From 33666cb0d19d7dff056a0eba90e8636d502dd119 Mon Sep 17 00:00:00 2001 From: Norman Meier Date: Sun, 13 Feb 2022 15:30:37 +0100 Subject: [PATCH] feat: enable bidi stream for blmod.RunModule Signed-off-by: Norman Meier --- api/blmod/v1/blmod.proto | 9 ++- go.mod | 3 +- go.sum | 3 +- go/cmd/client/main.go | 15 +++-- go/pkg/blmod/blmod.go | 1 + go/pkg/blmod/blmod.pb.go | 57 ++++++++++------- go/pkg/blmod/blmod_grpc.pb.go | 35 +++++----- go/pkg/blmod/server.go | 19 +++++- rn/src/api/blmod/v1/blmod_pb.d.ts | 6 ++ rn/src/api/blmod/v1/blmod_pb.js | 78 +++++++++++++++++++---- rn/src/api/blmod/v1/blmod_pb_service.d.ts | 5 +- rn/src/api/blmod/v1/blmod_pb_service.js | 44 +++++++------ rn/src/screens/GoModule.tsx | 11 +++- 13 files changed, 202 insertions(+), 84 deletions(-) diff --git a/api/blmod/v1/blmod.proto b/api/blmod/v1/blmod.proto index 6ae3ce14..e0c7096f 100644 --- a/api/blmod/v1/blmod.proto +++ b/api/blmod/v1/blmod.proto @@ -20,7 +20,7 @@ message ModuleInfo { service LabsModulesService { rpc AllModules(AllModulesRequest) returns (AllModulesResponse) {} - rpc RunModule(RunModuleRequest) returns (stream RunModuleResponse) {} + rpc RunModule(stream RunModuleRequest) returns (stream RunModuleResponse) {} } @@ -33,8 +33,11 @@ message AllModulesResponse { } message RunModuleRequest { - string name = 1; - bytes args = 2; + bytes payload = 1; + + // Header data + string name = 2; + bytes args = 3; } message RunModuleResponse { diff --git a/go.mod b/go.mod index 7e79d899..7084fd75 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,6 @@ require ( github.com/ipfs/go-ds-flatfs v0.5.1 github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-ds-sql v0.3.0 - github.com/ipfs/go-ipfs-api v0.3.0 github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-config v0.18.0 github.com/multiformats/go-multiaddr v0.5.0 @@ -92,6 +91,7 @@ require ( github.com/ipfs/go-fs-lock v0.0.7 // indirect github.com/ipfs/go-graphsync v0.11.0 // indirect github.com/ipfs/go-ipfs v0.11.0 // indirect + github.com/ipfs/go-ipfs-api v0.3.0 // indirect github.com/ipfs/go-ipfs-blockstore v0.2.1 // indirect github.com/ipfs/go-ipfs-cmds v0.6.0 // indirect github.com/ipfs/go-ipfs-delay v0.0.1 // indirect @@ -218,6 +218,7 @@ require ( github.com/prometheus/common v0.30.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect github.com/prometheus/statsd_exporter v0.21.0 // indirect + github.com/rogpeppe/go-internal v1.8.0 // indirect github.com/rs/cors v1.7.0 // indirect github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect diff --git a/go.sum b/go.sum index 41e6ee30..d202a2f9 100644 --- a/go.sum +++ b/go.sum @@ -1340,8 +1340,9 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= +github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= diff --git a/go/cmd/client/main.go b/go/cmd/client/main.go index df1aea0a..ca257b4d 100644 --- a/go/cmd/client/main.go +++ b/go/cmd/client/main.go @@ -60,13 +60,20 @@ func main() { if len(*argsFlag) != 0 { args = []byte(*argsFlag) } - cl, err := client.RunModule(context.Background(), &blmod.RunModuleRequest{ - Name: *runFlag, - Args: args, - }) + cl, err := client.RunModule(context.Background()) if err != nil { log.Fatalf("fail to start running module: %v", err) } + if err := cl.Send(&blmod.RunModuleRequest{ + Name: *runFlag, + Args: args, + }); err != nil { + log.Fatalf("fail to send header: %v", err) + } + if err := cl.CloseSend(); err != nil { + log.Fatalf("fail to close send: %v", err) + } + for { reply, err := cl.Recv() if err == io.EOF { diff --git a/go/pkg/blmod/blmod.go b/go/pkg/blmod/blmod.go index d9b135e1..ffd5c83c 100644 --- a/go/pkg/blmod/blmod.go +++ b/go/pkg/blmod/blmod.go @@ -11,6 +11,7 @@ import ( type ModuleContext interface { Send(interface{}) error + Recv(interface{}) error } type Module interface { diff --git a/go/pkg/blmod/blmod.pb.go b/go/pkg/blmod/blmod.pb.go index a197d623..2ff6fcb7 100644 --- a/go/pkg/blmod/blmod.pb.go +++ b/go/pkg/blmod/blmod.pb.go @@ -241,8 +241,10 @@ type RunModuleRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` - Args []byte `protobuf:"bytes,2,opt,name=args,proto3" json:"args,omitempty"` + Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` + // Header data + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + Args []byte `protobuf:"bytes,3,opt,name=args,proto3" json:"args,omitempty"` } func (x *RunModuleRequest) Reset() { @@ -277,6 +279,13 @@ func (*RunModuleRequest) Descriptor() ([]byte, []int) { return file_blmod_v1_blmod_proto_rawDescGZIP(), []int{3} } +func (x *RunModuleRequest) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + func (x *RunModuleRequest) GetName() string { if x != nil { return x.Name @@ -367,27 +376,29 @@ var file_blmod_v1_blmod_proto_rawDesc = []byte{ 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2e, 0x0a, 0x07, 0x6d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x62, 0x6c, 0x6d, 0x6f, 0x64, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x07, - 0x6d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x73, 0x22, 0x3a, 0x0a, 0x10, 0x52, 0x75, 0x6e, 0x4d, 0x6f, - 0x64, 0x75, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, - 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, - 0x12, 0x0a, 0x04, 0x61, 0x72, 0x67, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x61, - 0x72, 0x67, 0x73, 0x22, 0x2d, 0x0a, 0x11, 0x52, 0x75, 0x6e, 0x4d, 0x6f, 0x64, 0x75, 0x6c, 0x65, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, - 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, - 0x61, 0x64, 0x32, 0xa9, 0x01, 0x0a, 0x12, 0x4c, 0x61, 0x62, 0x73, 0x4d, 0x6f, 0x64, 0x75, 0x6c, - 0x65, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x49, 0x0a, 0x0a, 0x41, 0x6c, 0x6c, - 0x4d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x73, 0x12, 0x1b, 0x2e, 0x62, 0x6c, 0x6d, 0x6f, 0x64, 0x2e, - 0x76, 0x31, 0x2e, 0x41, 0x6c, 0x6c, 0x4d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x73, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x62, 0x6c, 0x6d, 0x6f, 0x64, 0x2e, 0x76, 0x31, 0x2e, - 0x41, 0x6c, 0x6c, 0x4d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x12, 0x48, 0x0a, 0x09, 0x52, 0x75, 0x6e, 0x4d, 0x6f, 0x64, 0x75, 0x6c, - 0x65, 0x12, 0x1a, 0x2e, 0x62, 0x6c, 0x6d, 0x6f, 0x64, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x75, 0x6e, - 0x4d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, - 0x62, 0x6c, 0x6d, 0x6f, 0x64, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x75, 0x6e, 0x4d, 0x6f, 0x64, 0x75, - 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x1e, - 0x5a, 0x1c, 0x62, 0x65, 0x72, 0x74, 0x79, 0x2e, 0x74, 0x65, 0x63, 0x68, 0x2f, 0x6c, 0x61, 0x62, - 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x62, 0x6c, 0x6d, 0x6f, 0x64, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x73, 0x22, 0x54, 0x0a, 0x10, 0x52, 0x75, 0x6e, 0x4d, 0x6f, + 0x64, 0x75, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x70, + 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, + 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x72, 0x67, + 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x61, 0x72, 0x67, 0x73, 0x22, 0x2d, 0x0a, + 0x11, 0x52, 0x75, 0x6e, 0x4d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x32, 0xab, 0x01, 0x0a, + 0x12, 0x4c, 0x61, 0x62, 0x73, 0x4d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x73, 0x53, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x12, 0x49, 0x0a, 0x0a, 0x41, 0x6c, 0x6c, 0x4d, 0x6f, 0x64, 0x75, 0x6c, 0x65, + 0x73, 0x12, 0x1b, 0x2e, 0x62, 0x6c, 0x6d, 0x6f, 0x64, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x6c, 0x6c, + 0x4d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, + 0x2e, 0x62, 0x6c, 0x6d, 0x6f, 0x64, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x6c, 0x6c, 0x4d, 0x6f, 0x64, + 0x75, 0x6c, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4a, + 0x0a, 0x09, 0x52, 0x75, 0x6e, 0x4d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x12, 0x1a, 0x2e, 0x62, 0x6c, + 0x6d, 0x6f, 0x64, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x75, 0x6e, 0x4d, 0x6f, 0x64, 0x75, 0x6c, 0x65, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x62, 0x6c, 0x6d, 0x6f, 0x64, 0x2e, + 0x76, 0x31, 0x2e, 0x52, 0x75, 0x6e, 0x4d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x1e, 0x5a, 0x1c, 0x62, 0x65, + 0x72, 0x74, 0x79, 0x2e, 0x74, 0x65, 0x63, 0x68, 0x2f, 0x6c, 0x61, 0x62, 0x73, 0x2f, 0x67, 0x6f, + 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x62, 0x6c, 0x6d, 0x6f, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( diff --git a/go/pkg/blmod/blmod_grpc.pb.go b/go/pkg/blmod/blmod_grpc.pb.go index db2dc344..6c4f8adc 100644 --- a/go/pkg/blmod/blmod_grpc.pb.go +++ b/go/pkg/blmod/blmod_grpc.pb.go @@ -23,7 +23,7 @@ const _ = grpc.SupportPackageIsVersion7 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type LabsModulesServiceClient interface { AllModules(ctx context.Context, in *AllModulesRequest, opts ...grpc.CallOption) (*AllModulesResponse, error) - RunModule(ctx context.Context, in *RunModuleRequest, opts ...grpc.CallOption) (LabsModulesService_RunModuleClient, error) + RunModule(ctx context.Context, opts ...grpc.CallOption) (LabsModulesService_RunModuleClient, error) } type labsModulesServiceClient struct { @@ -43,22 +43,17 @@ func (c *labsModulesServiceClient) AllModules(ctx context.Context, in *AllModule return out, nil } -func (c *labsModulesServiceClient) RunModule(ctx context.Context, in *RunModuleRequest, opts ...grpc.CallOption) (LabsModulesService_RunModuleClient, error) { +func (c *labsModulesServiceClient) RunModule(ctx context.Context, opts ...grpc.CallOption) (LabsModulesService_RunModuleClient, error) { stream, err := c.cc.NewStream(ctx, &LabsModulesService_ServiceDesc.Streams[0], "/blmod.v1.LabsModulesService/RunModule", opts...) if err != nil { return nil, err } x := &labsModulesServiceRunModuleClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } return x, nil } type LabsModulesService_RunModuleClient interface { + Send(*RunModuleRequest) error Recv() (*RunModuleResponse, error) grpc.ClientStream } @@ -67,6 +62,10 @@ type labsModulesServiceRunModuleClient struct { grpc.ClientStream } +func (x *labsModulesServiceRunModuleClient) Send(m *RunModuleRequest) error { + return x.ClientStream.SendMsg(m) +} + func (x *labsModulesServiceRunModuleClient) Recv() (*RunModuleResponse, error) { m := new(RunModuleResponse) if err := x.ClientStream.RecvMsg(m); err != nil { @@ -80,7 +79,7 @@ func (x *labsModulesServiceRunModuleClient) Recv() (*RunModuleResponse, error) { // for forward compatibility type LabsModulesServiceServer interface { AllModules(context.Context, *AllModulesRequest) (*AllModulesResponse, error) - RunModule(*RunModuleRequest, LabsModulesService_RunModuleServer) error + RunModule(LabsModulesService_RunModuleServer) error mustEmbedUnimplementedLabsModulesServiceServer() } @@ -91,7 +90,7 @@ type UnimplementedLabsModulesServiceServer struct { func (UnimplementedLabsModulesServiceServer) AllModules(context.Context, *AllModulesRequest) (*AllModulesResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method AllModules not implemented") } -func (UnimplementedLabsModulesServiceServer) RunModule(*RunModuleRequest, LabsModulesService_RunModuleServer) error { +func (UnimplementedLabsModulesServiceServer) RunModule(LabsModulesService_RunModuleServer) error { return status.Errorf(codes.Unimplemented, "method RunModule not implemented") } func (UnimplementedLabsModulesServiceServer) mustEmbedUnimplementedLabsModulesServiceServer() {} @@ -126,15 +125,12 @@ func _LabsModulesService_AllModules_Handler(srv interface{}, ctx context.Context } func _LabsModulesService_RunModule_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(RunModuleRequest) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(LabsModulesServiceServer).RunModule(m, &labsModulesServiceRunModuleServer{stream}) + return srv.(LabsModulesServiceServer).RunModule(&labsModulesServiceRunModuleServer{stream}) } type LabsModulesService_RunModuleServer interface { Send(*RunModuleResponse) error + Recv() (*RunModuleRequest, error) grpc.ServerStream } @@ -146,6 +142,14 @@ func (x *labsModulesServiceRunModuleServer) Send(m *RunModuleResponse) error { return x.ServerStream.SendMsg(m) } +func (x *labsModulesServiceRunModuleServer) Recv() (*RunModuleRequest, error) { + m := new(RunModuleRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // LabsModulesService_ServiceDesc is the grpc.ServiceDesc for LabsModulesService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -163,6 +167,7 @@ var LabsModulesService_ServiceDesc = grpc.ServiceDesc{ StreamName: "RunModule", Handler: _LabsModulesService_RunModule_Handler, ServerStreams: true, + ClientStreams: true, }, }, Metadata: "blmod/v1/blmod.proto", diff --git a/go/pkg/blmod/server.go b/go/pkg/blmod/server.go index a68b0627..75990d3e 100644 --- a/go/pkg/blmod/server.go +++ b/go/pkg/blmod/server.go @@ -48,6 +48,8 @@ type moduleContext struct { srv LabsModulesService_RunModuleServer } +var _ ModuleContext = (*moduleContext)(nil) + func (mc *moduleContext) Send(v interface{}) error { bytes, err := json.Marshal(v) if err != nil { @@ -56,7 +58,22 @@ func (mc *moduleContext) Send(v interface{}) error { return mc.srv.Send(&RunModuleResponse{Payload: bytes}) } -func (s *Server) RunModule(req *RunModuleRequest, srv LabsModulesService_RunModuleServer) error { +func (mc *moduleContext) Recv(v interface{}) error { + reply, err := mc.srv.Recv() + if err != nil { + return err + } + if err := json.Unmarshal(reply.GetPayload(), v); err != nil { + return errors.Wrap(err, "unmarshal JSON") + } + return nil +} + +func (s *Server) RunModule(srv LabsModulesService_RunModuleServer) error { + req, err := srv.Recv() + if err != nil { + return errors.Wrap(err, "read header") + } mod, err := s.reg.Get(req.GetName()) if err != nil { return errors.Wrap(err, "get module") diff --git a/rn/src/api/blmod/v1/blmod_pb.d.ts b/rn/src/api/blmod/v1/blmod_pb.d.ts index b801e923..74bad469 100644 --- a/rn/src/api/blmod/v1/blmod_pb.d.ts +++ b/rn/src/api/blmod/v1/blmod_pb.d.ts @@ -94,6 +94,11 @@ export namespace AllModulesResponse { } export class RunModuleRequest extends jspb.Message { + getPayload(): Uint8Array | string + getPayload_asU8(): Uint8Array + getPayload_asB64(): string + setPayload(value: Uint8Array | string): void + getName(): string setName(value: string): void @@ -117,6 +122,7 @@ export class RunModuleRequest extends jspb.Message { export namespace RunModuleRequest { export type AsObject = { + payload: Uint8Array | string name: string args: Uint8Array | string } diff --git a/rn/src/api/blmod/v1/blmod_pb.js b/rn/src/api/blmod/v1/blmod_pb.js index 5c3f20d8..0f0eb554 100644 --- a/rn/src/api/blmod/v1/blmod_pb.js +++ b/rn/src/api/blmod/v1/blmod_pb.js @@ -710,7 +710,8 @@ proto.blmod.v1.RunModuleRequest.prototype.toObject = function(opt_includeInstanc */ proto.blmod.v1.RunModuleRequest.toObject = function(includeInstance, msg) { var f, obj = { - name: jspb.Message.getFieldWithDefault(msg, 1, ""), + payload: msg.getPayload_asB64(), + name: jspb.Message.getFieldWithDefault(msg, 2, ""), args: msg.getArgs_asB64() }; @@ -749,10 +750,14 @@ proto.blmod.v1.RunModuleRequest.deserializeBinaryFromReader = function(msg, read var field = reader.getFieldNumber(); switch (field) { case 1: + var value = /** @type {!Uint8Array} */ (reader.readBytes()); + msg.setPayload(value); + break; + case 2: var value = /** @type {string} */ (reader.readString()); msg.setName(value); break; - case 2: + case 3: var value = /** @type {!Uint8Array} */ (reader.readBytes()); msg.setArgs(value); break; @@ -785,17 +790,24 @@ proto.blmod.v1.RunModuleRequest.prototype.serializeBinary = function() { */ proto.blmod.v1.RunModuleRequest.serializeBinaryToWriter = function(message, writer) { var f = undefined; + f = message.getPayload_asU8(); + if (f.length > 0) { + writer.writeBytes( + 1, + f + ); + } f = message.getName(); if (f.length > 0) { writer.writeString( - 1, + 2, f ); } f = message.getArgs_asU8(); if (f.length > 0) { writer.writeBytes( - 2, + 3, f ); } @@ -803,11 +815,53 @@ proto.blmod.v1.RunModuleRequest.serializeBinaryToWriter = function(message, writ /** - * optional string name = 1; + * optional bytes payload = 1; + * @return {!(string|Uint8Array)} + */ +proto.blmod.v1.RunModuleRequest.prototype.getPayload = function() { + return /** @type {!(string|Uint8Array)} */ (jspb.Message.getFieldWithDefault(this, 1, "")); +}; + + +/** + * optional bytes payload = 1; + * This is a type-conversion wrapper around `getPayload()` + * @return {string} + */ +proto.blmod.v1.RunModuleRequest.prototype.getPayload_asB64 = function() { + return /** @type {string} */ (jspb.Message.bytesAsB64( + this.getPayload())); +}; + + +/** + * optional bytes payload = 1; + * Note that Uint8Array is not supported on all browsers. + * @see http://caniuse.com/Uint8Array + * This is a type-conversion wrapper around `getPayload()` + * @return {!Uint8Array} + */ +proto.blmod.v1.RunModuleRequest.prototype.getPayload_asU8 = function() { + return /** @type {!Uint8Array} */ (jspb.Message.bytesAsU8( + this.getPayload())); +}; + + +/** + * @param {!(string|Uint8Array)} value + * @return {!proto.blmod.v1.RunModuleRequest} returns this + */ +proto.blmod.v1.RunModuleRequest.prototype.setPayload = function(value) { + return jspb.Message.setProto3BytesField(this, 1, value); +}; + + +/** + * optional string name = 2; * @return {string} */ proto.blmod.v1.RunModuleRequest.prototype.getName = function() { - return /** @type {string} */ (jspb.Message.getFieldWithDefault(this, 1, "")); + return /** @type {string} */ (jspb.Message.getFieldWithDefault(this, 2, "")); }; @@ -816,21 +870,21 @@ proto.blmod.v1.RunModuleRequest.prototype.getName = function() { * @return {!proto.blmod.v1.RunModuleRequest} returns this */ proto.blmod.v1.RunModuleRequest.prototype.setName = function(value) { - return jspb.Message.setProto3StringField(this, 1, value); + return jspb.Message.setProto3StringField(this, 2, value); }; /** - * optional bytes args = 2; + * optional bytes args = 3; * @return {!(string|Uint8Array)} */ proto.blmod.v1.RunModuleRequest.prototype.getArgs = function() { - return /** @type {!(string|Uint8Array)} */ (jspb.Message.getFieldWithDefault(this, 2, "")); + return /** @type {!(string|Uint8Array)} */ (jspb.Message.getFieldWithDefault(this, 3, "")); }; /** - * optional bytes args = 2; + * optional bytes args = 3; * This is a type-conversion wrapper around `getArgs()` * @return {string} */ @@ -841,7 +895,7 @@ proto.blmod.v1.RunModuleRequest.prototype.getArgs_asB64 = function() { /** - * optional bytes args = 2; + * optional bytes args = 3; * Note that Uint8Array is not supported on all browsers. * @see http://caniuse.com/Uint8Array * This is a type-conversion wrapper around `getArgs()` @@ -858,7 +912,7 @@ proto.blmod.v1.RunModuleRequest.prototype.getArgs_asU8 = function() { * @return {!proto.blmod.v1.RunModuleRequest} returns this */ proto.blmod.v1.RunModuleRequest.prototype.setArgs = function(value) { - return jspb.Message.setProto3BytesField(this, 2, value); + return jspb.Message.setProto3BytesField(this, 3, value); }; diff --git a/rn/src/api/blmod/v1/blmod_pb_service.d.ts b/rn/src/api/blmod/v1/blmod_pb_service.d.ts index 08858b8e..be80b051 100644 --- a/rn/src/api/blmod/v1/blmod_pb_service.d.ts +++ b/rn/src/api/blmod/v1/blmod_pb_service.d.ts @@ -17,7 +17,7 @@ type LabsModulesServiceAllModules = { type LabsModulesServiceRunModule = { readonly methodName: string readonly service: typeof LabsModulesService - readonly requestStream: false + readonly requestStream: true readonly responseStream: true readonly requestType: typeof blmod_v1_blmod_pb.RunModuleRequest readonly responseType: typeof blmod_v1_blmod_pb.RunModuleResponse @@ -77,7 +77,6 @@ export class LabsModulesServiceClient { ) => void, ): UnaryResponse runModule( - requestMessage: blmod_v1_blmod_pb.RunModuleRequest, metadata?: grpc.Metadata, - ): ResponseStream + ): BidirectionalStream } diff --git a/rn/src/api/blmod/v1/blmod_pb_service.js b/rn/src/api/blmod/v1/blmod_pb_service.js index 83416c84..816f52e6 100644 --- a/rn/src/api/blmod/v1/blmod_pb_service.js +++ b/rn/src/api/blmod/v1/blmod_pb_service.js @@ -22,7 +22,7 @@ LabsModulesService.AllModules = { LabsModulesService.RunModule = { methodName: 'RunModule', service: LabsModulesService, - requestStream: false, + requestStream: true, responseStream: true, requestType: blmod_v1_blmod_pb.RunModuleRequest, responseType: blmod_v1_blmod_pb.RunModuleResponse, @@ -70,38 +70,44 @@ LabsModulesServiceClient.prototype.allModules = function allModules( } } -LabsModulesServiceClient.prototype.runModule = function runModule(requestMessage, metadata) { +LabsModulesServiceClient.prototype.runModule = function runModule(metadata) { var listeners = { data: [], end: [], status: [], } - var client = grpc.invoke(LabsModulesService.RunModule, { - request: requestMessage, + var client = grpc.client(LabsModulesService.RunModule, { host: this.serviceHost, metadata: metadata, transport: this.options.transport, - debug: this.options.debug, - onMessage: function (responseMessage) { - listeners.data.forEach(function (handler) { - handler(responseMessage) - }) - }, - onEnd: function (status, statusMessage, trailers) { - listeners.status.forEach(function (handler) { - handler({ code: status, details: statusMessage, metadata: trailers }) - }) - listeners.end.forEach(function (handler) { - handler({ code: status, details: statusMessage, metadata: trailers }) - }) - listeners = null - }, }) + client.onEnd(function (status, statusMessage, trailers) { + listeners.status.forEach(function (handler) { + handler({ code: status, details: statusMessage, metadata: trailers }) + }) + listeners.end.forEach(function (handler) { + handler({ code: status, details: statusMessage, metadata: trailers }) + }) + listeners = null + }) + client.onMessage(function (message) { + listeners.data.forEach(function (handler) { + handler(message) + }) + }) + client.start(metadata) return { on: function (type, handler) { listeners[type].push(handler) return this }, + write: function (requestMessage) { + client.send(requestMessage) + return this + }, + end: function () { + client.finishSend() + }, cancel: function () { listeners = null client.close() diff --git a/rn/src/screens/GoModule.tsx b/rn/src/screens/GoModule.tsx index f581e212..b4ca3fdb 100644 --- a/rn/src/screens/GoModule.tsx +++ b/rn/src/screens/GoModule.tsx @@ -18,7 +18,11 @@ import { utf8 } from '@berty-labs/encoding' const space = 15 -const streams: { [key: string]: blmod.ResponseStream | undefined } = {} +const streams: { + [key: string]: + | blmod.BidirectionalStream + | undefined +} = {} export const GoModule: ScreenFC<'GoModule'> = ({ route: { @@ -45,7 +49,7 @@ export const GoModule: ScreenFC<'GoModule'> = ({ req.setArgs(utf8.encode(args)) } - const cl = modulesClient.runModule(req) + const cl = modulesClient.runModule() streams[name] = cl cl.on('data', reply => { @@ -83,6 +87,9 @@ export const GoModule: ScreenFC<'GoModule'> = ({ cl.on('status', (...args) => { console.log('status:', ...args) }) + + cl.write(req) + cl.end() }, [args, dispatch, modulesClient, name, state]) const handleArgsChange = useCallback( (args: string) => dispatch(setModuleArgs({ name, args })),