diff --git a/README.md b/README.md index 47c7b8db..e79d6649 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ The project adheres to [Semantic Versioning](https://semver.org) and [Convention - [Actor System](#actor-system) - [Behaviors](#behaviors) - [Mailbox](#mailbox) + - [Routers](#routers) - [Events Stream](#events-stream) - [Supported events](#supported-events) - [Messaging](#messaging) @@ -123,8 +124,11 @@ The fundamental building blocks of Go-Akt are actors. - supervise the failure behavior of (child) actors. The supervisory strategy to adopt is set during its creation. In Go-Akt that each child actor is treated separately. There is no concept of one-for-one and one-for-all strategies. The following directives are supported: - - [`Restart`](./actors/types.go): to restart the child actor - - [`Stop`](./actors/types.go): to stop the child actor which is the default one + - [`Restart`](./actors/supervisor.go): to restart the child actor. One can control how the restart is done using the following options: + - `maxNumRetries`: defines the maximum of restart attempts + - `timeout`: how to attempt restarting the faulty actor + - [`Stop`](./actors/supervisor.go): to stop the child actor which is the default one + - [`Resume`](./actors/supervisor.go): ignores the failure and process the next message, instead - remotely lookup for an actor on another node via their process id [`PID`](./actors/pid.go) `RemoteLookup`. This allows it to send messages remotely via `RemoteAsk` or `RemoteTell` methods @@ -177,6 +181,10 @@ To change the behavior, call the following methods on the [ReceiveContext interf Once can implement a custom mailbox. See [Mailbox](./actors/mailbox.go). The default mailbox makes use of buffered channels. +### Routers + +Go-Akt comes shipped with some routers. See [docs](./router/router.md) + ### Events Stream To receive some system events and act on them for some particular business cases, you just need to call the actor system `Subscribe`. diff --git a/actors/actor_system.go b/actors/actor_system.go index 1997c5ee..bdb17a40 100644 --- a/actors/actor_system.go +++ b/actors/actor_system.go @@ -129,6 +129,8 @@ type ActorSystem interface { handleRemoteAsk(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) // handleRemoteTell handles an asynchronous message to an actor handleRemoteTell(ctx context.Context, to PID, message proto.Message) error + // setActor sets actor in the actor system actors registry + setActor(actor PID) } // ActorSystem represent a collection of actors on a given node @@ -465,16 +467,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID, return nil, err } - x.actors.set(pid) - if x.clusterEnabled.Load() { - x.actorsChan <- &internalpb.WireActor{ - ActorName: name, - ActorAddress: actorPath.RemoteAddress(), - ActorPath: actorPath.String(), - ActorType: types.NameOf(actor), - } - } - + x.setActor(pid) return pid, nil } @@ -539,16 +532,7 @@ func (x *actorSystem) SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc return nil, err } - x.actors.set(pid) - if x.clusterEnabled.Load() { - x.actorsChan <- &internalpb.WireActor{ - ActorName: actorID, - ActorAddress: actorPath.RemoteAddress(), - ActorPath: actorPath.String(), - ActorType: types.NameOf(actor), - } - } - + x.setActor(pid) return pid, nil } @@ -1170,6 +1154,19 @@ func (x *actorSystem) handleRemoteTell(ctx context.Context, to PID, message prot return Tell(spanCtx, to, message) } +// setActor implements ActorSystem. +func (x *actorSystem) setActor(actor PID) { + x.actors.set(actor) + if x.clusterEnabled.Load() { + x.actorsChan <- &internalpb.WireActor{ + ActorName: actor.Name(), + ActorAddress: actor.ActorPath().RemoteAddress(), + ActorPath: actor.ActorPath().String(), + ActorType: types.NameOf(actor), + } + } +} + // enableClustering enables clustering. When clustering is enabled remoting is also enabled to facilitate remote // communication func (x *actorSystem) enableClustering(ctx context.Context) error { diff --git a/actors/actor_test.go b/actors/actor_test.go index be669662..c500e69e 100644 --- a/actors/actor_test.go +++ b/actors/actor_test.go @@ -197,7 +197,6 @@ func (x *userActor) Receive(ctx ReceiveContext) { // Authenticated behavior is executed when the actor receive the TestAuth message func (x *userActor) Authenticated(ctx ReceiveContext) { switch ctx.Message().(type) { - case *goaktpb.PostStart: case *testspb.TestReadiness: ctx.Response(new(testspb.TestReady)) ctx.UnBecome() @@ -206,7 +205,6 @@ func (x *userActor) Authenticated(ctx ReceiveContext) { func (x *userActor) CreditAccount(ctx ReceiveContext) { switch ctx.Message().(type) { - case *goaktpb.PostStart: case *testspb.CreditAccount: ctx.Response(new(testspb.AccountCredited)) ctx.BecomeStacked(x.DebitAccount) @@ -217,7 +215,6 @@ func (x *userActor) CreditAccount(ctx ReceiveContext) { func (x *userActor) DebitAccount(ctx ReceiveContext) { switch ctx.Message().(type) { - case *goaktpb.PostStart: case *testspb.DebitAccount: ctx.Response(new(testspb.AccountDebited)) ctx.UnBecomeStacked() diff --git a/actors/context.go b/actors/context.go index c88d0796..1e022a6b 100644 --- a/actors/context.go +++ b/actors/context.go @@ -193,8 +193,10 @@ func (c *receiveContext) Self() PID { // Err is used instead of panicking within a message handler. // One can also call panic which is not the recommended way func (c *receiveContext) Err(err error) { - // this will be recovered - panic(err) + if err != nil { + // this will be recovered + panic(err) + } } // Response sets the message response diff --git a/actors/pid.go b/actors/pid.go index 8c80fc3f..154b4cb7 100644 --- a/actors/pid.go +++ b/actors/pid.go @@ -157,7 +157,7 @@ type PID interface { // push a message to the actor's receiveContextBuffer doReceive(ctx ReceiveContext) // watchers returns the list of watchMen - watchers() *slices.ConcurrentSlice[*watcher] + watchers() *slices.ThreadSafe[*watcher] // setBehavior is a utility function that helps set the actor behavior setBehavior(behavior Behavior) // setBehaviorStacked adds a behavior to the actor's behaviors @@ -227,7 +227,7 @@ type pid struct { haltPassivationLnr chan types.Unit // set of watchersList watching the given actor - watchersList *slices.ConcurrentSlice[*watcher] + watchersList *slices.ThreadSafe[*watcher] // hold the list of the children children *pidMap @@ -257,7 +257,7 @@ type pid struct { // http client httpClient *stdhttp.Client - // specifies the current actor behavior + // specifies the actor behavior stack behaviorStack *behaviorStack // stash settings @@ -292,7 +292,7 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption mailboxSize: DefaultMailboxSize, children: newPIDMap(10), supervisorDirective: DefaultSupervisoryStrategy, - watchersList: slices.NewConcurrentSlice[*watcher](), + watchersList: slices.NewThreadSafe[*watcher](), telemetry: telemetry.New(), actorPath: actorPath, rwLocker: &sync.RWMutex{}, @@ -604,6 +604,11 @@ func (x *pid) SpawnChild(ctx context.Context, name string, actor Actor) (PID, er }) } + // set the actor in the given actor system registry + if x.ActorSystem() != nil { + x.ActorSystem().setActor(cid) + } + return cid, nil } @@ -1182,7 +1187,7 @@ func (x *pid) UnWatch(pid PID) { } // Watchers return the list of watchersList -func (x *pid) watchers() *slices.ConcurrentSlice[*watcher] { +func (x *pid) watchers() *slices.ThreadSafe[*watcher] { return x.watchersList } @@ -1247,7 +1252,7 @@ func (x *pid) reset() { x.lastProcessingDuration.Store(0) x.initTimeout.Store(DefaultInitTimeout) x.children = newPIDMap(10) - x.watchersList = slices.NewConcurrentSlice[*watcher]() + x.watchersList = slices.NewThreadSafe[*watcher]() x.telemetry = telemetry.New() x.mailbox.Reset() x.resetBehavior() @@ -1265,7 +1270,9 @@ func (x *pid) freeWatchers(ctx context.Context) { if watchers.Len() > 0 { for item := range watchers.Iter() { watcher := item.Value - terminated := &goaktpb.Terminated{} + terminated := &goaktpb.Terminated{ + ActorId: x.ID(), + } if watcher.WatcherID.IsRunning() { // TODO: handle error and push to some system dead-letters queue _ = x.Tell(ctx, watcher.WatcherID, terminated) @@ -1394,7 +1401,6 @@ func (x *pid) setBehavior(behavior Behavior) { // resetBehavior is a utility function resets the actor behavior func (x *pid) resetBehavior() { x.rwLocker.Lock() - x.behaviorStack.Clear() x.behaviorStack.Push(x.Receive) x.rwLocker.Unlock() } diff --git a/goaktpb/goakt.pb.go b/goaktpb/goakt.pb.go index 1a19e8b6..883e2e5a 100644 --- a/goaktpb/goakt.pb.go +++ b/goaktpb/goakt.pb.go @@ -605,6 +605,9 @@ type Terminated struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + + // Specifies the terminated actor + ActorId string `protobuf:"bytes,1,opt,name=actor_id,json=actorId,proto3" json:"actor_id,omitempty"` } func (x *Terminated) Reset() { @@ -639,6 +642,13 @@ func (*Terminated) Descriptor() ([]byte, []int) { return file_goakt_goakt_proto_rawDescGZIP(), []int{9} } +func (x *Terminated) GetActorId() string { + if x != nil { + return x.ActorId + } + return "" +} + // PoisonPill is sent the stop an actor. // It is enqueued as ordinary messages. // It will be handled after messages that were already queued in the mailbox. @@ -758,6 +768,55 @@ func (*PreStart) Descriptor() ([]byte, []int) { return file_goakt_goakt_proto_rawDescGZIP(), []int{12} } +// Broadcast is used to send message to a router +type Broadcast struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Specifies the actual message + Message *anypb.Any `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *Broadcast) Reset() { + *x = Broadcast{} + if protoimpl.UnsafeEnabled { + mi := &file_goakt_goakt_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Broadcast) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Broadcast) ProtoMessage() {} + +func (x *Broadcast) ProtoReflect() protoreflect.Message { + mi := &file_goakt_goakt_proto_msgTypes[13] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Broadcast.ProtoReflect.Descriptor instead. +func (*Broadcast) Descriptor() ([]byte, []int) { + return file_goakt_goakt_proto_rawDescGZIP(), []int{13} +} + +func (x *Broadcast) GetMessage() *anypb.Any { + if x != nil { + return x.Message + } + return nil +} + var File_goakt_goakt_proto protoreflect.FileDescriptor var file_goakt_goakt_proto_rawDesc = []byte{ @@ -839,19 +898,25 @@ var file_goakt_goakt_proto_rawDesc = []byte{ 0x73, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, - 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22, 0x0c, 0x0a, 0x0a, 0x54, - 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x22, 0x0c, 0x0a, 0x0a, 0x50, 0x6f, 0x69, - 0x73, 0x6f, 0x6e, 0x50, 0x69, 0x6c, 0x6c, 0x22, 0x0b, 0x0a, 0x09, 0x50, 0x6f, 0x73, 0x74, 0x53, - 0x74, 0x61, 0x72, 0x74, 0x22, 0x0a, 0x0a, 0x08, 0x50, 0x72, 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, - 0x42, 0x85, 0x01, 0x0a, 0x0b, 0x63, 0x6f, 0x6d, 0x2e, 0x67, 0x6f, 0x61, 0x6b, 0x74, 0x70, 0x62, - 0x42, 0x0a, 0x47, 0x6f, 0x61, 0x6b, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x48, 0x02, 0x50, 0x01, - 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x74, 0x6f, 0x63, - 0x68, 0x65, 0x6d, 0x65, 0x79, 0x2f, 0x67, 0x6f, 0x61, 0x6b, 0x74, 0x2f, 0x76, 0x32, 0x2f, 0x67, - 0x6f, 0x61, 0x6b, 0x74, 0x70, 0x62, 0x3b, 0x67, 0x6f, 0x61, 0x6b, 0x74, 0x70, 0x62, 0xa2, 0x02, - 0x03, 0x47, 0x58, 0x58, 0xaa, 0x02, 0x07, 0x47, 0x6f, 0x61, 0x6b, 0x74, 0x70, 0x62, 0xca, 0x02, - 0x07, 0x47, 0x6f, 0x61, 0x6b, 0x74, 0x70, 0x62, 0xe2, 0x02, 0x13, 0x47, 0x6f, 0x61, 0x6b, 0x74, - 0x70, 0x62, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, - 0x07, 0x47, 0x6f, 0x61, 0x6b, 0x74, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22, 0x27, 0x0a, 0x0a, 0x54, + 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x64, 0x12, 0x19, 0x0a, 0x08, 0x61, 0x63, 0x74, + 0x6f, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x63, 0x74, + 0x6f, 0x72, 0x49, 0x64, 0x22, 0x0c, 0x0a, 0x0a, 0x50, 0x6f, 0x69, 0x73, 0x6f, 0x6e, 0x50, 0x69, + 0x6c, 0x6c, 0x22, 0x0b, 0x0a, 0x09, 0x50, 0x6f, 0x73, 0x74, 0x53, 0x74, 0x61, 0x72, 0x74, 0x22, + 0x0a, 0x0a, 0x08, 0x50, 0x72, 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, 0x22, 0x3b, 0x0a, 0x09, 0x42, + 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x12, 0x2e, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, + 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x85, 0x01, 0x0a, 0x0b, 0x63, 0x6f, 0x6d, + 0x2e, 0x67, 0x6f, 0x61, 0x6b, 0x74, 0x70, 0x62, 0x42, 0x0a, 0x47, 0x6f, 0x61, 0x6b, 0x74, 0x50, + 0x72, 0x6f, 0x74, 0x6f, 0x48, 0x02, 0x50, 0x01, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x74, 0x6f, 0x63, 0x68, 0x65, 0x6d, 0x65, 0x79, 0x2f, 0x67, 0x6f, + 0x61, 0x6b, 0x74, 0x2f, 0x76, 0x32, 0x2f, 0x67, 0x6f, 0x61, 0x6b, 0x74, 0x70, 0x62, 0x3b, 0x67, + 0x6f, 0x61, 0x6b, 0x74, 0x70, 0x62, 0xa2, 0x02, 0x03, 0x47, 0x58, 0x58, 0xaa, 0x02, 0x07, 0x47, + 0x6f, 0x61, 0x6b, 0x74, 0x70, 0x62, 0xca, 0x02, 0x07, 0x47, 0x6f, 0x61, 0x6b, 0x74, 0x70, 0x62, + 0xe2, 0x02, 0x13, 0x47, 0x6f, 0x61, 0x6b, 0x74, 0x70, 0x62, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, + 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x07, 0x47, 0x6f, 0x61, 0x6b, 0x74, 0x70, 0x62, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -866,7 +931,7 @@ func file_goakt_goakt_proto_rawDescGZIP() []byte { return file_goakt_goakt_proto_rawDescData } -var file_goakt_goakt_proto_msgTypes = make([]protoimpl.MessageInfo, 13) +var file_goakt_goakt_proto_msgTypes = make([]protoimpl.MessageInfo, 14) var file_goakt_goakt_proto_goTypes = []interface{}{ (*Address)(nil), // 0: goaktpb.Address (*Deadletter)(nil), // 1: goaktpb.Deadletter @@ -881,32 +946,34 @@ var file_goakt_goakt_proto_goTypes = []interface{}{ (*PoisonPill)(nil), // 10: goaktpb.PoisonPill (*PostStart)(nil), // 11: goaktpb.PostStart (*PreStart)(nil), // 12: goaktpb.PreStart - (*anypb.Any)(nil), // 13: google.protobuf.Any - (*timestamppb.Timestamp)(nil), // 14: google.protobuf.Timestamp + (*Broadcast)(nil), // 13: goaktpb.Broadcast + (*anypb.Any)(nil), // 14: google.protobuf.Any + (*timestamppb.Timestamp)(nil), // 15: google.protobuf.Timestamp } var file_goakt_goakt_proto_depIdxs = []int32{ 0, // 0: goaktpb.Deadletter.sender:type_name -> goaktpb.Address 0, // 1: goaktpb.Deadletter.receiver:type_name -> goaktpb.Address - 13, // 2: goaktpb.Deadletter.message:type_name -> google.protobuf.Any - 14, // 3: goaktpb.Deadletter.send_time:type_name -> google.protobuf.Timestamp + 14, // 2: goaktpb.Deadletter.message:type_name -> google.protobuf.Any + 15, // 3: goaktpb.Deadletter.send_time:type_name -> google.protobuf.Timestamp 0, // 4: goaktpb.ActorStarted.address:type_name -> goaktpb.Address - 14, // 5: goaktpb.ActorStarted.started_at:type_name -> google.protobuf.Timestamp + 15, // 5: goaktpb.ActorStarted.started_at:type_name -> google.protobuf.Timestamp 0, // 6: goaktpb.ActorStopped.address:type_name -> goaktpb.Address - 14, // 7: goaktpb.ActorStopped.stopped_at:type_name -> google.protobuf.Timestamp + 15, // 7: goaktpb.ActorStopped.stopped_at:type_name -> google.protobuf.Timestamp 0, // 8: goaktpb.ActorPassivated.address:type_name -> goaktpb.Address - 14, // 9: goaktpb.ActorPassivated.passivated_at:type_name -> google.protobuf.Timestamp + 15, // 9: goaktpb.ActorPassivated.passivated_at:type_name -> google.protobuf.Timestamp 0, // 10: goaktpb.ActorChildCreated.address:type_name -> goaktpb.Address 0, // 11: goaktpb.ActorChildCreated.parent:type_name -> goaktpb.Address - 14, // 12: goaktpb.ActorChildCreated.created_at:type_name -> google.protobuf.Timestamp + 15, // 12: goaktpb.ActorChildCreated.created_at:type_name -> google.protobuf.Timestamp 0, // 13: goaktpb.ActorRestarted.address:type_name -> goaktpb.Address - 14, // 14: goaktpb.ActorRestarted.restarted_at:type_name -> google.protobuf.Timestamp - 14, // 15: goaktpb.NodeJoined.timestamp:type_name -> google.protobuf.Timestamp - 14, // 16: goaktpb.NodeLeft.timestamp:type_name -> google.protobuf.Timestamp - 17, // [17:17] is the sub-list for method output_type - 17, // [17:17] is the sub-list for method input_type - 17, // [17:17] is the sub-list for extension type_name - 17, // [17:17] is the sub-list for extension extendee - 0, // [0:17] is the sub-list for field type_name + 15, // 14: goaktpb.ActorRestarted.restarted_at:type_name -> google.protobuf.Timestamp + 15, // 15: goaktpb.NodeJoined.timestamp:type_name -> google.protobuf.Timestamp + 15, // 16: goaktpb.NodeLeft.timestamp:type_name -> google.protobuf.Timestamp + 14, // 17: goaktpb.Broadcast.message:type_name -> google.protobuf.Any + 18, // [18:18] is the sub-list for method output_type + 18, // [18:18] is the sub-list for method input_type + 18, // [18:18] is the sub-list for extension type_name + 18, // [18:18] is the sub-list for extension extendee + 0, // [0:18] is the sub-list for field type_name } func init() { file_goakt_goakt_proto_init() } @@ -1071,6 +1138,18 @@ func file_goakt_goakt_proto_init() { return nil } } + file_goakt_goakt_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Broadcast); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -1078,7 +1157,7 @@ func file_goakt_goakt_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_goakt_goakt_proto_rawDesc, NumEnums: 0, - NumMessages: 13, + NumMessages: 14, NumExtensions: 0, NumServices: 0, }, diff --git a/internal/cluster/engine.go b/internal/cluster/engine.go index 823c5a64..85c6aeea 100644 --- a/internal/cluster/engine.go +++ b/internal/cluster/engine.go @@ -31,6 +31,7 @@ import ( "net" "slices" "strconv" + "sync" "time" "github.com/buraksezer/olric" @@ -117,6 +118,7 @@ type Interface interface { // Engine represents the Engine type Engine struct { + *sync.Mutex // specifies the total number of partitions // the default values is 20 partitionsCount uint64 @@ -184,6 +186,7 @@ func NewEngine(name string, disco discovery.Provider, host *discovery.Node, opts messagesChan: make(chan *redis.Message, 1), minimumPeersQuorum: 1, replicaCount: 1, + Mutex: new(sync.Mutex), } // apply the various options for _, opt := range opts { @@ -332,7 +335,12 @@ func (n *Engine) Stop(ctx context.Context) error { // IsLeader states whether the given cluster node is a leader or not at a given // point in time in the cluster func (n *Engine) IsLeader(ctx context.Context) bool { - stats, err := n.client.Stats(ctx, n.host.PeersAddress()) + n.Lock() + client := n.client + host := n.host + n.Unlock() + + stats, err := client.Stats(ctx, host.PeersAddress()) if err != nil { n.logger.Errorf("failed to fetch the cluster node=(%s) stats: %v", n.host.PeersAddress(), err) return false @@ -342,18 +350,27 @@ func (n *Engine) IsLeader(ctx context.Context) bool { // Host returns the Node Host func (n *Engine) Host() string { - return n.host.Host + n.Lock() + host := n.host + n.Unlock() + return host.Host } // RemotingPort returns the Node remoting port func (n *Engine) RemotingPort() int { - return n.host.RemotingPort + n.Lock() + host := n.host + n.Unlock() + return host.RemotingPort } // AdvertisedAddress returns the cluster node cluster address that is known by the // peers in the cluster func (n *Engine) AdvertisedAddress() string { - return n.host.PeersAddress() + n.Lock() + host := n.host + n.Unlock() + return host.PeersAddress() } // PutActor pushes to the cluster the peer sync request @@ -361,6 +378,9 @@ func (n *Engine) PutActor(ctx context.Context, actor *internalpb.WireActor) erro ctx, cancelFn := context.WithTimeout(ctx, n.writeTimeout) defer cancelFn() + n.Lock() + defer n.Unlock() + logger := n.logger logger.Infof("synchronization peer (%s)", n.host.PeersAddress()) @@ -408,6 +428,9 @@ func (n *Engine) GetState(ctx context.Context, peerAddress string) (*internalpb. ctx, cancelFn := context.WithTimeout(ctx, n.readTimeout) defer cancelFn() + n.Lock() + defer n.Unlock() + logger := n.logger logger.Infof("[%s] retrieving peer (%s) sync record", n.host.PeersAddress(), peerAddress) @@ -443,6 +466,9 @@ func (n *Engine) GetActor(ctx context.Context, actorName string) (*internalpb.Wi ctx, cancelFn := context.WithTimeout(ctx, n.readTimeout) defer cancelFn() + n.Lock() + defer n.Unlock() + logger := n.logger logger.Infof("[%s] retrieving actor (%s) from the cluster", n.host.PeersAddress(), actorName) @@ -477,6 +503,8 @@ func (n *Engine) GetActor(ctx context.Context, actorName string) (*internalpb.Wi // An actor is removed from the cluster when this actor has been passivated. func (n *Engine) RemoveActor(ctx context.Context, actorName string) error { logger := n.logger + n.Lock() + defer n.Unlock() logger.Infof("removing actor (%s)", actorName) @@ -495,6 +523,9 @@ func (n *Engine) SetKey(ctx context.Context, key string) error { ctx, cancelFn := context.WithTimeout(ctx, n.writeTimeout) defer cancelFn() + n.Lock() + defer n.Unlock() + logger := n.logger logger.Infof("replicating key (%s)", key) @@ -513,6 +544,9 @@ func (n *Engine) KeyExists(ctx context.Context, key string) (bool, error) { ctx, cancelFn := context.WithTimeout(ctx, n.readTimeout) defer cancelFn() + n.Lock() + defer n.Unlock() + logger := n.logger logger.Infof("checking key (%s) existence in the cluster", key) @@ -534,6 +568,9 @@ func (n *Engine) KeyExists(ctx context.Context, key string) (bool, error) { func (n *Engine) UnsetKey(ctx context.Context, key string) error { logger := n.logger + n.Lock() + defer n.Unlock() + logger.Infof("unsetting key (%s)", key) if _, err := n.dmap.Delete(ctx, key); err != nil { @@ -561,7 +598,11 @@ func (n *Engine) Events() <-chan *Event { // Peers returns a channel containing the list of peers at a given time func (n *Engine) Peers(ctx context.Context) ([]*Peer, error) { - members, err := n.client.Members(ctx) + n.Lock() + client := n.client + n.Unlock() + + members, err := client.Members(ctx) if err != nil { n.logger.Error(errors.Wrap(err, "failed to read cluster peers")) return nil, err diff --git a/internal/slices/slices.go b/internal/slices/slices.go index 4adff0d7..ff02152f 100644 --- a/internal/slices/slices.go +++ b/internal/slices/slices.go @@ -26,8 +26,8 @@ package slices import "sync" -// ConcurrentSlice type that can be safely shared between goroutines. -type ConcurrentSlice[T any] struct { +// ThreadSafe type that can be safely shared between goroutines. +type ThreadSafe[T any] struct { sync.RWMutex items []T } @@ -38,9 +38,9 @@ type Item[T any] struct { Value T } -// NewConcurrentSlice creates a new synchronized slice. -func NewConcurrentSlice[T any]() *ConcurrentSlice[T] { - cs := &ConcurrentSlice[T]{ +// NewThreadSafe creates a new synchronized slice. +func NewThreadSafe[T any]() *ThreadSafe[T] { + cs := &ThreadSafe[T]{ items: make([]T, 0), } @@ -48,21 +48,21 @@ func NewConcurrentSlice[T any]() *ConcurrentSlice[T] { } // Len returns the number of items -func (cs *ConcurrentSlice[T]) Len() int { +func (cs *ThreadSafe[T]) Len() int { cs.Lock() defer cs.Unlock() return len(cs.items) } // Append adds an item to the concurrent slice. -func (cs *ConcurrentSlice[T]) Append(item T) { +func (cs *ThreadSafe[T]) Append(item T) { cs.Lock() defer cs.Unlock() cs.items = append(cs.items, item) } // Get returns the slice item at the given index -func (cs *ConcurrentSlice[T]) Get(index int) (item any) { +func (cs *ThreadSafe[T]) Get(index int) (item any) { cs.RLock() defer cs.RUnlock() if isSet(cs.items, index) { @@ -72,7 +72,7 @@ func (cs *ConcurrentSlice[T]) Get(index int) (item any) { } // Delete an item from the slice -func (cs *ConcurrentSlice[T]) Delete(index int) { +func (cs *ThreadSafe[T]) Delete(index int) { cs.RLock() defer cs.RUnlock() var nilState T @@ -91,7 +91,7 @@ func isSet[T any](arr []T, index int) bool { // Iter iterates the items in the concurrent slice. // Each item is sent over a channel, so that // we can iterate over the slice using the builtin range keyword. -func (cs *ConcurrentSlice[T]) Iter() <-chan Item[T] { +func (cs *ThreadSafe[T]) Iter() <-chan Item[T] { c := make(chan Item[T]) f := func() { cs.RLock() diff --git a/internal/slices/slices_test.go b/internal/slices/slices_test.go index fa31a0d2..c023669f 100644 --- a/internal/slices/slices_test.go +++ b/internal/slices/slices_test.go @@ -30,9 +30,9 @@ import ( "github.com/stretchr/testify/assert" ) -func TestConcurrentSlice(t *testing.T) { +func TestNewThreadSafeSlice(t *testing.T) { // create a concurrent slice of integer - sl := NewConcurrentSlice[int]() + sl := NewThreadSafe[int]() // add some items sl.Append(2) diff --git a/log/log.go b/log/log.go index 92e75e99..cd8756d1 100644 --- a/log/log.go +++ b/log/log.go @@ -35,7 +35,7 @@ import ( ) // DefaultLogger represents the default Log to use -// This Log wraps zerolog under the hood +// This Log wraps zap under the hood var DefaultLogger = New(DebugLevel, os.Stdout) var DiscardLogger = New(DebugLevel, io.Discard) diff --git a/protos/goakt/goakt.proto b/protos/goakt/goakt.proto index 0841963e..99fee3dc 100644 --- a/protos/goakt/goakt.proto +++ b/protos/goakt/goakt.proto @@ -94,7 +94,10 @@ message NodeLeft { // Terminated is used to notify watching actors // of the shutdown of its child actor. -message Terminated {} +message Terminated { + // Specifies the terminated actor + string actor_id = 1; +} // PoisonPill is sent the stop an actor. // It is enqueued as ordinary messages. @@ -106,3 +109,9 @@ message PostStart {} // PreStart is used when an actor is about to start message PreStart {} + +// Broadcast is used to send message to a router +message Broadcast { + // Specifies the actual message + google.protobuf.Any message = 1; +} diff --git a/protos/test/test.proto b/protos/test/test.proto index 2d4cb9d3..5a9726d2 100644 --- a/protos/test/test.proto +++ b/protos/test/test.proto @@ -61,4 +61,15 @@ message Wait { message RunTask { uint64 duration = 1; } + message TaskComplete {} + +message DoLog { + string text = 1; +} + +message GetCount {} + +message Count { + int32 value = 1; +} diff --git a/router/broadcast.go b/router/broadcast.go new file mode 100644 index 00000000..7249a6cd --- /dev/null +++ b/router/broadcast.go @@ -0,0 +1,135 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package router + +import ( + "context" + "fmt" + + "github.com/tochemey/goakt/v2/actors" + "github.com/tochemey/goakt/v2/goaktpb" +) + +// Broadcaster defines a broadcast router. This is a pool router +// When the router receives a message to broadcast, every routee is checked whether alive or not. +// When a routee is not alive the router removes it from its set of routeesMap. +// When the last routee stops the router itself stops. +type Broadcaster struct { + // list of routeesMap + routeesMap map[string]actors.PID + routees []actors.Actor +} + +// enforce compilation error +var _ actors.Actor = (*Broadcaster)(nil) + +// NewBroadcaster creates an instance of Broadcaster router +// routeeRefs can be of different types as long as they can handle the router broadcast message +func NewBroadcaster(routees ...actors.Actor) *Broadcaster { + // create the router instance + router := &Broadcaster{ + routeesMap: make(map[string]actors.PID, len(routees)), + routees: routees, + } + return router +} + +// PreStart pre-starts the actor. +func (x *Broadcaster) PreStart(context.Context) error { + return nil +} + +// Receive handles messages sent to the Broadcaster router +func (x *Broadcaster) Receive(ctx actors.ReceiveContext) { + message := ctx.Message() + switch message.(type) { + case *goaktpb.PostStart: + x.postStart(ctx) + default: + ctx.Unhandled() + } +} + +// PostStop is executed when the actor is shutting down. +func (x *Broadcaster) PostStop(context.Context) error { + return nil +} + +// postStart spawns routeesMap +func (x *Broadcaster) postStart(ctx actors.ReceiveContext) { + for index, routee := range x.routees { + name := fmt.Sprintf("routee-%s-%d", ctx.Self().Name(), index) + routee := ctx.Spawn(name, routee) + x.routeesMap[routee.ID()] = routee + } + + ctx.Become(x.broadcast) +} + +// broadcast send message to all the routeesMap +func (x *Broadcaster) broadcast(ctx actors.ReceiveContext) { + var message *goaktpb.Broadcast + switch msg := ctx.Message().(type) { + case *goaktpb.Broadcast: + message = msg + case *goaktpb.Terminated: + delete(x.routeesMap, msg.GetActorId()) + return + default: + ctx.Unhandled() + return + } + + if !x.canProceed() { + // push message to deadletters + ctx.Unhandled() + // shutdown + ctx.Shutdown() + return + } + + msg, err := message.GetMessage().UnmarshalNew() + if err != nil { + ctx.Err(err) + return + } + + for _, routee := range x.routeesMap { + routee := routee + go func(pid actors.PID) { + ctx.Tell(pid, msg) + }(routee) + } +} + +// canProceed check whether there are available routeesMap to proceed +func (x *Broadcaster) canProceed() bool { + for _, routee := range x.routeesMap { + if !routee.IsRunning() { + delete(x.routeesMap, routee.ID()) + } + } + return len(x.routeesMap) > 0 +} diff --git a/router/broadcast_test.go b/router/broadcast_test.go new file mode 100644 index 00000000..c5c8c88d --- /dev/null +++ b/router/broadcast_test.go @@ -0,0 +1,153 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package router + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/tochemey/goakt/v2/actors" + "github.com/tochemey/goakt/v2/goaktpb" + "github.com/tochemey/goakt/v2/log" + "github.com/tochemey/goakt/v2/test/data/testpb" +) + +func TestBroadcast(t *testing.T) { + t.Run("With happy path", func(t *testing.T) { + ctx := context.TODO() + logger := log.DefaultLogger + system, err := actors.NewActorSystem( + "testSystem", + actors.WithPassivationDisabled(), + actors.WithLogger(logger), + actors.WithReplyTimeout(time.Minute)) + + require.NoError(t, err) + require.NotNil(t, system) + + require.NoError(t, system.Start(ctx)) + + time.Sleep(time.Second) + + // create a broadcast router with two routeeRefs + broadcaster := NewBroadcaster(newWorker(), newWorker()) + + router, err := system.Spawn(ctx, "worker-pool", broadcaster) + require.NoError(t, err) + require.NotNil(t, router) + + time.Sleep(time.Second) + + // send a broadcast message to the router + message, _ := anypb.New(&testpb.DoLog{Text: "msg"}) + err = actors.Tell(ctx, router, &goaktpb.Broadcast{Message: message}) + require.NoError(t, err) + + time.Sleep(time.Second) + + // this is just for tests purpose + workerOneName := fmt.Sprintf("routee-%s-%d", router.Name(), 0) + workerTwoName := fmt.Sprintf("routee-%s-%d", router.Name(), 1) + + workerOneRef, err := system.LocalActor(workerOneName) + require.NoError(t, err) + require.NotNil(t, workerOneRef) + + workerTwoRef, err := system.LocalActor(workerTwoName) + require.NoError(t, err) + require.NotNil(t, workerTwoRef) + + expected := &testpb.Count{Value: 2} + + reply, err := actors.Ask(ctx, workerOneRef, new(testpb.GetCount), time.Minute) + require.NoError(t, err) + require.NotNil(t, reply) + assert.True(t, proto.Equal(expected, reply)) + + reply, err = actors.Ask(ctx, workerTwoRef, new(testpb.GetCount), time.Minute) + require.NoError(t, err) + require.NotNil(t, reply) + assert.True(t, proto.Equal(expected, reply)) + + t.Cleanup(func() { + assert.NoError(t, system.Stop(ctx)) + }) + }) + t.Run("With no available routees router shuts down", func(t *testing.T) { + ctx := context.TODO() + logger := log.DefaultLogger + system, err := actors.NewActorSystem( + "testSystem", + actors.WithPassivationDisabled(), + actors.WithLogger(logger), + actors.WithReplyTimeout(time.Minute)) + + require.NoError(t, err) + require.NotNil(t, system) + + require.NoError(t, system.Start(ctx)) + + time.Sleep(time.Second) + + // create a broadcast router with two routeeRefs + broadcaster := NewBroadcaster(newWorker(), newWorker()) + + router, err := system.Spawn(ctx, "worker-pool", broadcaster) + require.NoError(t, err) + require.NotNil(t, router) + + time.Sleep(time.Second) + + // this is just for tests purpose + workerOneName := fmt.Sprintf("routee-%s-%d", router.Name(), 0) + workerTwoName := fmt.Sprintf("routee-%s-%d", router.Name(), 1) + + require.NoError(t, system.Kill(ctx, workerOneName)) + require.NoError(t, system.Kill(ctx, workerTwoName)) + + // send a broadcast message to the router + message, _ := anypb.New(&testpb.DoLog{Text: "msg"}) + err = actors.Tell(ctx, router, &goaktpb.Broadcast{Message: message}) + require.NoError(t, err) + + time.Sleep(time.Second) + + ref, err := system.LocalActor("worker-pool") + require.Error(t, err) + require.Nil(t, ref) + assert.EqualError(t, err, actors.ErrActorNotFound("worker-pool").Error()) + + t.Cleanup(func() { + assert.NoError(t, system.Stop(ctx)) + }) + }) +} diff --git a/router/random.go b/router/random.go new file mode 100644 index 00000000..202ce9d1 --- /dev/null +++ b/router/random.go @@ -0,0 +1,125 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package router + +import ( + "context" + "fmt" + "math/rand" + "time" + + "github.com/tochemey/goakt/v2/actors" + "github.com/tochemey/goakt/v2/goaktpb" +) + +// Random defines a Random router. +// The router selects a routee at random when a message is sent through the router. +type Random struct { + // list of routeeRefs + routeesMap map[string]actors.PID + routees []actors.Actor + rnd *rand.Rand +} + +// enforce compilation error +var _ actors.Actor = (*Random)(nil) + +// NewRandom creates an instance of Random router +func NewRandom(routees ...actors.Actor) *Random { + // create the router instance + router := &Random{ + routeesMap: make(map[string]actors.PID, len(routees)), + routees: routees, + } + return router +} + +// PreStart pre-starts the actor. +func (x *Random) PreStart(context.Context) error { + x.rnd = rand.New(rand.NewSource(time.Now().UTC().UnixNano())) //nolint:gosec + return nil +} + +// Receive handles messages sent to the Random router +func (x *Random) Receive(ctx actors.ReceiveContext) { + message := ctx.Message() + switch message.(type) { + case *goaktpb.PostStart: + x.postStart(ctx) + default: + ctx.Unhandled() + } +} + +// PostStop is executed when the actor is shutting down. +func (x *Random) PostStop(context.Context) error { + return nil +} + +// postStart spawns routeeRefs +func (x *Random) postStart(ctx actors.ReceiveContext) { + for index, routee := range x.routees { + name := fmt.Sprintf("routee-%s-%d", ctx.Self().Name(), index) + routee := ctx.Spawn(name, routee) + x.routeesMap[routee.ID()] = routee + } + ctx.Become(x.broadcast) +} + +// broadcast send message to all the routeeRefs +func (x *Random) broadcast(ctx actors.ReceiveContext) { + var message *goaktpb.Broadcast + switch msg := ctx.Message().(type) { + case *goaktpb.Broadcast: + message = msg + case *goaktpb.Terminated: + delete(x.routeesMap, msg.GetActorId()) + return + default: + ctx.Unhandled() + return + } + + routees := make([]actors.PID, 0, len(x.routeesMap)) + for _, routee := range x.routeesMap { + if routee.IsRunning() { + routees = append(routees, routee) + } + } + + if len(routees) == 0 { + // push message to deadletters + ctx.Unhandled() + return + } + + msg, err := message.GetMessage().UnmarshalNew() + if err != nil { + ctx.Err(err) + } + + routee := routees[x.rnd.Intn(len(routees))] + ctx.Tell(routee, msg) +} diff --git a/router/random_test.go b/router/random_test.go new file mode 100644 index 00000000..da45fab0 --- /dev/null +++ b/router/random_test.go @@ -0,0 +1,157 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package router + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/tochemey/goakt/v2/actors" + "github.com/tochemey/goakt/v2/goaktpb" + "github.com/tochemey/goakt/v2/log" + "github.com/tochemey/goakt/v2/test/data/testpb" +) + +func TestRandom(t *testing.T) { + t.Run("With happy path", func(t *testing.T) { + ctx := context.TODO() + logger := log.DefaultLogger + system, err := actors.NewActorSystem( + "testSystem", + actors.WithPassivationDisabled(), + actors.WithLogger(logger), + actors.WithReplyTimeout(time.Minute)) + + require.NoError(t, err) + require.NotNil(t, system) + + require.NoError(t, system.Start(ctx)) + + time.Sleep(time.Second) + + // create a random router with one routee + // this is for the purpose of testing to make sure given routee does receive the + // message sent + random := NewRandom(newWorker()) + + router, err := system.Spawn(ctx, "worker-pool", random) + require.NoError(t, err) + require.NotNil(t, router) + + time.Sleep(time.Second) + + // send a broadcast message to the router + message, _ := anypb.New(&testpb.DoLog{Text: "msg"}) + err = actors.Tell(ctx, router, &goaktpb.Broadcast{Message: message}) + require.NoError(t, err) + + time.Sleep(time.Second) + + // this is just for tests purpose + workerName := fmt.Sprintf("routee-%s-%d", router.Name(), 0) + + workerOneRef, err := system.LocalActor(workerName) + require.NoError(t, err) + require.NotNil(t, workerOneRef) + + expected := &testpb.Count{Value: 2} + + reply, err := actors.Ask(ctx, workerOneRef, new(testpb.GetCount), time.Minute) + require.NoError(t, err) + require.NotNil(t, reply) + assert.True(t, proto.Equal(expected, reply)) + + t.Cleanup(func() { + assert.NoError(t, system.Stop(ctx)) + }) + }) + t.Run("With no available routees router is alive and message in deadletter", func(t *testing.T) { + ctx := context.TODO() + logger := log.DefaultLogger + system, err := actors.NewActorSystem( + "testSystem", + actors.WithPassivationDisabled(), + actors.WithLogger(logger), + actors.WithReplyTimeout(time.Minute)) + + require.NoError(t, err) + require.NotNil(t, system) + require.NoError(t, system.Start(ctx)) + + time.Sleep(time.Second) + + // create a deadletter subscriber + consumer, err := system.Subscribe() + require.NoError(t, err) + require.NotNil(t, consumer) + + // create a random router with one routee + // this is for the purpose of testing to make sure given routee does receive the + // message sent + random := NewRandom(newWorker()) + + router, err := system.Spawn(ctx, "worker-pool", random) + require.NoError(t, err) + require.NotNil(t, router) + + time.Sleep(time.Second) + + // this is just for tests purpose + workerName := fmt.Sprintf("routee-%s-%d", router.Name(), 0) + err = system.Kill(ctx, workerName) + require.NoError(t, err) + + // send a broadcast message to the router + message, _ := anypb.New(&testpb.DoLog{Text: "msg"}) + err = actors.Tell(ctx, router, &goaktpb.Broadcast{Message: message}) + require.NoError(t, err) + + time.Sleep(time.Second) + assert.True(t, router.IsRunning()) + + var items []*goaktpb.Deadletter + for message := range consumer.Iterator() { + payload := message.Payload() + // only listening to deadletters + deadletter, ok := payload.(*goaktpb.Deadletter) + if ok { + items = append(items, deadletter) + } + } + + require.Len(t, items, 1) + + t.Cleanup(func() { + assert.NoError(t, system.Stop(ctx)) + }) + }) +} diff --git a/router/round_robin.go b/router/round_robin.go new file mode 100644 index 00000000..dee27d87 --- /dev/null +++ b/router/round_robin.go @@ -0,0 +1,124 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package router + +import ( + "context" + "fmt" + "sync/atomic" + + "github.com/tochemey/goakt/v2/actors" + "github.com/tochemey/goakt/v2/goaktpb" +) + +// RoundRobin defines a Random router. +// It rotates over the set of routeesMap making sure that if there are n routeesMap, +// then for n messages sent through the router, each actor is forwarded one message. +type RoundRobin struct { + // list of routees + // list of routeeRefs + routeesMap map[string]actors.PID + routees []actors.Actor + next uint32 +} + +// enforce compilation error +var _ actors.Actor = (*RoundRobin)(nil) + +// NewRoundRobin creates an instance of RoundRobin router +func NewRoundRobin(routees ...actors.Actor) *RoundRobin { + // create the router instance + router := &RoundRobin{ + routeesMap: make(map[string]actors.PID, len(routees)), + routees: routees, + } + return router +} + +// PreStart pre-starts the actor. +func (x *RoundRobin) PreStart(context.Context) error { + return nil +} + +// Receive handles messages sent to the Random router +func (x *RoundRobin) Receive(ctx actors.ReceiveContext) { + message := ctx.Message() + switch message.(type) { + case *goaktpb.PostStart: + x.postStart(ctx) + default: + ctx.Unhandled() + } +} + +// PostStop is executed when the actor is shutting down. +func (x *RoundRobin) PostStop(context.Context) error { + return nil +} + +// postStart spawns routees +func (x *RoundRobin) postStart(ctx actors.ReceiveContext) { + for index, routee := range x.routees { + name := fmt.Sprintf("routee-%s-%d", ctx.Self().Name(), index) + routee := ctx.Spawn(name, routee) + x.routeesMap[routee.ID()] = routee + } + ctx.Become(x.broadcast) +} + +// broadcast send message to all the routees +func (x *RoundRobin) broadcast(ctx actors.ReceiveContext) { + var message *goaktpb.Broadcast + switch msg := ctx.Message().(type) { + case *goaktpb.Broadcast: + message = msg + case *goaktpb.Terminated: + delete(x.routeesMap, msg.GetActorId()) + return + default: + ctx.Unhandled() + return + } + + routees := make([]actors.PID, 0, len(x.routeesMap)) + for _, routee := range x.routeesMap { + if routee.IsRunning() { + routees = append(routees, routee) + } + } + + if len(routees) == 0 { + // push message to deadletters + ctx.Unhandled() + } + + msg, err := message.GetMessage().UnmarshalNew() + if err != nil { + ctx.Err(err) + } + n := atomic.AddUint32(&x.next, 1) + routee := routees[(int(n)-1)%len(routees)] + ctx.Tell(routee, msg) +} diff --git a/router/round_robin_test.go b/router/round_robin_test.go new file mode 100644 index 00000000..a13ae4fe --- /dev/null +++ b/router/round_robin_test.go @@ -0,0 +1,157 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package router + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/tochemey/goakt/v2/actors" + "github.com/tochemey/goakt/v2/goaktpb" + "github.com/tochemey/goakt/v2/log" + "github.com/tochemey/goakt/v2/test/data/testpb" +) + +func TestRoundRobin(t *testing.T) { + t.Run("With happy path", func(t *testing.T) { + ctx := context.TODO() + logger := log.DefaultLogger + system, err := actors.NewActorSystem( + "testSystem", + actors.WithPassivationDisabled(), + actors.WithLogger(logger), + actors.WithReplyTimeout(time.Minute)) + + require.NoError(t, err) + require.NotNil(t, system) + + require.NoError(t, system.Start(ctx)) + + time.Sleep(time.Second) + + // create a roundrobin router with one routee + // this is for the purpose of testing to make sure given routee does receive the + // message sent + roundrobin := NewRoundRobin(newWorker()) + + router, err := system.Spawn(ctx, "worker-pool", roundrobin) + require.NoError(t, err) + require.NotNil(t, router) + + time.Sleep(time.Second) + + // send a broadcast message to the router + message, _ := anypb.New(&testpb.DoLog{Text: "msg"}) + err = actors.Tell(ctx, router, &goaktpb.Broadcast{Message: message}) + require.NoError(t, err) + + time.Sleep(time.Second) + + // this is just for tests purpose + workerName := fmt.Sprintf("routee-%s-%d", router.Name(), 0) + + workerOneRef, err := system.LocalActor(workerName) + require.NoError(t, err) + require.NotNil(t, workerOneRef) + + expected := &testpb.Count{Value: 2} + + reply, err := actors.Ask(ctx, workerOneRef, new(testpb.GetCount), time.Minute) + require.NoError(t, err) + require.NotNil(t, reply) + assert.True(t, proto.Equal(expected, reply)) + + t.Cleanup(func() { + assert.NoError(t, system.Stop(ctx)) + }) + }) + t.Run("With no available routees router is alive and message in deadletter", func(t *testing.T) { + ctx := context.TODO() + logger := log.DefaultLogger + system, err := actors.NewActorSystem( + "testSystem", + actors.WithPassivationDisabled(), + actors.WithLogger(logger), + actors.WithReplyTimeout(time.Minute)) + + require.NoError(t, err) + require.NotNil(t, system) + require.NoError(t, system.Start(ctx)) + + time.Sleep(time.Second) + + // create a deadletter subscriber + consumer, err := system.Subscribe() + require.NoError(t, err) + require.NotNil(t, consumer) + + // create a roundRobin router with one routee + // this is for the purpose of testing to make sure given routee does receive the + // message sent + roundRobin := NewRoundRobin(newWorker()) + + router, err := system.Spawn(ctx, "worker-pool", roundRobin) + require.NoError(t, err) + require.NotNil(t, router) + + time.Sleep(time.Second) + + // this is just for tests purpose + workerName := fmt.Sprintf("routee-%s-%d", router.Name(), 0) + err = system.Kill(ctx, workerName) + require.NoError(t, err) + + // send a broadcast message to the router + message, _ := anypb.New(&testpb.DoLog{Text: "msg"}) + err = actors.Tell(ctx, router, &goaktpb.Broadcast{Message: message}) + require.NoError(t, err) + + time.Sleep(time.Second) + assert.True(t, router.IsRunning()) + + var items []*goaktpb.Deadletter + for message := range consumer.Iterator() { + payload := message.Payload() + // only listening to deadletters + deadletter, ok := payload.(*goaktpb.Deadletter) + if ok { + items = append(items, deadletter) + } + } + + require.Len(t, items, 1) + + t.Cleanup(func() { + assert.NoError(t, system.Stop(ctx)) + }) + }) +} diff --git a/router/router.md b/router/router.md new file mode 100644 index 00000000..21ad1e17 --- /dev/null +++ b/router/router.md @@ -0,0 +1,23 @@ +# Routers + +Routers help send the same type of message to a set of actors to be processed in parallel depending upon +the type of the router used. Routers should be used with caution because they can hinder performance +Go-Akt comes shipped with the following routers: + +## Broadcast + +This router broadcasts the given message to all its available routees in parallel. +When the router receives a message to broadcast, every routee is checked whether alive or not. +When a routee is not alive the router removes it from its set of routees. +When the last routee stops the router itself stops. + +## Random + +This type of router randomly pick a routee in its set of routees and send the message to it. +Only routees that are alive are considered when the router receives a message. + +## Round Robin + +This type of router rotates over the set of routees making sure that if there are n routees. +Only routees that are alive are considered when the router receives a message. +For n messages sent through the router, each actor is forwarded one message. \ No newline at end of file diff --git a/router/router_test.go b/router/router_test.go new file mode 100644 index 00000000..0297c67f --- /dev/null +++ b/router/router_test.go @@ -0,0 +1,66 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package router + +import ( + "context" + + "github.com/tochemey/goakt/v2/actors" + "github.com/tochemey/goakt/v2/log" + "github.com/tochemey/goakt/v2/test/data/testpb" +) + +type worker struct { + counter int + logger log.Logger +} + +var _ actors.Actor = (*worker)(nil) + +func newWorker() *worker { + return &worker{} +} + +func (x *worker) PreStart(context.Context) error { + x.logger = log.DefaultLogger + return nil +} + +func (x *worker) Receive(ctx actors.ReceiveContext) { + switch msg := ctx.Message().(type) { + case *testpb.DoLog: + x.counter++ + x.logger.Infof("Got message: %s", msg.GetText()) + case *testpb.GetCount: + x.counter++ + ctx.Response(&testpb.Count{Value: int32(x.counter)}) + default: + ctx.Unhandled() + } +} + +func (x *worker) PostStop(context.Context) error { + return nil +} diff --git a/router/routing.go b/router/routing.go new file mode 100644 index 00000000..dff85121 --- /dev/null +++ b/router/routing.go @@ -0,0 +1,41 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package router + +// RoutingStrategy defines the routing strategy +type RoutingStrategy int + +const ( + // RoundRobinStrategy rotates over the set of routeesMap making sure that if there are n routeesMap, + // then for n messages sent through the router, each actor is forwarded one message. + RoundRobinStrategy RoutingStrategy = iota + // RandomStrategy selects a routee when a message is sent through the router. + RandomStrategy + // ConsistentHashingStrategy uses consistent hashing to select a routee based on the sent message. + // Consistent hashing delivers messages with the same hash to the same routee as long as the set of routeesMap stays the same. + // When the set of routeesMap changes, consistent hashing tries to make sure, + // but does not guarantee, that messages with the same hash are routed to the same routee. + ConsistentHashingStrategy +) diff --git a/test/data/testpb/test.pb.go b/test/data/testpb/test.pb.go index 995348d4..3b3602a8 100644 --- a/test/data/testpb/test.pb.go +++ b/test/data/testpb/test.pb.go @@ -1112,6 +1112,138 @@ func (*TaskComplete) Descriptor() ([]byte, []int) { return file_test_test_proto_rawDescGZIP(), []int{25} } +type DoLog struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Text string `protobuf:"bytes,1,opt,name=text,proto3" json:"text,omitempty"` +} + +func (x *DoLog) Reset() { + *x = DoLog{} + if protoimpl.UnsafeEnabled { + mi := &file_test_test_proto_msgTypes[26] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DoLog) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DoLog) ProtoMessage() {} + +func (x *DoLog) ProtoReflect() protoreflect.Message { + mi := &file_test_test_proto_msgTypes[26] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DoLog.ProtoReflect.Descriptor instead. +func (*DoLog) Descriptor() ([]byte, []int) { + return file_test_test_proto_rawDescGZIP(), []int{26} +} + +func (x *DoLog) GetText() string { + if x != nil { + return x.Text + } + return "" +} + +type GetCount struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *GetCount) Reset() { + *x = GetCount{} + if protoimpl.UnsafeEnabled { + mi := &file_test_test_proto_msgTypes[27] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetCount) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetCount) ProtoMessage() {} + +func (x *GetCount) ProtoReflect() protoreflect.Message { + mi := &file_test_test_proto_msgTypes[27] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetCount.ProtoReflect.Descriptor instead. +func (*GetCount) Descriptor() ([]byte, []int) { + return file_test_test_proto_rawDescGZIP(), []int{27} +} + +type Count struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Value int32 `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"` +} + +func (x *Count) Reset() { + *x = Count{} + if protoimpl.UnsafeEnabled { + mi := &file_test_test_proto_msgTypes[28] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Count) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Count) ProtoMessage() {} + +func (x *Count) ProtoReflect() protoreflect.Message { + mi := &file_test_test_proto_msgTypes[28] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Count.ProtoReflect.Descriptor instead. +func (*Count) Descriptor() ([]byte, []int) { + return file_test_test_proto_rawDescGZIP(), []int{28} +} + +func (x *Count) GetValue() int32 { + if x != nil { + return x.Value + } + return 0 +} + var File_test_test_proto protoreflect.FileDescriptor var file_test_test_proto_rawDesc = []byte{ @@ -1164,15 +1296,20 @@ var file_test_test_proto_rawDesc = []byte{ 0x07, 0x52, 0x75, 0x6e, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x0e, 0x0a, 0x0c, 0x54, 0x61, 0x73, 0x6b, 0x43, 0x6f, 0x6d, 0x70, - 0x6c, 0x65, 0x74, 0x65, 0x42, 0x80, 0x01, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x2e, 0x74, 0x65, 0x73, - 0x74, 0x70, 0x62, 0x42, 0x09, 0x54, 0x65, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x48, 0x02, - 0x50, 0x01, 0x5a, 0x2d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x74, - 0x6f, 0x63, 0x68, 0x65, 0x6d, 0x65, 0x79, 0x2f, 0x67, 0x6f, 0x61, 0x6b, 0x74, 0x2f, 0x76, 0x32, - 0x2f, 0x74, 0x65, 0x73, 0x74, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x3b, 0x74, 0x65, 0x73, 0x74, 0x70, - 0x62, 0xa2, 0x02, 0x03, 0x54, 0x58, 0x58, 0xaa, 0x02, 0x06, 0x54, 0x65, 0x73, 0x74, 0x70, 0x62, - 0xca, 0x02, 0x06, 0x54, 0x65, 0x73, 0x74, 0x70, 0x62, 0xe2, 0x02, 0x12, 0x54, 0x65, 0x73, 0x74, - 0x70, 0x62, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, - 0x06, 0x54, 0x65, 0x73, 0x74, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6c, 0x65, 0x74, 0x65, 0x22, 0x1b, 0x0a, 0x05, 0x44, 0x6f, 0x4c, 0x6f, 0x67, 0x12, 0x12, 0x0a, + 0x04, 0x74, 0x65, 0x78, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, + 0x74, 0x22, 0x0a, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x1d, 0x0a, + 0x05, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x80, 0x01, 0x0a, + 0x0a, 0x63, 0x6f, 0x6d, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x62, 0x42, 0x09, 0x54, 0x65, 0x73, + 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x48, 0x02, 0x50, 0x01, 0x5a, 0x2d, 0x67, 0x69, 0x74, 0x68, + 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x74, 0x6f, 0x63, 0x68, 0x65, 0x6d, 0x65, 0x79, 0x2f, + 0x67, 0x6f, 0x61, 0x6b, 0x74, 0x2f, 0x76, 0x32, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x2f, 0x64, 0x61, + 0x74, 0x61, 0x3b, 0x74, 0x65, 0x73, 0x74, 0x70, 0x62, 0xa2, 0x02, 0x03, 0x54, 0x58, 0x58, 0xaa, + 0x02, 0x06, 0x54, 0x65, 0x73, 0x74, 0x70, 0x62, 0xca, 0x02, 0x06, 0x54, 0x65, 0x73, 0x74, 0x70, + 0x62, 0xe2, 0x02, 0x12, 0x54, 0x65, 0x73, 0x74, 0x70, 0x62, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, + 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x06, 0x54, 0x65, 0x73, 0x74, 0x70, 0x62, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1187,7 +1324,7 @@ func file_test_test_proto_rawDescGZIP() []byte { return file_test_test_proto_rawDescData } -var file_test_test_proto_msgTypes = make([]protoimpl.MessageInfo, 26) +var file_test_test_proto_msgTypes = make([]protoimpl.MessageInfo, 29) var file_test_test_proto_goTypes = []interface{}{ (*TestReply)(nil), // 0: testpb.TestReply (*TestPanic)(nil), // 1: testpb.TestPanic @@ -1215,6 +1352,9 @@ var file_test_test_proto_goTypes = []interface{}{ (*Wait)(nil), // 23: testpb.Wait (*RunTask)(nil), // 24: testpb.RunTask (*TaskComplete)(nil), // 25: testpb.TaskComplete + (*DoLog)(nil), // 26: testpb.DoLog + (*GetCount)(nil), // 27: testpb.GetCount + (*Count)(nil), // 28: testpb.Count } var file_test_test_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for method output_type @@ -1542,6 +1682,42 @@ func file_test_test_proto_init() { return nil } } + file_test_test_proto_msgTypes[26].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DoLog); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_test_test_proto_msgTypes[27].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetCount); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_test_test_proto_msgTypes[28].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Count); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -1549,7 +1725,7 @@ func file_test_test_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_test_test_proto_rawDesc, NumEnums: 0, - NumMessages: 26, + NumMessages: 29, NumExtensions: 0, NumServices: 0, },