Skip to content

Commit

Permalink
loki_out: add stuctured_metadata_map_keys
Browse files Browse the repository at this point in the history
* Adds stuctured_metadata_map_keys config to dynamically populate stuctured_metadata from a map

Signed-off-by: Greg Eales <0x006EA1E5@gmail.com>
  • Loading branch information
0x006EA1E5 committed Oct 29, 2024
1 parent e7c3e93 commit e9fee7d
Show file tree
Hide file tree
Showing 3 changed files with 318 additions and 22 deletions.
135 changes: 130 additions & 5 deletions plugins/out_loki/loki.c
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,13 @@ static void flb_loki_kv_exit(struct flb_loki *ctx)
mk_list_foreach_safe(head, tmp, &ctx->structured_metadata_list) {
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* unlink and destroy */
mk_list_del(&kv->_head);
flb_loki_kv_destroy(kv);
}
mk_list_foreach_safe(head, tmp, &ctx->structured_metadata_map_keys_list) {
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* unlink and destroy */
mk_list_del(&kv->_head);
flb_loki_kv_destroy(kv);
Expand Down Expand Up @@ -416,6 +423,87 @@ static void pack_kv(struct flb_loki *ctx,
}
}

/* Similar to pack_kv above, except will only use msgpack_objects of type
* MSGPACK_OBJECT_MAP, and will iterate over the keys adding each entry as a separate
* item. Non-string map values are serialised to JSON, as Loki requires all values to be
* strings.
*/
static void pack_maps(struct flb_loki *ctx,
msgpack_packer *mp_pck,
char *tag, int tag_len,
msgpack_object *map,
struct flb_mp_map_header *mh,
struct mk_list *list)
{
struct mk_list *head;
struct flb_loki_kv *kv;

msgpack_object *start_key;
msgpack_object *out_key;
msgpack_object *out_val;

msgpack_object_map accessed_map;
uint32_t accessed_map_index;
msgpack_object_kv accessed_map_kv;

char *accessed_map_val_json;

mk_list_foreach(head, list) {
/* get the flb_loki_kv for this iteration of the loop */
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* record accessor key/value pair */
// TODO what if kv->ra_val != NULL?
if (kv->ra_key != NULL && kv->ra_val == NULL) {

/* try to get the value for the record accessor */
if (flb_ra_get_kv_pair(kv->ra_key, *map, &start_key, &out_key, &out_val)
== MSGPACK_UNPACK_CONTINUE) {

/* we require the value to be a map, or it doesn't make sense as this is
* adding a map's key / values */
if (out_val->type != MSGPACK_OBJECT_MAP || out_val->via.map.size <= 0) {
flb_plg_debug(ctx->ins, "No valid map data found for key %s",
kv->ra_key->pattern);
} else {
accessed_map = out_val->via.map;

/* for each entry in the accessed map... */
for (accessed_map_index = 0; accessed_map_index < accessed_map.size;
++accessed_map_index) {

/* get the entry */
accessed_map_kv = accessed_map.ptr[accessed_map_index];

/* Pack the key and value */
flb_mp_map_header_append(mh);

pack_label_key(mp_pck, (char*) accessed_map_kv.key.via.str.ptr,
accessed_map_kv.key.via.str.size);
/* Does this need optimising? For example, to handle bool as
* non-JSON? */
if (accessed_map_kv.val.type == MSGPACK_OBJECT_STR) {
msgpack_pack_str_with_body(mp_pck,
accessed_map_kv.val.via.str.ptr,
accessed_map_kv.val.via.str.size);
}
/* convert value to JSON, as Loki expects a string value */
else {
accessed_map_val_json = flb_msgpack_to_json_str(1024,
&accessed_map_kv.val);
if (accessed_map_val_json) {
msgpack_pack_str_with_body(mp_pck, accessed_map_val_json,
strlen(accessed_map_val_json));
flb_free(accessed_map_val_json);
}
}
}
}
}
}
}
}

static flb_sds_t pack_structured_metadata(struct flb_loki *ctx,
msgpack_packer *mp_pck,
char *tag, int tag_len,
Expand All @@ -424,7 +512,13 @@ static flb_sds_t pack_structured_metadata(struct flb_loki *ctx,
struct flb_mp_map_header mh;
/* Initialize dynamic map header */
flb_mp_map_header_init(&mh, mp_pck);
pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->structured_metadata_list);
if (ctx->structured_metadata_map_keys) {
pack_maps(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->structured_metadata_map_keys_list);
}
// explicit structured_metadata entries override structured_metadata_map_keys entries
if (ctx->structured_metadata) {
pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->structured_metadata_list);
}
flb_mp_map_header_end(&mh);
return 0;
}
Expand Down Expand Up @@ -788,6 +882,7 @@ static int parse_labels(struct flb_loki *ctx)

flb_loki_kv_init(&ctx->labels_list);
flb_loki_kv_init(&ctx->structured_metadata_list);
flb_loki_kv_init(&ctx->structured_metadata_map_keys_list);

