Skip to content

Commit

Permalink
[BEAM-6291] Refactored BQ and Metrics load tests utils. Added missing…
Browse files Browse the repository at this point in the history
… documentation and pipelineoption.
  • Loading branch information
kkucharc committed Jan 30, 2019
1 parent de395da commit ff8d527
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 72 deletions.
20 changes: 13 additions & 7 deletions sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@
* project (optional) - the gcp project in case of saving
metrics in Big Query (in case of Dataflow Runner
it is required to specify project of runner),
* metrics_namespace (optional) - name of BigQuery table where metrics
* publish_to_big_query - if metrics should be published in big query,
* metrics_namespace (optional) - name of BigQuery dataset where metrics
will be stored,
in case of lack of any of both options metrics won't be saved
* input_options - options for Synthetic Sources
* metrics_table (optional) - name of BigQuery table where metrics
will be stored,
* input_options - options for Synthetic Sources,
* co_input_options - options for Synthetic Sources.
Example test run on DirectRunner:
python setup.py nosetests \
--test-pipeline-options="
--project=big-query-project
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=co_gbk
--input_options='{
Expand Down Expand Up @@ -58,6 +61,7 @@
--staging_location=gs://...
--temp_location=gs://...
--sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=co_gbk
--input_options='{
Expand Down Expand Up @@ -125,21 +129,23 @@ def setUp(self):
self.co_input_options = json.loads(
self.pipeline.get_option('co_input_options'))

self.metrics_monitor = self.pipeline.get_option('publish_to_big_query')
metrics_project_id = self.pipeline.get_option('project')
self.metrics_namespace = self.pipeline.get_option('metrics_table')
metrics_dataset = self.pipeline.get_option('metrics_dataset')
self.metrics_monitor = None
check = metrics_project_id and self.metrics_namespace and metrics_dataset\
is not None
if check:
if not self.metrics_monitor:
logging.info('Metrics will not be collected')
elif check:
self.metrics_monitor = MetricsMonitor(
project_name=metrics_project_id,
table=self.metrics_namespace,
dataset=metrics_dataset,
)
else:
logging.error('One or more of parameters for collecting metrics '
'are empty. Metrics will not be collected')
raise ValueError('One or more of parameters for collecting metrics '
'are empty.')

class _Ungroup(beam.DoFn):
def process(self, element):
Expand Down
8 changes: 6 additions & 2 deletions sdks/python/apache_beam/testing/load_tests/combine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
* project (optional) - the gcp project in case of saving
metrics in Big Query (in case of Dataflow Runner
it is required to specify project of runner),
* metrics_namespace (optional) - name of BigQuery table where metrics
* publish_to_big_query - if metrics should be published in big query,
* metrics_namespace (optional) - name of BigQuery dataset where metrics
will be stored,
* metrics_table (optional) - name of BigQuery table where metrics
will be stored,
in case of lack of any of both options metrics won't be saved
* input_options - options for Synthetic Sources.
Example test run on DirectRunner:
python setup.py nosetests \
--test-pipeline-options="
--project=big-query-project
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=combine
--input_options='{
Expand All @@ -51,6 +54,7 @@
--staging_location=gs://...
--temp_location=gs://...
--sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=combine
--input_options='{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
* project (optional) - the gcp project in case of saving
metrics in Big Query (in case of Dataflow Runner
it is required to specify project of runner),
* metrics_namespace (optional) - name of BigQuery table where metrics
* publish_to_big_query - if metrics should be published in big query,
* metrics_namespace (optional) - name of BigQuery dataset where metrics
will be stored,
* metrics_table (optional) - name of BigQuery table where metrics
will be stored,
in case of lack of any of both options metrics won't be saved
* input_options - options for Synthetic Sources.
Example test run on DirectRunner:
python setup.py nosetests \
--test-pipeline-options="
--project=big-query-project
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=gbk
--input_options='{
Expand All @@ -51,6 +54,7 @@
--staging_location=gs://...
--temp_location=gs://...
--sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=gbk
--input_options='{
Expand Down Expand Up @@ -109,7 +113,6 @@ def setUp(self):
metrics_project_id = self.pipeline.get_option('project')
self.metrics_namespace = self.pipeline.get_option('metrics_table')
metrics_dataset = self.pipeline.get_option('metrics_dataset')
self.metrics_monitor = None

check = metrics_project_id and self.metrics_namespace and metrics_dataset \
is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
NotFound = None

