Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/74 send final ack after receiving result #121

Merged
2 changes: 2 additions & 0 deletions internal/capability/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ func (finController *FinController) Run() {

// Handle goroutine call from mqtt stack
func (finController *FinController) Handler(client mqtt.Client, msg mqtt.Message) {
// Might need to filter on fin topics in the future if communication is needed
if msg.Topic() != string("soarca") {
log.Trace("message was receive in wrong topic: " + msg.Topic())
return
}
payload := msg.Payload()
log.Trace(string(payload))
Expand Down
35 changes: 30 additions & 5 deletions internal/fin/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

const defaultTimeout = 1
const disconnectTimeout = 100
const clientId = "soarca"
const clientId = "soarca-fin-capability"
const defaultQos = AtLeastOnce

const (
Expand Down Expand Up @@ -55,6 +55,22 @@ func New(guid guid.IGuid, topic Topic, broker Broker, port int) FinProtocol {
return prot
}

func (protocol *FinProtocol) SendAck(result fin.Result, client mqttlib.Client) {
ack := fin.NewAck(result.MessageId)
json, _ := fin.Encode(ack)
log.Trace("Sending ack for message id: ", result.MessageId)
token := client.Publish(string(protocol.Topic), defaultQos, false, json)
token.Wait()
}

func (protocol *FinProtocol) SendNack(result fin.Result, client mqttlib.Client) {
nack := fin.NewNack(result.MessageId)
json, _ := fin.Encode(nack)
log.Trace("Sending nack for message id: ", result.MessageId)
token := client.Publish(string(protocol.Topic), defaultQos, false, json)
token.Wait()
}

func (protocol *FinProtocol) SendCommand(command fin.Command) (cacao.Variables, error) {

client, err := protocol.Connect(command.CommandSubstructure.Authentication)
Expand All @@ -69,14 +85,15 @@ func (protocol *FinProtocol) SendCommand(command fin.Command) (cacao.Variables,
protocol.Disconnect(client)
return map[string]cacao.Variable{}, err
}
result, err := protocol.AwaitResultOrTimeout(command)
result, err := protocol.AwaitResultOrTimeout(command, client)
protocol.Disconnect(client)

return result, err
}

func (protocol *FinProtocol) AwaitResultOrTimeout(command fin.Command) (map[string]cacao.Variable, error) {
func (protocol *FinProtocol) AwaitResultOrTimeout(command fin.Command, client mqttlib.Client) (map[string]cacao.Variable, error) {
timeout := command.CommandSubstructure.Context.Timeout

if command.CommandSubstructure.Context.Timeout == 0 {
log.Warning("no valid timeout will set 1 second")
timeout = defaultTimeout
Expand All @@ -99,6 +116,7 @@ func (protocol *FinProtocol) AwaitResultOrTimeout(command fin.Command) (map[stri
break
}
log.Info(finMessage)
// This now accepts any ack, should be changed
if finMessage.Type == fin.MessageTypeAck {
ackReceived = true
} else if finMessage.Type == fin.MessageTypeResult {
Expand All @@ -108,13 +126,21 @@ func (protocol *FinProtocol) AwaitResultOrTimeout(command fin.Command) (map[stri
log.Trace(err)
return map[string]cacao.Variable{}, err
}

if ackReceived {

if finResult.ResultStructure.Context.ExecutionId == command.CommandSubstructure.Context.ExecutionId {

protocol.SendAck(finResult, client)
return finResult.ResultStructure.Variables, nil
} else {
protocol.SendNack(finResult, client)
}

} else {
protocol.SendNack(finResult, client)
}
}

}

}
Expand All @@ -132,7 +158,6 @@ func (protocol *FinProtocol) Handler(client mqttlib.Client, msg mqttlib.Message)
}

func (protocol *FinProtocol) Subscribe(client mqttlib.Client) {

token := client.Subscribe(string(protocol.Topic), defaultQos, protocol.Handler)
token.Wait()

Expand Down
14 changes: 12 additions & 2 deletions test/unittest/finprotocol/finprotocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestTimeoutAndCallbackTimerElaspsed(t *testing.T) {
expectedCommand := model.NewCommand()
expectedCommand.CommandSubstructure.Context.Timeout = 1

result, err := prot.AwaitResultOrTimeout(expectedCommand)
result, err := prot.AwaitResultOrTimeout(expectedCommand, &mock_client)

assert.Equal(t, err, errors.New("no message received from fin while it was expected"))
assert.Equal(t, result, map[string]cacao.Variable{})
Expand All @@ -54,22 +54,32 @@ func TestTimeoutAndCallbackHandlerCalled(t *testing.T) {
mock_client := mock_mqtt.Mock_MqttClient{}
mock_token := mock_mqtt.Mock_MqttToken{}

mock_token_ack := mock_mqtt.Mock_MqttToken{}

guid := new(guid.Guid)

prot := protocol.New(guid, "testing", "localhost", 1883)
mock_token.On("Wait").Return(true)
mock_client.On("Subscribe", "testing", uint8(1), mock.Anything).Return(&mock_token)

prot.Subscribe(&mock_client)

expectedCommand := model.NewCommand()
expectedCommand.CommandSubstructure.Context.Timeout = 1

mock_token_ack.On("Wait").Return(true)
mock_client.On("Publish", "testing", uint8(1), false, mock.Anything).Return(&mock_token_ack)

fmt.Println("calling await")
go helper(&prot)
result, err := prot.AwaitResultOrTimeout(expectedCommand)
result, err := prot.AwaitResultOrTimeout(expectedCommand, &mock_client)
fmt.Println("done waiting")

assert.Equal(t, err, nil)
assert.Equal(t, result, map[string]cacao.Variable{"test": {Name: "test"}})
mock_client.AssertExpectations(t)
mock_token.AssertExpectations(t)
mock_token_ack.AssertExpectations(t)
}

// Helper for TestTimeoutAndCallbackHandlerCalled
Expand Down