Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add context propagation support #783

Merged
merged 3 commits into from
Aug 7, 2024
Merged

Add context propagation support #783

merged 3 commits into from
Aug 7, 2024

Conversation

rgrandl
Copy link
Collaborator

@rgrandl rgrandl commented Jul 17, 2024

In the current implementation, weaver doesn't allow the user to propagate context information. We recommend users to define a struct that encapsulates the metadata information and add it as an argument to the method. However, more and more users are asking for an option to propagate metadata information using the context. This request comes especially from users that are using gRPC to communicate between their services, and gRPC provides a way to propagate metadata information using the context.

This PR implements a similar approach to gRPC [1] to propagate metadata information. We allow users to create a context where they can pass metadata as a map[string][]string.

E.g.,

...
ctx := context.Background()
meta := metadata.New(map[string][]string{"name":{"foo", "bar"}})
ctx = metadata.NewContext(ctx, meta)

dst.Get().Update(ctx, ....)
...
func( d *destination) Update(ctx, ...) (error) {
  meta := metadata.FromContext(ctx)
  vals := meta.Get("name")

  // Can call another component method with an updated context
  meta.Set("account", "x", "y")
  ctx = metadata.NewContext(ctx, meta)
  acct.Get().UpdateAccount(ctx, ...)
}

The user APIs are as follows:

// New creates a Metadata object that has operations
// to manipulate the metadata (e.g., Get, Set, Append, Delete.
func New(m map[string][]string) Metadata

// NewContext creates a new context that contains metadata meta, with
// parent context ctx.
func NewContext(ctx context.Context, meta Metadata) context.Context

// FromContext returns the metadata from the context. If the context
// doesn't have any metata, it returns an empty metadata.
func FromContext(ctx context.Context) Metadata

Note that similar to the gRPC metadata, we make sure the metadata keys are lowercased, because the user can create metadata w/o using our New() metadata constructor, hence they can set the keys however they want.

[1] https://pkg.go.dev/google.golang.org/grpc/metadata

@rgrandl rgrandl requested a review from ghemawat July 17, 2024 20:52
@rgrandl
Copy link
Collaborator Author

rgrandl commented Jul 17, 2024

@ghemawat do you mind taking a look? Tests are missing, but I would like to get your opinion on the APIs before I go ahead and polish the PR.

@rgrandl rgrandl force-pushed the context_propagation branch 2 times, most recently from a91af38 to 8225fbf Compare July 17, 2024 23:40
Copy link
Collaborator Author

@rgrandl rgrandl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Sanjay. Updated the PR to incorporate your feedback.

@rgrandl rgrandl self-assigned this Jul 17, 2024
Copy link
Collaborator

@ghemawat ghemawat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API looks good.

Can you update the commit message.

// component. Because the UpdateMetadata and GetMetadata methods are not
// routed, we can end up updating the metadata at replica 0, and reading
// the metadata from replica 1. To avoid this scenario we call the
// UpdateMetadata method twice (one for each replica).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we sure that the two calls won't both go to the same replica?

Perhaps we should make it a bit better by calling it N times, where N is 10 for multi and 1 for others

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Done

