Skip to content

Commit

Permalink
Client Logs Data Flow script update (#1055)
Browse files Browse the repository at this point in the history
* fix client-log-exported.py dataflow script

* remove stale file

* split long lines
  • Loading branch information
rohit-dua committed Sep 30, 2019
1 parent 80cd22b commit 8de7237
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 60 deletions.
6 changes: 2 additions & 4 deletions packages/analytics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ A similar job runs in the `celo-testnet-production` project.

The Client Logs Exporter takes a stream of [Google Cloud Storage notifications](https://cloud.google.com/storage/docs/pubsub-notifications) published to a pub-sub topic (in `celo-testnet` project) about newly-uploaded [client logs](https://console.cloud.google.com/storage/browser/celo-org-mobile.appspot.com/logs/?project=celo-org-mobile&organizationId=54829595577) (in `celo-mobile-app` project). It processes those notifications and emits the contents of those files to StackDriver (to the log named by `--output`.)

The job expects client logs to be located at a path `logs/$BUNDLEID/$ENV/$TIMESTAMP_$PHONE_$SUFFIX.txt`.
The job expects client logs to be located at a path `logs/$DATE/$BUNDLEID/$ENV/$TIMESTAMP_$PHONE_$SUFFIX.txt`.

Each instance of the job filters on the `$ENV` path component to match the value passed by `--env`. In this way, one Dataflow job needs to be run per environment.
Each instance of the job filters on the `$ENV` path component to match the value passed by `--env`. In this way, one Dataflow job needs to be run per environment. If no value is passed all environments are matched.

The notification was created as follows:

Expand All @@ -82,10 +82,8 @@ The currently deployed job was deployed with:

```bash
python client_log_exporter.py \
--env integration \
--input projects/celo-testnet/topics/clientlogs \
--bucket celo-org-mobile.appspot.com \
--output client-logs-integration \
--streaming \
--runner=DataflowRunner \
--project celo-testnet \
Expand Down
106 changes: 50 additions & 56 deletions packages/analytics/client_log_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import re
import json
import urllib2
import sys
import textwrap

import apache_beam as beam
from apache_beam.io.gcp.pubsub import ReadFromPubSub
Expand All @@ -25,95 +27,87 @@ def __init__(self, env, bucket_name, log_name, pipeline_args):


def parse_element(self, element):

if not self.gcs:
# These imports have to be nested (ugh) because the constructor and the
# main pipeline get evaluated locally when deploying remotely from
# the cmdline, and this class is only available when running on GCS
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
self.gcs = GCSFileSystem(PipelineOptions(self.pipeline_args))
self.logger = stackdriver_logging.Client().logger(self.log_name)

message = json.loads(element.data)
bucket = message['bucket']

# Only import from the bucket we are expecting.
if bucket != self.bucket_name:
return []

filepath = message['name']
logging.info('Got file: %s, %s', bucket, filepath)

logging.info('Got -: %s', message)

logline_metadata = None

# try:
# Split path component. Expecting logs/bundleId/env/
# Split path component. Expecting logs/date/bundleId/env/
path_comps = filepath.split('/')

if len(path_comps) < 3 or path_comps[2] != self.env:
if len(path_comps) < 3 or (path_comps[3] != self.env and self.env is not None):
logging.info('Skipping %s', filepath)

return []

name = path_comps[len(path_comps)-1]
if name.endswith('.txt'):
name = name[0:len(name)-4]
name_comps = name.split('_')

self.env = path_comps[3]
self.log_name = 'client-logs-%s'%(self.env) if self.log_name is None else self.log_name
logline_metadata = {
'suffix' : name_comps[2],
'bundleId': path_comps[1],
'env': path_comps[2],
'bundleId': path_comps[2],
'env': path_comps[3],
'phone': urllib2.unquote(name_comps[0]).decode('utf8'),
'filepath': filepath
}
# except:
# logging.warn("Couldn't read metadata for %s", filepath)
# return []

self.logline_metadata = logline_metadata
logging.info('Got file: %s with %s', filepath, logline_metadata)

severity_pattern = re.compile('^([A-Za-z]+)')
severity_remappings = {
'TRACE' : 'DEBUG',
'LOG' : 'DEBUG',
'WARN' : 'WARNING',
'CRIT' : 'CRITICAL'
}
if not self.gcs:
# These imports have to be nested (ugh) because the constructor and the
# main pipeline get evaluated locally when deploying remotely from
# the cmdline, and this class is only available when running on GCS
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
self.gcs = GCSFileSystem(PipelineOptions(self.pipeline_args))
self.logger = stackdriver_logging.Client().logger(self.log_name)

# Read the whole file (ugh) from GCS. Without SDoFns support in Python, that's the best
# we can do in dataflow right now.

with self.gcs.open('gs://%s/%s' % (bucket, filepath), mime_type='text/plain') as infile:

for line in infile:

# Build log element from message, and labels from metadata
log_element = dict(logline_metadata)
log_element['msg'] = line

# Try to parse out the severity from the start of the line
# And try and make sure it maps to a valid SD severity
match = severity_pattern.match(line)
if match:
log_severity = match.group(1).upper()
log_severity = severity_remappings.get(log_severity, log_severity)

try:
# Write the struct to SD using the hopefully valid severity
self.logger.log_struct(log_element, severity=log_severity)
except:
# Write the struct to SD without a severity
self.logger.log_struct(log_element)

if sys.getsizeof(line) > 1000:
lines = textwrap.wrap(line, 1000, break_long_words=False)
for text in lines:
self.writeLog(text)
else:
# Write the struct to SD without a severity
self.logger.log_struct(log_element)

self.writeLog(line)
return []

def writeLog(self, text):
severity_pattern = re.compile('^([A-Za-z]+)')
severity_remappings = {
'TRACE' : 'DEBUG',
'LOG' : 'DEBUG',
'WARN' : 'WARNING',
'CRIT' : 'CRITICAL'
}
# Build log element from message, and labels from metadata
log_element = dict(self.logline_metadata)
log_element['msg'] = text

# Try to parse out the severity from the start of the line
# And try and make sure it maps to a valid SD severity
match = severity_pattern.match(text)
if match:
log_severity = match.group(1).upper()
log_severity = severity_remappings.get(log_severity, log_severity)
try:
# Write the struct to SD using the hopefully valid severity
self.logger.log_struct(log_element, severity=log_severity)
except:
# Write the struct to SD without a severity
self.logger.log_struct(log_element)
else:
# Write the struct to SD without a severity
self.logger.log_struct(log_element)



def expand(self, pcoll):
return pcoll | 'ReadGCSNotifications' >> beam.FlatMap(self.parse_element)
Expand Down

0 comments on commit 8de7237

Please sign in to comment.