Skip to content

Commit

Permalink
Stream event once instead of multiple lists
Browse files Browse the repository at this point in the history
  • Loading branch information
fieldryand committed Feb 17, 2024
1 parent 68397b5 commit e823bf6
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 28 deletions.
22 changes: 12 additions & 10 deletions execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (

// Execution of a job.
type execution struct {
ID uuid.UUID `json:"id"`
JobName string `json:"job"`
StartedAt string `json:"submitted"`
State state `json:"state"`
TaskExecutions []taskExecution `json:"tasks"`
ID uuid.UUID `json:"id"`
JobName string `json:"job"`
StartedAt string `json:"submitted"`
ModifiedTimestamp string `json:"modifiedTimestamp"`
State state `json:"state"`
TaskExecutions []taskExecution `json:"tasks"`
}

type taskExecution struct {
Expand All @@ -28,11 +29,12 @@ func (j *Job) newExecution() *execution {
taskExecutions = append(taskExecutions, taskrun)
}
return &execution{
ID: uuid.New(),
JobName: j.Name,
StartedAt: time.Now().UTC().Format(time.RFC3339Nano),
State: none,
TaskExecutions: taskExecutions}
ID: uuid.New(),
JobName: j.Name,
StartedAt: time.Now().UTC().Format(time.RFC3339Nano),
ModifiedTimestamp: time.Now().UTC().Format(time.RFC3339Nano),
State: none,
TaskExecutions: taskExecutions}
}

// Persist a new execution.
Expand Down
2 changes: 2 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log"
"sync"
"time"

"github.com/philippgille/gokv"
)
Expand Down Expand Up @@ -194,6 +195,7 @@ func (j *Job) run(store gokv.Store, e *execution) error {

// Sync to store
e.State = j.loadState()
e.ModifiedTimestamp = time.Now().UTC().Format(time.RFC3339Nano)
syncStateToStore(store, e, write.key, write.val)

if j.allDone() {
Expand Down
43 changes: 25 additions & 18 deletions stream.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package goflow

import (
"encoding/json"
"io"
"time"

Expand All @@ -12,19 +11,40 @@ import (
func (g *Goflow) stream(clientDisconnect bool) func(*gin.Context) {

return func(c *gin.Context) {
chanStream := make(chan string)

history := make([]*execution, 0)

// open a channel for live executions
chanStream := make(chan *execution)

// periodically push the list of job runs into the stream
go func() {
defer close(chanStream)
// Periodically push the list of job runs into the stream
for {
for jobname := range g.Jobs {
executions, _ := readExecutions(g.Store, jobname)
marshalled, _ := marshalExecutions(jobname, executions)
chanStream <- string(marshalled)
for _, e := range executions {

// make sure it wasn't already sent
inHistory := false

for _, h := range history {
if e.ID == h.ID && e.ModifiedTimestamp == h.ModifiedTimestamp {
inHistory = true
}
}

if !inHistory {
chanStream <- e
history = append(history, e)
}

}
}
time.Sleep(time.Second * 1)
}
}()

c.Stream(func(w io.Writer) bool {
if msg, ok := <-chanStream; ok {
c.SSEvent("message", msg)
Expand All @@ -35,16 +55,3 @@ func (g *Goflow) stream(clientDisconnect bool) func(*gin.Context) {
}

}

// Obtain locks and put the response in the structure expected
// by the streaming endpoint.
func marshalExecutions(name string, executions []*execution) ([]byte, error) {
var msg struct {
JobName string `json:"jobName"`
Executions []*execution `json:"executions"`
}
msg.JobName = name
msg.Executions = executions
result, ok := json.Marshal(msg)
return result, ok
}

0 comments on commit e823bf6

Please sign in to comment.