if err := dst.UpdateMetadata(ctx); err != nil {
t.Fatal(err)
}
if runner.Name == weavertest.Multi.Name {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The multi-replica comment belongs here, not above.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if err := dst.UpdateMetadata(ctx); err != nil {
t.Fatal(err)
}
if runner.Name == weavertest.Multi.Name {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repeated code. Maybe add a helper: setAndGetMetadata(map[string]string) map[string]string

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the code a little bit. Does it look better?

godeps.txt Outdated
@@ -983,6 +984,9 @@ github.com/ServiceWeaver/weaver/sim/internal/bank
go.opentelemetry.io/otel/codes
go.opentelemetry.io/otel/trace
reflect
github.com/ServiceWeaver/weaver/weavermetadata
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested rename of package "callmetadata".

Caller will look something like:

ctx = callmetadata.NewContext(ctx, map[string]string{...})

Or perhaps just "metadata", which is what gRPC does.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Called it "metadata".

msgHeaderSize = 16 + 8 + traceHeaderLen // handler_key + deadline + trace_context
)
// Size of the header included in each message.
const msgHeaderSize = 16 + 8 + traceHeaderLen + metadataHeaderLen
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation of format should be updated above.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use a fixed header anymore, so all these constants are gone.


// readContextMetadata returns the context metadata (if any).
func readContextMetadata(meta []byte) map[string]string {
dec := codegen.NewDecoder(meta)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should be using codegen encoder/decoder to format the header completely. I.e., writeHeader that does something like:

put request id
put payload length
put trace boolean if present and then encode trace
put #metadata entries
put metadata entries

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// Size of the header included in each message.
msgHeaderSize = 16 + 8 + traceHeaderLen // handler_key + deadline + trace_context
)
// Size of the header included in each message.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a format change, are there any versions do we need to update? Do we use "call.go" for any of our internal communication where the two sides may be at different versions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think everything should work as expected

@@ -20,7 +20,7 @@ import (
"go.opentelemetry.io/otel/trace"
)

const traceHeaderLen = 25
const traceHeaderLen = 25 // handler_key + deadline + trace_context
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if all of the header constants that depend on each other were described next to each other in the same constant block

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

// See the License for the specific language governing permissions and
// limitations under the License.

// Package weavermetadata define the structure of the metadata supported by weaver.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reword:

Package weavermetadata provides support for the propagation of metadata information from a component method caller to the callee. The metadata is propagated to the callee even if the caller and callee are not colocated in the same process.

The metadata is a map from string to string stored in context.Context. The map can be added to a context by calling NewContext.

... include example here ...

The metadata map can be extracted from a context by calling FromContext:

... include example here ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// more complicated types, it's their responsibility to encode/decode these types
// to/from string values.
//
// Note that all keys are automatically lowercase. This ensures the user avoids
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should drop this lower-casing I think.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@rgrandl rgrandl force-pushed the context_propagation branch 2 times, most recently from a6d09c3 to b715910 Compare August 5, 2024 23:43
Copy link
Collaborator Author

@rgrandl rgrandl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Sanjay. Very good suggestions.

var hdr [msgHeaderSize]byte
copy(hdr[0:], h[:])
enc := codegen.NewEncoder()
enc.Bytes(h[:])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bytes() seems suitable for variable-length data - it encodes the length. How about using Encoder.Grow and Decoder.Read to write exactly the correct number of bytes.

It also means that we won't have to worry about the wrong number of bytes showing up at the reader.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Changed.

// Extract trace context and create a new child span to trace the method
// call on the server.
span := trace.SpanFromContext(ctx) // noop span
if sc := readTraceContext(dec); sc != nil && sc.IsValid() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if sc is invalid? Should we not fail the RPC with an error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We are not failing the RPC right now if sc is invalid, but I think we should do that. Changed.

// Encode the length of the header. Note that the header can have variable length,
// so we need to know at which offset in the encoded message starts the payload
// when we decode the message at the receiver.
enc.Int(len(enc.Data()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enc.Uint? to avoid having to deal with negative number errors?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to encode the length of the header based on your suggestions.

ctx = readContextMetadata(ctx, dec)

// Compute the start offset of the payload in msg.
payloadOffset := dec.Int() + 8 /*int64 that stores the length of the header*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to check that the decoded number is correct.

The more I look at it, the more I think that we should not try to store the encoded header length inside the encoded header. Instead we can do:

header_length : uint32
header : byte[header_length]
... rest is payload ...

So we read the first 4 bytes to get the header length, use a Decoder to parse just the header, and leave the rest as the payload.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the encoding based on your suggestions.

"github.com/ServiceWeaver/weaver/runtime/logging"
"github.com/ServiceWeaver/weaver/runtime/retry"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

const (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a long comment in msg.go that explains the message format. That should be updated, but I don't see it in this commit.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Updated.

metadata/metadata.go Show resolved Hide resolved
if !ok {
return nil, false
}
out := make(map[string]string, len(meta))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use maps.Clone instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Done

@@ -28,6 +28,8 @@ import (
"sync"

"github.com/ServiceWeaver/weaver"
"github.com/ServiceWeaver/weaver/metadata"
"golang.org/x/exp/maps"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need x/exp/maps, or can we use the maps package in the standard library now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, replaced with "maps". Thank you

Copy link
Collaborator Author

@rgrandl rgrandl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Sanjay

var hdr [msgHeaderSize]byte
copy(hdr[0:], h[:])
enc := codegen.NewEncoder()
enc.Bytes(h[:])
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Changed.

// Encode the length of the header. Note that the header can have variable length,
// so we need to know at which offset in the encoded message starts the payload
// when we decode the message at the receiver.
enc.Int(len(enc.Data()))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to encode the length of the header based on your suggestions.

// Extract trace context and create a new child span to trace the method
// call on the server.
span := trace.SpanFromContext(ctx) // noop span
if sc := readTraceContext(dec); sc != nil && sc.IsValid() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We are not failing the RPC right now if sc is invalid, but I think we should do that. Changed.

ctx = readContextMetadata(ctx, dec)

// Compute the start offset of the payload in msg.
payloadOffset := dec.Int() + 8 /*int64 that stores the length of the header*/
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the encoding based on your suggestions.

metadata/metadata.go Show resolved Hide resolved
if !ok {
return nil, false
}
out := make(map[string]string, len(meta))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Done

@@ -28,6 +28,8 @@ import (
"sync"

"github.com/ServiceWeaver/weaver"
"github.com/ServiceWeaver/weaver/metadata"
"golang.org/x/exp/maps"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, replaced with "maps". Thank you

"github.com/ServiceWeaver/weaver/runtime/logging"
"github.com/ServiceWeaver/weaver/runtime/retry"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

const (
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Updated.

@rgrandl
Copy link
Collaborator Author

rgrandl commented Aug 7, 2024

Sanjay, don't look at the PR yet. Some test is failing.

In the current implementation, weaver doesn't allow the user to
propagate context information. We recommend users to define a struct
that encapsulates the metadata information and add it as an argument to
the method. However, more and more users are asking for an option to
propagate metadata information using the context. This request comes
especially from users that are using gRPC to communicate between their
services, and gRPC provides a way to propagate metadata information
using the context.

This PR enables the users to propagate metadata information as a
map[string]string.

```main.go
// To attach metadata with key "foo" and value "bar" to the context, you can do:
ctx := context.Background()
ctx = metadata.NewContext(ctx, map[string]string{"foo": "bar"})

// To read the metadata value associated with a key "foo" in the context, you can do:
meta, found := metadata.FromContext(ctx)
if found {
  value := meta["foo"]
}
```

[1] https://pkg.go.dev/google.golang.org/grpc/metadata
@rgrandl
Copy link
Collaborator Author

rgrandl commented Aug 7, 2024

Sanjay, don't look at the PR yet. Some test is failing.

Never mind. Everything works as expected. We have a flaky test but that's not something introduced by this change.


// Note that we send the header and the payload as follows:
// [header_length][encoded_header][payload]
hdrSlice := make([]byte, 4)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary allocation. You can do:

var hdr [4]byte
binary...PutUint32(hdr[:], ...)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// Note that we send the header and the payload as follows:
// [header_length][encoded_header][payload]
hdrSlice := make([]byte, 4)
binary.BigEndian.PutUint32(hdrSlice, uint32(len(enc.Data())))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why BigEndian? I think we are using LittleEndian everywhere else.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

// Extract request header from front of payload.
if len(msg) < msgHeaderSize {
msgLen := uint32(len(msg))
hdrLenEndOffset := uint32(4)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a "const hdrLenEndOffset".

Also, maybe hdrLenLen would be a better name?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// call on the server.
span := trace.SpanFromContext(ctx) // noop span
if sc := readTraceContext(dec); sc != nil {
if sc.IsValid() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about checking for error and returning early on error instead of placing the error-handling in the else branch?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// traceContext [25]byte -- zero, or trace context
// remainder -- call argument serialization
// headerLen [4]byte -- length of the encoded header
// header [length]byte -- encoded header information
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reword the type of header of [headerLen]byte to match the name used above

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// remainder -- call argument serialization
// headerLen [4]byte -- length of the encoded header
// header [length]byte -- encoded header information
// headerKey [16]byte -- fingerprint of method name
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following part looks wrong? I think you need follow header by remainder and then extend the comment after that to say something like:

The header is encoded using Service Weaver's encoding format for a type that looks like:

struct {
  ...fill this in...
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Done

// deadline [8]byte -- zero, or deadline in microseconds
// traceContext [25]byte -- zero, or trace context
// metadataContext [length]byte -- encoded map[string]string
// remainder -- call argument serialization
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

payload would be a better name than remainder

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

d.mu.Lock()
defer d.mu.Unlock()
meta, found := metadata.FromContext(ctx)
if found {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do:

if meta, found := ...; found {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -385,25 +381,36 @@ func (rc *reconnectingConnection) Call(ctx context.Context, h MethodKey, arg []b
}

func (rc *reconnectingConnection) callOnce(ctx context.Context, h MethodKey, arg []byte, opts CallOptions) ([]byte, error) {
var hdr [msgHeaderSize]byte
copy(hdr[0:], h[:])
enc := codegen.NewEncoder()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving the header encoding/decoding code into routines of their own. The code here can then be:

hdr := encodeHeader(ctx, h)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@rgrandl
Copy link
Collaborator Author

rgrandl commented Aug 7, 2024

Thanks Sanjay

var hdrLen [hdrLenLen]byte
binary.LittleEndian.PutUint32(hdrLen[:], uint32(len(hdr)))
hdrSlice := hdrLen[:]
hdrSlice = append(hdrSlice, hdr...)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just combine prev 2 lines?
hdrSlice := append(hdrLen[:], hdr...)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// The header is encoded using Service Weaver's encoding format for a type that
// looks like:
//
// struct header {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line up the types just like "go fmt" would.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator Author

@rgrandl rgrandl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Sanjay. Looks great after your suggestions

var hdrLen [hdrLenLen]byte
binary.LittleEndian.PutUint32(hdrLen[:], uint32(len(hdr)))
hdrSlice := hdrLen[:]
hdrSlice = append(hdrSlice, hdr...)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// The header is encoded using Service Weaver's encoding format for a type that
// looks like:
//
// struct header {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@rgrandl rgrandl merged commit d613ffe into main Aug 7, 2024
10 checks passed
@rgrandl rgrandl deleted the context_propagation branch August 7, 2024 23:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants