Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-6291] Generic BigQuery schema load tests metrics #7614

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@
* project (optional) - the gcp project in case of saving
metrics in Big Query (in case of Dataflow Runner
it is required to specify project of runner),
* metrics_namespace (optional) - name of BigQuery table where metrics
* publish_to_big_query - if metrics should be published in big query,
* metrics_namespace (optional) - name of BigQuery dataset where metrics
will be stored,
in case of lack of any of both options metrics won't be saved
* input_options - options for Synthetic Sources
* metrics_table (optional) - name of BigQuery table where metrics
will be stored,
* input_options - options for Synthetic Sources,
* co_input_options - options for Synthetic Sources.

Example test run on DirectRunner:

python setup.py nosetests \
--test-pipeline-options="
--project=big-query-project
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=co_gbk
--input_options='{
Expand Down Expand Up @@ -58,6 +61,7 @@
--staging_location=gs://...
--temp_location=gs://...
--sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=co_gbk
--input_options='{
Expand All @@ -84,25 +88,23 @@

import json
import logging
import os
import unittest

import apache_beam as beam
from apache_beam.testing import synthetic_pipeline
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor
from apache_beam.testing.test_pipeline import TestPipeline

try:
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor
from google.cloud import bigquery as bq
except ImportError:
bq = None

INPUT_TAG = 'pc1'
CO_INPUT_TAG = 'pc2'
RUNTIME_LABEL = 'runtime'
load_test_enabled = False
if os.environ.get('LOAD_TEST_ENABLED') == 'true':
load_test_enabled = True


@unittest.skipIf(bq is None, 'BigQuery for storing metrics not installed')
@unittest.skipIf(not load_test_enabled, 'Enabled only for phrase triggering.')
class CoGroupByKeyTest(unittest.TestCase):

def parseTestPipelineOptions(self, options):
Expand All @@ -122,30 +124,28 @@ def parseTestPipelineOptions(self, options):
}

def setUp(self):
self.pipeline = TestPipeline(is_integration_test=True)
self.pipeline = TestPipeline()
self.input_options = json.loads(self.pipeline.get_option('input_options'))
self.co_input_options = json.loads(
self.pipeline.get_option('co_input_options'))

self.metrics_monitor = self.pipeline.get_option('publish_to_big_query')
metrics_project_id = self.pipeline.get_option('project')
self.metrics_namespace = self.pipeline.get_option('metrics_table')
metrics_dataset = self.pipeline.get_option('metrics_dataset')
self.metrics_monitor = None
check = metrics_project_id and self.metrics_namespace and metrics_dataset\
is not None
if check:
measured_values = [{'name': RUNTIME_LABEL,
'type': 'FLOAT',
'mode': 'REQUIRED'}]
if not self.metrics_monitor:
logging.info('Metrics will not be collected')
elif check:
self.metrics_monitor = MetricsMonitor(
project_name=metrics_project_id,
table=self.metrics_namespace,
dataset=metrics_dataset,
schema_map=measured_values
)
else:
logging.error('One or more of parameters for collecting metrics '
'are empty. Metrics will not be collected')
raise ValueError('One or more of parameters for collecting metrics '
'are empty.')

class _Ungroup(beam.DoFn):
def process(self, element):
Expand Down
38 changes: 20 additions & 18 deletions sdks/python/apache_beam/testing/load_tests/combine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
* project (optional) - the gcp project in case of saving
metrics in Big Query (in case of Dataflow Runner
it is required to specify project of runner),
* metrics_namespace (optional) - name of BigQuery table where metrics
* publish_to_big_query - if metrics should be published in big query,
* metrics_namespace (optional) - name of BigQuery dataset where metrics
will be stored,
* metrics_table (optional) - name of BigQuery table where metrics
will be stored,
in case of lack of any of both options metrics won't be saved
* input_options - options for Synthetic Sources.

Example test run on DirectRunner:

python setup.py nosetests \
--test-pipeline-options="
--project=big-query-project
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=combine
--input_options='{
Expand All @@ -51,6 +54,7 @@
--staging_location=gs://...
--temp_location=gs://...
--sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=combine
--input_options='{
Expand All @@ -69,23 +73,21 @@

import json
import logging
import os
import unittest

import apache_beam as beam
from apache_beam.testing import synthetic_pipeline
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor
from apache_beam.testing.test_pipeline import TestPipeline

try:
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor
from google.cloud import bigquery as bq
except ImportError:
bq = None

RUNTIME_LABEL = 'runtime'
load_test_enabled = False
if os.environ.get('LOAD_TEST_ENABLED') == 'true':
load_test_enabled = True


