Skip to content

Commit

Permalink
feat: add logger to PID for accessibility within actors
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Sep 6, 2024
1 parent bddc103 commit d168d34
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 17 deletions.
36 changes: 36 additions & 0 deletions actors/actor_test.go → actors/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ package actors

import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -536,3 +538,37 @@ func pause(duration time.Duration) {
<-stopCh
timer.Stop()
}

func extractMessage(bytes []byte) (string, error) {
// a map container to decode the JSON structure into
c := make(map[string]json.RawMessage)

// unmarshal JSON
if err := json.Unmarshal(bytes, &c); err != nil {
return "", err
}
for k, v := range c {
if k == "msg" {
return strconv.Unquote(string(v))
}
}

return "", nil
}

func extractLevel(bytes []byte) (string, error) {
// a map container to decode the JSON structure into
c := make(map[string]json.RawMessage)

// unmarshal JSON
if err := json.Unmarshal(bytes, &c); err != nil {
return "", err
}
for k, v := range c {
if k == "level" {
return strconv.Unquote(string(v))
}
}

return "", nil
}
8 changes: 8 additions & 0 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,14 @@ func (pid *PID) UnWatch(cid *PID) {
}
}

// Logger returns the logger sets when creating the PID
func (pid *PID) Logger() log.Logger {
pid.fieldsLocker.Lock()
logger := pid.logger
pid.fieldsLocker.Unlock()
return logger
}

// watchers return the list of watchersList
func (pid *PID) watchers() *slice.Slice[*watcher] {
return pid.watchersList
Expand Down
33 changes: 33 additions & 0 deletions actors/pid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package actors

import (
"bytes"
"context"
"sort"
"sync"
Expand Down Expand Up @@ -2582,3 +2583,35 @@ func TestNewPID(t *testing.T) {
assert.Nil(t, pid)
})
}
func TestLogger(t *testing.T) {
ctx := context.TODO()
buffer := new(bytes.Buffer)
buffer.Reset()

actorPath := NewPath("Test", NewAddress("sys", "host", 1))
pid, err := newPID(
ctx,
actorPath,
&exchanger{},
withInitMaxRetries(1),
withCustomLogger(log.New(log.InfoLevel, buffer)),
withAskTimeout(replyTimeout))

require.NoError(t, err)
require.NotNil(t, pid)

buffer.Reset()

pid.Logger().Info("test debug")
actual, err := extractMessage(buffer.Bytes())
require.NoError(t, err)

expected := "test debug"
require.Equal(t, expected, actual)

t.Cleanup(func() {
// reset the buffer
buffer.Reset()
assert.NoError(t, pid.Shutdown(ctx))
})
}
34 changes: 17 additions & 17 deletions actors/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestScheduler(t *testing.T) {
err = newActorSystem.Start(ctx)
assert.NoError(t, err)

time.Sleep(time.Second)
pause(time.Second)

// create a test actor
actorName := "test"
Expand All @@ -63,14 +63,14 @@ func TestScheduler(t *testing.T) {
require.NoError(t, err)
assert.NotNil(t, actorRef)

time.Sleep(time.Second)
pause(time.Second)

// send a message to the actor after 100 ms
message := new(testpb.TestSend)
err = newActorSystem.ScheduleOnce(ctx, message, actorRef, 100*time.Millisecond)
require.NoError(t, err)

time.Sleep(time.Second)
pause(time.Second)
typedSystem := newActorSystem.(*actorSystem)
keys, err := typedSystem.scheduler.quartzScheduler.GetJobKeys()
require.NoError(t, err)
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestScheduler(t *testing.T) {
err = newActorSystem.Start(ctx)
assert.NoError(t, err)

time.Sleep(time.Second)
pause(time.Second)

// create a test actor
actorName := "test"
Expand All @@ -149,7 +149,7 @@ func TestScheduler(t *testing.T) {
err = newActorSystem.RemoteScheduleOnce(ctx, message, addr, 100*time.Millisecond)
require.NoError(t, err)

time.Sleep(time.Second)
pause(time.Second)
typedSystem := newActorSystem.(*actorSystem)
// for test purpose only
keys, err := typedSystem.scheduler.quartzScheduler.GetJobKeys()
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestScheduler(t *testing.T) {
err = newActorSystem.Start(ctx)
assert.NoError(t, err)

time.Sleep(time.Second)
pause(time.Second)

// test purpose only
typedSystem := newActorSystem.(*actorSystem)
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestScheduler(t *testing.T) {
err = newActorSystem.Start(ctx)
assert.NoError(t, err)

time.Sleep(time.Second)
pause(time.Second)

// create a test actor
actorName := "test"
Expand All @@ -237,7 +237,7 @@ func TestScheduler(t *testing.T) {
require.NoError(t, err)
assert.NotNil(t, actorRef)

time.Sleep(time.Second)
pause(time.Second)

// send a message to the actor after 100 ms
message := new(testpb.TestSend)
Expand All @@ -247,7 +247,7 @@ func TestScheduler(t *testing.T) {
require.NoError(t, err)

// wait for two seconds
time.Sleep(2 * time.Second)
pause(2 * time.Second)
assert.EqualValues(t, 2, actor.counter.Load())

// stop the actor
Expand All @@ -270,7 +270,7 @@ func TestScheduler(t *testing.T) {
err = newActorSystem.Start(ctx)
assert.NoError(t, err)

time.Sleep(time.Second)
pause(time.Second)

// create a test actor
actorName := "test"
Expand All @@ -279,7 +279,7 @@ func TestScheduler(t *testing.T) {
require.NoError(t, err)
assert.NotNil(t, actorRef)

time.Sleep(time.Second)
pause(time.Second)

// send a message to the actor after 100 ms
message := new(testpb.TestSend)
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestScheduler(t *testing.T) {
err = newActorSystem.Start(ctx)
assert.NoError(t, err)

time.Sleep(time.Second)
pause(time.Second)

// test purpose only
typedSystem := newActorSystem.(*actorSystem)
Expand All @@ -321,7 +321,7 @@ func TestScheduler(t *testing.T) {
require.NoError(t, err)
assert.NotNil(t, actorRef)

time.Sleep(time.Second)
pause(time.Second)

// send a message to the actor after 100 ms
message := new(testpb.TestSend)
Expand Down Expand Up @@ -358,7 +358,7 @@ func TestScheduler(t *testing.T) {
err = newActorSystem.Start(ctx)
assert.NoError(t, err)

time.Sleep(time.Second)
pause(time.Second)

// create a test actor
actorName := "test"
Expand All @@ -379,7 +379,7 @@ func TestScheduler(t *testing.T) {
require.NoError(t, err)

// wait for two seconds
time.Sleep(2 * time.Second)
pause(2 * time.Second)
assert.EqualValues(t, 2, actor.counter.Load())

// stop the actor
Expand Down Expand Up @@ -409,7 +409,7 @@ func TestScheduler(t *testing.T) {
err = newActorSystem.Start(ctx)
assert.NoError(t, err)

time.Sleep(time.Second)
pause(time.Second)

// create a test actor
actorName := "test"
Expand Down Expand Up @@ -456,7 +456,7 @@ func TestScheduler(t *testing.T) {
err = newActorSystem.Start(ctx)
assert.NoError(t, err)

time.Sleep(time.Second)
pause(time.Second)
// test purpose only
typedSystem := newActorSystem.(*actorSystem)
typedSystem.scheduler.Stop(ctx)
Expand Down

0 comments on commit d168d34

Please sign in to comment.