Skip to content

Commit

Permalink
struct migration to support storing events in gomemdb
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbailey committed Oct 6, 2020
1 parent 58c0ce2 commit fd0d8de
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 39 deletions.
2 changes: 1 addition & 1 deletion nomad/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
}
defer subscription.Unsubscribe()

ndJsonCh := make(chan *stream.NDJson)
ndJsonCh := make(chan *structs.NDJson)
errCh := make(chan error)

jsonStream := stream.NewNDJsonStream(ndJsonCh, 30*time.Second)
Expand Down
File renamed without changes.
2 changes: 0 additions & 2 deletions nomad/state/state_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package state
import (
"fmt"

"github.com/davecgh/go-spew/spew"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -175,7 +174,6 @@ func (tx *txn) Commit() error {
}

if tx.persistChanges {
spew.Dump("YO!")
// persist events after processing changes
err := tx.Txn.Insert("events", events)
if err != nil {
Expand Down
27 changes: 8 additions & 19 deletions nomad/stream/ndjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,40 @@ import (
"fmt"
"sync"
"time"

"github.com/hashicorp/nomad/nomad/structs"
)

var (
// NDJsonHeartbeat is the NDJson to send as a heartbeat
// Avoids creating many heartbeat instances
NDJsonHeartbeat = &NDJson{Data: []byte("{}\n")}
NDJsonHeartbeat = &structs.NDJson{Data: []byte("{}\n")}
)

// NDJsonStream is used to send new line delimited JSON and heartbeats
// to a destination (out channel)
type NDJsonStream struct {
out chan<- *NDJson
out chan<- *structs.NDJson

// heartbeat is the interval to send heartbeat messages to keep a connection
// open.
heartbeat *time.Ticker

publishCh chan NDJson
publishCh chan structs.NDJson
exitCh chan struct{}

l sync.Mutex
running bool
}

// NNDJson is a wrapper for a Newline Delimited JSON object
type NDJson struct {
Data []byte
}

// NewNNewNDJsonStream creates a new NDJson stream that will output NDJson structs
// to the passed output channel
func NewNDJsonStream(out chan<- *NDJson, heartbeat time.Duration) *NDJsonStream {
func NewNDJsonStream(out chan<- *structs.NDJson, heartbeat time.Duration) *NDJsonStream {
return &NDJsonStream{
out: out,
heartbeat: time.NewTicker(heartbeat),
exitCh: make(chan struct{}),
publishCh: make(chan NDJson),
publishCh: make(chan structs.NDJson),
}
}

Expand Down Expand Up @@ -97,18 +94,10 @@ func (n *NDJsonStream) Send(obj interface{}) error {
}

select {
case n.publishCh <- NDJson{Data: buf.Bytes()}:
case n.publishCh <- structs.NDJson{Data: buf.Bytes()}:
case <-n.exitCh:
return fmt.Errorf("stream is no longer running")
}

return nil
}

func (j *NDJson) Copy() *NDJson {
n := new(NDJson)
*n = *j
n.Data = make([]byte, len(j.Data))
copy(n.Data, j.Data)
return n
}
7 changes: 4 additions & 3 deletions nomad/stream/ndjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)

Expand All @@ -19,7 +20,7 @@ func TestNDJson(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

out := make(chan *NDJson)
out := make(chan *structs.NDJson)
s := NewNDJsonStream(out, 1*time.Second)
s.Run(ctx)

Expand All @@ -45,7 +46,7 @@ func TestNDJson_Send_After_Stop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

out := make(chan *NDJson)
out := make(chan *structs.NDJson)
s := NewNDJsonStream(out, 1*time.Second)
s.Run(ctx)

Expand All @@ -62,7 +63,7 @@ func TestNDJson_HeartBeat(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

out := make(chan *NDJson)
out := make(chan *structs.NDJson)
s := NewNDJsonStream(out, 10*time.Millisecond)
s.Run(ctx)

Expand Down
25 changes: 11 additions & 14 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/hashicorp/nomad/helper/constraints/semver"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib/kheap"
"github.com/hashicorp/nomad/nomad/stream"
psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
)

Expand Down Expand Up @@ -10679,7 +10678,7 @@ type EventStreamRequest struct {

type EventStreamWrapper struct {
Error *RpcError
Event *stream.NDJson
Event *NDJson
}

// RpcError is used for serializing errors with a potential error code
Expand All @@ -10700,6 +10699,7 @@ func (r *RpcError) Error() string {
}

type Topic string

type Event struct {
Topic Topic
Type string
Expand All @@ -10714,18 +10714,15 @@ type Events struct {
Events []Event
}

type Topic string

type Event struct {
Topic Topic
Type string
Key string
FilterKeys []string
Index uint64
Payload interface{}
// NNDJson is a wrapper for a Newline Delimited JSON object
type NDJson struct {
Data []byte
}

type Events struct {
Index uint64
Events []Event
func (j *NDJson) Copy() *NDJson {
n := new(NDJson)
*n = *j
n.Data = make([]byte, len(j.Data))
copy(n.Data, j.Data)
return n
}

0 comments on commit fd0d8de

Please sign in to comment.