From a975dc7d99f4e6a76b49f81f356c406c8cae800b Mon Sep 17 00:00:00 2001 From: Rohit <8ohit.dua@gmail.com> Date: Thu, 19 Sep 2019 15:19:18 +0200 Subject: [PATCH 1/3] fix client-log-exported.py dataflow script --- packages/analytics/README.md | 6 ++-- packages/analytics/client_log_exporter.py | 36 +++++++++-------------- packages/analytics/t.py | 10 +++++++ 3 files changed, 26 insertions(+), 26 deletions(-) create mode 100644 packages/analytics/t.py diff --git a/packages/analytics/README.md b/packages/analytics/README.md index aaa4b9a0c44..8523eea779a 100644 --- a/packages/analytics/README.md +++ b/packages/analytics/README.md @@ -60,9 +60,9 @@ A similar job runs in the `celo-testnet-production` project. The Client Logs Exporter takes a stream of [Google Cloud Storage notifications](https://cloud.google.com/storage/docs/pubsub-notifications) published to a pub-sub topic (in `celo-testnet` project) about newly-uploaded [client logs](https://console.cloud.google.com/storage/browser/celo-org-mobile.appspot.com/logs/?project=celo-org-mobile&organizationId=54829595577) (in `celo-mobile-app` project). It processes those notifications and emits the contents of those files to StackDriver (to the log named by `--output`.) -The job expects client logs to be located at a path `logs/$BUNDLEID/$ENV/$TIMESTAMP_$PHONE_$SUFFIX.txt`. +The job expects client logs to be located at a path `logs/$DATE/$BUNDLEID/$ENV/$TIMESTAMP_$PHONE_$SUFFIX.txt`. -Each instance of the job filters on the `$ENV` path component to match the value passed by `--env`. In this way, one Dataflow job needs to be run per environment. +Each instance of the job filters on the `$ENV` path component to match the value passed by `--env`. In this way, one Dataflow job needs to be run per environment. If no value is passed all environments are matched. The notification was created as follows: @@ -82,10 +82,8 @@ The currently deployed job was deployed with: ```bash python client_log_exporter.py \ - --env integration \ --input projects/celo-testnet/topics/clientlogs \ --bucket celo-org-mobile.appspot.com \ - --output client-logs-integration \ --streaming \ --runner=DataflowRunner \ --project celo-testnet \ diff --git a/packages/analytics/client_log_exporter.py b/packages/analytics/client_log_exporter.py index 24c71f16c4f..004f0d6f4a4 100644 --- a/packages/analytics/client_log_exporter.py +++ b/packages/analytics/client_log_exporter.py @@ -25,47 +25,31 @@ def __init__(self, env, bucket_name, log_name, pipeline_args): def parse_element(self, element): - - if not self.gcs: - # These imports have to be nested (ugh) because the constructor and the - # main pipeline get evaluated locally when deploying remotely from - # the cmdline, and this class is only available when running on GCS - from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem - self.gcs = GCSFileSystem(PipelineOptions(self.pipeline_args)) - self.logger = stackdriver_logging.Client().logger(self.log_name) - message = json.loads(element.data) bucket = message['bucket'] - # Only import from the bucket we are expecting. if bucket != self.bucket_name: return [] - filepath = message['name'] logging.info('Got file: %s, %s', bucket, filepath) - logging.info('Got -: %s', message) - logline_metadata = None - # try: - # Split path component. Expecting logs/bundleId/env/ + # Split path component. Expecting logs/date/bundleId/env/ path_comps = filepath.split('/') - - if len(path_comps) < 3 or path_comps[2] != self.env: + if len(path_comps) < 3 or (path_comps[3] != self.env and self.env is not None): logging.info('Skipping %s', filepath) - return [] - name = path_comps[len(path_comps)-1] if name.endswith('.txt'): name = name[0:len(name)-4] name_comps = name.split('_') - + self.env = path_comps[3] + self.log_name = 'client-logs-%s'%(self.env) if self.log_name is None else self.log_name logline_metadata = { 'suffix' : name_comps[2], - 'bundleId': path_comps[1], - 'env': path_comps[2], + 'bundleId': path_comps[2], + 'env': path_comps[3], 'phone': urllib2.unquote(name_comps[0]).decode('utf8'), 'filepath': filepath } @@ -83,6 +67,14 @@ def parse_element(self, element): 'CRIT' : 'CRITICAL' } + if not self.gcs: + # These imports have to be nested (ugh) because the constructor and the + # main pipeline get evaluated locally when deploying remotely from + # the cmdline, and this class is only available when running on GCS + from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem + self.gcs = GCSFileSystem(PipelineOptions(self.pipeline_args)) + self.logger = stackdriver_logging.Client().logger(self.log_name) + # Read the whole file (ugh) from GCS. Without SDoFns support in Python, that's the best # we can do in dataflow right now. diff --git a/packages/analytics/t.py b/packages/analytics/t.py new file mode 100644 index 00000000000..84c7e7a6c84 --- /dev/null +++ b/packages/analytics/t.py @@ -0,0 +1,10 @@ +import argparse +parser = argparse.ArgumentParser() +parser.add_argument('-w', '--writer', help="Team Player.") +args = parser.parse_args() + + +if args.writer == 'Shubham': + print('Technical Author.') +print(args.writer) + From 974175a8ba0d3555a7cb2109364da0f8a5a8233b Mon Sep 17 00:00:00 2001 From: Rohit <8ohit.dua@gmail.com> Date: Thu, 19 Sep 2019 15:29:10 +0200 Subject: [PATCH 2/3] remove stale file --- packages/analytics/t.py | 10 ---------- 1 file changed, 10 deletions(-) delete mode 100644 packages/analytics/t.py diff --git a/packages/analytics/t.py b/packages/analytics/t.py deleted file mode 100644 index 84c7e7a6c84..00000000000 --- a/packages/analytics/t.py +++ /dev/null @@ -1,10 +0,0 @@ -import argparse -parser = argparse.ArgumentParser() -parser.add_argument('-w', '--writer', help="Team Player.") -args = parser.parse_args() - - -if args.writer == 'Shubham': - print('Technical Author.') -print(args.writer) - From fc8c9599e8cd279249dcd6637f4f450b5ec87682 Mon Sep 17 00:00:00 2001 From: Rohit <8ohit.dua@gmail.com> Date: Mon, 30 Sep 2019 12:53:30 +0200 Subject: [PATCH 3/3] split long lines --- packages/analytics/client_log_exporter.py | 72 ++++++++++++----------- 1 file changed, 37 insertions(+), 35 deletions(-) diff --git a/packages/analytics/client_log_exporter.py b/packages/analytics/client_log_exporter.py index 004f0d6f4a4..58237bd2181 100644 --- a/packages/analytics/client_log_exporter.py +++ b/packages/analytics/client_log_exporter.py @@ -5,6 +5,8 @@ import re import json import urllib2 +import sys +import textwrap import apache_beam as beam from apache_beam.io.gcp.pubsub import ReadFromPubSub @@ -53,20 +55,9 @@ def parse_element(self, element): 'phone': urllib2.unquote(name_comps[0]).decode('utf8'), 'filepath': filepath } - # except: - # logging.warn("Couldn't read metadata for %s", filepath) - # return [] - + self.logline_metadata = logline_metadata logging.info('Got file: %s with %s', filepath, logline_metadata) - severity_pattern = re.compile('^([A-Za-z]+)') - severity_remappings = { - 'TRACE' : 'DEBUG', - 'LOG' : 'DEBUG', - 'WARN' : 'WARNING', - 'CRIT' : 'CRITICAL' - } - if not self.gcs: # These imports have to be nested (ugh) because the constructor and the # main pipeline get evaluated locally when deploying remotely from @@ -79,33 +70,44 @@ def parse_element(self, element): # we can do in dataflow right now. with self.gcs.open('gs://%s/%s' % (bucket, filepath), mime_type='text/plain') as infile: - for line in infile: - - # Build log element from message, and labels from metadata - log_element = dict(logline_metadata) - log_element['msg'] = line - - # Try to parse out the severity from the start of the line - # And try and make sure it maps to a valid SD severity - match = severity_pattern.match(line) - if match: - log_severity = match.group(1).upper() - log_severity = severity_remappings.get(log_severity, log_severity) - - try: - # Write the struct to SD using the hopefully valid severity - self.logger.log_struct(log_element, severity=log_severity) - except: - # Write the struct to SD without a severity - self.logger.log_struct(log_element) - + if sys.getsizeof(line) > 1000: + lines = textwrap.wrap(line, 1000, break_long_words=False) + for text in lines: + self.writeLog(text) else: - # Write the struct to SD without a severity - self.logger.log_struct(log_element) - + self.writeLog(line) return [] + def writeLog(self, text): + severity_pattern = re.compile('^([A-Za-z]+)') + severity_remappings = { + 'TRACE' : 'DEBUG', + 'LOG' : 'DEBUG', + 'WARN' : 'WARNING', + 'CRIT' : 'CRITICAL' + } + # Build log element from message, and labels from metadata + log_element = dict(self.logline_metadata) + log_element['msg'] = text + + # Try to parse out the severity from the start of the line + # And try and make sure it maps to a valid SD severity + match = severity_pattern.match(text) + if match: + log_severity = match.group(1).upper() + log_severity = severity_remappings.get(log_severity, log_severity) + try: + # Write the struct to SD using the hopefully valid severity + self.logger.log_struct(log_element, severity=log_severity) + except: + # Write the struct to SD without a severity + self.logger.log_struct(log_element) + else: + # Write the struct to SD without a severity + self.logger.log_struct(log_element) + + def expand(self, pcoll): return pcoll | 'ReadGCSNotifications' >> beam.FlatMap(self.parse_element)