RUNTIME_METRIC = 'runtime'
COUNTER_LABEL = "total_bytes_count"
COUNTER_LABEL = 'total_bytes_count'

ID_LABEL = 'test_id'
SUBMIT_TIMESTAMP_LABEL = 'timestamp'
Expand All @@ -57,31 +57,24 @@

SCHEMA = [
{'name': ID_LABEL,
'type': 'STRING',
'field_type': 'STRING',
'mode': 'REQUIRED'
},
{'name': SUBMIT_TIMESTAMP_LABEL,
'type': 'TIMESTAMP',
'field_type': 'TIMESTAMP',
'mode': 'REQUIRED'
},
{'name': METRICS_TYPE_LABEL,
'type': 'STRING',
'field_type': 'STRING',
'mode': 'REQUIRED'
},
{'name': VALUE_LABEL,
'type': 'FLOAT',
'field_type': 'FLOAT',
'mode': 'REQUIRED'
}
]


def get_schema_field(schema_field):
return SchemaField(
name=schema_field['name'],
field_type=schema_field['type'],
mode=schema_field['mode'])


def get_element_by_schema(schema_name, insert_list):
for element in insert_list:
if element['label'] == schema_name:
Expand All @@ -99,20 +92,8 @@ def __init__(self, project_name, table, dataset):

self._get_or_create_table(schema, dataset)

def match_and_save(self, results_lists):
list_of_tuples = []
for x in results_lists:
list_of_tuples += [self._match_inserts_by_schema(x)]
self._insert_data(list_of_tuples)

def _match_inserts_by_schema(self, insert_list):
result_tuple = ()
for name in self._schema_names:
result_tuple += (get_element_by_schema(name, insert_list),)
return result_tuple

def _insert_data(self, list_of_tuples):
outputs = self._bq_client.insert_rows(self._bq_table, rows=list_of_tuples)
def match_and_save(self, rows):
outputs = self._bq_client.insert_rows(self._bq_table, rows)
if len(outputs) > 0:
for output in outputs:
errors = output['errors']
Expand Down Expand Up @@ -146,7 +127,7 @@ def _get_or_create_table(self, bq_schemas, dataset):
self._bq_table = self._bq_client.create_table(table)

def _prepare_schema(self):
return [get_schema_field(row) for row in SCHEMA]
return [SchemaField(**row) for row in SCHEMA]

def _get_schema_names(self):
return [schema['name'] for schema in SCHEMA]
Expand All @@ -159,42 +140,44 @@ def __init__(self, project_name, table, dataset):

def send_metrics(self, result):
metrics = result.metrics().query()
timestamp = {'label': SUBMIT_TIMESTAMP_LABEL, 'value': time.time()}
test_id = {'label': ID_LABEL, 'value': uuid.uuid4().hex}

insert_dicts = self._prepare_all_metrics(metrics)

self.bq.match_and_save(insert_dicts)

def _prepare_all_metrics(self, metrics):
common_metric_rows = {SUBMIT_TIMESTAMP_LABEL: time.time(),
ID_LABEL: uuid.uuid4().hex
}
insert_rows = []

counters = metrics['counters']
counters_list = []
if len(counters) > 0:
counters_list = self._prepare_counter_metrics(counters)
insert_rows += self._prepare_counter_metrics(counters, common_metric_rows)

distributions = metrics['distributions']
dist_list = []
if len(distributions) > 0:
dist_list = self._prepare_runtime_metrics(distributions)
insert_rows += self._prepare_runtime_metrics(distributions,
common_metric_rows)

metrics_list = [test_id] + [timestamp]
insert_list = []
values = [dist_list] + [counters_list]
for value in values:
if value:
insert_list.append(metrics_list + value)
return insert_rows

self.bq.match_and_save(insert_list)

def _prepare_counter_metrics(self, counters):
def _prepare_counter_metrics(self, counters, common_metric_rows):
for counter in counters:
logging.info("Counter: %s", counter)
counters_list = []
for counter in counters:
counter_commited = counter.committed
counter_label = str(counter.key.metric.name)
counters_list.extend([
{'label': VALUE_LABEL, 'value': counter_commited},
{'label': METRICS_TYPE_LABEL, 'value': counter_label}])
dict({VALUE_LABEL: counter_commited,
METRICS_TYPE_LABEL: counter_label
},
**common_metric_rows)])

return counters_list

