Skip to content

Commit

Permalink
[BEAM-6291] Generic BigQuery schema load tests metrics (#7614)
Browse files Browse the repository at this point in the history
* [BEAM-6291] Generic schema for BQ load tests

* [BEAM-6291] Added pipeline option to check if metrics are required

* [BEAM-6291] Common environment variable to disable load tests.

* [BEAM-6291] Refactored BQ and Metrics load tests utils. Added missing documentation and pipelineoption.
  • Loading branch information
kkucharc authored and pabloem committed Feb 5, 2019
1 parent 228a7d5 commit 09996b6
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 168 deletions.
42 changes: 21 additions & 21 deletions sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@
* project (optional) - the gcp project in case of saving
metrics in Big Query (in case of Dataflow Runner
it is required to specify project of runner),
* metrics_namespace (optional) - name of BigQuery table where metrics
* publish_to_big_query - if metrics should be published in big query,
* metrics_namespace (optional) - name of BigQuery dataset where metrics
will be stored,
in case of lack of any of both options metrics won't be saved
* input_options - options for Synthetic Sources
* metrics_table (optional) - name of BigQuery table where metrics
will be stored,
* input_options - options for Synthetic Sources,
* co_input_options - options for Synthetic Sources.
Example test run on DirectRunner:
python setup.py nosetests \
--test-pipeline-options="
--project=big-query-project
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=co_gbk
--input_options='{
Expand Down Expand Up @@ -58,6 +61,7 @@
--staging_location=gs://...
--temp_location=gs://...
--sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=co_gbk
--input_options='{
Expand All @@ -84,25 +88,23 @@

import json
import logging
import os
import unittest

import apache_beam as beam
from apache_beam.testing import synthetic_pipeline
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor
from apache_beam.testing.test_pipeline import TestPipeline

try:
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor
from google.cloud import bigquery as bq
except ImportError:
bq = None

INPUT_TAG = 'pc1'
CO_INPUT_TAG = 'pc2'
RUNTIME_LABEL = 'runtime'
load_test_enabled = False
if os.environ.get('LOAD_TEST_ENABLED') == 'true':
load_test_enabled = True


@unittest.skipIf(bq is None, 'BigQuery for storing metrics not installed')
@unittest.skipIf(not load_test_enabled, 'Enabled only for phrase triggering.')
class CoGroupByKeyTest(unittest.TestCase):

def parseTestPipelineOptions(self, options):
Expand All @@ -122,30 +124,28 @@ def parseTestPipelineOptions(self, options):
}

def setUp(self):
self.pipeline = TestPipeline(is_integration_test=True)
self.pipeline = TestPipeline()
self.input_options = json.loads(self.pipeline.get_option('input_options'))
self.co_input_options = json.loads(
self.pipeline.get_option('co_input_options'))

self.metrics_monitor = self.pipeline.get_option('publish_to_big_query')
metrics_project_id = self.pipeline.get_option('project')
self.metrics_namespace = self.pipeline.get_option('metrics_table')
metrics_dataset = self.pipeline.get_option('metrics_dataset')
self.metrics_monitor = None
check = metrics_project_id and self.metrics_namespace and metrics_dataset\
is not None
if check:
measured_values = [{'name': RUNTIME_LABEL,
'type': 'FLOAT',
'mode': 'REQUIRED'}]
if not self.metrics_monitor:
logging.info('Metrics will not be collected')
elif check:
self.metrics_monitor = MetricsMonitor(
project_name=metrics_project_id,
table=self.metrics_namespace,
dataset=metrics_dataset,
schema_map=measured_values
)
else:
logging.error('One or more of parameters for collecting metrics '
'are empty. Metrics will not be collected')
raise ValueError('One or more of parameters for collecting metrics '
'are empty.')

class _Ungroup(beam.DoFn):
def process(self, element):
Expand Down
38 changes: 20 additions & 18 deletions sdks/python/apache_beam/testing/load_tests/combine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
* project (optional) - the gcp project in case of saving
metrics in Big Query (in case of Dataflow Runner
it is required to specify project of runner),
* metrics_namespace (optional) - name of BigQuery table where metrics
* publish_to_big_query - if metrics should be published in big query,
* metrics_namespace (optional) - name of BigQuery dataset where metrics
will be stored,
* metrics_table (optional) - name of BigQuery table where metrics
will be stored,
in case of lack of any of both options metrics won't be saved
* input_options - options for Synthetic Sources.
Example test run on DirectRunner:
python setup.py nosetests \
--test-pipeline-options="
--project=big-query-project
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=combine
--input_options='{
Expand All @@ -51,6 +54,7 @@
--staging_location=gs://...
--temp_location=gs://...
--sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=combine
--input_options='{
Expand All @@ -69,23 +73,21 @@

import json
import logging
import os
import unittest

import apache_beam as beam
from apache_beam.testing import synthetic_pipeline
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor
from apache_beam.testing.test_pipeline import TestPipeline

