Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(observability): Vector tap, v2 #6610

Merged
merged 88 commits into from
Apr 6, 2021
Merged

enhancement(observability): Vector tap, v2 #6610

merged 88 commits into from
Apr 6, 2021

Conversation

leebenson
Copy link
Member

@leebenson leebenson commented Mar 3, 2021

This PR is a retake of the API for Vector 'tap'.

It follows on from #6363 by attempting to reimplement the majority of the tap sink outside of topology.

Closes #3212.
Closes #6391.
Closes #6409.
Closes #6635.

Premise

This PR enables observability into the log events that pass through topology, via the GraphQL API.

The purpose of which is to support an (upcoming) CLI command and a web UI to allow a user to 'tap' into the stream of events that are being processed live.

Example query

Assuming this config:

[api]
  enabled = true

[sources.gen1]
  type = "generator"
  lines = ["From gen 1"]
  batch_interval = 0.05
  format = "shuffle"

[sinks.blackhole]
  type = "blackhole"
  inputs = ["gen1"]
  print_amount = 1000

This query:

subscription {
  outputLogEvents(componentNames:["gen2"], limit: 5) {
    __typename
    ... on LogEvent {
      componentName
      message
      timestamp
    }
  	... on LogEventNotification {
      componentName
      notification
    }
  }
}

Initially produces:

{
  "data": {
    "outputLogEvents": [
      {
        "__typename": "LogEventNotification",
        "componentName": "gen2",
        "notification": "NOT_MATCHED"
      }
    ]
  }
}

The subscription will remain open until the client explicitly terminates it (or the WebSocket connection goes away.)

If the config is then updated to:

[api]
  enabled = true

[sources.gen1]
  type = "generator"
  lines = ["From gen 1"]
  batch_interval = 0.05
  format = "shuffle"

[sources.gen2]
  type = "generator"
  lines = ["From gen 2"]
  batch_interval = 0.1
  format = "shuffle"

[sinks.blackhole]
  type = "blackhole"
  inputs = ["gen1"]
  print_amount = 1000

Then the next payload may look something like this:

{
  "data": {
    "outputLogEvents": [
      {
        "__typename": "LogEventNotification",
        "componentName": "gen2",
        "notification": "MATCHED"
      },
      {
        "__typename": "LogEvent",
        "componentName": "gen2",
        "message": "From gen 2",
        "timestamp": "2021-03-03T14:15:30.037735+00:00"
      },
      {
        "__typename": "LogEvent",
        "componentName": "gen2",
        "message": "From gen 2",
        "timestamp": "2021-03-03T14:15:30.138130+00:00"
      },
      {
        "__typename": "LogEvent",
        "componentName": "gen2",
        "message": "From gen 2",
        "timestamp": "2021-03-03T14:15:30.238785+00:00"
      },
      {
        "__typename": "LogEvent",
        "componentName": "gen2",
        "message": "From gen 2",
        "timestamp": "2021-03-03T14:15:30.337920+00:00"
      }
    ]
  }
}

Notes:

  1. When a componentNames value is matched, a LogEventNotification is returned with a notification.MATCHED value. This is intended to signal to the client that a component has been 'found', and therefore update the user interface in some way to indicate that fact.

  2. The LogEvent itself contains the payloads of log events.

  3. If a component goes away, it returns the notification.NOT_MATCHED value initially seen.

LogEvent

A log event is comprised of the following fields:

type LogEvent {
  # Name of the component associated with the log event
  componentName: String!

  # Log message
  message: String

  # Log timestamp
  timestamp: DateTime

  # Log event as a JSON string
  json: String!

  # Log event as a YAML string
  yaml: String
}

message and timestamp are opinionated fields based on the assumption that they will be relatively common in the event data.

json and yaml output text strings encoded to those respective formats.

TODO

  • Self-review before pulling out of draft.
  • Add unit tests.

Later work

Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
@leebenson leebenson added type: enhancement A value-adding code change that enhances its existing functionality. domain: api Anything related to Vector's GraphQL API labels Mar 3, 2021
@leebenson leebenson self-assigned this Mar 3, 2021
Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
Cargo.toml Outdated
@@ -129,9 +129,10 @@ goauth = { version = "0.9.0", optional = true }
smpl_jwt = { version = "0.6.1", optional = true }

# API
async-graphql = { version = "=2.5.0", optional = true }
async-graphql-warp = { version = "=2.5.0", optional = true }
async-graphql = { version = "=2.4.11", optional = true }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Downgraded (and stapled) to 2.4.11, to unblock a fix for subscription context. Will be able to bump to the latest when #6383 lands.

