Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: Support handlers of any arity #4630

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cli/azd/internal/vsrpc/aspire_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newAspireService(server *Server) *aspireService {
// GetAspireHostAsync is the server implementation of:
// ValueTask<AspireHost> GetAspireHostAsync(Session session, string aspireEnv, CancellationToken cancellationToken).
func (s *aspireService) GetAspireHostAsync(
ctx context.Context, rc RequestContext, aspireEnv string, observer IObserver[ProgressMessage],
ctx context.Context, rc RequestContext, aspireEnv string, observer *IObserver[ProgressMessage],
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's non-obvious from the diff why I went and did this. The issues that IObserver needs some special attention during unmarshalling. The way that works is we have an internal interface:

// connectionObserver is an interface that can be implemented by types that want to be notified when they are deserialized
// during a JSON RPC call.
type connectionObserver interface {
	attachConnection(c jsonrpc2.Conn)
}

When we unmarshall arguments, we check to see if the type we unmarshalled to implements this interface and if so, we call it. IObserver's implementation of the method stores the connection so it can send messages to it later, as items are emitted.

The interface is implemented on *IObserver[T] not IObserver[T].

The old version of unmarshalArg handled this by adding a level of indirection on thet ype during interface checking:

	if v, ok := (any(&arg)).(connectionObserver); ok {
		v.attachConnection(conn)
	}

But it felt cleaner to not do this extra level of indirection in the new version and instead ensure the argument was just *IObserver[T].

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can image that we could also do both - check to see if T and *T implement the interface, (preferring one if both are there) and calling the function.

) (*AspireHost, error) {
session, err := s.server.validateSession(rc.Session)
if err != nil {
Expand Down Expand Up @@ -65,7 +65,7 @@ func (s *aspireService) GetAspireHostAsync(
// RenameAspireHostAsync is the server implementation of:
// ValueTask RenameAspireHostAsync(Session session, string newPath, CancellationToken cancellationToken).
func (s *aspireService) RenameAspireHostAsync(
ctx context.Context, rc RequestContext, newPath string, observer IObserver[ProgressMessage],
ctx context.Context, rc RequestContext, newPath string, observer *IObserver[ProgressMessage],
) error {
_, err := s.server.validateSession(rc.Session)
if err != nil {
Expand All @@ -79,7 +79,7 @@ func (s *aspireService) RenameAspireHostAsync(
// ServeHTTP implements http.Handler.
func (s *aspireService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
serveRpc(w, r, map[string]Handler{
"GetAspireHostAsync": HandlerFunc3(s.GetAspireHostAsync),
"RenameAspireHostAsync": HandlerAction3(s.RenameAspireHostAsync),
"GetAspireHostAsync": NewHandler(s.GetAspireHostAsync),
"RenameAspireHostAsync": NewHandler(s.RenameAspireHostAsync),
})
}
10 changes: 5 additions & 5 deletions cli/azd/internal/vsrpc/debug_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s *debugService) TestCancelAsync(ctx context.Context, timeoutMs int) (bool
// ValueTask<bool> TestIObserverAsync(int, CancellationToken);
//
// It emits a sequence of integers to the observer, from 0 to max, and then completes the observer, before returning.
func (s *debugService) TestIObserverAsync(ctx context.Context, max int, observer IObserver[int]) error {
func (s *debugService) TestIObserverAsync(ctx context.Context, max int, observer *IObserver[int]) error {
for i := 0; i < max; i++ {
_ = observer.OnNext(ctx, i)
}
Expand Down Expand Up @@ -107,9 +107,9 @@ func (s *debugService) FetchTokenAsync(ctx context.Context, sessionId Session) (
// ServeHTTP implements http.Handler.
func (s *debugService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
serveRpc(w, r, map[string]Handler{
"TestCancelAsync": HandlerFunc1(s.TestCancelAsync),
"TestIObserverAsync": HandlerAction2(s.TestIObserverAsync),
"TestPanicAsync": HandlerAction1(s.TestPanicAsync),
"FetchTokenAsync": HandlerFunc1(s.FetchTokenAsync),
"TestCancelAsync": NewHandler(s.TestCancelAsync),
"TestIObserverAsync": NewHandler(s.TestIObserverAsync),
"TestPanicAsync": NewHandler(s.TestPanicAsync),
"FetchTokenAsync": NewHandler(s.FetchTokenAsync),
})
}
22 changes: 11 additions & 11 deletions cli/azd/internal/vsrpc/environment_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func newEnvironmentService(server *Server) *environmentService {
// ValueTask<IEnumerable<EnvironmentInfo>> GetEnvironmentsAsync(
// RequestContext, IObserver<ProgressMessage>, CancellationToken);
func (s *environmentService) GetEnvironmentsAsync(
ctx context.Context, rc RequestContext, observer IObserver[ProgressMessage],
ctx context.Context, rc RequestContext, observer *IObserver[ProgressMessage],
) ([]*EnvironmentInfo, error) {
session, err := s.server.validateSession(rc.Session)
if err != nil {
Expand Down Expand Up @@ -71,7 +71,7 @@ func (s *environmentService) GetEnvironmentsAsync(
// SetCurrentEnvironmentAsync is the server implementation of:
// ValueTask<bool> SetCurrentEnvironmentAsync(RequestContext, string, IObserver<ProgressMessage>, CancellationToken);
func (s *environmentService) SetCurrentEnvironmentAsync(
ctx context.Context, rc RequestContext, name string, observer IObserver[ProgressMessage],
ctx context.Context, rc RequestContext, name string, observer *IObserver[ProgressMessage],
) (bool, error) {
session, err := s.server.validateSession(rc.Session)
if err != nil {
Expand Down Expand Up @@ -107,7 +107,7 @@ const (
// DeleteEnvironmentAsync is the server implementation of:
// ValueTask<bool> DeleteEnvironmentAsync(RequestContext, string, IObserver<ProgressMessage>, int, CancellationToken);
func (s *environmentService) DeleteEnvironmentAsync(
ctx context.Context, rc RequestContext, name string, mode int, observer IObserver[ProgressMessage],
ctx context.Context, rc RequestContext, name string, mode int, observer *IObserver[ProgressMessage],
) (bool, error) {
session, err := s.server.validateSession(rc.Session)
if err != nil {
Expand Down Expand Up @@ -196,13 +196,13 @@ func (s *environmentService) DeleteEnvironmentAsync(
// ServeHTTP implements http.Handler.
func (s *environmentService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
serveRpc(w, r, map[string]Handler{
"CreateEnvironmentAsync": HandlerFunc3(s.CreateEnvironmentAsync),
"GetEnvironmentsAsync": HandlerFunc2(s.GetEnvironmentsAsync),
"LoadEnvironmentAsync": HandlerFunc3(s.LoadEnvironmentAsync),
"OpenEnvironmentAsync": HandlerFunc3(s.OpenEnvironmentAsync),
"SetCurrentEnvironmentAsync": HandlerFunc3(s.SetCurrentEnvironmentAsync),
"DeleteEnvironmentAsync": HandlerFunc4(s.DeleteEnvironmentAsync),
"RefreshEnvironmentAsync": HandlerFunc3(s.RefreshEnvironmentAsync),
"DeployAsync": HandlerFunc3(s.DeployAsync),
"CreateEnvironmentAsync": NewHandler(s.CreateEnvironmentAsync),
"GetEnvironmentsAsync": NewHandler(s.GetEnvironmentsAsync),
"LoadEnvironmentAsync": NewHandler(s.LoadEnvironmentAsync),
"OpenEnvironmentAsync": NewHandler(s.OpenEnvironmentAsync),
"SetCurrentEnvironmentAsync": NewHandler(s.SetCurrentEnvironmentAsync),
"DeleteEnvironmentAsync": NewHandler(s.DeleteEnvironmentAsync),
"RefreshEnvironmentAsync": NewHandler(s.RefreshEnvironmentAsync),
"DeployAsync": NewHandler(s.DeployAsync),
})
}
2 changes: 1 addition & 1 deletion cli/azd/internal/vsrpc/environment_service_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
// CreateEnvironmentAsync is the server implementation of:
// ValueTask<bool> CreateEnvironmentAsync(RequestContext, Environment, IObserver<ProgressMessage>, CancellationToken);
func (s *environmentService) CreateEnvironmentAsync(
ctx context.Context, rc RequestContext, newEnv Environment, observer IObserver[ProgressMessage],
ctx context.Context, rc RequestContext, newEnv Environment, observer *IObserver[ProgressMessage],
) (bool, error) {
session, err := s.server.validateSession(rc.Session)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cli/azd/internal/vsrpc/environment_service_deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
//
// While it is named simply `DeployAsync`, it behaves as if the user had run `azd provision` and `azd deploy`.
func (s *environmentService) DeployAsync(
ctx context.Context, rc RequestContext, name string, observer IObserver[ProgressMessage],
ctx context.Context, rc RequestContext, name string, observer *IObserver[ProgressMessage],
) (*Environment, error) {
session, err := s.server.validateSession(rc.Session)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cli/azd/internal/vsrpc/environment_service_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
// already cached) and is faster than `LoadEnvironmentAsync` in cases where we have not cached the manifest. This means
// the Services array of the returned environment may be empty.
func (s *environmentService) OpenEnvironmentAsync(
ctx context.Context, rc RequestContext, name string, observer IObserver[ProgressMessage],
ctx context.Context, rc RequestContext, name string, observer *IObserver[ProgressMessage],
) (*Environment, error) {
session, err := s.server.validateSession(rc.Session)
if err != nil {
Expand All @@ -43,7 +43,7 @@ func (s *environmentService) OpenEnvironmentAsync(
// the environment (like service endpoints) may not be available. Use `RefreshEnvironmentAsync` to load the environment and
// fetch information from Azure.
func (s *environmentService) LoadEnvironmentAsync(
ctx context.Context, rc RequestContext, name string, observer IObserver[ProgressMessage],
ctx context.Context, rc RequestContext, name string, observer *IObserver[ProgressMessage],
) (*Environment, error) {
session, err := s.server.validateSession(rc.Session)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cli/azd/internal/vsrpc/environment_service_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// to accept some loss of information in favor of a faster load time, use `LoadEnvironmentAsync` instead, which does not
// contact azure to compute service endpoints or last deployment information.
func (s *environmentService) RefreshEnvironmentAsync(
ctx context.Context, rc RequestContext, name string, observer IObserver[ProgressMessage],
ctx context.Context, rc RequestContext, name string, observer *IObserver[ProgressMessage],
) (*Environment, error) {
session, err := s.server.validateSession(rc.Session)
if err != nil {
Expand All @@ -41,7 +41,7 @@ func (s *environmentService) RefreshEnvironmentAsync(
}

func (s *environmentService) refreshEnvironmentAsync(
ctx context.Context, container *container, name string, observer IObserver[ProgressMessage],
ctx context.Context, container *container, name string, observer *IObserver[ProgressMessage],
) (*Environment, error) {
env, err := s.loadEnvironmentAsync(ctx, container, name, true)
if err != nil {
Expand Down
Loading
Loading