Skip to content

Commit

Permalink
refactor: rename some method for self documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Aug 13, 2024
1 parent fddaedf commit b390fac
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 286 deletions.
8 changes: 4 additions & 4 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
}

system := &actorSystem{
actors: newPIDMap(1_000), // TODO need to check with memory footprint here since we change the map engine
actors: newPIDMap(1_000),
actorsChan: make(chan *internalpb.WireActor, 10),
name: name,
logger: log.DefaultLogger,
Expand Down Expand Up @@ -1214,7 +1214,7 @@ func (x *actorSystem) enableClustering(ctx context.Context) error {
x.locker.Unlock()

go x.clusterEventsLoop()
go x.clusterReplicationLoop()
go x.replicationLoop()
go x.peersStateLoop()
go x.redistributionLoop()

Expand Down Expand Up @@ -1359,8 +1359,8 @@ func (x *actorSystem) registerMetrics() error {
return err
}

// clusterReplicationLoop publishes newly created actor into the cluster when cluster is enabled
func (x *actorSystem) clusterReplicationLoop() {
// replicationLoop publishes newly created actor into the cluster when cluster is enabled
func (x *actorSystem) replicationLoop() {
for actor := range x.actorsChan {
if x.InCluster() {
ctx := context.Background()
Expand Down
34 changes: 17 additions & 17 deletions actors/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,15 @@ type ReceiveContext interface {
}

type receiveContext struct {
ctx context.Context
message proto.Message
sender PID
remoteSender *goaktpb.Address
response chan proto.Message
recipient PID
mu sync.Mutex
isAsyncMessage bool
sendTime atomic.Time
ctx context.Context
message proto.Message
sender PID
remoteSender *goaktpb.Address
response chan proto.Message
recipient PID
mu sync.Mutex
async bool
sendTime atomic.Time
}

// force compilation error
Expand All @@ -167,7 +167,7 @@ func newReceiveContext(ctx context.Context, from, to PID, message proto.Message,
context.sender = from
context.recipient = to
context.message = message
context.isAsyncMessage = async
context.async = async
context.mu = sync.Mutex{}
context.response = make(chan proto.Message, 1)
context.sendTime.Store(time.Now())
Expand Down Expand Up @@ -204,7 +204,7 @@ func (c *receiveContext) Response(resp proto.Message) {
c.mu.Lock()
defer c.mu.Unlock()
// only set a response when the message is sync message
if !c.isAsyncMessage {
if !c.async {
defer close(c.response)
c.response <- resp
}
Expand Down Expand Up @@ -497,12 +497,12 @@ func (c *receiveContext) Forward(to PID) {
if to.IsRunning() {
ctx := context.WithoutCancel(c.ctx)
receiveContext := &receiveContext{
ctx: ctx,
message: message,
sender: sender,
recipient: to,
mu: sync.Mutex{},
isAsyncMessage: false,
ctx: ctx,
message: message,
sender: sender,
recipient: to,
mu: sync.Mutex{},
async: false,
}
to.doReceive(receiveContext)
}
Expand Down
Loading

0 comments on commit b390fac

Please sign in to comment.