def _prepare_runtime_metrics(self, distributions):
def _prepare_runtime_metrics(self, distributions, common_metric_rows):
min_values = []
max_values = []
for dist in distributions:
Expand All @@ -209,8 +192,9 @@ def _prepare_runtime_metrics(self, distributions):
runtime_in_s = max_value - min_value
logging.info("Runtime: %s", runtime_in_s)
runtime_in_s = float(runtime_in_s)
return [{'label': VALUE_LABEL, 'value': runtime_in_s},
{'label': METRICS_TYPE_LABEL, 'value': RUNTIME_METRIC}]
return [dict({VALUE_LABEL: runtime_in_s,
METRICS_TYPE_LABEL: RUNTIME_METRIC
}, **common_metric_rows)]


class MeasureTime(beam.DoFn):
Expand All @@ -237,4 +221,5 @@ def repl(*args):
for i in range(len(value)):
counter.inc(i)
return f(*args)

return repl
11 changes: 8 additions & 3 deletions sdks/python/apache_beam/testing/load_tests/pardo_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
* project (optional) - the gcp project in case of saving
metrics in Big Query (in case of Dataflow Runner
it is required to specify project of runner),
* metrics_namespace (optional) - name of BigQuery table where metrics
* publish_to_big_query - if metrics should be published in big query,
* metrics_namespace (optional) - name of BigQuery dataset where metrics
will be stored,
* metrics_table (optional) - name of BigQuery table where metrics
will be stored,
in case of lack of any of both options metrics won't be saved
* output (optional) - destination to save output, in case of no option
output won't be written
output won't be written,
* input_options - options for Synthetic Sources.
Example test run on DirectRunner:
Expand All @@ -35,6 +37,7 @@
--number_of_counter_operations=1000
--output=gs://...
--project=big-query-project
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=pardo
--input_options='{
Expand All @@ -58,6 +61,7 @@
--sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz
--output=gs://...
--number_of_counter_operations=1000
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=pardo
--input_options='{
Expand Down Expand Up @@ -120,6 +124,7 @@ def setUp(self):
metrics_project_id = self.pipeline.get_option('project')
self.metrics_namespace = self.pipeline.get_option('metrics_table')
metrics_dataset = self.pipeline.get_option('metrics_dataset')

check = metrics_project_id and self.metrics_namespace and metrics_dataset \
is not None
if not self.metrics_monitor:
Expand Down
22 changes: 12 additions & 10 deletions sdks/python/apache_beam/testing/load_tests/sideinput_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@
* project (optional) - the gcp project in case of saving
metrics in Big Query (in case of Dataflow Runner
it is required to specify project of runner),
* metrics_table (optional) - name of BigQuery table where metrics
* publish_to_big_query - if metrics should be published in big query,
* metrics_namespace (optional) - name of BigQuery dataset where metrics
will be stored,
in case of lack of any of both options metrics won't be saved
* metrics_dataset (optional) - name of BigQuery dataset where metrics
* metrics_table (optional) - name of BigQuery table where metrics
will be stored,
in case of lack of all three options metrics won't be saved
* input_options - options for Synthetic Sources.
To run test on DirectRunner
python setup.py nosetests \
--project=big-query-project
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=side_input
--test-pipeline-options="
Expand All @@ -54,6 +54,7 @@
--test-pipeline-options="
--runner=TestDataflowRunner
--project=...
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=side_input
--staging_location=gs://...
Expand Down Expand Up @@ -134,23 +135,24 @@ def setUp(self):
self.iterations = 1
self.iterations = int(self.iterations)

self.metrics_monitor = self.pipeline.get_option('publish_to_big_query')
metrics_project_id = self.pipeline.get_option('project')
self.metrics_namespace = self.pipeline.get_option('metrics_table')
if not self.metrics_namespace:
self.metrics_namespace = self.__class__.__name__
metrics_dataset = self.pipeline.get_option('metrics_dataset')
self.metrics_monitor = None

check = metrics_project_id and self.metrics_namespace and metrics_dataset \
is not None
if check:
if not self.metrics_monitor:
logging.info('Metrics will not be collected')
elif check:
self.metrics_monitor = MetricsMonitor(
project_name=metrics_project_id,
table=self.metrics_namespace,
dataset=metrics_dataset,
)
else:
logging.error('One or more of parameters for collecting metrics '
'are empty. Metrics will not be collected')
raise ValueError('One or more of parameters for collecting metrics '
'are empty.')

def testSideInput(self):
def join_fn(element, side_input, iterations):
Expand Down

0 comments on commit ff8d527

Please sign in to comment.