diff --git a/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py b/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py index 8b3c026b6738b..4ebbf8597fa61 100644 --- a/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py +++ b/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py @@ -20,10 +20,12 @@ * project (optional) - the gcp project in case of saving metrics in Big Query (in case of Dataflow Runner it is required to specify project of runner), -* metrics_namespace (optional) - name of BigQuery table where metrics +* publish_to_big_query - if metrics should be published in big query, +* metrics_namespace (optional) - name of BigQuery dataset where metrics will be stored, -in case of lack of any of both options metrics won't be saved -* input_options - options for Synthetic Sources +* metrics_table (optional) - name of BigQuery table where metrics +will be stored, +* input_options - options for Synthetic Sources, * co_input_options - options for Synthetic Sources. Example test run on DirectRunner: @@ -31,6 +33,7 @@ python setup.py nosetests \ --test-pipeline-options=" --project=big-query-project + --publish_to_big_query=true --metrics_dataset=python_load_tests --metrics_table=co_gbk --input_options='{ @@ -58,6 +61,7 @@ --staging_location=gs://... --temp_location=gs://... --sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz + --publish_to_big_query=true --metrics_dataset=python_load_tests --metrics_table=co_gbk --input_options='{ @@ -84,25 +88,23 @@ import json import logging +import os import unittest import apache_beam as beam from apache_beam.testing import synthetic_pipeline +from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime +from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor from apache_beam.testing.test_pipeline import TestPipeline -try: - from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime - from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor - from google.cloud import bigquery as bq -except ImportError: - bq = None - INPUT_TAG = 'pc1' CO_INPUT_TAG = 'pc2' -RUNTIME_LABEL = 'runtime' +load_test_enabled = False +if os.environ.get('LOAD_TEST_ENABLED') == 'true': + load_test_enabled = True -@unittest.skipIf(bq is None, 'BigQuery for storing metrics not installed') +@unittest.skipIf(not load_test_enabled, 'Enabled only for phrase triggering.') class CoGroupByKeyTest(unittest.TestCase): def parseTestPipelineOptions(self, options): @@ -122,30 +124,28 @@ def parseTestPipelineOptions(self, options): } def setUp(self): - self.pipeline = TestPipeline(is_integration_test=True) + self.pipeline = TestPipeline() self.input_options = json.loads(self.pipeline.get_option('input_options')) self.co_input_options = json.loads( self.pipeline.get_option('co_input_options')) + self.metrics_monitor = self.pipeline.get_option('publish_to_big_query') metrics_project_id = self.pipeline.get_option('project') self.metrics_namespace = self.pipeline.get_option('metrics_table') metrics_dataset = self.pipeline.get_option('metrics_dataset') - self.metrics_monitor = None check = metrics_project_id and self.metrics_namespace and metrics_dataset\ is not None - if check: - measured_values = [{'name': RUNTIME_LABEL, - 'type': 'FLOAT', - 'mode': 'REQUIRED'}] + if not self.metrics_monitor: + logging.info('Metrics will not be collected') + elif check: self.metrics_monitor = MetricsMonitor( project_name=metrics_project_id, table=self.metrics_namespace, dataset=metrics_dataset, - schema_map=measured_values ) else: - logging.error('One or more of parameters for collecting metrics ' - 'are empty. Metrics will not be collected') + raise ValueError('One or more of parameters for collecting metrics ' + 'are empty.') class _Ungroup(beam.DoFn): def process(self, element): diff --git a/sdks/python/apache_beam/testing/load_tests/combine_test.py b/sdks/python/apache_beam/testing/load_tests/combine_test.py index 8214d619e3d41..fb2e05cc9b04c 100644 --- a/sdks/python/apache_beam/testing/load_tests/combine_test.py +++ b/sdks/python/apache_beam/testing/load_tests/combine_test.py @@ -20,9 +20,11 @@ * project (optional) - the gcp project in case of saving metrics in Big Query (in case of Dataflow Runner it is required to specify project of runner), -* metrics_namespace (optional) - name of BigQuery table where metrics +* publish_to_big_query - if metrics should be published in big query, +* metrics_namespace (optional) - name of BigQuery dataset where metrics +will be stored, +* metrics_table (optional) - name of BigQuery table where metrics will be stored, -in case of lack of any of both options metrics won't be saved * input_options - options for Synthetic Sources. Example test run on DirectRunner: @@ -30,6 +32,7 @@ python setup.py nosetests \ --test-pipeline-options=" --project=big-query-project + --publish_to_big_query=true --metrics_dataset=python_load_tests --metrics_table=combine --input_options='{ @@ -51,6 +54,7 @@ --staging_location=gs://... --temp_location=gs://... --sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz + --publish_to_big_query=true --metrics_dataset=python_load_tests --metrics_table=combine --input_options='{ @@ -69,23 +73,21 @@ import json import logging +import os import unittest import apache_beam as beam from apache_beam.testing import synthetic_pipeline +from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime +from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor from apache_beam.testing.test_pipeline import TestPipeline -try: - from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime - from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor - from google.cloud import bigquery as bq -except ImportError: - bq = None - -RUNTIME_LABEL = 'runtime' +load_test_enabled = False +if os.environ.get('LOAD_TEST_ENABLED') == 'true': + load_test_enabled = True -@unittest.skipIf(bq is None, 'BigQuery for storing metrics not installed') +@unittest.skipIf(not load_test_enabled, 'Enabled only for phrase triggering.') class CombineTest(unittest.TestCase): def parseTestPipelineOptions(self): return { @@ -104,26 +106,26 @@ def parseTestPipelineOptions(self): } def setUp(self): - self.pipeline = TestPipeline(is_integration_test=True) + self.pipeline = TestPipeline() self.input_options = json.loads(self.pipeline.get_option('input_options')) + self.metrics_monitor = self.pipeline.get_option('publish_to_big_query') metrics_project_id = self.pipeline.get_option('project') self.metrics_namespace = self.pipeline.get_option('metrics_table') metrics_dataset = self.pipeline.get_option('metrics_dataset') - self.metrics_monitor = None check = metrics_project_id and self.metrics_namespace and metrics_dataset \ is not None - if check: - schema = [{'name': RUNTIME_LABEL, 'type': 'FLOAT', 'mode': 'REQUIRED'}] + if not self.metrics_monitor: + logging.info('Metrics will not be collected') + elif check: self.metrics_monitor = MetricsMonitor( project_name=metrics_project_id, table=self.metrics_namespace, dataset=metrics_dataset, - schema_map=schema ) else: - logging.error('One or more of parameters for collecting metrics ' - 'are empty. Metrics will not be collected') + raise ValueError('One or more of parameters for collecting metrics ' + 'are empty.') class _GetElement(beam.DoFn): def process(self, element): diff --git a/sdks/python/apache_beam/testing/load_tests/group_by_key_test.py b/sdks/python/apache_beam/testing/load_tests/group_by_key_test.py index 4382fb6210398..8eb4f54dd6ca4 100644 --- a/sdks/python/apache_beam/testing/load_tests/group_by_key_test.py +++ b/sdks/python/apache_beam/testing/load_tests/group_by_key_test.py @@ -20,9 +20,11 @@ * project (optional) - the gcp project in case of saving metrics in Big Query (in case of Dataflow Runner it is required to specify project of runner), -* metrics_namespace (optional) - name of BigQuery table where metrics +* publish_to_big_query - if metrics should be published in big query, +* metrics_namespace (optional) - name of BigQuery dataset where metrics +will be stored, +* metrics_table (optional) - name of BigQuery table where metrics will be stored, -in case of lack of any of both options metrics won't be saved * input_options - options for Synthetic Sources. Example test run on DirectRunner: @@ -30,6 +32,7 @@ python setup.py nosetests \ --test-pipeline-options=" --project=big-query-project + --publish_to_big_query=true --metrics_dataset=python_load_tests --metrics_table=gbk --input_options='{ @@ -51,6 +54,7 @@ --staging_location=gs://... --temp_location=gs://... --sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz + --publish_to_big_query=true --metrics_dataset=python_load_tests --metrics_table=gbk --input_options='{ @@ -69,23 +73,21 @@ import json import logging +import os import unittest import apache_beam as beam from apache_beam.testing import synthetic_pipeline +from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime +from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor from apache_beam.testing.test_pipeline import TestPipeline -try: - from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime - from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor - from google.cloud import bigquery as bq -except ImportError: - bq = None - -RUNTIME_LABEL = 'runtime' +load_test_enabled = False +if os.environ.get('LOAD_TEST_ENABLED') == 'true': + load_test_enabled = True -@unittest.skipIf(bq is None, 'BigQuery for storing metrics not installed') +@unittest.skipIf(not load_test_enabled, 'Enabled only for phrase triggering.') class GroupByKeyTest(unittest.TestCase): def parseTestPipelineOptions(self): return { @@ -104,26 +106,27 @@ def parseTestPipelineOptions(self): } def setUp(self): - self.pipeline = TestPipeline(is_integration_test=True) + self.pipeline = TestPipeline() self.input_options = json.loads(self.pipeline.get_option('input_options')) + self.metrics_monitor = self.pipeline.get_option('publish_to_big_query') metrics_project_id = self.pipeline.get_option('project') self.metrics_namespace = self.pipeline.get_option('metrics_table') metrics_dataset = self.pipeline.get_option('metrics_dataset') - self.metrics_monitor = None + check = metrics_project_id and self.metrics_namespace and metrics_dataset \ is not None - if check: - schema = [{'name': RUNTIME_LABEL, 'type': 'FLOAT', 'mode': 'REQUIRED'}] + if not self.metrics_monitor: + logging.info('Metrics will not be collected') + elif check: self.metrics_monitor = MetricsMonitor( project_name=metrics_project_id, table=self.metrics_namespace, dataset=metrics_dataset, - schema_map=schema ) else: - logging.error('One or more of parameters for collecting metrics ' - 'are empty. Metrics will not be collected') + raise ValueError('One or more of parameters for collecting metrics ' + 'are empty.') def testGroupByKey(self): with self.pipeline as p: diff --git a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py index a1f867eaaf52c..d89a41d7cec6a 100644 --- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py +++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py @@ -17,12 +17,23 @@ """ Utility functions used for integrating Metrics API into load tests pipelines. + +Metrics are send to BigQuery in following format: +test_id | submit_timestamp | metric_type | value + +The 'test_id' is common for all metrics for one run. +Currently it is possible to have following metrics types: +* runtime +* total_bytes_count + + """ from __future__ import absolute_import import logging import time +import uuid import apache_beam as beam from apache_beam.metrics import Metrics @@ -36,49 +47,60 @@ SchemaField = None NotFound = None -RUNTIME_LABEL = 'runtime' -SUBMIT_TIMESTAMP_LABEL = 'submit_timestamp' - - -def _get_schema_field(schema_field): - return SchemaField( - name=schema_field['name'], - field_type=schema_field['type'], - mode=schema_field['mode']) +RUNTIME_METRIC = 'runtime' +COUNTER_LABEL = 'total_bytes_count' + +ID_LABEL = 'test_id' +SUBMIT_TIMESTAMP_LABEL = 'timestamp' +METRICS_TYPE_LABEL = 'metric' +VALUE_LABEL = 'value' + +SCHEMA = [ + {'name': ID_LABEL, + 'field_type': 'STRING', + 'mode': 'REQUIRED' + }, + {'name': SUBMIT_TIMESTAMP_LABEL, + 'field_type': 'TIMESTAMP', + 'mode': 'REQUIRED' + }, + {'name': METRICS_TYPE_LABEL, + 'field_type': 'STRING', + 'mode': 'REQUIRED' + }, + {'name': VALUE_LABEL, + 'field_type': 'FLOAT', + 'mode': 'REQUIRED' + } +] + + +def get_element_by_schema(schema_name, insert_list): + for element in insert_list: + if element['label'] == schema_name: + return element['value'] class BigQueryClient(object): - def __init__(self, project_name, table, dataset, schema_map): + def __init__(self, project_name, table, dataset): self._namespace = table self._bq_client = bigquery.Client(project=project_name) - schema = self._parse_schema(schema_map) - self._schema_names = self._get_schema_names(schema) - schema = self._prepare_schema(schema) + self._schema_names = self._get_schema_names() + schema = self._prepare_schema() self._get_or_create_table(schema, dataset) - def match_and_save(self, result_list): - rows_tuple = tuple(self._match_inserts_by_schema(result_list)) - self._insert_data(rows_tuple) - - def _match_inserts_by_schema(self, insert_list): - for name in self._schema_names: - yield self._get_element_by_schema(name, insert_list) - - def _get_element_by_schema(self, schema_name, insert_list): - for metric in insert_list: - if metric['label'] == schema_name: - return metric['value'] - return None - - def _insert_data(self, rows_tuple): - errors = self._bq_client.insert_rows(self._bq_table, rows=[rows_tuple]) - if len(errors) > 0: - for err in errors: - logging.error(err['message']) - raise ValueError('Unable save rows in BigQuery.') + def match_and_save(self, rows): + outputs = self._bq_client.insert_rows(self._bq_table, rows) + if len(outputs) > 0: + for output in outputs: + errors = output['errors'] + for err in errors: + logging.error(err['message']) + raise ValueError( + 'Unable save rows in BigQuery: {}'.format(err['message'])) def _get_dataset(self, dataset_name): bq_dataset_ref = self._bq_client.dataset(dataset_name) @@ -104,53 +126,58 @@ def _get_or_create_table(self, bq_schemas, dataset): table = bigquery.Table(table_ref, schema=bq_schemas) self._bq_table = self._bq_client.create_table(table) - def _parse_schema(self, schema_map): - return [{'name': SUBMIT_TIMESTAMP_LABEL, - 'type': 'TIMESTAMP', - 'mode': 'REQUIRED'}] + schema_map + def _prepare_schema(self): + return [SchemaField(**row) for row in SCHEMA] - def _prepare_schema(self, schemas): - return [_get_schema_field(schema) for schema in schemas] - - def _get_schema_names(self, schemas): - return [schema['name'] for schema in schemas] + def _get_schema_names(self): + return [schema['name'] for schema in SCHEMA] class MetricsMonitor(object): - def __init__(self, project_name, table, dataset, schema_map): + def __init__(self, project_name, table, dataset): if project_name is not None: - self.bq = BigQueryClient(project_name, table, dataset, schema_map) + self.bq = BigQueryClient(project_name, table, dataset) def send_metrics(self, result): metrics = result.metrics().query() + + insert_dicts = self._prepare_all_metrics(metrics) + + self.bq.match_and_save(insert_dicts) + + def _prepare_all_metrics(self, metrics): + common_metric_rows = {SUBMIT_TIMESTAMP_LABEL: time.time(), + ID_LABEL: uuid.uuid4().hex + } + insert_rows = [] + counters = metrics['counters'] - counters_list = [] if len(counters) > 0: - counters_list = self._prepare_counter_metrics(counters) + insert_rows += self._prepare_counter_metrics(counters, common_metric_rows) distributions = metrics['distributions'] - dist_list = [] if len(distributions) > 0: - dist_list = self._prepare_runtime_metrics(distributions) + insert_rows += self._prepare_runtime_metrics(distributions, + common_metric_rows) - timestamp = {'label': SUBMIT_TIMESTAMP_LABEL, 'value': time.time()} + return insert_rows - insert_list = [timestamp] + dist_list + counters_list - self.bq.match_and_save(insert_list) - - def _prepare_counter_metrics(self, counters): + def _prepare_counter_metrics(self, counters, common_metric_rows): for counter in counters: logging.info("Counter: %s", counter) counters_list = [] for counter in counters: counter_commited = counter.committed counter_label = str(counter.key.metric.name) - counters_list.append( - {'label': counter_label, 'value': counter_commited}) + counters_list.extend([ + dict({VALUE_LABEL: counter_commited, + METRICS_TYPE_LABEL: counter_label + }, + **common_metric_rows)]) return counters_list - def _prepare_runtime_metrics(self, distributions): + def _prepare_runtime_metrics(self, distributions, common_metric_rows): min_values = [] max_values = [] for dist in distributions: @@ -165,13 +192,15 @@ def _prepare_runtime_metrics(self, distributions): runtime_in_s = max_value - min_value logging.info("Runtime: %s", runtime_in_s) runtime_in_s = float(runtime_in_s) - return [{'label': RUNTIME_LABEL, 'value': runtime_in_s}] + return [dict({VALUE_LABEL: runtime_in_s, + METRICS_TYPE_LABEL: RUNTIME_METRIC + }, **common_metric_rows)] class MeasureTime(beam.DoFn): def __init__(self, namespace): self.namespace = namespace - self.runtime = Metrics.distribution(self.namespace, RUNTIME_LABEL) + self.runtime = Metrics.distribution(self.namespace, RUNTIME_METRIC) def start_bundle(self): self.runtime.update(time.time()) @@ -183,15 +212,14 @@ def process(self, element): yield element -def count_bytes(counter_name): - def layer(f): - def repl(*args): - namespace = args[2] - counter = Metrics.counter(namespace, counter_name) - element = args[1] - _, value = element - for i in range(len(value)): - counter.inc(i) - return f(*args) - return repl - return layer +def count_bytes(f): + def repl(*args): + namespace = args[2] + counter = Metrics.counter(namespace, COUNTER_LABEL) + element = args[1] + _, value = element + for i in range(len(value)): + counter.inc(i) + return f(*args) + + return repl diff --git a/sdks/python/apache_beam/testing/load_tests/pardo_test.py b/sdks/python/apache_beam/testing/load_tests/pardo_test.py index 0f2b2c1b64ba2..ce1dc59be5731 100644 --- a/sdks/python/apache_beam/testing/load_tests/pardo_test.py +++ b/sdks/python/apache_beam/testing/load_tests/pardo_test.py @@ -21,11 +21,13 @@ * project (optional) - the gcp project in case of saving metrics in Big Query (in case of Dataflow Runner it is required to specify project of runner), -* metrics_namespace (optional) - name of BigQuery table where metrics +* publish_to_big_query - if metrics should be published in big query, +* metrics_namespace (optional) - name of BigQuery dataset where metrics +will be stored, +* metrics_table (optional) - name of BigQuery table where metrics will be stored, -in case of lack of any of both options metrics won't be saved * output (optional) - destination to save output, in case of no option -output won't be written +output won't be written, * input_options - options for Synthetic Sources. Example test run on DirectRunner: @@ -35,6 +37,7 @@ --number_of_counter_operations=1000 --output=gs://... --project=big-query-project + --publish_to_big_query=true --metrics_dataset=python_load_tests --metrics_table=pardo --input_options='{ @@ -58,6 +61,7 @@ --sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz --output=gs://... --number_of_counter_operations=1000 + --publish_to_big_query=true --metrics_dataset=python_load_tests --metrics_table=pardo --input_options='{ @@ -76,24 +80,21 @@ import json import logging +import os import unittest import apache_beam as beam from apache_beam.testing import synthetic_pipeline +from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime +from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor from apache_beam.testing.test_pipeline import TestPipeline -try: - from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime - from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor - from google.cloud import bigquery as bq -except ImportError: - bq = None - -COUNTER_LABEL = "total_bytes_count" -RUNTIME_LABEL = 'runtime' +load_test_enabled = False +if os.environ.get('LOAD_TEST_ENABLED') == 'true': + load_test_enabled = True -@unittest.skipIf(bq is None, 'BigQuery for storing metrics not installed') +@unittest.skipIf(not load_test_enabled, 'Enabled only for phrase triggering.') class ParDoTest(unittest.TestCase): def parseTestPipelineOptions(self): return {'numRecords': self.input_options.get('num_records'), @@ -113,37 +114,37 @@ def parseTestPipelineOptions(self): } def setUp(self): - self.pipeline = TestPipeline(is_integration_test=True) + self.pipeline = TestPipeline() self.output = self.pipeline.get_option('output') self.iterations = self.pipeline.get_option('number_of_counter_operations') self.input_options = json.loads(self.pipeline.get_option('input_options')) + self.metrics_monitor = self.pipeline.get_option('publish_to_big_query') metrics_project_id = self.pipeline.get_option('project') self.metrics_namespace = self.pipeline.get_option('metrics_table') metrics_dataset = self.pipeline.get_option('metrics_dataset') - self.metrics_monitor = None - if metrics_project_id and self.metrics_namespace is not None: - measured_values = [ - {'name': RUNTIME_LABEL, 'type': 'FLOAT', 'mode': 'REQUIRED'}, - {'name': COUNTER_LABEL, 'type': 'INTEGER', 'mode': 'REQUIRED'} - ] + + check = metrics_project_id and self.metrics_namespace and metrics_dataset \ + is not None + if not self.metrics_monitor: + logging.info('Metrics will not be collected') + elif check: self.metrics_monitor = MetricsMonitor( project_name=metrics_project_id, table=self.metrics_namespace, dataset=metrics_dataset, - schema_map=measured_values ) else: - logging.error('One or more of parameters for collecting metrics ' - 'are empty. Metrics will not be collected') + raise ValueError('One or more of parameters for collecting metrics ' + 'are empty.') def testParDo(self): class _GetElement(beam.DoFn): from apache_beam.testing.load_tests.load_test_metrics_utils import count_bytes - @count_bytes(COUNTER_LABEL) + @count_bytes def process(self, element, namespace, is_returning): if is_returning: yield element diff --git a/sdks/python/apache_beam/testing/load_tests/sideinput_test.py b/sdks/python/apache_beam/testing/load_tests/sideinput_test.py index 435f44c02eb4d..bbd4ebd000eda 100644 --- a/sdks/python/apache_beam/testing/load_tests/sideinput_test.py +++ b/sdks/python/apache_beam/testing/load_tests/sideinput_test.py @@ -21,18 +21,18 @@ * project (optional) - the gcp project in case of saving metrics in Big Query (in case of Dataflow Runner it is required to specify project of runner), -* metrics_table (optional) - name of BigQuery table where metrics +* publish_to_big_query - if metrics should be published in big query, +* metrics_namespace (optional) - name of BigQuery dataset where metrics will be stored, -in case of lack of any of both options metrics won't be saved -* metrics_dataset (optional) - name of BigQuery dataset where metrics +* metrics_table (optional) - name of BigQuery table where metrics will be stored, -in case of lack of all three options metrics won't be saved * input_options - options for Synthetic Sources. To run test on DirectRunner python setup.py nosetests \ --project=big-query-project + --publish_to_big_query=true --metrics_dataset=python_load_tests --metrics_table=side_input --test-pipeline-options=" @@ -54,6 +54,7 @@ --test-pipeline-options=" --runner=TestDataflowRunner --project=... + --publish_to_big_query=true --metrics_dataset=python_load_tests --metrics_table=side_input --staging_location=gs://... @@ -91,10 +92,8 @@ if os.environ.get('LOAD_TEST_ENABLED') == 'true': load_test_enabled = True -RUNTIME_LABEL = 'runtime' - -@unittest.skipIf(not load_test_enabled, 'Enabled only for phase triggering.') +@unittest.skipIf(not load_test_enabled, 'Enabled only for phrase triggering.') class SideInputTest(unittest.TestCase): def _parseTestPipelineOptions(self): return { @@ -136,27 +135,24 @@ def setUp(self): self.iterations = 1 self.iterations = int(self.iterations) + self.metrics_monitor = self.pipeline.get_option('publish_to_big_query') metrics_project_id = self.pipeline.get_option('project') self.metrics_namespace = self.pipeline.get_option('metrics_table') - if not self.metrics_namespace: - self.metrics_namespace = self.__class__.__name__ metrics_dataset = self.pipeline.get_option('metrics_dataset') - self.metrics_monitor = None + check = metrics_project_id and self.metrics_namespace and metrics_dataset \ is not None - if check: - measured_values = [ - {'name': RUNTIME_LABEL, 'type': 'FLOAT', 'mode': 'REQUIRED'}, - ] + if not self.metrics_monitor: + logging.info('Metrics will not be collected') + elif check: self.metrics_monitor = MetricsMonitor( project_name=metrics_project_id, table=self.metrics_namespace, dataset=metrics_dataset, - schema_map=measured_values ) else: - logging.error('One or more of parameters for collecting metrics ' - 'are empty. Metrics will not be collected') + raise ValueError('One or more of parameters for collecting metrics ' + 'are empty.') def testSideInput(self): def join_fn(element, side_input, iterations):