Skip to content

Commit

Permalink
feat: redistribute remaining traces during shutdown (#1261)
Browse files Browse the repository at this point in the history
<!--
Thank you for contributing to the project! 💜
Please make sure to:
- Chat with us first if this is a big change
  - Open a new issue (or comment on an existing one)
- We want to make sure you don't spend time implementing something we
might have to say No to
- Add unit tests
- Mention any relevant issues in the PR description (e.g. "Fixes #123")

Please see our [OSS process
document](https://github.com/honeycombio/home/blob/main/honeycomb-oss-lifecycle-and-practices.md#)
to get an idea of how we operate.
-->

## Which problem is this PR solving?

When a node leaves the cluster, gossip the news and then calculate the
new destination and forward traces that node was holding on to. This
means that removing a node from the refinery cluster will be much less
disruptive than it is today.

## Short description of the changes

- During shutdown, the collector first to stop accepting new traces by
existing from the `collect()` function.
- Traces are distributed to three go channels based on their state
      - late spans
- traces that have already received root span or it has expired from its
TraceTimeout
      - traces that can't have their decisions made yet
- A goroutine is created to grab items off of the three go channels. For
the late spans, expired traces, or traces with root span, collector will
send them to honeycomb. For traces that don't have decisions yet,
collector will forward them to their new home(another refinery peer)

I also added unit tests for the distribution logic as well as a counter
to monitor the amount of traces that goes through each state.

address #1204

---------

Co-authored-by: Yingrong Zhao <yingrongzhao@yingrongzhao.attlocal.net>
  • Loading branch information
VinozzZ and Yingrong Zhao authored Aug 9, 2024
1 parent 41ca9d1 commit 8bbdda7
Show file tree
Hide file tree
Showing 10 changed files with 457 additions and 68 deletions.
39 changes: 20 additions & 19 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,14 @@ func TestAppIntegration(t *testing.T) {

time.Sleep(5 * app.Config.GetSendTickerValue())

events := sender.Events()
require.Len(t, events, 1)

assert.Equal(t, "dataset", events[0].Dataset)
assert.Equal(t, "bar", events[0].Data["foo"])
assert.Equal(t, "1", events[0].Data["trace.trace_id"])
assert.Equal(t, uint(1), events[0].Data["meta.refinery.original_sample_rate"])
require.EventuallyWithT(t, func(collect *assert.CollectT) {
events := sender.Events()
require.Len(collect, events, 1)
assert.Equal(collect, "dataset", events[0].Dataset)
assert.Equal(collect, "bar", events[0].Data["foo"])
assert.Equal(collect, "1", events[0].Data["trace.trace_id"])
assert.Equal(collect, uint(1), events[0].Data["meta.refinery.original_sample_rate"])
}, 2*time.Second, 10*time.Millisecond)

err = startstop.Stop(graph.Objects(), nil)
assert.NoError(t, err)
Expand Down Expand Up @@ -400,8 +401,8 @@ func TestPeerRouting(t *testing.T) {
}
assert.Equal(t, expectedEvent, senders[0].Events()[0])

// Repeat, but deliver to host 1 on the peer channel, it should not be
// passed to host 0.
// Repeat, but deliver to host 1 on the peer channel, it should be
// passed to host 0 since that's who the trace belongs to.
req, err = http.NewRequest(
"POST",
"http://localhost:11003/1/batch/dataset",
Expand All @@ -414,7 +415,7 @@ func TestPeerRouting(t *testing.T) {
req.Body = io.NopCloser(strings.NewReader(blob))
post(t, req)
assert.Eventually(t, func() bool {
return len(senders[1].Events()) == 1
return len(senders[0].Events()) == 1
}, 2*time.Second, 2*time.Millisecond)
assert.Equal(t, expectedEvent, senders[0].Events()[0])
}
Expand Down Expand Up @@ -522,8 +523,8 @@ func TestEventsEndpoint(t *testing.T) {
senders[0].Events()[0],
)

// Repeat, but deliver to host 1 on the peer channel, it should not be
// passed to host 0.
// Repeat, but deliver to host 1 on the peer channel, it should be
// passed to host 0 since that's the host this trace belongs to.

blob = blob[:0]
buf := bytes.NewBuffer(blob)
Expand All @@ -545,7 +546,7 @@ func TestEventsEndpoint(t *testing.T) {

post(t, req)
assert.Eventually(t, func() bool {
return len(senders[1].Events()) == 1
return len(senders[0].Events()) == 1
}, 2*time.Second, 2*time.Millisecond)

assert.Equal(
Expand All @@ -565,10 +566,10 @@ func TestEventsEndpoint(t *testing.T) {
"api_host": "http://api.honeycomb.io",
"dataset": "dataset",
"environment": "",
"enqueued_at": senders[1].Events()[0].Metadata.(map[string]any)["enqueued_at"],
"enqueued_at": senders[0].Events()[0].Metadata.(map[string]any)["enqueued_at"],
},
},
senders[1].Events()[0],
senders[0].Events()[0],
)
}

Expand Down Expand Up @@ -644,7 +645,7 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
senders[0].Events()[0],
)

// Repeat, but deliver to host 1 on the peer channel, it should not be
// Repeat, but deliver to host 1 on the peer channel, it should be
// passed to host 0.

blob = blob[:0]
Expand All @@ -667,7 +668,7 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) {

post(t, req)
assert.Eventually(t, func() bool {
return len(senders[1].Events()) == 1
return len(senders[0].Events()) == 1
}, 2*time.Second, 2*time.Millisecond)

assert.Equal(
Expand All @@ -687,10 +688,10 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
"api_host": "http://api.honeycomb.io",
"dataset": "dataset",
"environment": "test",
"enqueued_at": senders[1].Events()[0].Metadata.(map[string]any)["enqueued_at"],
"enqueued_at": senders[0].Events()[0].Metadata.(map[string]any)["enqueued_at"],
},
},
senders[1].Events()[0],
senders[0].Events()[0],
)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/refinery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func main() {
{Value: legacyMetrics, Name: "legacyMetrics"},
{Value: promMetrics, Name: "promMetrics"},
{Value: oTelMetrics, Name: "otelMetrics"},
{Value: tracer, Name: "tracer"},
{Value: tracer, Name: "tracer"}, // we need to use a named injection here because trace.Tracer's struct fields are all private
{Value: clockwork.NewRealClock()},
{Value: metricsSingleton, Name: "metrics"},
{Value: genericMetricsRecorder, Name: "genericMetrics"},
Expand Down
24 changes: 22 additions & 2 deletions collect/cache/cuckooSentCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ func (c *cuckooSentCache) Record(trace KeptTrace, keep bool, reason string) {
c.dropped.Add(trace.ID())
}

func (c *cuckooSentCache) Check(span *types.Span) (TraceSentRecord, string, bool) {
func (c *cuckooSentCache) CheckSpan(span *types.Span) (TraceSentRecord, string, bool) {
// was it dropped?
if c.dropped.Check(span.TraceID) {
// we recognize it as dropped, so just say so; there's nothing else to do
return &cuckooDroppedRecord{}, "", false
return &cuckooDroppedRecord{}, "", true
}
// was it kept?
c.keptMut.Lock()
Expand Down Expand Up @@ -261,3 +261,23 @@ func (c *cuckooSentCache) Resize(cfg config.SampleCacheConfig) error {
go c.monitor()
return nil
}

// CheckTrace checks if a trace was kept or dropped, and returns the reason if it was kept.
// The bool return value is true if the trace was found in the cache.
// It does not modify the count information.
func (c *cuckooSentCache) CheckTrace(traceID string) (TraceSentRecord, string, bool) {
// was it dropped?
if c.dropped.Check(traceID) {
// we recognize it as dropped, so just say so; there's nothing else to do
return &cuckooDroppedRecord{}, "", true
}
// was it kept?
c.keptMut.Lock()
defer c.keptMut.Unlock()
if sentRecord, found := c.kept.Get(traceID); found {
reason, _ := c.sentReasons.Get(uint(sentRecord.reason))
return sentRecord, reason, true
}
// we have no memory of this place
return nil, "", false
}
7 changes: 5 additions & 2 deletions collect/cache/traceSentCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ type TraceSentRecord interface {
type TraceSentCache interface {
// Record preserves the record of a trace being sent or not.
Record(trace KeptTrace, keep bool, reason string)
// Check tests if a trace corresponding to the span is in the cache; if found, it returns the appropriate TraceSentRecord and true,
// CheckTrace if a trace is in the cache; if found, it returns the appropriate TraceSentRecord and true, else nil and false.
// It does not modify the count information.
CheckTrace(traceID string) (TraceSentRecord, string, bool)
// CheckSpan tests if a trace corresponding to the span is in the cache; if found, it returns the appropriate TraceSentRecord and true,
// else nil and false.
Check(span *types.Span) (TraceSentRecord, string, bool)
CheckSpan(span *types.Span) (TraceSentRecord, string, bool)
// Stop halts the cache in preparation for shutdown
Stop()
// Resize adjusts the size of the cache according to the Config passed in
Expand Down
Loading

0 comments on commit 8bbdda7

Please sign in to comment.