From 21ac520ebe6327494b10ca9a2aa2f1b2540e9c8b Mon Sep 17 00:00:00 2001 From: Kruno Tomola Fabro Date: Sun, 25 Apr 2021 17:28:13 +0200 Subject: [PATCH] enhancement(graphql api): Expose `events_in` & `events_out` metrics (#6888) * Add in/out to API Signed-off-by: ktf * Update client Signed-off-by: ktf * Update tests Signed-off-by: ktf * Add new line Signed-off-by: ktf * Apply suggestions Co-authored-by: Lee Benson Signed-off-by: ktf * Update schema Signed-off-by: ktf Co-authored-by: Lee Benson --- .../graphql/queries/components.graphql | 18 + lib/vector-api-client/graphql/schema.json | 919 ++++++++++++++++-- .../component_events_in_throughputs.graphql | 6 + .../component_events_in_totals.graphql | 8 + .../component_events_out_throughputs.graphql | 6 + .../component_events_out_totals.graphql | 8 + .../events_in_throughput.graphql | 3 + .../subscriptions/events_in_total.graphql | 5 + .../events_out_throughput.graphql | 3 + .../subscriptions/events_out_total.graphql | 5 + lib/vector-api-client/src/gql/components.rs | 46 + lib/vector-api-client/src/gql/metrics.rs | 230 ++++- .../tests/queries/file_source_metrics.graphql | 3 + src/api/schema/metrics/events_in.rs | 96 ++ src/api/schema/metrics/events_out.rs | 96 ++ src/api/schema/metrics/filter.rs | 36 +- src/api/schema/metrics/mod.rs | 96 ++ src/api/schema/metrics/sink/generic.rs | 10 + src/api/schema/metrics/sink/mod.rs | 6 +- src/api/schema/metrics/source/file.rs | 44 + src/api/schema/metrics/source/generic.rs | 10 + src/api/schema/metrics/source/mod.rs | 6 +- src/api/schema/metrics/transform/generic.rs | 10 + src/api/schema/metrics/transform/mod.rs | 6 +- tests/api.rs | 50 + 25 files changed, 1643 insertions(+), 83 deletions(-) create mode 100644 lib/vector-api-client/graphql/subscriptions/component_events_in_throughputs.graphql create mode 100644 lib/vector-api-client/graphql/subscriptions/component_events_in_totals.graphql create mode 100644 lib/vector-api-client/graphql/subscriptions/component_events_out_throughputs.graphql create mode 100644 lib/vector-api-client/graphql/subscriptions/component_events_out_totals.graphql create mode 100644 lib/vector-api-client/graphql/subscriptions/events_in_throughput.graphql create mode 100644 lib/vector-api-client/graphql/subscriptions/events_in_total.graphql create mode 100644 lib/vector-api-client/graphql/subscriptions/events_out_throughput.graphql create mode 100644 lib/vector-api-client/graphql/subscriptions/events_out_total.graphql create mode 100644 src/api/schema/metrics/events_in.rs create mode 100644 src/api/schema/metrics/events_out.rs diff --git a/lib/vector-api-client/graphql/queries/components.graphql b/lib/vector-api-client/graphql/queries/components.graphql index d26488edb42f9..e8e3170a0fe3d 100644 --- a/lib/vector-api-client/graphql/queries/components.graphql +++ b/lib/vector-api-client/graphql/queries/components.graphql @@ -14,6 +14,12 @@ query ComponentsQuery($first: Int!) { processedBytesTotal { processedBytesTotal } + eventsInTotal { + eventsInTotal + } + eventsOutTotal { + eventsOutTotal + } } } ... on Transform { @@ -25,6 +31,12 @@ query ComponentsQuery($first: Int!) { processedBytesTotal { processedBytesTotal } + eventsInTotal { + eventsInTotal + } + eventsOutTotal { + eventsOutTotal + } } } ... on Sink { @@ -36,6 +48,12 @@ query ComponentsQuery($first: Int!) { processedBytesTotal { processedBytesTotal } + eventsInTotal { + eventsInTotal + } + eventsOutTotal { + eventsOutTotal + } } } } diff --git a/lib/vector-api-client/graphql/schema.json b/lib/vector-api-client/graphql/schema.json index 11d8802f5a549..08f3521b82015 100644 --- a/lib/vector-api-client/graphql/schema.json +++ b/lib/vector-api-client/graphql/schema.json @@ -283,6 +283,178 @@ "name": "ComponentErrorsTotal", "possibleTypes": null }, + { + "description": null, + "enumValues": null, + "fields": [ + { + "args": [], + "deprecationReason": null, + "description": "Component name", + "isDeprecated": false, + "name": "name", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "String", + "ofType": null + } + } + }, + { + "args": [], + "deprecationReason": null, + "description": "Events processed throughput", + "isDeprecated": false, + "name": "throughput", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "Int", + "ofType": null + } + } + } + ], + "inputFields": null, + "interfaces": [], + "kind": "OBJECT", + "name": "ComponentEventsInThroughput", + "possibleTypes": null + }, + { + "description": null, + "enumValues": null, + "fields": [ + { + "args": [], + "deprecationReason": null, + "description": "Component name", + "isDeprecated": false, + "name": "name", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "String", + "ofType": null + } + } + }, + { + "args": [], + "deprecationReason": null, + "description": "Total incoming events metric", + "isDeprecated": false, + "name": "metric", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "OBJECT", + "name": "EventsInTotal", + "ofType": null + } + } + } + ], + "inputFields": null, + "interfaces": [], + "kind": "OBJECT", + "name": "ComponentEventsInTotal", + "possibleTypes": null + }, + { + "description": null, + "enumValues": null, + "fields": [ + { + "args": [], + "deprecationReason": null, + "description": "Component name", + "isDeprecated": false, + "name": "name", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "String", + "ofType": null + } + } + }, + { + "args": [], + "deprecationReason": null, + "description": "Events processed throughput", + "isDeprecated": false, + "name": "throughput", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "Int", + "ofType": null + } + } + } + ], + "inputFields": null, + "interfaces": [], + "kind": "OBJECT", + "name": "ComponentEventsOutThroughput", + "possibleTypes": null + }, + { + "description": null, + "enumValues": null, + "fields": [ + { + "args": [], + "deprecationReason": null, + "description": "Component name", + "isDeprecated": false, + "name": "name", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "String", + "ofType": null + } + } + }, + { + "args": [], + "deprecationReason": null, + "description": "Total outgoing events metric", + "isDeprecated": false, + "name": "metric", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "OBJECT", + "name": "EventsOutTotal", + "ofType": null + } + } + } + ], + "inputFields": null, + "interfaces": [], + "kind": "OBJECT", + "name": "ComponentEventsOutTotal", + "possibleTypes": null + }, { "description": null, "enumValues": [ @@ -666,7 +838,7 @@ "inputFields": null, "interfaces": [], "kind": "OBJECT", - "name": "Cpumetrics", + "name": "CpuMetrics", "possibleTypes": null }, { @@ -905,6 +1077,84 @@ "name": "EventNotificationType", "possibleTypes": null }, + { + "description": null, + "enumValues": null, + "fields": [ + { + "args": [], + "deprecationReason": null, + "description": "Metric timestamp", + "isDeprecated": false, + "name": "timestamp", + "type": { + "kind": "SCALAR", + "name": "DateTime", + "ofType": null + } + }, + { + "args": [], + "deprecationReason": null, + "description": "Total incoming events", + "isDeprecated": false, + "name": "eventsInTotal", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "Float", + "ofType": null + } + } + } + ], + "inputFields": null, + "interfaces": [], + "kind": "OBJECT", + "name": "EventsInTotal", + "possibleTypes": null + }, + { + "description": null, + "enumValues": null, + "fields": [ + { + "args": [], + "deprecationReason": null, + "description": "Metric timestamp", + "isDeprecated": false, + "name": "timestamp", + "type": { + "kind": "SCALAR", + "name": "DateTime", + "ofType": null + } + }, + { + "args": [], + "deprecationReason": null, + "description": "Total outgoing events", + "isDeprecated": false, + "name": "eventsOutTotal", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "Float", + "ofType": null + } + } + } + ], + "inputFields": null, + "interfaces": [], + "kind": "OBJECT", + "name": "EventsOutTotal", + "possibleTypes": null + }, { "description": null, "enumValues": null, @@ -948,6 +1198,30 @@ "name": "ProcessedBytesTotal", "ofType": null } + }, + { + "args": [], + "deprecationReason": null, + "description": "Metric indicating incoming events for the current file", + "isDeprecated": false, + "name": "eventsInTotal", + "type": { + "kind": "OBJECT", + "name": "EventsInTotal", + "ofType": null + } + }, + { + "args": [], + "deprecationReason": null, + "description": "Metric indicating outgoing events for the current file", + "isDeprecated": false, + "name": "eventsOutTotal", + "type": { + "kind": "OBJECT", + "name": "EventsOutTotal", + "ofType": null + } } ], "inputFields": null, @@ -1117,6 +1391,18 @@ "description": null, "isDeprecated": false, "name": "PROCESSED_EVENTS_TOTAL" + }, + { + "deprecationReason": null, + "description": null, + "isDeprecated": false, + "name": "EVENTS_IN_TOTAL" + }, + { + "deprecationReason": null, + "description": null, + "isDeprecated": false, + "name": "EVENTS_OUT_TOTAL" } ], "fields": null, @@ -1238,6 +1524,30 @@ "name": "ProcessedBytesTotal", "ofType": null } + }, + { + "args": [], + "deprecationReason": null, + "description": "Total incoming events for the current file source", + "isDeprecated": false, + "name": "eventsInTotal", + "type": { + "kind": "OBJECT", + "name": "EventsInTotal", + "ofType": null + } + }, + { + "args": [], + "deprecationReason": null, + "description": "Total outgoing events for the current file source", + "isDeprecated": false, + "name": "eventsOutTotal", + "type": { + "kind": "OBJECT", + "name": "EventsOutTotal", + "ofType": null + } } ], "inputFields": null, @@ -1395,6 +1705,30 @@ "name": "ProcessedBytesTotal", "ofType": null } + }, + { + "args": [], + "deprecationReason": null, + "description": "Total incoming events for the current sink", + "isDeprecated": false, + "name": "eventsInTotal", + "type": { + "kind": "OBJECT", + "name": "EventsInTotal", + "ofType": null + } + }, + { + "args": [], + "deprecationReason": null, + "description": "Total outgoing events for the current sink", + "isDeprecated": false, + "name": "eventsOutTotal", + "type": { + "kind": "OBJECT", + "name": "EventsOutTotal", + "ofType": null + } } ], "inputFields": null, @@ -1416,24 +1750,48 @@ { "args": [], "deprecationReason": null, - "description": "Events processed for the current source", + "description": "Events processed for the current source", + "isDeprecated": false, + "name": "processedEventsTotal", + "type": { + "kind": "OBJECT", + "name": "ProcessedEventsTotal", + "ofType": null + } + }, + { + "args": [], + "deprecationReason": null, + "description": "Bytes processed for the current source", + "isDeprecated": false, + "name": "processedBytesTotal", + "type": { + "kind": "OBJECT", + "name": "ProcessedBytesTotal", + "ofType": null + } + }, + { + "args": [], + "deprecationReason": null, + "description": "Total incoming events for the current source", "isDeprecated": false, - "name": "processedEventsTotal", + "name": "eventsInTotal", "type": { "kind": "OBJECT", - "name": "ProcessedEventsTotal", + "name": "EventsInTotal", "ofType": null } }, { "args": [], "deprecationReason": null, - "description": "Bytes processed for the current source", + "description": "Total outgoing events for the current source", "isDeprecated": false, - "name": "processedBytesTotal", + "name": "eventsOutTotal", "type": { "kind": "OBJECT", - "name": "ProcessedBytesTotal", + "name": "EventsOutTotal", "ofType": null } } @@ -1477,6 +1835,30 @@ "name": "ProcessedBytesTotal", "ofType": null } + }, + { + "args": [], + "deprecationReason": null, + "description": "Total incoming events for the current transform", + "isDeprecated": false, + "name": "eventsInTotal", + "type": { + "kind": "OBJECT", + "name": "EventsInTotal", + "ofType": null + } + }, + { + "args": [], + "deprecationReason": null, + "description": "Total outgoing events for the current transform", + "isDeprecated": false, + "name": "eventsOutTotal", + "type": { + "kind": "OBJECT", + "name": "EventsOutTotal", + "ofType": null + } } ], "inputFields": null, @@ -1565,7 +1947,7 @@ "name": null, "ofType": { "kind": "OBJECT", - "name": "Cpumetrics", + "name": "CpuMetrics", "ofType": null } } @@ -1657,6 +2039,16 @@ "name": "Int", "possibleTypes": null }, + { + "description": "Raw JSON data`", + "enumValues": null, + "fields": null, + "inputFields": null, + "interfaces": null, + "kind": "SCALAR", + "name": "Json", + "possibleTypes": null + }, { "description": null, "enumValues": null, @@ -1790,6 +2182,33 @@ "ofType": null } } + }, + { + "args": [ + { + "defaultValue": null, + "description": null, + "name": "field", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "String", + "ofType": null + } + } + } + ], + "deprecationReason": null, + "description": "Get JSON field data on the log event, by field name", + "isDeprecated": false, + "name": "json", + "type": { + "kind": "SCALAR", + "name": "Json", + "ofType": null + } } ], "inputFields": null, @@ -2980,6 +3399,30 @@ "name": "ProcessedBytesTotal", "ofType": null } + }, + { + "args": [], + "deprecationReason": null, + "description": null, + "isDeprecated": false, + "name": "eventsInTotal", + "type": { + "kind": "OBJECT", + "name": "EventsInTotal", + "ofType": null + } + }, + { + "args": [], + "deprecationReason": null, + "description": null, + "isDeprecated": false, + "name": "eventsOutTotal", + "type": { + "kind": "OBJECT", + "name": "EventsOutTotal", + "ofType": null + } } ], "inputFields": null, @@ -3379,6 +3822,30 @@ "name": "ProcessedBytesTotal", "ofType": null } + }, + { + "args": [], + "deprecationReason": null, + "description": null, + "isDeprecated": false, + "name": "eventsInTotal", + "type": { + "kind": "OBJECT", + "name": "EventsInTotal", + "ofType": null + } + }, + { + "args": [], + "deprecationReason": null, + "description": null, + "isDeprecated": false, + "name": "eventsOutTotal", + "type": { + "kind": "OBJECT", + "name": "EventsOutTotal", + "ofType": null + } } ], "inputFields": null, @@ -3675,51 +4142,229 @@ } }, { - "defaultValue": null, - "description": null, - "name": "endsWith", + "defaultValue": null, + "description": null, + "name": "endsWith", + "type": { + "kind": "SCALAR", + "name": "String", + "ofType": null + } + } + ], + "interfaces": null, + "kind": "INPUT_OBJECT", + "name": "StringFilter", + "possibleTypes": null + }, + { + "description": null, + "enumValues": null, + "fields": [ + { + "args": [ + { + "defaultValue": null, + "description": null, + "name": "componentNames", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "LIST", + "name": null, + "ofType": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "String", + "ofType": null + } + } + } + } + }, + { + "defaultValue": "500", + "description": null, + "name": "interval", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "Int", + "ofType": null + } + } + }, + { + "defaultValue": "100", + "description": null, + "name": "limit", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "Int", + "ofType": null + } + } + } + ], + "deprecationReason": null, + "description": "A stream of events emitted from matched component(s)", + "isDeprecated": false, + "name": "outputEvents", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "LIST", + "name": null, + "ofType": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "UNION", + "name": "OutputEventsPayload", + "ofType": null + } + } + } + } + }, + { + "args": [], + "deprecationReason": null, + "description": "Subscribes to all newly added components", + "isDeprecated": false, + "name": "componentAdded", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "INTERFACE", + "name": "Component", + "ofType": null + } + } + }, + { + "args": [], + "deprecationReason": null, + "description": "Subscribes to all removed components", + "isDeprecated": false, + "name": "componentRemoved", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "INTERFACE", + "name": "Component", + "ofType": null + } + } + }, + { + "args": [ + { + "defaultValue": "1000", + "description": null, + "name": "interval", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "Int", + "ofType": null + } + } + } + ], + "deprecationReason": null, + "description": "Metrics for how long the Vector instance has been running.", + "isDeprecated": false, + "name": "uptime", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "OBJECT", + "name": "Uptime", + "ofType": null + } + } + }, + { + "args": [ + { + "defaultValue": "1000", + "description": null, + "name": "interval", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "Int", + "ofType": null + } + } + } + ], + "deprecationReason": null, + "description": "Event processing metrics.", + "isDeprecated": false, + "name": "processedEventsTotal", "type": { - "kind": "SCALAR", - "name": "String", - "ofType": null + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "OBJECT", + "name": "ProcessedEventsTotal", + "ofType": null + } } - } - ], - "interfaces": null, - "kind": "INPUT_OBJECT", - "name": "StringFilter", - "possibleTypes": null - }, - { - "description": null, - "enumValues": null, - "fields": [ + }, { "args": [ { - "defaultValue": null, + "defaultValue": "1000", "description": null, - "name": "componentNames", + "name": "interval", "type": { "kind": "NON_NULL", "name": null, "ofType": { - "kind": "LIST", - "name": null, - "ofType": { - "kind": "NON_NULL", - "name": null, - "ofType": { - "kind": "SCALAR", - "name": "String", - "ofType": null - } - } + "kind": "SCALAR", + "name": "Int", + "ofType": null } } - }, + } + ], + "deprecationReason": null, + "description": "Event processing throughput sampled over the provided millisecond `interval`.", + "isDeprecated": false, + "name": "processedEventsThroughput", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "Int", + "ofType": null + } + } + }, + { + "args": [ { - "defaultValue": "500", + "defaultValue": "1000", "description": null, "name": "interval", "type": { @@ -3731,11 +4376,36 @@ "ofType": null } } - }, + } + ], + "deprecationReason": null, + "description": "Component event processing throughput metrics over `interval`.", + "isDeprecated": false, + "name": "componentProcessedEventsThroughputs", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "LIST", + "name": null, + "ofType": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "OBJECT", + "name": "ComponentProcessedEventsThroughput", + "ofType": null + } + } + } + } + }, + { + "args": [ { - "defaultValue": "100", + "defaultValue": "1000", "description": null, - "name": "limit", + "name": "interval", "type": { "kind": "NON_NULL", "name": null, @@ -3748,9 +4418,9 @@ } ], "deprecationReason": null, - "description": "A stream of events emitted from matched component(s)", + "description": "Component event processing metrics over `interval`.", "isDeprecated": false, - "name": "outputEvents", + "name": "componentProcessedEventsTotals", "type": { "kind": "NON_NULL", "name": null, @@ -3761,8 +4431,8 @@ "kind": "NON_NULL", "name": null, "ofType": { - "kind": "UNION", - "name": "OutputEventsPayload", + "kind": "OBJECT", + "name": "ComponentProcessedEventsTotal", "ofType": null } } @@ -3770,33 +4440,63 @@ } }, { - "args": [], + "args": [ + { + "defaultValue": "1000", + "description": null, + "name": "interval", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "Int", + "ofType": null + } + } + } + ], "deprecationReason": null, - "description": "Subscribes to all newly added components", + "description": "Total incoming events metrics", "isDeprecated": false, - "name": "componentAdded", + "name": "eventsInTotal", "type": { "kind": "NON_NULL", "name": null, "ofType": { - "kind": "INTERFACE", - "name": "Component", + "kind": "OBJECT", + "name": "EventsInTotal", "ofType": null } } }, { - "args": [], + "args": [ + { + "defaultValue": "1000", + "description": null, + "name": "interval", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "Int", + "ofType": null + } + } + } + ], "deprecationReason": null, - "description": "Subscribes to all removed components", + "description": "Total incoming events throughput sampled over the provided millisecond `interval`", "isDeprecated": false, - "name": "componentRemoved", + "name": "eventsInThroughput", "type": { "kind": "NON_NULL", "name": null, "ofType": { - "kind": "INTERFACE", - "name": "Component", + "kind": "SCALAR", + "name": "Int", "ofType": null } } @@ -3819,16 +4519,24 @@ } ], "deprecationReason": null, - "description": "Metrics for how long the Vector instance has been running.", + "description": "Total incoming component events throughput metrics over `interval`", "isDeprecated": false, - "name": "uptime", + "name": "componentEventsInThroughputs", "type": { "kind": "NON_NULL", "name": null, "ofType": { - "kind": "OBJECT", - "name": "Uptime", - "ofType": null + "kind": "LIST", + "name": null, + "ofType": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "OBJECT", + "name": "ComponentEventsInThroughput", + "ofType": null + } + } } } }, @@ -3850,15 +4558,54 @@ } ], "deprecationReason": null, - "description": "Event processing metrics.", + "description": "Total incoming component event metrics over `interval`", "isDeprecated": false, - "name": "processedEventsTotal", + "name": "componentEventsInTotals", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "LIST", + "name": null, + "ofType": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "OBJECT", + "name": "ComponentEventsInTotal", + "ofType": null + } + } + } + } + }, + { + "args": [ + { + "defaultValue": "1000", + "description": null, + "name": "interval", + "type": { + "kind": "NON_NULL", + "name": null, + "ofType": { + "kind": "SCALAR", + "name": "Int", + "ofType": null + } + } + } + ], + "deprecationReason": null, + "description": "Total outgoing events metrics", + "isDeprecated": false, + "name": "eventsOutTotal", "type": { "kind": "NON_NULL", "name": null, "ofType": { "kind": "OBJECT", - "name": "ProcessedEventsTotal", + "name": "EventsOutTotal", "ofType": null } } @@ -3881,9 +4628,9 @@ } ], "deprecationReason": null, - "description": "Event processing throughput sampled over the provided millisecond `interval`.", + "description": "Total outgoing events throughput sampled over the provided millisecond `interval`", "isDeprecated": false, - "name": "processedEventsThroughput", + "name": "eventsOutThroughput", "type": { "kind": "NON_NULL", "name": null, @@ -3912,9 +4659,9 @@ } ], "deprecationReason": null, - "description": "Component event processing throughput metrics over `interval`.", + "description": "Total outgoing component event throughput metrics over `interval`", "isDeprecated": false, - "name": "componentProcessedEventsThroughputs", + "name": "componentEventsOutThroughputs", "type": { "kind": "NON_NULL", "name": null, @@ -3926,7 +4673,7 @@ "name": null, "ofType": { "kind": "OBJECT", - "name": "ComponentProcessedEventsThroughput", + "name": "ComponentEventsOutThroughput", "ofType": null } } @@ -3951,9 +4698,9 @@ } ], "deprecationReason": null, - "description": "Component event processing metrics over `interval`.", + "description": "Total outgoing component event metrics over `interval`", "isDeprecated": false, - "name": "componentProcessedEventsTotals", + "name": "componentEventsOutTotals", "type": { "kind": "NON_NULL", "name": null, @@ -3965,7 +4712,7 @@ "name": null, "ofType": { "kind": "OBJECT", - "name": "ComponentProcessedEventsTotal", + "name": "ComponentEventsOutTotal", "ofType": null } } @@ -4600,6 +5347,30 @@ "name": "ProcessedBytesTotal", "ofType": null } + }, + { + "args": [], + "deprecationReason": null, + "description": null, + "isDeprecated": false, + "name": "eventsInTotal", + "type": { + "kind": "OBJECT", + "name": "EventsInTotal", + "ofType": null + } + }, + { + "args": [], + "deprecationReason": null, + "description": null, + "isDeprecated": false, + "name": "eventsOutTotal", + "type": { + "kind": "OBJECT", + "name": "EventsOutTotal", + "ofType": null + } } ], "inputFields": null, @@ -4787,7 +5558,7 @@ "possibleTypes": null }, { - "description": "A Directive provides a way to describe alternate runtime execution and type validation behavior in a GraphQL document.", + "description": "A Directive provides a way to describe alternate runtime execution and type validation behavior in a GraphQL document.\n\nIn some cases, you need to provide options to alter GraphQL's execution behavior in ways field arguments will not suffice, such as conditionally including or skipping a field. Directives provide this by describing additional information to the executor.", "enumValues": null, "fields": [ { diff --git a/lib/vector-api-client/graphql/subscriptions/component_events_in_throughputs.graphql b/lib/vector-api-client/graphql/subscriptions/component_events_in_throughputs.graphql new file mode 100644 index 0000000000000..226052fd8bbc7 --- /dev/null +++ b/lib/vector-api-client/graphql/subscriptions/component_events_in_throughputs.graphql @@ -0,0 +1,6 @@ +subscription ComponentEventsInThroughputsSubscription($interval: Int!) { + componentEventsInThroughputs(interval: $interval) { + name + throughput + } +} diff --git a/lib/vector-api-client/graphql/subscriptions/component_events_in_totals.graphql b/lib/vector-api-client/graphql/subscriptions/component_events_in_totals.graphql new file mode 100644 index 0000000000000..5d1f842fd44fb --- /dev/null +++ b/lib/vector-api-client/graphql/subscriptions/component_events_in_totals.graphql @@ -0,0 +1,8 @@ +subscription ComponentEventsInTotalsSubscription($interval: Int!) { + componentEventsInTotals(interval: $interval) { + name + metric { + eventsInTotal + } + } +} diff --git a/lib/vector-api-client/graphql/subscriptions/component_events_out_throughputs.graphql b/lib/vector-api-client/graphql/subscriptions/component_events_out_throughputs.graphql new file mode 100644 index 0000000000000..78496802f3cfc --- /dev/null +++ b/lib/vector-api-client/graphql/subscriptions/component_events_out_throughputs.graphql @@ -0,0 +1,6 @@ +subscription ComponentEventsOutThroughputsSubscription($interval: Int!) { + componentEventsOutThroughputs(interval: $interval) { + name + throughput + } +} diff --git a/lib/vector-api-client/graphql/subscriptions/component_events_out_totals.graphql b/lib/vector-api-client/graphql/subscriptions/component_events_out_totals.graphql new file mode 100644 index 0000000000000..e5ae651105a30 --- /dev/null +++ b/lib/vector-api-client/graphql/subscriptions/component_events_out_totals.graphql @@ -0,0 +1,8 @@ +subscription ComponentEventsOutTotalsSubscription($interval: Int!) { + componentEventsOutTotals(interval: $interval) { + name + metric { + eventsOutTotal + } + } +} diff --git a/lib/vector-api-client/graphql/subscriptions/events_in_throughput.graphql b/lib/vector-api-client/graphql/subscriptions/events_in_throughput.graphql new file mode 100644 index 0000000000000..9e36ce5a90f45 --- /dev/null +++ b/lib/vector-api-client/graphql/subscriptions/events_in_throughput.graphql @@ -0,0 +1,3 @@ +subscription EventsInThroughputSubscription($interval: Int!) { + eventsInThroughput(interval: $interval) +} diff --git a/lib/vector-api-client/graphql/subscriptions/events_in_total.graphql b/lib/vector-api-client/graphql/subscriptions/events_in_total.graphql new file mode 100644 index 0000000000000..0c95f786aea65 --- /dev/null +++ b/lib/vector-api-client/graphql/subscriptions/events_in_total.graphql @@ -0,0 +1,5 @@ +subscription EventsInTotalSubscription($interval: Int!) { + eventsInTotal(interval: $interval) { + eventsInTotal + } +} diff --git a/lib/vector-api-client/graphql/subscriptions/events_out_throughput.graphql b/lib/vector-api-client/graphql/subscriptions/events_out_throughput.graphql new file mode 100644 index 0000000000000..230b1b42c1753 --- /dev/null +++ b/lib/vector-api-client/graphql/subscriptions/events_out_throughput.graphql @@ -0,0 +1,3 @@ +subscription EventsOutThroughputSubscription($interval: Int!) { + eventsOutThroughput(interval: $interval) +} diff --git a/lib/vector-api-client/graphql/subscriptions/events_out_total.graphql b/lib/vector-api-client/graphql/subscriptions/events_out_total.graphql new file mode 100644 index 0000000000000..68fe8aea590af --- /dev/null +++ b/lib/vector-api-client/graphql/subscriptions/events_out_total.graphql @@ -0,0 +1,5 @@ +subscription EventsOutTotalSubscription($interval: Int!) { + eventsOutTotal(interval: $interval) { + eventsOutTotal + } +} diff --git a/lib/vector-api-client/src/gql/components.rs b/lib/vector-api-client/src/gql/components.rs index f8696e308152a..44ff33c1d2b01 100644 --- a/lib/vector-api-client/src/gql/components.rs +++ b/lib/vector-api-client/src/gql/components.rs @@ -113,6 +113,52 @@ impl components_query::ComponentsQueryComponentsEdgesNodeOn { .unwrap_or(0), } } + + pub fn events_in_total(&self) -> i64 { + match self { + components_query::ComponentsQueryComponentsEdgesNodeOn::Source(s) => s + .metrics + .events_in_total + .as_ref() + .map(|p| p.events_in_total as i64) + .unwrap_or(0), + components_query::ComponentsQueryComponentsEdgesNodeOn::Transform(t) => t + .metrics + .events_in_total + .as_ref() + .map(|p| p.events_in_total as i64) + .unwrap_or(0), + components_query::ComponentsQueryComponentsEdgesNodeOn::Sink(s) => s + .metrics + .events_in_total + .as_ref() + .map(|p| p.events_in_total as i64) + .unwrap_or(0), + } + } + + pub fn events_out_total(&self) -> i64 { + match self { + components_query::ComponentsQueryComponentsEdgesNodeOn::Source(s) => s + .metrics + .events_out_total + .as_ref() + .map(|p| p.events_out_total as i64) + .unwrap_or(0), + components_query::ComponentsQueryComponentsEdgesNodeOn::Transform(t) => t + .metrics + .events_out_total + .as_ref() + .map(|p| p.events_out_total as i64) + .unwrap_or(0), + components_query::ComponentsQueryComponentsEdgesNodeOn::Sink(s) => s + .metrics + .events_out_total + .as_ref() + .map(|p| p.events_out_total as i64) + .unwrap_or(0), + } + } } impl fmt::Display for components_query::ComponentsQueryComponentsEdgesNodeOn { diff --git a/lib/vector-api-client/src/gql/metrics.rs b/lib/vector-api-client/src/gql/metrics.rs index b6a55c7e9a9ce..6017d427d8601 100644 --- a/lib/vector-api-client/src/gql/metrics.rs +++ b/lib/vector-api-client/src/gql/metrics.rs @@ -43,6 +43,46 @@ pub struct ProcessedEventsThroughputSubscription; )] pub struct ProcessedBytesThroughputSubscription; +/// EventsInTotalSubscription contains metrics on the number of events +/// that have been accepted for processing by a Vector instance. +#[derive(GraphQLQuery, Debug, Copy, Clone)] +#[graphql( + schema_path = "graphql/schema.json", + query_path = "graphql/subscriptions/events_in_total.graphql", + response_derives = "Debug" +)] +pub struct EventsInTotalSubscription; + +/// EventsInThroughputSubscription contains metrics on the number of events +/// that have been accepted for processing between `interval` samples. +#[derive(GraphQLQuery, Debug, Copy, Clone)] +#[graphql( + schema_path = "graphql/schema.json", + query_path = "graphql/subscriptions/events_in_throughput.graphql", + response_derives = "Debug" +)] +pub struct EventsInThroughputSubscription; + +/// EventsOutTotalSubscription contains metrics on the number of events +/// that have been emitted by a Vector instance. +#[derive(GraphQLQuery, Debug, Copy, Clone)] +#[graphql( + schema_path = "graphql/schema.json", + query_path = "graphql/subscriptions/events_out_total.graphql", + response_derives = "Debug" +)] +pub struct EventsOutTotalSubscription; + +/// EventsOutThroughputSubscription contains metrics on the number of events +/// that have been emitted between `interval` samples. +#[derive(GraphQLQuery, Debug, Copy, Clone)] +#[graphql( + schema_path = "graphql/schema.json", + query_path = "graphql/subscriptions/events_out_throughput.graphql", + response_derives = "Debug" +)] +pub struct EventsOutThroughputSubscription; + /// ComponentProcessedEventsThroughputsSubscription contains metrics on the number of events /// that have been processed between `interval` samples, against specific components. #[derive(GraphQLQuery, Debug, Copy, Clone)] @@ -83,7 +123,47 @@ pub struct ComponentProcessedBytesThroughputsSubscription; )] pub struct ComponentProcessedBytesTotalsSubscription; -/// Extension methods for metrics subscriptions. +/// ComponentEventsInThroughputsSubscription contains metrics on the number of events +/// that have been accepted for processing between `interval` samples, against specific components. +#[derive(GraphQLQuery, Debug, Copy, Clone)] +#[graphql( + schema_path = "graphql/schema.json", + query_path = "graphql/subscriptions/component_events_in_throughputs.graphql", + response_derives = "Debug" +)] +pub struct ComponentEventsInThroughputsSubscription; + +/// ComponentEventsInTotalsSubscription contains metrics on the number of events +/// that have been accepted for processing by a Vector instance, against specific components. +#[derive(GraphQLQuery, Debug, Copy, Clone)] +#[graphql( + schema_path = "graphql/schema.json", + query_path = "graphql/subscriptions/component_events_in_totals.graphql", + response_derives = "Debug" +)] +pub struct ComponentEventsInTotalsSubscription; + +/// ComponentEventsOutThroughputsSubscription contains metrics on the number of events +/// that have been emitted between `interval` samples, against specific components. +#[derive(GraphQLQuery, Debug, Copy, Clone)] +#[graphql( + schema_path = "graphql/schema.json", + query_path = "graphql/subscriptions/component_events_out_throughputs.graphql", + response_derives = "Debug" +)] +pub struct ComponentEventsOutThroughputsSubscription; + +/// ComponentEventsOutTotalsSubscription contains metrics on the number of events +/// that have been emitted by a Vector instance, against specific components. +#[derive(GraphQLQuery, Debug, Copy, Clone)] +#[graphql( + schema_path = "graphql/schema.json", + query_path = "graphql/subscriptions/component_events_out_totals.graphql", + response_derives = "Debug" +)] +pub struct ComponentEventsOutTotalsSubscription; + +/// Extension methods for metrics subscriptions pub trait MetricsSubscriptionExt { /// Executes an uptime metrics subscription. fn uptime_subscription(&self) -> crate::BoxedSubscription; @@ -106,7 +186,31 @@ pub trait MetricsSubscriptionExt { interval: i64, ) -> crate::BoxedSubscription; - /// Executes an component events processed totals subscription. + /// Executes an events in metrics subscription + fn events_in_total_subscription( + &self, + interval: i64, + ) -> crate::BoxedSubscription; + + /// Executes an events in throughput subscription. + fn events_in_throughput_subscription( + &self, + interval: i64, + ) -> crate::BoxedSubscription; + + /// Executes an events out metrics subscription. + fn events_out_total_subscription( + &self, + interval: i64, + ) -> crate::BoxedSubscription; + + /// Executes an events out throughput subscription. + fn events_out_throughput_subscription( + &self, + interval: i64, + ) -> crate::BoxedSubscription; + + /// Executes an component events processed totals subscription fn component_processed_events_totals_subscription( &self, interval: i64, @@ -129,6 +233,30 @@ pub trait MetricsSubscriptionExt { &self, interval: i64, ) -> crate::BoxedSubscription; + + /// Executes an component events in totals subscription. + fn component_events_in_totals_subscription( + &self, + interval: i64, + ) -> crate::BoxedSubscription; + + /// Executes an component events in throughputs subscription. + fn component_events_in_throughputs_subscription( + &self, + interval: i64, + ) -> crate::BoxedSubscription; + + /// Executes an component events out totals subscription. + fn component_events_out_totals_subscription( + &self, + interval: i64, + ) -> crate::BoxedSubscription; + + /// Executes an component events in throughputs subscription. + fn component_events_out_throughputs_subscription( + &self, + interval: i64, + ) -> crate::BoxedSubscription; } impl MetricsSubscriptionExt for crate::SubscriptionClient { @@ -175,6 +303,56 @@ impl MetricsSubscriptionExt for crate::SubscriptionClient { self.start::(&request_body) } + /// Executes an events in metrics subscription. + fn events_in_total_subscription( + &self, + interval: i64, + ) -> BoxedSubscription { + let request_body = + EventsInTotalSubscription::build_query(events_in_total_subscription::Variables { + interval, + }); + + self.start::(&request_body) + } + + /// Executes an events in throughput subscription. + fn events_in_throughput_subscription( + &self, + interval: i64, + ) -> BoxedSubscription { + let request_body = EventsInThroughputSubscription::build_query( + events_in_throughput_subscription::Variables { interval }, + ); + + self.start::(&request_body) + } + + /// Executes an events out metrics subscription. + fn events_out_total_subscription( + &self, + interval: i64, + ) -> BoxedSubscription { + let request_body = + EventsOutTotalSubscription::build_query(events_out_total_subscription::Variables { + interval, + }); + + self.start::(&request_body) + } + + /// Executes an events out throughput subscription. + fn events_out_throughput_subscription( + &self, + interval: i64, + ) -> BoxedSubscription { + let request_body = EventsOutThroughputSubscription::build_query( + events_out_throughput_subscription::Variables { interval }, + ); + + self.start::(&request_body) + } + /// Executes an all component events processed totals subscription. fn component_processed_events_totals_subscription( &self, @@ -222,4 +400,52 @@ impl MetricsSubscriptionExt for crate::SubscriptionClient { self.start::(&request_body) } + + /// Executes an all component events in totals subscription. + fn component_events_in_totals_subscription( + &self, + interval: i64, + ) -> BoxedSubscription { + let request_body = ComponentEventsInTotalsSubscription::build_query( + component_events_in_totals_subscription::Variables { interval }, + ); + + self.start::(&request_body) + } + + /// Executes an all component events in throughputs subscription. + fn component_events_in_throughputs_subscription( + &self, + interval: i64, + ) -> BoxedSubscription { + let request_body = ComponentEventsInThroughputsSubscription::build_query( + component_events_in_throughputs_subscription::Variables { interval }, + ); + + self.start::(&request_body) + } + + /// Executes an all component events out totals subscription. + fn component_events_out_totals_subscription( + &self, + interval: i64, + ) -> BoxedSubscription { + let request_body = ComponentEventsOutTotalsSubscription::build_query( + component_events_out_totals_subscription::Variables { interval }, + ); + + self.start::(&request_body) + } + + /// Executes an all component events out throughputs subscription. + fn component_events_out_throughputs_subscription( + &self, + interval: i64, + ) -> BoxedSubscription { + let request_body = ComponentEventsOutThroughputsSubscription::build_query( + component_events_out_throughputs_subscription::Variables { interval }, + ); + + self.start::(&request_body) + } } diff --git a/lib/vector-api-client/tests/queries/file_source_metrics.graphql b/lib/vector-api-client/tests/queries/file_source_metrics.graphql index c019de45f435f..5f99f2aaf2746 100644 --- a/lib/vector-api-client/tests/queries/file_source_metrics.graphql +++ b/lib/vector-api-client/tests/queries/file_source_metrics.graphql @@ -14,6 +14,9 @@ query FileSourceMetricsQuery($after: String, $before: String, $first: Int, $last processedEventsTotal { processedEventsTotal } + eventsInTotal { + eventsInTotal + } } } } diff --git a/src/api/schema/metrics/events_in.rs b/src/api/schema/metrics/events_in.rs new file mode 100644 index 0000000000000..10358effadfae --- /dev/null +++ b/src/api/schema/metrics/events_in.rs @@ -0,0 +1,96 @@ +use crate::event::{Metric, MetricValue}; +use async_graphql::Object; +use chrono::{DateTime, Utc}; + +pub struct EventsInTotal(Metric); + +impl EventsInTotal { + pub fn new(m: Metric) -> Self { + Self(m) + } + + pub fn get_timestamp(&self) -> Option> { + self.0.data.timestamp + } + + pub fn get_events_in_total(&self) -> f64 { + match self.0.data.value { + MetricValue::Counter { value } => value, + _ => 0.00, + } + } +} + +#[Object] +impl EventsInTotal { + /// Metric timestamp + pub async fn timestamp(&self) -> Option> { + self.get_timestamp() + } + + /// Total incoming events + pub async fn events_in_total(&self) -> f64 { + self.get_events_in_total() + } +} + +impl From for EventsInTotal { + fn from(m: Metric) -> Self { + Self(m) + } +} + +pub struct ComponentEventsInTotal { + name: String, + metric: Metric, +} + +impl ComponentEventsInTotal { + /// Returns a new `ComponentEventsInTotal` struct, which is a GraphQL type. The + /// component name is hoisted for clear field resolution in the resulting payload. + pub fn new(metric: Metric) -> Self { + let name = metric.tag_value("component_name").expect( + "Returned a metric without a `component_name`, which shouldn't happen. Please report.", + ); + + Self { name, metric } + } +} + +#[Object] +impl ComponentEventsInTotal { + /// Component name + async fn name(&self) -> &str { + &self.name + } + + /// Total incoming events metric + async fn metric(&self) -> EventsInTotal { + EventsInTotal::new(self.metric.clone()) + } +} + +pub struct ComponentEventsInThroughput { + name: String, + throughput: i64, +} + +impl ComponentEventsInThroughput { + /// Returns a new `ComponentEventsInThroughput`, set to the provided name/throughput values. + pub fn new(name: String, throughput: i64) -> Self { + Self { name, throughput } + } +} + +#[Object] +impl ComponentEventsInThroughput { + /// Component name + async fn name(&self) -> &str { + &self.name + } + + /// Events processed throughput + async fn throughput(&self) -> i64 { + self.throughput + } +} diff --git a/src/api/schema/metrics/events_out.rs b/src/api/schema/metrics/events_out.rs new file mode 100644 index 0000000000000..56aad46ef62b3 --- /dev/null +++ b/src/api/schema/metrics/events_out.rs @@ -0,0 +1,96 @@ +use crate::event::{Metric, MetricValue}; +use async_graphql::Object; +use chrono::{DateTime, Utc}; + +pub struct EventsOutTotal(Metric); + +impl EventsOutTotal { + pub fn new(m: Metric) -> Self { + Self(m) + } + + pub fn get_timestamp(&self) -> Option> { + self.0.data.timestamp + } + + pub fn get_events_out_total(&self) -> f64 { + match self.0.data.value { + MetricValue::Counter { value } => value, + _ => 0.00, + } + } +} + +#[Object] +impl EventsOutTotal { + /// Metric timestamp + pub async fn timestamp(&self) -> Option> { + self.get_timestamp() + } + + /// Total outgoing events + pub async fn events_out_total(&self) -> f64 { + self.get_events_out_total() + } +} + +impl From for EventsOutTotal { + fn from(m: Metric) -> Self { + Self(m) + } +} + +pub struct ComponentEventsOutTotal { + name: String, + metric: Metric, +} + +impl ComponentEventsOutTotal { + /// Returns a new `ComponentEventsOutTotal` struct, which is a GraphQL type. The + /// component name is hoisted for clear field resolution in the resulting payload. + pub fn new(metric: Metric) -> Self { + let name = metric.tag_value("component_name").expect( + "Returned a metric without a `component_name`, which shouldn't happen. Please report.", + ); + + Self { name, metric } + } +} + +#[Object] +impl ComponentEventsOutTotal { + /// Component name + async fn name(&self) -> &str { + &self.name + } + + /// Total outgoing events metric + async fn metric(&self) -> EventsOutTotal { + EventsOutTotal::new(self.metric.clone()) + } +} + +pub struct ComponentEventsOutThroughput { + name: String, + throughput: i64, +} + +impl ComponentEventsOutThroughput { + /// Returns a new `ComponentEventsOutThroughput`, set to the provided name/throughput values + pub fn new(name: String, throughput: i64) -> Self { + Self { name, throughput } + } +} + +#[Object] +impl ComponentEventsOutThroughput { + /// Component name + async fn name(&self) -> &str { + &self.name + } + + /// Events processed throughput + async fn throughput(&self) -> i64 { + self.throughput + } +} diff --git a/src/api/schema/metrics/filter.rs b/src/api/schema/metrics/filter.rs index d61b4915d0364..7807cd75623b5 100644 --- a/src/api/schema/metrics/filter.rs +++ b/src/api/schema/metrics/filter.rs @@ -1,4 +1,4 @@ -use super::{ProcessedBytesTotal, ProcessedEventsTotal}; +use super::{EventsInTotal, EventsOutTotal, ProcessedBytesTotal, ProcessedEventsTotal}; use crate::{ event::{Event, Metric, MetricValue}, metrics::{capture_metrics, get_controller, Controller}, @@ -29,6 +29,8 @@ fn sum_metrics<'a, I: IntoIterator>(metrics: I) -> Option { fn processed_events_total(&self) -> Option; fn processed_bytes_total(&self) -> Option; + fn events_in_total(&self) -> Option; + fn events_out_total(&self) -> Option; } impl<'a> MetricsFilter<'a> for Vec { @@ -43,6 +45,18 @@ impl<'a> MetricsFilter<'a> for Vec { Some(ProcessedBytesTotal::new(sum)) } + + fn events_in_total(&self) -> Option { + let sum = sum_metrics(self.iter().filter(|m| m.name() == "events_in_total"))?; + + Some(EventsInTotal::new(sum)) + } + + fn events_out_total(&self) -> Option { + let sum = sum_metrics(self.iter().filter(|m| m.name() == "events_out_total"))?; + + Some(EventsOutTotal::new(sum)) + } } impl<'a> MetricsFilter<'a> for Vec<&'a Metric> { @@ -65,6 +79,26 @@ impl<'a> MetricsFilter<'a> for Vec<&'a Metric> { Some(ProcessedBytesTotal::new(sum)) } + + fn events_in_total(&self) -> Option { + let sum = sum_metrics( + self.iter() + .filter(|m| m.name() == "events_in_total") + .copied(), + )?; + + Some(EventsInTotal::new(sum)) + } + + fn events_out_total(&self) -> Option { + let sum = sum_metrics( + self.iter() + .filter(|m| m.name() == "events_out_total") + .copied(), + )?; + + Some(EventsOutTotal::new(sum)) + } } /// Returns a stream of `Metric`s, collected at the provided millisecond interval. diff --git a/src/api/schema/metrics/mod.rs b/src/api/schema/metrics/mod.rs index f83866704f7e5..7dc56eec07e49 100644 --- a/src/api/schema/metrics/mod.rs +++ b/src/api/schema/metrics/mod.rs @@ -1,4 +1,6 @@ mod errors; +mod events_in; +mod events_out; pub mod filter; mod host; mod processed_bytes; @@ -13,6 +15,8 @@ use chrono::{DateTime, Utc}; use tokio_stream::{Stream, StreamExt}; pub use errors::{ComponentErrorsTotal, ErrorsTotal}; +pub use events_in::{ComponentEventsInThroughput, ComponentEventsInTotal, EventsInTotal}; +pub use events_out::{ComponentEventsOutThroughput, ComponentEventsOutTotal, EventsOutTotal}; pub use filter::*; pub use host::HostMetrics; pub use processed_bytes::{ @@ -112,6 +116,98 @@ impl MetricsSubscription { }) } + /// Total incoming events metrics + async fn events_in_total( + &self, + #[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32, + ) -> impl Stream { + get_metrics(interval).filter_map(|m| match m.name() { + "events_in_total" => Some(EventsInTotal::new(m)), + _ => None, + }) + } + + /// Total incoming events throughput sampled over the provided millisecond `interval` + async fn events_in_throughput( + &self, + #[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32, + ) -> impl Stream { + counter_throughput(interval, &|m| m.name() == "events_in_total") + .map(|(_, throughput)| throughput as i64) + } + + /// Total incoming component events throughput metrics over `interval` + async fn component_events_in_throughputs( + &self, + #[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32, + ) -> impl Stream> { + component_counter_throughputs(interval, &|m| m.name() == "events_in_total").map(|m| { + m.into_iter() + .map(|(m, throughput)| { + ComponentEventsInThroughput::new( + m.tag_value("component_name").unwrap(), + throughput as i64, + ) + }) + .collect() + }) + } + + /// Total incoming component event metrics over `interval` + async fn component_events_in_totals( + &self, + #[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32, + ) -> impl Stream> { + component_counter_metrics(interval, &|m| m.name() == "events_in_total") + .map(|m| m.into_iter().map(ComponentEventsInTotal::new).collect()) + } + + /// Total outgoing events metrics + async fn events_out_total( + &self, + #[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32, + ) -> impl Stream { + get_metrics(interval).filter_map(|m| match m.name() { + "events_out_total" => Some(EventsOutTotal::new(m)), + _ => None, + }) + } + + /// Total outgoing events throughput sampled over the provided millisecond `interval` + async fn events_out_throughput( + &self, + #[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32, + ) -> impl Stream { + counter_throughput(interval, &|m| m.name() == "events_out_total") + .map(|(_, throughput)| throughput as i64) + } + + /// Total outgoing component event throughput metrics over `interval` + async fn component_events_out_throughputs( + &self, + #[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32, + ) -> impl Stream> { + component_counter_throughputs(interval, &|m| m.name() == "events_out_total").map(|m| { + m.into_iter() + .map(|(m, throughput)| { + ComponentEventsOutThroughput::new( + m.tag_value("component_name").unwrap(), + throughput as i64, + ) + }) + .collect() + }) + } + + /// Total outgoing component event metrics over `interval` + async fn component_events_out_totals( + &self, + #[graphql(default = 1000, validator(IntRange(min = "10", max = "60_000")))] interval: i32, + ) -> impl Stream> { + component_counter_metrics(interval, &|m| m.name() == "events_out_total") + .map(|m| m.into_iter().map(ComponentEventsOutTotal::new).collect()) + } + /// Byte processing metrics. async fn processed_bytes_total( &self, diff --git a/src/api/schema/metrics/sink/generic.rs b/src/api/schema/metrics/sink/generic.rs index 50e034ea99a79..2391686f99bc2 100644 --- a/src/api/schema/metrics/sink/generic.rs +++ b/src/api/schema/metrics/sink/generic.rs @@ -24,4 +24,14 @@ impl GenericSinkMetrics { pub async fn processed_bytes_total(&self) -> Option { self.0.processed_bytes_total() } + + /// Total incoming events for the current sink + pub async fn events_in_total(&self) -> Option { + self.0.events_in_total() + } + + /// Total outgoing events for the current sink + pub async fn events_out_total(&self) -> Option { + self.0.events_out_total() + } } diff --git a/src/api/schema/metrics/sink/mod.rs b/src/api/schema/metrics/sink/mod.rs index 32863dde38a45..8a0cfffc27864 100644 --- a/src/api/schema/metrics/sink/mod.rs +++ b/src/api/schema/metrics/sink/mod.rs @@ -1,13 +1,15 @@ mod generic; -use super::{ProcessedBytesTotal, ProcessedEventsTotal}; +use super::{EventsInTotal, EventsOutTotal, ProcessedBytesTotal, ProcessedEventsTotal}; use crate::event::Metric; use async_graphql::Interface; #[derive(Debug, Clone, Interface)] #[graphql( field(name = "processed_events_total", type = "Option"), - field(name = "processed_bytes_total", type = "Option") + field(name = "processed_bytes_total", type = "Option"), + field(name = "events_in_total", type = "Option"), + field(name = "events_out_total", type = "Option") )] pub enum SinkMetrics { GenericSinkMetrics(generic::GenericSinkMetrics), diff --git a/src/api/schema/metrics/source/file.rs b/src/api/schema/metrics/source/file.rs index 4ce67cd2da483..1ee8b2e3f3d5e 100644 --- a/src/api/schema/metrics/source/file.rs +++ b/src/api/schema/metrics/source/file.rs @@ -43,6 +43,16 @@ impl<'a> FileSourceMetricFile<'a> { async fn processed_bytes_total(&self) -> Option { self.metrics.processed_bytes_total() } + + /// Metric indicating incoming events for the current file + async fn events_in_total(&self) -> Option { + self.metrics.events_in_total() + } + + /// Metric indicating outgoing events for the current file + async fn events_out_total(&self) -> Option { + self.metrics.events_out_total() + } } #[derive(Debug, Clone)] @@ -72,6 +82,8 @@ pub enum FileSourceMetricFilesSortFieldName { Name, ProcessedBytesTotal, ProcessedEventsTotal, + EventsInTotal, + EventsOutTotal, } impl sort::SortableByField for FileSourceMetricFile<'_> { @@ -100,6 +112,28 @@ impl sort::SortableByField for FileSourceMet .map(|m| m.get_processed_events_total() as i64) .unwrap_or(0), ), + FileSourceMetricFilesSortFieldName::EventsInTotal => Ord::cmp( + &self + .metrics + .events_in_total() + .map(|m| m.get_events_in_total() as i64) + .unwrap_or(0), + &rhs.metrics + .events_in_total() + .map(|m| m.get_events_in_total() as i64) + .unwrap_or(0), + ), + FileSourceMetricFilesSortFieldName::EventsOutTotal => Ord::cmp( + &self + .metrics + .events_out_total() + .map(|m| m.get_events_out_total() as i64) + .unwrap_or(0), + &rhs.metrics + .events_out_total() + .map(|m| m.get_events_out_total() as i64) + .unwrap_or(0), + ), } } } @@ -160,6 +194,16 @@ impl FileSourceMetrics { pub async fn processed_bytes_total(&self) -> Option { self.0.processed_bytes_total() } + + /// Total incoming events for the current file source + pub async fn events_in_total(&self) -> Option { + self.0.events_in_total() + } + + /// Total outgoing events for the current file source + pub async fn events_out_total(&self) -> Option { + self.0.events_out_total() + } } #[cfg(test)] diff --git a/src/api/schema/metrics/source/generic.rs b/src/api/schema/metrics/source/generic.rs index e17700190ad74..9655bf98cb9db 100644 --- a/src/api/schema/metrics/source/generic.rs +++ b/src/api/schema/metrics/source/generic.rs @@ -24,4 +24,14 @@ impl GenericSourceMetrics { pub async fn processed_bytes_total(&self) -> Option { self.0.processed_bytes_total() } + + /// Total incoming events for the current source + pub async fn events_in_total(&self) -> Option { + self.0.events_in_total() + } + + /// Total outgoing events for the current source + pub async fn events_out_total(&self) -> Option { + self.0.events_out_total() + } } diff --git a/src/api/schema/metrics/source/mod.rs b/src/api/schema/metrics/source/mod.rs index 15263ca6cf964..b027eda3444db 100644 --- a/src/api/schema/metrics/source/mod.rs +++ b/src/api/schema/metrics/source/mod.rs @@ -1,14 +1,16 @@ pub mod file; mod generic; -use super::{ProcessedBytesTotal, ProcessedEventsTotal}; +use super::{EventsInTotal, EventsOutTotal, ProcessedBytesTotal, ProcessedEventsTotal}; use crate::event::Metric; use async_graphql::Interface; #[derive(Debug, Clone, Interface)] #[graphql( field(name = "processed_events_total", type = "Option"), - field(name = "processed_bytes_total", type = "Option") + field(name = "processed_bytes_total", type = "Option"), + field(name = "events_in_total", type = "Option"), + field(name = "events_out_total", type = "Option") )] pub enum SourceMetrics { GenericSourceMetrics(generic::GenericSourceMetrics), diff --git a/src/api/schema/metrics/transform/generic.rs b/src/api/schema/metrics/transform/generic.rs index b1cfdfc86db3d..5432909cd2a04 100644 --- a/src/api/schema/metrics/transform/generic.rs +++ b/src/api/schema/metrics/transform/generic.rs @@ -24,4 +24,14 @@ impl GenericTransformMetrics { pub async fn processed_bytes_total(&self) -> Option { self.0.processed_bytes_total() } + + /// Total incoming events for the current transform + pub async fn events_in_total(&self) -> Option { + self.0.events_in_total() + } + + /// Total outgoing events for the current transform + pub async fn events_out_total(&self) -> Option { + self.0.events_out_total() + } } diff --git a/src/api/schema/metrics/transform/mod.rs b/src/api/schema/metrics/transform/mod.rs index 149e384e7e505..2670ed9776020 100644 --- a/src/api/schema/metrics/transform/mod.rs +++ b/src/api/schema/metrics/transform/mod.rs @@ -1,13 +1,15 @@ mod generic; -use super::{ProcessedBytesTotal, ProcessedEventsTotal}; +use super::{EventsInTotal, EventsOutTotal, ProcessedBytesTotal, ProcessedEventsTotal}; use crate::event::Metric; use async_graphql::Interface; #[derive(Debug, Clone, Interface)] #[graphql( field(name = "processed_events_total", type = "Option"), - field(name = "processed_bytes_total", type = "Option") + field(name = "processed_bytes_total", type = "Option"), + field(name = "events_in_total", type = "Option"), + field(name = "events_out_total", type = "Option") )] pub enum TransformMetrics { GenericTransformMetrics(generic::GenericTransformMetrics), diff --git a/tests/api.rs b/tests/api.rs index 65eccaf070501..c4628cf3a44d5 100644 --- a/tests/api.rs +++ b/tests/api.rs @@ -540,6 +540,55 @@ mod tests { ) } + #[test] + #[allow(clippy::float_cmp)] + /// Tests componentEventsOutTotals returns increasing metrics, ordered by + /// source -> transform -> sink + fn api_graphql_component_events_out_totals() { + metrics_test("tests::api_graphql_component_events_out_totals", async { + let conf = r#" + [api] + enabled = true + + [sources.events_out_total_batch_source] + type = "generator" + format = "shuffle" + lines = ["Random line", "And another"] + interval = 0.01 + + [sinks.events_out_total_batch_sink] + # General + type = "blackhole" + inputs = ["events_out_total_batch_source"] + print_amount = 100000 + "#; + + let topology = from_str_config(conf).await; + + tokio::time::delay_for(tokio::time::Duration::from_millis(500)).await; + + let server = api::Server::start(topology.config()); + let client = new_subscription_client(server.addr()).await; + let subscription = client.component_events_out_totals_subscription(500); + + let data = subscription + .stream() + .skip(1) + .take(1) + .map(|r| r.unwrap().data.unwrap().component_events_out_totals) + .next() + .await + .expect("Didn't return results"); + + for name in &[ + "events_out_total_batch_source", + "events_out_total_batch_sink", + ] { + assert!(data.iter().any(|d| d.name == *name)); + } + }) + } + #[test] #[allow(clippy::float_cmp)] /// Tests componentProcessedBytesTotals returns increasing metrics, ordered by @@ -921,6 +970,7 @@ mod tests { let node = &files.edges.iter().flatten().next().unwrap().as_ref().unwrap().node; assert_eq!(node.name, path); assert_eq!(node.processed_events_total.as_ref().unwrap().processed_events_total as usize, lines.len()); + assert_eq!(node.events_in_total.as_ref().unwrap().events_in_total as usize, lines.len()); } _ => panic!("not a file source"), }