@unittest.skipIf(bq is None, 'BigQuery for storing metrics not installed')
@unittest.skipIf(not load_test_enabled, 'Enabled only for phrase triggering.')
class CombineTest(unittest.TestCase):
def parseTestPipelineOptions(self):
return {
Expand All @@ -104,26 +106,26 @@ def parseTestPipelineOptions(self):
}

def setUp(self):
self.pipeline = TestPipeline(is_integration_test=True)
self.pipeline = TestPipeline()
self.input_options = json.loads(self.pipeline.get_option('input_options'))

self.metrics_monitor = self.pipeline.get_option('publish_to_big_query')
metrics_project_id = self.pipeline.get_option('project')
self.metrics_namespace = self.pipeline.get_option('metrics_table')
metrics_dataset = self.pipeline.get_option('metrics_dataset')
self.metrics_monitor = None
check = metrics_project_id and self.metrics_namespace and metrics_dataset \
is not None
if check:
schema = [{'name': RUNTIME_LABEL, 'type': 'FLOAT', 'mode': 'REQUIRED'}]
if not self.metrics_monitor:
logging.info('Metrics will not be collected')
elif check:
self.metrics_monitor = MetricsMonitor(
project_name=metrics_project_id,
table=self.metrics_namespace,
dataset=metrics_dataset,
schema_map=schema
)
else:
logging.error('One or more of parameters for collecting metrics '
'are empty. Metrics will not be collected')
raise ValueError('One or more of parameters for collecting metrics '
'are empty.')

class _GetElement(beam.DoFn):
def process(self, element):
Expand Down
39 changes: 21 additions & 18 deletions sdks/python/apache_beam/testing/load_tests/group_by_key_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
* project (optional) - the gcp project in case of saving
metrics in Big Query (in case of Dataflow Runner
it is required to specify project of runner),
* metrics_namespace (optional) - name of BigQuery table where metrics
* publish_to_big_query - if metrics should be published in big query,
* metrics_namespace (optional) - name of BigQuery dataset where metrics
will be stored,
* metrics_table (optional) - name of BigQuery table where metrics
will be stored,
in case of lack of any of both options metrics won't be saved
* input_options - options for Synthetic Sources.

Example test run on DirectRunner:

python setup.py nosetests \
--test-pipeline-options="
--project=big-query-project
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=gbk
--input_options='{
Expand All @@ -51,6 +54,7 @@
--staging_location=gs://...
--temp_location=gs://...
--sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=gbk
--input_options='{
Expand All @@ -69,23 +73,21 @@

import json
import logging
import os
import unittest

import apache_beam as beam
from apache_beam.testing import synthetic_pipeline
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor
from apache_beam.testing.test_pipeline import TestPipeline

try:
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsMonitor
from google.cloud import bigquery as bq
except ImportError:
bq = None

RUNTIME_LABEL = 'runtime'
load_test_enabled = False
if os.environ.get('LOAD_TEST_ENABLED') == 'true':
load_test_enabled = True


@unittest.skipIf(bq is None, 'BigQuery for storing metrics not installed')
@unittest.skipIf(not load_test_enabled, 'Enabled only for phrase triggering.')
class GroupByKeyTest(unittest.TestCase):
def parseTestPipelineOptions(self):
return {
Expand All @@ -104,26 +106,27 @@ def parseTestPipelineOptions(self):
}

def setUp(self):
self.pipeline = TestPipeline(is_integration_test=True)
self.pipeline = TestPipeline()
self.input_options = json.loads(self.pipeline.get_option('input_options'))

self.metrics_monitor = self.pipeline.get_option('publish_to_big_query')
metrics_project_id = self.pipeline.get_option('project')
self.metrics_namespace = self.pipeline.get_option('metrics_table')
metrics_dataset = self.pipeline.get_option('metrics_dataset')
self.metrics_monitor = None

check = metrics_project_id and self.metrics_namespace and metrics_dataset \
is not None
if check:
schema = [{'name': RUNTIME_LABEL, 'type': 'FLOAT', 'mode': 'REQUIRED'}]
if not self.metrics_monitor:
logging.info('Metrics will not be collected')
elif check:
self.metrics_monitor = MetricsMonitor(
project_name=metrics_project_id,
table=self.metrics_namespace,
dataset=metrics_dataset,
schema_map=schema
)
else:
logging.error('One or more of parameters for collecting metrics '
'are empty. Metrics will not be collected')
raise ValueError('One or more of parameters for collecting metrics '
'are empty.')

def testGroupByKey(self):
with self.pipeline as p:
Expand Down
Loading