Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trouble Decoding logs from Kinesis #366

Closed
jscottcronin opened this issue May 27, 2020 · 4 comments
Closed

Trouble Decoding logs from Kinesis #366

jscottcronin opened this issue May 27, 2020 · 4 comments

Comments

@jscottcronin
Copy link

jscottcronin commented May 27, 2020

I'm having trouble properly decoding the Flagr logs from Kinesis stream.

Expected Behavior

Upon a POST request to the Flagr evaluation endpoint, I was expecting to see a json payload in Kinesis look like:

{
   "payload":{
      "evalContext":{
         "enableDebug":true,
         "entityContext":{
            "hello":"world"
         },
         "entityID":"a1234",
         "entityType":"kinesis",
         "flagID":1,
         "flagKey":"kzv4okmdqyskoxner"
      },
      "evalDebugLog":{
         "segmentDebugLogs":[
            {
               "msg":"matched all constraints. rollout yes. {BucketNum:201 DistributionArray:{VariantIDs:[19 20] PercentsAccumulated:[500 1000]} VariantID:19 RolloutPercent:100}",
               "segmentID":10
            }
         ]
      },
      "flagID":1,
      "flagKey":"kzv4okmdqyskoxner",
      "flagSnapshotID":145,
      "segmentID":10,
      "timestamp":"2020-05-27T21:05:20Z",
      "variantAttachment":{

      },
      "variantID":19,
      "variantKey":"on"
   }
}

Current Behavior

Instead, I get back a bytes object (in python) with gibberish characters at the beginning and end, but properly serialized json string in between:

b'\xf3\x89\x9a\xc2\n\x05a1234\x1a\xad\x04\x08\x00\x1a\xa8\x04{"payload":{"evalContext":{"enableDebug":true,"entityContext":{"hello":"world"},"entityID":"a1234","entityType":"kinesis","flagID":1,"flagKey":"kzv4okmdqyskoxner"},"evalDebugLog":{"segmentDebugLogs":[{"msg":"matched all constraints. rollout yes. {BucketNum:201 DistributionArray:{VariantIDs:[19 20] PercentsAccumulated:[500 1000]} VariantID:19 RolloutPercent:100}","segmentID":10}]},"flagID":1,"flagKey":"kzv4okmdqyskoxner","flagSnapshotID":145,"segmentID":10,"timestamp":"2020-05-27T21:05:20Z","variantAttachment":{},"variantID":19,"variantKey":"on"}}\x08\xfb\xec\xbd\r\xf7/\xde\x0f@\x08\xd4\x81\x06S\x17'

I also used awscli to watch, in real time, a base64 encoded log come through kinesis. When I try to decode the base64_string in python with base64.64decode(base64_string), I also replicate the giberrish characters before and after the valid serialized json. This indicates to me that I am not understanding how the json is encoded into base64 representation for kinesis.

Possible Solution

Steps to Reproduce (for bugs)

  1. setup environment variables and build the service
FLAGR_RECORDER_ENABLED=true
FLAGR_RECORDER_TYPE=kinesis
FLAGR_RECORDER_KINESIS_STREAM_NAME=flagr-kinesis-wip
FLAGR_RECORDER_FRAME_OUTPUT_MODE=payload_raw_json
AWS_REGION=us-east-1

Terraform Kinesis Setup:

resource "aws_kinesis_stream" "flagr_stream" {
  name             = "flagr-kinesis-${terraform.workspace}"
  shard_count      = 1
  retention_period = 48

  shard_level_metrics = [
    "IncomingBytes",
    "OutgoingBytes",
  ]

  tags = local.default_tags
}
  1. Create an Experiment Flag with logging turned on for kinesis
    image

  2. using python, create a consumer of the Kinesis messages

import json
import os
import random
import time

import boto3

session = boto3.Session()
client = session.client(
    'kinesis'
)

stream = 'flagr-kinesis-wip'
stream_details = client.describe_stream(StreamName=stream)
shard_id = stream_details['StreamDescription']['Shards'][0]['ShardId']
response = client.get_shard_iterator(
    StreamName=stream,
    ShardId=shard_id,
    ShardIteratorType='LATEST'
)
shard_iterator = response['ShardIterator']
while True:
    response = client.get_records(ShardIterator=shard_iterator, Limit=1)
    shard_iterator = response['NextShardIterator']
    for record in response['Records']:
        if 'Data' in record and len(record['Data']) > 0:
            print(record['Data'])
  1. Using the Evaluation UI, POST a request on Flagr
  2. View the message from Kinesis consumer
