Skip to content

Commit

Permalink
Merge pull request #431 from umitkavala/identitlookup-support
Browse files Browse the repository at this point in the history
Initial port
  • Loading branch information
rogeralsing authored Apr 30, 2021
2 parents a793745 + b221d48 commit 80db82e
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 4,251 deletions.
18 changes: 11 additions & 7 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Cluster struct {
ActorSystem *actor.ActorSystem
Config *Config
remote *remote.Remote
pidCache *pidCacheValue
PidCache *pidCacheValue
MemberList *MemberList
partitionValue *partitionValue
partitionManager *PartitionManager
Expand Down Expand Up @@ -58,7 +58,7 @@ func (c *Cluster) Start() {

// for each known kind, spin up a partition-kind actor to handle all requests for that kind
c.partitionValue = setupPartition(c, kinds)
c.pidCache = setupPidCache(c.ActorSystem)
c.PidCache = setupPidCache(c.ActorSystem)
c.MemberList = setupMemberList(c)
c.partitionManager = newPartitionManager(c)
c.partitionManager.Start()
Expand All @@ -81,7 +81,7 @@ func (c *Cluster) StartClient() {

// for each known kind, spin up a partition-kind actor to handle all requests for that kind
c.partitionValue = setupPartition(c, kinds)
c.pidCache = setupPidCache(c.ActorSystem)
c.PidCache = setupPidCache(c.ActorSystem)
c.MemberList = setupMemberList(c)
c.partitionManager = newPartitionManager(c)
c.partitionManager.Start()
Expand All @@ -97,7 +97,7 @@ func (c *Cluster) Shutdown(graceful bool) {
// This is to wait ownership transferring complete.
time.Sleep(time.Millisecond * 2000)
c.MemberList.stopMemberList()
c.pidCache.stopPidCache()
c.PidCache.stopPidCache()
c.partitionValue.stopPartition()
c.partitionManager.Stop()
}
Expand All @@ -111,8 +111,7 @@ func (c *Cluster) Shutdown(graceful bool) {
// Get a PID to a virtual actor
func (c *Cluster) GetV1(name string, kind string) (*actor.PID, remote.ResponseStatusCode) {
// Check Cache
clusterActorId := kind + "/" + name
if pid, ok := c.pidCache.getCache(clusterActorId); ok {
if pid, ok := c.PidCache.getCache(name); ok {
return pid, remote.ResponseStatusCodeOK
}

Expand Down Expand Up @@ -150,7 +149,7 @@ func (c *Cluster) GetV1(name string, kind string) (*actor.PID, remote.ResponseSt
switch statusCode {
case remote.ResponseStatusCodeOK:
// save cache
c.pidCache.addCache(clusterActorId, response.Pid)
c.PidCache.addCache(name, response.Pid)
// tell the original requester that we have a response
return response.Pid, statusCode
default:
Expand Down Expand Up @@ -226,6 +225,11 @@ func (c *Cluster) GetClusterKinds() []string {
return c.remote.GetKnownKinds()
}

// ToShortString Get kind & identity
func (ci *ClusterIdentity) ToShortString() string {
return ci.Kind + "/" + ci.Identity
}

// Call is a wrap of context.RequestFuture with retries.
func (c *Cluster) Call(name string, kind string, msg interface{}, callopts ...*GrainCallOptions) (interface{}, error) {
var _callopts *GrainCallOptions = nil
Expand Down
88 changes: 88 additions & 0 deletions cluster/identity/identity_lookup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package identity

import (
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/cluster"
)

// Lookup contains
type Lookup interface {
Get(clusterIdentity *cluster.ClusterIdentity) *actor.PID

RemovePid(pid *actor.PID)

Setup(cluster *cluster.Cluster, kinds []string, isClient bool)

Shutdown()
}

// StorageLookup contains
type StorageLookup interface {
TryGetExistingActivation(clusterIdentity *cluster.ClusterIdentity) *StoredActivation

TryAcquireLock(clusterIdentity *cluster.ClusterIdentity) *SpawnLock

WaitForActivation(clusterIdentity *cluster.ClusterIdentity) *StoredActivation

RemoveLock(spawnLock SpawnLock)

StoreActivation(memberID string, spawnLock *SpawnLock, pid *actor.PID)

RemoveActivation(pid *SpawnLock)

RemoveMemberId(memberID string)
}

// SpawnLock contains
type SpawnLock struct {
LockID string
ClusterIdentity *cluster.ClusterIdentity
}

func newSpawnLock(lockID string, clusterIdentity *cluster.ClusterIdentity) *SpawnLock {
this := &SpawnLock{
LockID: lockID,
ClusterIdentity: clusterIdentity,
}

return this
}

// StoredActivation contains
type StoredActivation struct {
Pid string
MemberID string
}

func newStoredActivation(pid string, memberID string) *StoredActivation {
this := &StoredActivation{
Pid: pid,
MemberID: memberID,
}

return this
}

// GetPid contains
type GetPid struct {
ClusterIdentity *cluster.ClusterIdentity
}

func newGetPid(clusterIdentity *cluster.ClusterIdentity) *GetPid {
this := &GetPid{
ClusterIdentity: clusterIdentity,
}
return this
}

// PidResult contains
type PidResult struct {
Pid *actor.PID
}

func newPidResult(p *actor.PID) *PidResult {
this := &PidResult{
Pid: p,
}
return this
}
67 changes: 67 additions & 0 deletions cluster/identity/identity_storage_lookup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package identity

import (
"time"

"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/cluster"
)

const (
placementActorName = "placement-activator"
pidClusterIdentityStartIndex = len(placementActorName) + 1
)

// IdentityStorageLookup contains
type IdentityStorageLookup struct {
Storage StorageLookup
cluster *cluster.Cluster
isClient bool
placementActor *actor.PID
system *actor.ActorSystem
router *actor.PID
memberID string
}

func newIdentityStorageLookup(storage StorageLookup) *IdentityStorageLookup {
this := &IdentityStorageLookup{
Storage: storage,
}
return this
}

// RemoveMember
func (i *IdentityStorageLookup) RemoveMember(memberID string) {
i.Storage.RemoveMemberId(memberID)
}

// RemotePlacementActor
func RemotePlacementActor(address string) *actor.PID {
return actor.NewPID(address, placementActorName)
}

//
// Interface: Lookup
//

// Get
func (id *IdentityStorageLookup) Get(clusterIdentity *cluster.ClusterIdentity) *actor.PID {
msg := newGetPid(clusterIdentity)
timeout := 5 * time.Second

res, _ := id.system.Root.RequestFuture(id.router, msg, timeout).Result()
response := res.(*actor.Future)

return response.PID()
}

func (id *IdentityStorageLookup) Setup(cluster *cluster.Cluster, kinds []string, isClient bool) {
id.cluster = cluster
id.system = cluster.ActorSystem
id.memberID = string(cluster.Id())

//workerProps := actor.PropsFromProducer(func() actor.Actor { return newIdentityStorageWorker(id) })

//routerProps := id.system.Root.(workerProps, 50);

}
50 changes: 50 additions & 0 deletions cluster/identity/identity_storage_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package identity

import (
"log"

"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/cluster"
)

type IdentityStorageWorker struct {
cluster *cluster.Cluster
lookup *IdentityStorageLookup
storage StorageLookup
}

func newIdentityStorageWorker(storageLookup *IdentityStorageLookup) *IdentityStorageWorker {
this := &IdentityStorageWorker{
cluster: storageLookup.cluster,
lookup: storageLookup,
storage: storageLookup.Storage,
}
return this
}

// Receive func
func (ids *IdentityStorageWorker) Receive(c actor.Context) {
m := c.Message()
_, ok := m.(GetPid)

if !ok {
return
}

if c.Sender() == nil {
log.Println("No sender in GetPid request")
return
}

cid := m.(GetPid).ClusterIdentity.Identity + "." + m.(GetPid).ClusterIdentity.Kind

existing, _ := ids.cluster.PidCache.GetCache(cid)

if existing != nil {
log.Printf("Found %s in pidcache", m.(GetPid).ClusterIdentity.ToShortString())
c.Respond(newPidResult(existing))
}

return
// continue
}
2 changes: 1 addition & 1 deletion cluster/pid_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *pidCacheValue) onMemberStatusEvent(evn interface{}) {
}
}

func (c *pidCacheValue) getCache(name string) (*actor.PID, bool) {
func (c *pidCacheValue) GetCache(name string) (*actor.PID, bool) {
v, ok := c.cache.Get(name)
if !ok {
return nil, false
Expand Down
Loading

0 comments on commit 80db82e

Please sign in to comment.