Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client Logs Data Flow script update #1055

Merged
merged 8 commits into from
Sep 30, 2019
6 changes: 2 additions & 4 deletions packages/analytics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ A similar job runs in the `celo-testnet-production` project.

The Client Logs Exporter takes a stream of [Google Cloud Storage notifications](https://cloud.google.com/storage/docs/pubsub-notifications) published to a pub-sub topic (in `celo-testnet` project) about newly-uploaded [client logs](https://console.cloud.google.com/storage/browser/celo-org-mobile.appspot.com/logs/?project=celo-org-mobile&organizationId=54829595577) (in `celo-mobile-app` project). It processes those notifications and emits the contents of those files to StackDriver (to the log named by `--output`.)

The job expects client logs to be located at a path `logs/$BUNDLEID/$ENV/$TIMESTAMP_$PHONE_$SUFFIX.txt`.
The job expects client logs to be located at a path `logs/$DATE/$BUNDLEID/$ENV/$TIMESTAMP_$PHONE_$SUFFIX.txt`.

Each instance of the job filters on the `$ENV` path component to match the value passed by `--env`. In this way, one Dataflow job needs to be run per environment.
Each instance of the job filters on the `$ENV` path component to match the value passed by `--env`. In this way, one Dataflow job needs to be run per environment. If no value is passed all environments are matched.

The notification was created as follows:

Expand All @@ -82,10 +82,8 @@ The currently deployed job was deployed with:

```bash
python client_log_exporter.py \
--env integration \
--input projects/celo-testnet/topics/clientlogs \
--bucket celo-org-mobile.appspot.com \
--output client-logs-integration \
--streaming \
--runner=DataflowRunner \
--project celo-testnet \
Expand Down
106 changes: 50 additions & 56 deletions packages/analytics/client_log_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import re
import json
import urllib2
import sys
import textwrap

import apache_beam as beam
from apache_beam.io.gcp.pubsub import ReadFromPubSub
Expand All @@ -25,95 +27,87 @@ def __init__(self, env, bucket_name, log_name, pipeline_args):


def parse_element(self, element):

if not self.gcs:
# These imports have to be nested (ugh) because the constructor and the
# main pipeline get evaluated locally when deploying remotely from
# the cmdline, and this class is only available when running on GCS
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
self.gcs = GCSFileSystem(PipelineOptions(self.pipeline_args))
self.logger = stackdriver_logging.Client().logger(self.log_name)

message = json.loads(element.data)
bucket = message['bucket']

# Only import from the bucket we are expecting.
if bucket != self.bucket_name:
return []

filepath = message['name']
logging.info('Got file: %s, %s', bucket, filepath)

logging.info('Got -: %s', message)

logline_metadata = None

# try:
# Split path component. Expecting logs/bundleId/env/
# Split path component. Expecting logs/date/bundleId/env/
path_comps = filepath.split('/')

if len(path_comps) < 3 or path_comps[2] != self.env:
if len(path_comps) < 3 or (path_comps[3] != self.env and self.env is not None):
logging.info('Skipping %s', filepath)

return []

name = path_comps[len(path_comps)-1]
if name.endswith('.txt'):
name = name[0:len(name)-4]
name_comps = name.split('_')

self.env = path_comps[3]
self.log_name = 'client-logs-%s'%(self.env) if self.log_name is None else self.log_name
logline_metadata = {
'suffix' : name_comps[2],
'bundleId': path_comps[1],
'env': path_comps[2],
'bundleId': path_comps[2],
'env': path_comps[3],
'phone': urllib2.unquote(name_comps[0]).decode('utf8'),
'filepath': filepath
}
# except:
# logging.warn("Couldn't read metadata for %s", filepath)
# return []

self.logline_metadata = logline_metadata
logging.info('Got file: %s with %s', filepath, logline_metadata)

severity_pattern = re.compile('^([A-Za-z]+)')
severity_remappings = {
'TRACE' : 'DEBUG',
'LOG' : 'DEBUG',
'WARN' : 'WARNING',
'CRIT' : 'CRITICAL'
}
if not self.gcs:
# These imports have to be nested (ugh) because the constructor and the
# main pipeline get evaluated locally when deploying remotely from
# the cmdline, and this class is only available when running on GCS
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
self.gcs = GCSFileSystem(PipelineOptions(self.pipeline_args))
self.logger = stackdriver_logging.Client().logger(self.log_name)

# Read the whole file (ugh) from GCS. Without SDoFns support in Python, that's the best
# we can do in dataflow right now.

with self.gcs.open('gs://%s/%s' % (bucket, filepath), mime_type='text/plain') as infile:

for line in infile:

# Build log element from message, and labels from metadata
log_element = dict(logline_metadata)
log_element['msg'] = line

# Try to parse out the severity from the start of the line
# And try and make sure it maps to a valid SD severity
match = severity_pattern.match(line)
if match:
log_severity = match.group(1).upper()
log_severity = severity_remappings.get(log_severity, log_severity)

try:
# Write the struct to SD using the hopefully valid severity
self.logger.log_struct(log_element, severity=log_severity)
except:
# Write the struct to SD without a severity
self.logger.log_struct(log_element)

if sys.getsizeof(line) > 1000:
lines = textwrap.wrap(line, 1000, break_long_words=False)
for text in lines:
self.writeLog(text)
else:
# Write the struct to SD without a severity
self.logger.log_struct(log_element)

self.writeLog(line)
return []

def writeLog(self, text):
severity_pattern = re.compile('^([A-Za-z]+)')
severity_remappings = {
'TRACE' : 'DEBUG',
'LOG' : 'DEBUG',
'WARN' : 'WARNING',
'CRIT' : 'CRITICAL'
}
# Build log element from message, and labels from metadata
log_element = dict(self.logline_metadata)
log_element['msg'] = text

# Try to parse out the severity from the start of the line
# And try and make sure it maps to a valid SD severity
match = severity_pattern.match(text)
if match:
log_severity = match.group(1).upper()
log_severity = severity_remappings.get(log_severity, log_severity)
try:
# Write the struct to SD using the hopefully valid severity
self.logger.log_struct(log_element, severity=log_severity)
except:
# Write the struct to SD without a severity
self.logger.log_struct(log_element)
else:
# Write the struct to SD without a severity
self.logger.log_struct(log_element)



def expand(self, pcoll):
return pcoll | 'ReadGCSNotifications' >> beam.FlatMap(self.parse_element)
Expand Down