b'\xf3\x89\x9a\xc2\n\x05a1234\x1a\xad\x04\x08\x00\x1a\xa8\x04{"payload":{"evalContext":{"enableDebug":true,"entityContext":{"hello":"world"},"entityID":"a1234","entityType":"kinesis","flagID":1,"flagKey":"kzv4okmdqyskoxner"},"evalDebugLog":{"segmentDebugLogs":[{"msg":"matched all constraints. rollout yes. {BucketNum:201 DistributionArray:{VariantIDs:[19 20] PercentsAccumulated:[500 1000]} VariantID:19 RolloutPercent:100}","segmentID":10}]},"flagID":1,"flagKey":"kzv4okmdqyskoxner","flagSnapshotID":145,"segmentID":10,"timestamp":"2020-05-27T21:05:20Z","variantAttachment":{},"variantID":19,"variantKey":"on"}}\x08\xfb\xec\xbd\r\xf7/\xde\x0f@\x08\xd4\x81\x06S\x17'

Context

This is affecting our ability to stand up the service and try it out at our org. Once we have our data pipelines hooked up for analytics, we can then try out for feature flags and A/B testing.

Your Environment

  • Version used (flagr version 1.1.8):
  • Server type and version:
  • Operating System and version (uname -a):
  • Link to your project:
@marceloboeira
Copy link
Member

marceloboeira commented Jun 1, 2020

@jscottcronin thanks for the issue. I can jump into it, but it seems quite similar to another issue reported a while back, and since the implementation hasn't changed since then, it might be worth it to check out.

Could you check the encoding config of your kinesis stream?

More info: #267

Let me know if that has any impact on it, otherwise we can debut it further more!

@jscottcronin
Copy link
Author

jscottcronin commented Jun 1, 2020

@marceloboeira Thanks for the suggestion. #267 looks similar, though I'm unable to find the encoding within the AWS UI for my personal setup.

I ended up resolving this issue by applying firehose to the kinesis stream. Firehose now dumps the messages onto s3, and the message format looks good. Potentially the issue could have been in how I was reading from the stream with python, but at this point, I am not going to investigate any longer.

Thanks for the suggestion!

@danielfsousa
Copy link

I've got the same weird characters using the AWS SDK to read records from the Kinesis Data Stream, this happens because flagr aggregates the logs before sending to Kinesis via RecorderKinesisAggregateBatchCount and RecorderKinesisAggregateBatchSize, and the AWS SDK does not deaggregates the records automatically.

I had to use the kinesis-aggregation library to deaggregate the records.

@jscottcronin
Copy link
Author

@danielfsousa Interesting. I ended up solving this by adding AWS Firehose, applying a lambda function, and dumping results in csv format on S3 which automatically goes into our data warehouse. the Lambda function I applied looks like:

import base64
from collections import OrderedDict
import csv
import io
import json


def convert_json_to_flat(json_record):
    """Flatten JSON Flagr record to ordered dict"""
    payload = json_record.get('payload', {})
    eval_context = payload.get('evalContext', {})

    return OrderedDict([
        ('raw', json.dumps(json_record)),
        ('error_msg', json_record.get('errorMessage', None)),

        ('eval_context_entity_id', eval_context.get('entityID', None)),
        ('eval_context_entity_context', json.dumps(eval_context.get('entityContext', {}))),
        ('eval_context_flag_id', eval_context.get('flagID', None)),
        ('eval_context_flag_key', eval_context.get('flagKey', None)),

        ('eval_debug_log', json.dumps(payload.get('evalDebugLog', {}))),

        ('flag_id', payload.get('flagID', None)),
        ('flag_key', payload.get('flagKey', None)),
        ('flag_snapshot_id', payload.get('flagSnapshotID', None)),
        ('segment_id', payload.get('segmentID', None)),
        ('timestamp', payload.get('timestamp', None)),
        ('variant_attachment', json.dumps(payload.get('variantAttachment', {}))),
        ('variant_id', payload.get('variantID', None)),
        ('variant_key', payload.get('variantKey', None)),
    ])


def convert_flat_to_csv_string(flat_record):
    """CSV record in string format"""
    delimiter = u'\u001F'
    fn = io.StringIO()

    csv_writer = csv.DictWriter(fn, fieldnames=flat_record.keys(), delimiter=delimiter)
    csv_writer.writerow(flat_record)
    csv_string = fn.getvalue()
    return csv_string


def lambda_handler(event, context):
    """converts each record in event to csv_string and encodes to base64"""
    output = []

    for record in event['records']:
        json_record = json.loads(base64.b64decode(record['data']).decode('utf-8'))
        flat_record = convert_json_to_flat(json_record)
        csv_string = convert_flat_to_csv_string(flat_record)
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(csv_string.encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)
    return {'records': output}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants