diff --git a/.travis.yml b/.travis.yml index 00f3e0f..223e9f8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,7 @@ language: go go: - 1.9 + - 1.8 env: global: - TEST_TIMEOUT_SCALE=40 diff --git a/participant.go b/participant.go index d788980..7c0b996 100644 --- a/participant.go +++ b/participant.go @@ -74,12 +74,11 @@ type participant struct { keyBuilder *KeyBuilder zkClient *uzk.Client // Mirrors org.apache.helix.participant.HelixStateMachineEngine - // stateModelName->stateModelProcessor - stateModelProcessors sync.Map + stateModelProcessorsMu sync.Mutex + stateModelProcessors map[string]*StateModelProcessor // state model name to processor stateModelProcessorLocks map[string]*sync.Mutex stateModel StateModel - sync.Mutex - dataAccessor *DataAccessor + dataAccessor *DataAccessor // fatalErrChan would notify user when a fatal error occurs fatalErrChan chan error @@ -122,6 +121,7 @@ func NewParticipant( port: port, keyBuilder: keyBuilder, zkClient: zkClient, + stateModelProcessors: make(map[string]*StateModelProcessor), stateModelProcessorLocks: make(map[string]*sync.Mutex), dataAccessor: newDataAccessor(zkClient, keyBuilder), stateModel: NewStateModel(), @@ -159,7 +159,9 @@ func (p *participant) IsConnected() bool { // RegisterStateModel associates state trasition functions with the participant func (p *participant) RegisterStateModel(stateModelName string, processor *StateModelProcessor) { - p.stateModelProcessors.Store(stateModelName, processor) + p.stateModelProcessorsMu.Lock() + p.stateModelProcessors[stateModelName] = processor + p.stateModelProcessorsMu.Unlock() p.stateModelProcessorLocks[stateModelName] = &sync.Mutex{} } @@ -435,7 +437,7 @@ func (p *participant) handleMsg(msg *model.Message) error { mu, ok := p.stateModelProcessorLocks[msg.GetStateModelDef()] if !ok { p.logger.Error("failed to find state model in stateModelProcessorLocks", - zap.String("StateModelDefinition", msg.GetStateModelDef()), + zap.String("stateModelDefinition", msg.GetStateModelDef()), zap.Any("stateModelProcessorLocks", p.stateModelProcessorLocks)) return errMsgMissingStateModelDef } @@ -556,8 +558,9 @@ func (p *participant) handleStateTransition(msg *model.Message) error { // set the msg execution time msg.SetExecuteStartTime(time.Now()) - if val, ok := p.stateModelProcessors.Load(msg.GetStateModelDef()); ok { - processor := val.(*StateModelProcessor) + p.stateModelProcessorsMu.Lock() + defer p.stateModelProcessorsMu.Unlock() + if processor, ok := p.stateModelProcessors[msg.GetStateModelDef()]; ok { if toStateHandler, ok := processor.Transitions[fromState]; ok { if handler, ok := toStateHandler[toState]; ok { // TODO: deal with handler error