Skip to content

Commit

Permalink
Merge pull request #57 from berty/bidi-stream
Browse files Browse the repository at this point in the history
feat: enable bidi stream for blmod.RunModule
  • Loading branch information
n0izn0iz authored Feb 16, 2022
2 parents 978a286 + 33666cb commit aa2361f
Show file tree
Hide file tree
Showing 13 changed files with 202 additions and 84 deletions.
9 changes: 6 additions & 3 deletions api/blmod/v1/blmod.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ message ModuleInfo {

service LabsModulesService {
rpc AllModules(AllModulesRequest) returns (AllModulesResponse) {}
rpc RunModule(RunModuleRequest) returns (stream RunModuleResponse) {}
rpc RunModule(stream RunModuleRequest) returns (stream RunModuleResponse) {}
}


Expand All @@ -33,8 +33,11 @@ message AllModulesResponse {
}

message RunModuleRequest {
string name = 1;
bytes args = 2;
bytes payload = 1;

// Header data
string name = 2;
bytes args = 3;
}

message RunModuleResponse {
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ require (
github.com/ipfs/go-ds-flatfs v0.5.1
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-ds-sql v0.3.0
github.com/ipfs/go-ipfs-api v0.3.0
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-config v0.18.0
github.com/multiformats/go-multiaddr v0.5.0
Expand Down Expand Up @@ -92,6 +91,7 @@ require (
github.com/ipfs/go-fs-lock v0.0.7 // indirect
github.com/ipfs/go-graphsync v0.11.0 // indirect
github.com/ipfs/go-ipfs v0.11.0 // indirect
github.com/ipfs/go-ipfs-api v0.3.0 // indirect
github.com/ipfs/go-ipfs-blockstore v0.2.1 // indirect
github.com/ipfs/go-ipfs-cmds v0.6.0 // indirect
github.com/ipfs/go-ipfs-delay v0.0.1 // indirect
Expand Down Expand Up @@ -218,6 +218,7 @@ require (
github.com/prometheus/common v0.30.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1340,8 +1340,9 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
Expand Down
15 changes: 11 additions & 4 deletions go/cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,20 @@ func main() {
if len(*argsFlag) != 0 {
args = []byte(*argsFlag)
}
cl, err := client.RunModule(context.Background(), &blmod.RunModuleRequest{
Name: *runFlag,
Args: args,
})
cl, err := client.RunModule(context.Background())
if err != nil {
log.Fatalf("fail to start running module: %v", err)
}
if err := cl.Send(&blmod.RunModuleRequest{
Name: *runFlag,
Args: args,
}); err != nil {
log.Fatalf("fail to send header: %v", err)
}
if err := cl.CloseSend(); err != nil {
log.Fatalf("fail to close send: %v", err)
}

for {
reply, err := cl.Recv()
if err == io.EOF {
Expand Down
1 change: 1 addition & 0 deletions go/pkg/blmod/blmod.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

type ModuleContext interface {
Send(interface{}) error
Recv(interface{}) error
}

type Module interface {
Expand Down
57 changes: 34 additions & 23 deletions go/pkg/blmod/blmod.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 20 additions & 15 deletions go/pkg/blmod/blmod_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 18 additions & 1 deletion go/pkg/blmod/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type moduleContext struct {
srv LabsModulesService_RunModuleServer
}

var _ ModuleContext = (*moduleContext)(nil)

func (mc *moduleContext) Send(v interface{}) error {
bytes, err := json.Marshal(v)
if err != nil {
Expand All @@ -56,7 +58,22 @@ func (mc *moduleContext) Send(v interface{}) error {
return mc.srv.Send(&RunModuleResponse{Payload: bytes})
}

func (s *Server) RunModule(req *RunModuleRequest, srv LabsModulesService_RunModuleServer) error {
func (mc *moduleContext) Recv(v interface{}) error {
reply, err := mc.srv.Recv()
if err != nil {
return err
}
if err := json.Unmarshal(reply.GetPayload(), v); err != nil {
return errors.Wrap(err, "unmarshal JSON")
}
return nil
}

func (s *Server) RunModule(srv LabsModulesService_RunModuleServer) error {
req, err := srv.Recv()
if err != nil {
return errors.Wrap(err, "read header")
}
mod, err := s.reg.Get(req.GetName())
if err != nil {
return errors.Wrap(err, "get module")
Expand Down
6 changes: 6 additions & 0 deletions rn/src/api/blmod/v1/blmod_pb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ export namespace AllModulesResponse {
}

export class RunModuleRequest extends jspb.Message {
getPayload(): Uint8Array | string
getPayload_asU8(): Uint8Array
getPayload_asB64(): string
setPayload(value: Uint8Array | string): void

getName(): string
setName(value: string): void

Expand All @@ -117,6 +122,7 @@ export class RunModuleRequest extends jspb.Message {

export namespace RunModuleRequest {
export type AsObject = {
payload: Uint8Array | string
name: string
args: Uint8Array | string
}
Expand Down
Loading

0 comments on commit aa2361f

Please sign in to comment.