Skip to content

Commit

Permalink
add basic command processing
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Jul 5, 2019
1 parent 57b6355 commit 2ef365b
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 8 deletions.
5 changes: 3 additions & 2 deletions cmd/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ func (_ *StdinCommandReader) ReadCommand(timeout time.Duration) *string {

go func() {
reader := bufio.NewReader(os.Stdin)
line, err := reader.ReadString('\n')
line, _, err := reader.ReadLine()
sline := string(line)
if err != nil {
e <- err
} else {
s <- line
s <- sline
}
close(s)
close(e)
Expand Down
66 changes: 60 additions & 6 deletions cmd/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"log"
"time"

"github.com/google/uuid"

"github.com/ctron/hot/pkg/utils"
"pack.ag/amqp"
)
Expand Down Expand Up @@ -89,20 +91,27 @@ func consume(messageType string, uri string, tenant string) error {

utils.PrintMessage(msg)
if processCommands {
processCommand(msg)
if err := processCommand(session, tenant, msg); err != nil {
log.Print("Failed to send command: ", err)
}
}
}
}

func processCommand(msg *amqp.Message) {
func processCommand(session *amqp.Session, tenant string, msg *amqp.Message) error {
ttd, ok := msg.ApplicationProperties["ttd"].(int32)

if !ok {
return
return nil
}

if ttd < 0 {
return
return nil
}

deviceId, ok := msg.Annotations["device_id"].(string)
if !ok || deviceId == "" {
return nil
}

reader := &StdinCommandReader{}
Expand All @@ -114,8 +123,53 @@ func processCommand(msg *amqp.Message) {
if cmd == nil {
fmt.Print("Timeout!")
fmt.Println()
return
return nil
}

// FIXME: implement
// open sender

sender, err := session.NewSender(
amqp.LinkTargetAddress("control/" + tenant + "/" + deviceId),
)

if err != nil {
return err
}

// defer: close sender

defer func() {
if err := sender.Close(context.Background()); err != nil {
log.Print("Failed to close sender: ", err)
}
}()

// prepare message

send := amqp.NewMessage([]byte(*cmd))
send.Properties = &amqp.MessageProperties{
Subject: "CMD",
To: "control/" + tenant + "/" + deviceId,
}

// set message id

id, err := uuid.NewRandom()
if err != nil {
return err
}
send.Properties.MessageID = amqp.UUID(id).String()

// send message

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := sender.Send(ctx, send); err != nil {
return err
}

fmt.Println("Command delivered!")

return nil
}

0 comments on commit 2ef365b

Please sign in to comment.