Skip to content

Commit

Permalink
feat: Allow cancelling in-flight RPC operations with per-Link*() fu…
Browse files Browse the repository at this point in the history
…nction context in Go registry implementation

Signed-off-by: Felicitas Pojtinger <felicitas@pojtinger.com>
  • Loading branch information
pojntfx committed Aug 9, 2024
1 parent a9052e5 commit 079231a
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions go/pkg/rpc/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,13 @@ type Registry[R, T any] struct {
remotes map[string]R
remotesLock *sync.Mutex

ctx context.Context

hooks *RegistryHooks
}

// NewRegistry creates a new registry
func NewRegistry[R, T any]( // Type of remote RPCs to implement, type of nested values
local any, // Struct of local RPCs to expose

ctx context.Context, // Global context, used for resolving RPC requests and responses not tied to any specific read or write operation

hooks *RegistryHooks, // Global hooks
) *Registry[R, T] {
if hooks == nil {
Expand All @@ -87,10 +83,14 @@ func NewRegistry[R, T any]( // Type of remote RPCs to implement, type of nested
closuresLock: sync.Mutex{},
closures: map[string]func(args ...interface{}) (interface{}, error){},
},
}, *new(R), map[string]R{}, &sync.Mutex{}, ctx, hooks}
}, *new(R), map[string]R{}, &sync.Mutex{}, hooks}
}

func (r Registry[R, T]) makeRPC(
// This is separate from the context that is the first argument to each RPC because we also
// want to be able to cancel all in-flight RPCs if the context passed to a `Link*()` function is cancelled
linkCtx context.Context,

name string,
functionType reflect.Type,
setErr func(err error),
Expand Down Expand Up @@ -222,8 +222,8 @@ func (r Registry[R, T]) makeRPC(

returnValues = append(returnValues, valueReturnValue.Elem(), errReturnValue.Elem())
}
case <-r.ctx.Done():
panic(r.ctx.Err())
case <-linkCtx.Done():
panic(linkCtx.Err())
}

return returnValues
Expand All @@ -232,7 +232,7 @@ func (r Registry[R, T]) makeRPC(

// LinkMessage exposes local RPCs and implements remote RPCs via a message-based transport
func (r Registry[R, T]) LinkMessage(
ctx context.Context, // Context for read and write operations
ctx context.Context, // Context for read, write and in-flight RPC operations

writeRequest, // Function to write requests with
writeResponse func(b T) error, // Function to write responses with
Expand Down Expand Up @@ -357,6 +357,8 @@ func (r Registry[R, T]) LinkMessage(
remote.
FieldByName(functionField.Name).
Set(r.makeRPC(
ctx,

functionField.Name,
functionType,
setErr,
Expand Down Expand Up @@ -485,6 +487,8 @@ func (r Registry[R, T]) LinkMessage(
}

rpc := r.makeRPC(
ctx,

"CallClosure",
reflect.TypeOf(callClosureType(nil)),
setErr,
Expand Down Expand Up @@ -739,7 +743,7 @@ func (r Registry[R, T]) LinkMessage(

// LinkStream exposes local RPCs and implements remote RPCs via a stream-based transport
func (r Registry[R, T]) LinkStream(
ctx context.Context, // Context for read and write operations
ctx context.Context, // Context for read, write and in-flight RPC operations

encode func(v Message[T]) error, // Function to encode messages with
decode func(v *Message[T]) error, // Function to decode messages with
Expand Down

0 comments on commit 079231a

Please sign in to comment.