ctx: &'a Context<'a>,
component_names: Vec<String>,
#[graphql(default = 500)] interval: i32,
#[graphql(default = 100, validator(IntRange(min = "1", max = "10_000")))] limit: usize,
Copy link
Member Author

@leebenson leebenson Mar 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows for observing log events in batch sizes of up to 10,000. This feels a little large to me (it's probably quite expensive to unwind 10k records on the client, at default intervals of 0.5 seconds -- not to mention collect/serialize a JSON payload of that size in Vector) -- but this defaults to 100, and is user-definable.

We can adjust the upper bound if needed.

Signed-off-by: Lee Benson <lee@leebenson.com>
@leebenson leebenson marked this pull request as ready for review March 4, 2021 15:26
@jszwedko
Copy link
Member

@leebenson awesome!

I think I can see why you collapsed EventNotification into that union too, but I think it might actually still be good to leave that out so that we can reuse the Event type in other places besides tap that might not return notifications. I was thinking it would basically just map to the internal Event enum which would be Log, Metric, and, at some point, Trace.

You could still define another union type that combines Event with EventNotification though like:

union Event = LogEvent
union OutputEventsPayload = Event | EventNotification

@leebenson
Copy link
Member Author

@jszwedko - unfortunately, I don't think that'll work due to this spec constraint:

The member types of a Union type must all be Object base types; Scalar, Interface and Union types must not be member types of a Union. Similarly, wrapping types must not be member types of a Union.

@jszwedko
Copy link
Member

@jszwedko - unfortunately, I don't think that'll work due to this spec constraint:

The member types of a Union type must all be Object base types; Scalar, Interface and Union types must not be member types of a Union. Similarly, wrapping types must not be member types of a Union.

Ah 😢

I still think we should split them up though. So I guess we'll need to do:

union Event = LogEvent
union OutputEventsPayload = LogEvent | EventNotification

I'm also a fan of dropping Event from LogEvent and just calling it Log unless there is a reason not to.

@leebenson
Copy link
Member Author

leebenson commented Mar 26, 2021

I still think we should split them up though. So I guess we'll need to do:

union Event = LogEvent
union OutputEventsPayload = LogEvent | EventNotification

Gotcha, yeah, I think it does make sense to scope the union name here to a more precise use-case than Event.

That also opens up the possibility of making Event an interface, rather than a union. There's likely to be some common fields and it's one less switch statement in a connecting client to unravel the type.

I'm also a fan of dropping Event from LogEvent and just calling it Log unless there is a reason not to.

I went with LogEvent just because that's part of the lexicon we're using internally, but I like the idea of simplifying for a client.

I'll update 👍🏻

Signed-off-by: Lee Benson <lee@leebenson.com>
@leebenson
Copy link
Member Author

@jszwedko - made the change in 10df625.

I punted on the Event interface for now, since we don't have an immediate need. I'll tackle that in #6892.

Copy link
Member

@jszwedko jszwedko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thanks @leebenson !

I still defer to @lukesteensen's thoughts on the topology bits, but this looks good to me!

Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
@leebenson
Copy link
Member Author

leebenson commented Mar 29, 2021

I found a subtle bug at the weekend that should be fixed by ab0252b.

If a configuration is changed while an active subscription is in flight, and the subscription is later closed by the client, topology would panic because the event handler went away. This didn't happen for subscriptions that were terminated before the change, or for subscriptions that didn't change over subsequent reloads.

To fix this, the control channel is now cloned and a fanout::ControlMessage::Remove(id) is triggered at the point of falling out of scope in the event handler itself, which is determined with a oneshot shutdown channel.

Since we're not doing anything with ConfigDiff in the tap handler, subscriptions with the same name spawn a second event handler and replace the cache with it. This has the effect of dropping the first handler, since the sink holds on to the oneshot sender. Since the new channel is added right before it replaces the sink cache, this effectively takes over from the previous event handler.

When a subscription is dropped, the HashMap holding the component name -> shutdown channel is dropped, triggering a shutdown of all event handlers.

Copy link
Member

@lukesteensen lukesteensen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I hadn't really thought about letting things subscribe to topology changes like this but it seems to work nicely here.

src/api/schema/events/encoding.rs Show resolved Hide resolved
src/api/schema/events/mod.rs Outdated Show resolved Hide resolved

// Random number generator to allow for sampling. Speed trumps cryptographic security here.
// The RNG must be Send + Sync to use with the `select!` loop below, hence `SmallRng`.
let mut rng = SmallRng::from_entropy();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this block? From the docs it seems possible but I'm not sure if it's something to worry about or not.

Copy link
Member Author

@leebenson leebenson Mar 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the docs:

RNG recommended when small state, cheap initialization and good performance are required. The PRNG algorithm in SmallRng is chosen to be efficient on the current platform, without consideration for cryptography or security. The size of its state is much smaller than for StdRng

This is initialised once in an async task (each GraphQL subscription, and possibly each field, is a separate task), and is invoked a maximum of the total number of logs for the observed component(s) over a given interval, minus the provided limit.

Running the bench for seq_iter_choose_from_1000 locally produces 5354 MB/s of usize values, which I think is ~701,759,488/sec on a 64 bit platform (macOS, 2.3 Ghz 8 core i9 in this case.)

I don't think this is anything to be concerned with, but open to alternatives -- I chose this because it purports to be fast and thread safe.

#[cfg(feature = "api")]
// Pass the new config to the API server.
if let Some(ref api_server) = api_server {
api_server.update_config(topology.config());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to update the config here if the server is able to watch the topology?

Copy link
Member Author

@leebenson leebenson Mar 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, this would be updated to follow the same pattern. It's a fair amount of work, since metrics subscriptions are built around the notion of global state. I think this is a good follow-up issue. Tracking in #6939.


/// Returns a tap handler that listens for topology changes, and connects sinks to observe
/// `LogEvent`s` when a component matches one or more of the provided patterns.
async fn tap_handler(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding correctly, we spawn one of these per client connection?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. When the subscription comes in, the process is roughly:

  1. The client hits the /graphql endpoint; the request is upgraded to a WebSocket.
  2. RunningTopology's watch_tx is injected into the request context, persisting for the life of the request.
  3. The subscription (and possibly the individual field; in this case, output_events) is resolved asynchronously.
  4. In the body of the output_events resolver, create_events_stream is called, returning an impl Stream<Item = Vec<OutputEventsPayload>>
  5. Inside create_events_stream, an event is spawned, calling TapSink::new, which spawns the tap_handler
  6. Each tap_handler receives a pattern of components to match against, and selects over RunningTopology.watch to get recent/updated config.
  7. When a component matches, a sink is connected to the output ControlChannel, which spawns an event handler, returning the sink and a oneshot::Sender to trigger event shutdown. The shutdown channel is stored along with the component name in a HashMap.
  8. LogEvents fan-in to the subscription's event_rx, collecting events from component(s).
  9. Events are sampled per the reservoir sampling strategy here, randomly evicting entries from a subscription cache.
  10. Each interval, results are drained and emitted to event_rx.
  11. Results stream back to the client over the active WebSocket.

If event_rx go away (either explicitly with a "stop" WS frame from the client, or the WS connection itself going away), the stream is terminated which drops each of the event handlers along the way.

Likewise, if topology changes, sinks are reconnected. The process of overwriting the sink cache replaces the existing sink cache, which implicitly drops the oneshot::Sender, dropping the event handler for the previous sink. This is in lieu of reading ConfigDiff to determine which are "new" components with the same name, and which are unchanged.

It's a little complicated to follow, but the end result is a fan-in pattern with results streamed back to the client, while also cleaning up when the client goes away or topology changes, with minimal touch points with RunningTopology.

src/api/tap.rs Outdated Show resolved Hide resolved
Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
Signed-off-by: Lee Benson <lee@leebenson.com>
@leebenson
Copy link
Member Author

@lukesteensen, TapSink now implements futures::Sink -- would you mind giving 6d80f8e a look over and letting me know if this is the kind of thing you had in mind? Thanks!

@leebenson leebenson requested a review from lukesteensen March 31, 2021 19:05
src/api/tap.rs Outdated Show resolved Hide resolved
src/api/tap.rs Outdated Show resolved Hide resolved
src/api/tap.rs Outdated Show resolved Hide resolved
src/api/tap.rs Outdated Show resolved Hide resolved
@leebenson
Copy link
Member Author

I've updated this PR to work alongside the tokio 1.x upgrade, and refactored the tap sink to be non-blocking.

Signed-off-by: Lee Benson <lee@leebenson.com>
@leebenson leebenson merged commit e062486 into master Apr 6, 2021
@leebenson leebenson deleted the leebenson/tap2 branch April 6, 2021 18:35
@leebenson
Copy link
Member Author

@lukesteensen, this is merged after refactoring TapSink to be non-blocking. I think I caught everything, but if any further changes are required, just let me know and I'll prioritise in a follow-up. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: api Anything related to Vector's GraphQL API type: enhancement A value-adding code change that enhances its existing functionality.
Projects
None yet
4 participants