Skip to content

Commit

Permalink
one ndjson line per index
Browse files Browse the repository at this point in the history
wrap returned events in a events wrapper that includes index
  • Loading branch information
drewbailey committed Oct 1, 2020
1 parent ccf1844 commit c6799c5
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 20 deletions.
15 changes: 6 additions & 9 deletions nomad/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,16 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
}

// Continue if there are no events
if events == nil {
if len(events.Events) == 0 {
continue
}

// Send each event as its own frame
for _, e := range events {
if err := jsonStream.Send(e); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
break LOOP
if err := jsonStream.Send(events); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
break LOOP
}
}
}()
Expand Down
12 changes: 6 additions & 6 deletions nomad/event_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ OUTER:
continue
}

var event stream.Event
var event stream.Events
err = json.Unmarshal(msg.Event.Data, &event)
require.NoError(t, err)

Expand All @@ -102,7 +102,7 @@ OUTER:
Result: &out,
}
dec, err := mapstructure.NewDecoder(cfg)
dec.Decode(event.Payload)
dec.Decode(event.Events[0].Payload)
require.NoError(t, err)
require.Equal(t, node.ID, out.ID)
break OUTER
Expand All @@ -123,7 +123,7 @@ func TestEventStream_StreamErr(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC)

req := structs.EventStreamRequest{
Topics: map[stream.Topic][]string{"*": []string{"*"}},
Topics: map[stream.Topic][]string{"*": {"*"}},
QueryOptions: structs.QueryOptions{
Region: s1.Region(),
},
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestEventStream_RegionForward(t *testing.T) {

// Create request targed for region foo
req := structs.EventStreamRequest{
Topics: map[stream.Topic][]string{"*": []string{"*"}},
Topics: map[stream.Topic][]string{"*": {"*"}},
QueryOptions: structs.QueryOptions{
Region: "foo",
},
Expand Down Expand Up @@ -272,7 +272,7 @@ OUTER:
continue
}

var event stream.Event
var event stream.Events
err = json.Unmarshal(msg.Event.Data, &event)
require.NoError(t, err)

Expand All @@ -282,7 +282,7 @@ OUTER:
Result: &out,
}
dec, err := mapstructure.NewDecoder(cfg)
dec.Decode(event.Payload)
dec.Decode(event.Events[0].Payload)
require.NoError(t, err)
require.Equal(t, node.ID, out.ID)
break OUTER
Expand Down
5 changes: 5 additions & 0 deletions nomad/stream/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ type Event struct {
Index uint64
Payload interface{}
}

type Events struct {
Index uint64
Events []Event
}
10 changes: 5 additions & 5 deletions nomad/stream/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,26 @@ func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Sub
}
}

func (s *Subscription) Next(ctx context.Context) ([]Event, error) {
func (s *Subscription) Next(ctx context.Context) (Events, error) {
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
return nil, ErrSubscriptionClosed
return Events{}, ErrSubscriptionClosed
}

for {
next, err := s.currentItem.Next(ctx, s.forceClosed)
switch {
case err != nil && atomic.LoadUint32(&s.state) == subscriptionStateClosed:
return nil, ErrSubscriptionClosed
return Events{}, ErrSubscriptionClosed
case err != nil:
return nil, err
return Events{}, err
}
s.currentItem = next

events := filter(s.req, next.Events)
if len(events) == 0 {
continue
}
return events, nil
return Events{Index: next.Index, Events: events}, nil
}
}

Expand Down

0 comments on commit c6799c5

Please sign in to comment.