From 9e51c8dd95b67e675822d1301f96bf637a86d6b7 Mon Sep 17 00:00:00 2001 From: kroskinskiis Date: Fri, 5 Apr 2024 11:06:00 +0200 Subject: [PATCH 1/8] Send ack back after receiving a result --- internal/fin/protocol/protocol.go | 61 ++++++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 2 deletions(-) diff --git a/internal/fin/protocol/protocol.go b/internal/fin/protocol/protocol.go index 44711ccf..9ea2a2e2 100644 --- a/internal/fin/protocol/protocol.go +++ b/internal/fin/protocol/protocol.go @@ -55,6 +55,58 @@ func New(guid guid.IGuid, topic Topic, broker Broker, port int) FinProtocol { return prot } +func (protocol *FinProtocol) SendAck(result fin.Result, authentication cacao.AuthenticationInformation) error { + + client, err := protocol.Connect(authentication) + if err != nil { + log.Error("could not connect to mqtt broker") + return err + } + + ack := fin.NewAck(result.MessageId) + json, _ := fin.Encode(ack) + + protocol.Subscribe(client) + + log.Trace("Sending ack for message id: ", result.MessageId) + token := client.Publish(string(protocol.Topic), defaultQos, false, json) + + token.Wait() + protocol.Disconnect(client) + + if err := token.Error(); err != nil { + log.Error(err) + return err + } + return nil +} + +func (protocol *FinProtocol) SendNack(result fin.Result, authentication cacao.AuthenticationInformation) error { + + client, err := protocol.Connect(authentication) + if err != nil { + log.Error("could not connect to mqtt broker") + return err + } + + nack := fin.NewNack(result.MessageId) + json, _ := fin.Encode(nack) + + protocol.Subscribe(client) + + log.Trace("Sending ack for message id: ", result.MessageId) + token := client.Publish(string(protocol.Topic), defaultQos, false, json) + + token.Wait() + protocol.Disconnect(client) + + if err := token.Error(); err != nil { + log.Error(err) + return err + } + return nil +} + func (protocol *FinProtocol) SendCommand(command fin.Command) (cacao.Variables, error) { client, err := protocol.Connect(command.CommandSubstructure.Authentication) @@ -77,6 +129,7 @@ func (protocol *FinProtocol) SendCommand(command fin.Command) (cacao.Variables, func (protocol *FinProtocol) AwaitResultOrTimeout(command fin.Command) (map[string]cacao.Variable, error) { timeout := command.CommandSubstructure.Context.Timeout + if command.CommandSubstructure.Context.Timeout == 0 { log.Warning("no valid timeout will set 1 second") timeout = defaultTimeout @@ -110,11 +163,16 @@ func (protocol *FinProtocol) AwaitResultOrTimeout(command fin.Command) (map[stri } if ackReceived { if finResult.ResultStructure.Context.ExecutionId == command.CommandSubstructure.Context.ExecutionId { + protocol.SendAck(finResult, command.CommandSubstructure.Authentication) return finResult.ResultStructure.Variables, nil + } else { + protocol.SendAck(finResult, command.CommandSubstructure.Authentication) } + + } else { + protocol.SendNack(finResult, command.CommandSubstructure.Authentication) } } - } } @@ -132,7 +190,6 @@ func (protocol *FinProtocol) Handler(client mqttlib.Client, msg mqttlib.Message) } func (protocol *FinProtocol) Subscribe(client mqttlib.Client) { - token := client.Subscribe(string(protocol.Topic), defaultQos, protocol.Handler) token.Wait() From 0a78d048749b36649ed27a0b346783e7c0cfa2f9 Mon Sep 17 00:00:00 2001 From: kroskinskiis Date: Mon, 8 Apr 2024 14:18:31 +0200 Subject: [PATCH 2/8] Update fin_protocol_test to include final ack --- test/unittest/finprotocol/finprotocol_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test/unittest/finprotocol/finprotocol_test.go b/test/unittest/finprotocol/finprotocol_test.go index c0c55481..3ab68fa3 100644 --- a/test/unittest/finprotocol/finprotocol_test.go +++ b/test/unittest/finprotocol/finprotocol_test.go @@ -54,15 +54,22 @@ func TestTimeoutAndCallbackHandlerCalled(t *testing.T) { mock_client := mock_mqtt.Mock_MqttClient{} mock_token := mock_mqtt.Mock_MqttToken{} + mock_token_ack := mock_mqtt.Mock_MqttToken{} + guid := new(guid.Guid) prot := protocol.New(guid, "testing", "localhost", 1883) mock_token.On("Wait").Return(true) mock_client.On("Subscribe", "testing", uint8(1), mock.Anything).Return(&mock_token) + prot.Subscribe(&mock_client) expectedCommand := model.NewCommand() expectedCommand.CommandSubstructure.Context.Timeout = 1 + + mock_token_ack.On("Wait").Return(true) + mock_client.On("Publish", "testing", uint8(1), false, mock.Anything).Return(&mock_token_ack) + fmt.Println("calling await") go helper(&prot) result, err := prot.AwaitResultOrTimeout(expectedCommand) @@ -70,6 +77,7 @@ func TestTimeoutAndCallbackHandlerCalled(t *testing.T) { assert.Equal(t, err, nil) assert.Equal(t, result, map[string]cacao.Variable{"test": {Name: "test"}}) + mock_token_ack.AssertExpectations(t) } // Helper for TestTimeoutAndCallbackHandlerCalled From 437e9b6f0466f2f6e2069aa8b4171a5f66b3d673 Mon Sep 17 00:00:00 2001 From: kroskinskiis Date: Mon, 8 Apr 2024 14:19:17 +0200 Subject: [PATCH 3/8] Fix mistake where ack should be nack --- internal/fin/protocol/protocol.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/fin/protocol/protocol.go b/internal/fin/protocol/protocol.go index 9ea2a2e2..6b445f30 100644 --- a/internal/fin/protocol/protocol.go +++ b/internal/fin/protocol/protocol.go @@ -166,7 +166,7 @@ func (protocol *FinProtocol) AwaitResultOrTimeout(command fin.Command) (map[stri protocol.SendAck(finResult, command.CommandSubstructure.Authentication) return finResult.ResultStructure.Variables, nil } else { - protocol.SendAck(finResult, command.CommandSubstructure.Authentication) + protocol.SendNack(finResult, command.CommandSubstructure.Authentication) } } else { From 456fcd7603663d5cfd1dae3120116d1332da09e7 Mon Sep 17 00:00:00 2001 From: kroskinskiis Date: Mon, 8 Apr 2024 15:47:56 +0200 Subject: [PATCH 4/8] Pass mqtt client to awaitResultOrTimeout method and ack + nack --- internal/fin/protocol/protocol.go | 56 +++++++------------------------ 1 file changed, 12 insertions(+), 44 deletions(-) diff --git a/internal/fin/protocol/protocol.go b/internal/fin/protocol/protocol.go index 6b445f30..ff9f0f26 100644 --- a/internal/fin/protocol/protocol.go +++ b/internal/fin/protocol/protocol.go @@ -55,56 +55,20 @@ func New(guid guid.IGuid, topic Topic, broker Broker, port int) FinProtocol { return prot } -func (protocol *FinProtocol) SendAck(result fin.Result, authentication cacao.AuthenticationInformation) error { - - client, err := protocol.Connect(authentication) - if err != nil { - log.Error("could not connect to mqtt broker") - return err - } - +func (protocol *FinProtocol) SendAck(result fin.Result, client mqttlib.Client) { ack := fin.NewAck(result.MessageId) json, _ := fin.Encode(ack) - - protocol.Subscribe(client) - log.Trace("Sending ack for message id: ", result.MessageId) token := client.Publish(string(protocol.Topic), defaultQos, false, json) - token.Wait() - protocol.Disconnect(client) - - if err := token.Error(); err != nil { - log.Error(err) - return err - } - return nil } -func (protocol *FinProtocol) SendNack(result fin.Result, authentication cacao.AuthenticationInformation) error { - - client, err := protocol.Connect(authentication) - if err != nil { - log.Error("could not connect to mqtt broker") - return err - } - +func (protocol *FinProtocol) SendNack(result fin.Result, client mqttlib.Client) { nack := fin.NewNack(result.MessageId) json, _ := fin.Encode(nack) - - protocol.Subscribe(client) - - log.Trace("Sending ack for message id: ", result.MessageId) + log.Trace("Sending nack for message id: ", result.MessageId) token := client.Publish(string(protocol.Topic), defaultQos, false, json) - token.Wait() - protocol.Disconnect(client) - - if err := token.Error(); err != nil { - log.Error(err) - return err - } - return nil } func (protocol *FinProtocol) SendCommand(command fin.Command) (cacao.Variables, error) { @@ -121,13 +85,13 @@ func (protocol *FinProtocol) SendCommand(command fin.Command) (cacao.Variables, protocol.Disconnect(client) return map[string]cacao.Variable{}, err } - result, err := protocol.AwaitResultOrTimeout(command) + result, err := protocol.AwaitResultOrTimeout(command, client) protocol.Disconnect(client) return result, err } -func (protocol *FinProtocol) AwaitResultOrTimeout(command fin.Command) (map[string]cacao.Variable, error) { +func (protocol *FinProtocol) AwaitResultOrTimeout(command fin.Command, client mqttlib.Client) (map[string]cacao.Variable, error) { timeout := command.CommandSubstructure.Context.Timeout if command.CommandSubstructure.Context.Timeout == 0 { @@ -152,6 +116,7 @@ func (protocol *FinProtocol) AwaitResultOrTimeout(command fin.Command) (map[stri break } log.Info(finMessage) + // This now accepts any ack, should be changed if finMessage.Type == fin.MessageTypeAck { ackReceived = true } else if finMessage.Type == fin.MessageTypeResult { @@ -161,16 +126,19 @@ func (protocol *FinProtocol) AwaitResultOrTimeout(command fin.Command) (map[stri log.Trace(err) return map[string]cacao.Variable{}, err } + log.Info("Outside") if ackReceived { + log.Info("Inside") if finResult.ResultStructure.Context.ExecutionId == command.CommandSubstructure.Context.ExecutionId { - protocol.SendAck(finResult, command.CommandSubstructure.Authentication) + log.Info("test") + protocol.SendAck(finResult, client) return finResult.ResultStructure.Variables, nil } else { - protocol.SendNack(finResult, command.CommandSubstructure.Authentication) + protocol.SendNack(finResult, client) } } else { - protocol.SendNack(finResult, command.CommandSubstructure.Authentication) + protocol.SendNack(finResult, client) } } } From 34463b8e3c958402c9daed379236c8c0a4f6675e Mon Sep 17 00:00:00 2001 From: kroskinskiis Date: Mon, 8 Apr 2024 15:48:24 +0200 Subject: [PATCH 5/8] Fix finprotocol tests --- test/unittest/finprotocol/finprotocol_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/unittest/finprotocol/finprotocol_test.go b/test/unittest/finprotocol/finprotocol_test.go index 3ab68fa3..42f828db 100644 --- a/test/unittest/finprotocol/finprotocol_test.go +++ b/test/unittest/finprotocol/finprotocol_test.go @@ -44,7 +44,7 @@ func TestTimeoutAndCallbackTimerElaspsed(t *testing.T) { expectedCommand := model.NewCommand() expectedCommand.CommandSubstructure.Context.Timeout = 1 - result, err := prot.AwaitResultOrTimeout(expectedCommand) + result, err := prot.AwaitResultOrTimeout(expectedCommand, &mock_client) assert.Equal(t, err, errors.New("no message received from fin while it was expected")) assert.Equal(t, result, map[string]cacao.Variable{}) @@ -72,11 +72,13 @@ func TestTimeoutAndCallbackHandlerCalled(t *testing.T) { fmt.Println("calling await") go helper(&prot) - result, err := prot.AwaitResultOrTimeout(expectedCommand) + result, err := prot.AwaitResultOrTimeout(expectedCommand, &mock_client) fmt.Println("done waiting") assert.Equal(t, err, nil) assert.Equal(t, result, map[string]cacao.Variable{"test": {Name: "test"}}) + mock_client.AssertExpectations(t) + mock_token.AssertExpectations(t) mock_token_ack.AssertExpectations(t) } From 17e0f4609224579d0e15717c1d288bc397c218d1 Mon Sep 17 00:00:00 2001 From: kroskinskiis Date: Mon, 8 Apr 2024 17:20:02 +0200 Subject: [PATCH 6/8] Fix additonal parsing in controller.go --- internal/capability/controller/controller.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/capability/controller/controller.go b/internal/capability/controller/controller.go index 99396e10..a948b8d6 100644 --- a/internal/capability/controller/controller.go +++ b/internal/capability/controller/controller.go @@ -86,8 +86,10 @@ func (finController *FinController) Run() { // Handle goroutine call from mqtt stack func (finController *FinController) Handler(client mqtt.Client, msg mqtt.Message) { + // Might need to filter on fin topics in the future if communication is needed if msg.Topic() != string("soarca") { log.Trace("message was receive in wrong topic: " + msg.Topic()) + return } payload := msg.Payload() log.Trace(string(payload)) From 57a815cd0361570d0d194f72c988d00a204fc0c1 Mon Sep 17 00:00:00 2001 From: kroskinskiis Date: Mon, 8 Apr 2024 17:22:36 +0200 Subject: [PATCH 7/8] Change name of fin protocol to not collide with fin controller --- internal/fin/protocol/protocol.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/fin/protocol/protocol.go b/internal/fin/protocol/protocol.go index ff9f0f26..3997fbbe 100644 --- a/internal/fin/protocol/protocol.go +++ b/internal/fin/protocol/protocol.go @@ -15,7 +15,7 @@ import ( const defaultTimeout = 1 const disconnectTimeout = 100 -const clientId = "soarca" +const clientId = "soarca-fin-capability" const defaultQos = AtLeastOnce const ( From 18671136a82456373f6cc98c8bdfac66f2df0ef3 Mon Sep 17 00:00:00 2001 From: kroskinskiis Date: Mon, 8 Apr 2024 17:26:05 +0200 Subject: [PATCH 8/8] Remove debugging log statements --- internal/fin/protocol/protocol.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/fin/protocol/protocol.go b/internal/fin/protocol/protocol.go index 3997fbbe..a923255d 100644 --- a/internal/fin/protocol/protocol.go +++ b/internal/fin/protocol/protocol.go @@ -126,11 +126,11 @@ func (protocol *FinProtocol) AwaitResultOrTimeout(command fin.Command, client mq log.Trace(err) return map[string]cacao.Variable{}, err } - log.Info("Outside") + if ackReceived { - log.Info("Inside") + if finResult.ResultStructure.Context.ExecutionId == command.CommandSubstructure.Context.ExecutionId { - log.Info("test") + protocol.SendAck(finResult, client) return finResult.ResultStructure.Variables, nil } else {