if (ctx->structured_metadata) {
ret = parse_kv(ctx, ctx->structured_metadata, &ctx->structured_metadata_list, &ra_used);
Expand All @@ -796,6 +891,26 @@ static int parse_labels(struct flb_loki *ctx)
}
}

/* Append structured metadata map keys set in the configuration */
if (ctx->structured_metadata_map_keys) {
mk_list_foreach(head, ctx->structured_metadata_map_keys) {
entry = mk_list_entry(head, struct flb_slist_entry, _head);
if (entry->str[0] != '$') {
flb_plg_error(ctx->ins,
"invalid structured metadata map key, the name must start with '$'");
return -1;
}

ret = flb_loki_kv_append(ctx, &ctx->structured_metadata_map_keys_list, entry->str, NULL);
if (ret == -1) {
return -1;
}
else if (ret > 0) {
ra_used++;
}
}
}

if (ctx->labels) {
ret = parse_kv(ctx, ctx->labels, &ctx->labels_list, &ra_used);
if (ret == -1) {
Expand Down Expand Up @@ -971,6 +1086,7 @@ static struct flb_loki *loki_config_create(struct flb_output_instance *ins,
ctx->ins = ins;
flb_loki_kv_init(&ctx->labels_list);
flb_loki_kv_init(&ctx->structured_metadata_list);
flb_loki_kv_init(&ctx->structured_metadata_map_keys_list);

/* Register context with plugin instance */
flb_output_set_context(ins, ctx);
Expand Down Expand Up @@ -1539,12 +1655,13 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
while ((ret = flb_log_event_decoder_next(
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2);
msgpack_pack_array(&mp_pck, ctx->structured_metadata ||
ctx->structured_metadata_map_keys ? 3 : 2);

/* Append the timestamp */
pack_timestamp(&mp_pck, &log_event.timestamp);
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
if (ctx->structured_metadata) {
if (ctx->structured_metadata || ctx->structured_metadata_map_keys) {
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, NULL);
}
}
Expand Down Expand Up @@ -1575,12 +1692,13 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
msgpack_pack_str_body(&mp_pck, "values", 6);
msgpack_pack_array(&mp_pck, 1);

msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2);
msgpack_pack_array(&mp_pck, ctx->structured_metadata ||
ctx->structured_metadata_map_keys ? 3 : 2);

/* Append the timestamp */
pack_timestamp(&mp_pck, &log_event.timestamp);
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
if (ctx->structured_metadata) {
if (ctx->structured_metadata || ctx->structured_metadata_map_keys) {
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, log_event.body);
}
}
Expand Down Expand Up @@ -1905,6 +2023,13 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_loki, structured_metadata),
"optional structured metadata fields for API requests."
},

{
FLB_CONFIG_MAP_CLIST, "structured_metadata_map_keys", NULL,
0, FLB_TRUE, offsetof(struct flb_loki, structured_metadata_map_keys),
"optional structured metadata fields, as derived dynamically from configured maps "
"keys, for API requests."
},

{
FLB_CONFIG_MAP_BOOL, "auto_kubernetes_labels", "false",
Expand Down
16 changes: 9 additions & 7 deletions plugins/out_loki/loki.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ struct flb_loki {
struct mk_list *labels;
struct mk_list *label_keys;
struct mk_list *structured_metadata;
struct mk_list *structured_metadata_map_keys;
struct mk_list *remove_keys;

flb_sds_t label_map_path;
Expand All @@ -87,13 +88,14 @@ struct flb_loki {
char *tcp_host;
int out_line_format;
int out_drop_single_key;
int ra_used; /* number of record accessor label keys */
struct flb_record_accessor *ra_k8s; /* kubernetes record accessor */
struct mk_list labels_list; /* list of flb_loki_kv nodes */
struct mk_list structured_metadata_list; /* list of flb_loki_kv nodes */
struct mk_list remove_keys_derived; /* remove_keys with label RAs */
struct flb_mp_accessor *remove_mpa; /* remove_keys multi-pattern accessor */
struct flb_record_accessor *ra_tenant_id_key; /* dynamic tenant id key */
int ra_used; /* number of record accessor label keys */
struct flb_record_accessor *ra_k8s; /* kubernetes record accessor */
struct mk_list labels_list; /* list of flb_loki_kv nodes */
struct mk_list structured_metadata_list; /* list of flb_loki_kv nodes */
struct mk_list structured_metadata_map_keys_list; /* list of flb_loki_kv nodes */
struct mk_list remove_keys_derived; /* remove_keys with label RAs */
struct flb_mp_accessor *remove_mpa; /* remove_keys multi-pattern accessor */
struct flb_record_accessor *ra_tenant_id_key; /* dynamic tenant id key */

struct cfl_list dynamic_tenant_list;
pthread_mutex_t dynamic_tenant_list_lock;
Expand Down
Loading

0 comments on commit e9fee7d

Please sign in to comment.