try:
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor
from google.cloud import bigquery as bq
except ImportError:
bq = None

RUNTIME_LABEL = 'runtime'
load_test_enabled = False
if os.environ.get('LOAD_TEST_ENABLED') == 'true':
load_test_enabled = True


@unittest.skipIf(bq is None, 'BigQuery for storing metrics not installed')
@unittest.skipIf(not load_test_enabled, 'Enabled only for phrase triggering.')
class CombineTest(unittest.TestCase):
def parseTestPipelineOptions(self):
return {
Expand All @@ -104,26 +106,26 @@ def parseTestPipelineOptions(self):
}

def setUp(self):
self.pipeline = TestPipeline(is_integration_test=True)
self.pipeline = TestPipeline()
self.input_options = json.loads(self.pipeline.get_option('input_options'))

self.metrics_monitor = self.pipeline.get_option('publish_to_big_query')
metrics_project_id = self.pipeline.get_option('project')
self.metrics_namespace = self.pipeline.get_option('metrics_table')
metrics_dataset = self.pipeline.get_option('metrics_dataset')
self.metrics_monitor = None
check = metrics_project_id and self.metrics_namespace and metrics_dataset \
is not None
if check:
schema = [{'name': RUNTIME_LABEL, 'type': 'FLOAT', 'mode': 'REQUIRED'}]
if not self.metrics_monitor:
logging.info('Metrics will not be collected')
elif check:
self.metrics_monitor = MetricsMonitor(
project_name=metrics_project_id,
table=self.metrics_namespace,
dataset=metrics_dataset,
schema_map=schema
)
else:
logging.error('One or more of parameters for collecting metrics '
'are empty. Metrics will not be collected')
raise ValueError('One or more of parameters for collecting metrics '
'are empty.')

class _GetElement(beam.DoFn):
def process(self, element):
Expand Down
39 changes: 21 additions & 18 deletions sdks/python/apache_beam/testing/load_tests/group_by_key_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
* project (optional) - the gcp project in case of saving
metrics in Big Query (in case of Dataflow Runner
it is required to specify project of runner),
* metrics_namespace (optional) - name of BigQuery table where metrics
* publish_to_big_query - if metrics should be published in big query,
* metrics_namespace (optional) - name of BigQuery dataset where metrics
will be stored,
* metrics_table (optional) - name of BigQuery table where metrics
will be stored,
in case of lack of any of both options metrics won't be saved
* input_options - options for Synthetic Sources.
Example test run on DirectRunner:
python setup.py nosetests \
--test-pipeline-options="
--project=big-query-project
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=gbk
--input_options='{
Expand All @@ -51,6 +54,7 @@
--staging_location=gs://...
--temp_location=gs://...
--sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=gbk
--input_options='{
Expand All @@ -69,23 +73,21 @@

import json
import logging
import os
import unittest

import apache_beam as beam
from apache_beam.testing import synthetic_pipeline
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor
from apache_beam.testing.test_pipeline import TestPipeline

try:
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor
from google.cloud import bigquery as bq
except ImportError:
bq = None

RUNTIME_LABEL = 'runtime'
load_test_enabled = False
if os.environ.get('LOAD_TEST_ENABLED') == 'true':
load_test_enabled = True


@unittest.skipIf(bq is None, 'BigQuery for storing metrics not installed')
@unittest.skipIf(not load_test_enabled, 'Enabled only for phrase triggering.')
class GroupByKeyTest(unittest.TestCase):
def parseTestPipelineOptions(self):
return {
Expand All @@ -104,26 +106,27 @@ def parseTestPipelineOptions(self):
}

def setUp(self):
self.pipeline = TestPipeline(is_integration_test=True)
self.pipeline = TestPipeline()
self.input_options = json.loads(self.pipeline.get_option('input_options'))

self.metrics_monitor = self.pipeline.get_option('publish_to_big_query')
metrics_project_id = self.pipeline.get_option('project')
self.metrics_namespace = self.pipeline.get_option('metrics_table')
metrics_dataset = self.pipeline.get_option('metrics_dataset')
self.metrics_monitor = None

check = metrics_project_id and self.metrics_namespace and metrics_dataset \
is not None
if check:
schema = [{'name': RUNTIME_LABEL, 'type': 'FLOAT', 'mode': 'REQUIRED'}]
if not self.metrics_monitor:
logging.info('Metrics will not be collected')
elif check:
self.metrics_monitor = MetricsMonitor(
project_name=metrics_project_id,
table=self.metrics_namespace,
dataset=metrics_dataset,
schema_map=schema
)
else:
logging.error('One or more of parameters for collecting metrics '
'are empty. Metrics will not be collected')
raise ValueError('One or more of parameters for collecting metrics '
'are empty.')

def testGroupByKey(self):
with self.pipeline as p:
Expand Down
Loading

0 comments on commit 09996b6

Please sign in to comment.