diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index 3d0db805cbf29..bcffac528a195 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -95,7 +95,13 @@ def tearDown(self): os.remove(path) self._temp_files = [] - def _write_data(self, directory, prefix, codec, count, sync_interval): + def _write_data( + self, + directory=None, + prefix=None, + codec=None, + count=None, + sync_interval=None): raise NotImplementedError def _write_pattern(self, num_files, return_filenames=False): diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 92d55929309fe..c854e5879722c 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -257,8 +257,7 @@ def _read_from_internal_buffer(self, read_fn): self._read_buffer.seek(0, os.SEEK_END) # Allow future writes. return result - def read(self, num_bytes): - # type: (int) -> bytes + def read(self, num_bytes: Optional[int] = None) -> bytes: if not self._decompressor: raise ValueError('decompressor not initialized') diff --git a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py index 7b975bbe8feb1..0099d462af986 100644 --- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py +++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py @@ -216,7 +216,7 @@ def __init__( 'InfluxDB') self.filters = filters - def publish_metrics(self, result, extra_metrics: dict): + def publish_metrics(self, result, extra_metrics: Optional[dict] = None): metric_id = uuid.uuid4().hex metrics = result.metrics().query(self.filters) @@ -227,13 +227,16 @@ def publish_metrics(self, result, extra_metrics: dict): # a list of dictionaries matching the schema. insert_dicts = self._prepare_all_metrics(metrics, metric_id) - insert_dicts += self._prepare_extra_metrics(extra_metrics, metric_id) + insert_dicts += self._prepare_extra_metrics(metric_id, extra_metrics) if len(insert_dicts) > 0: for publisher in self.publishers: publisher.publish(insert_dicts) - def _prepare_extra_metrics(self, extra_metrics: dict, metric_id: str): + def _prepare_extra_metrics( + self, metric_id: str, extra_metrics: Optional[dict] = None): ts = time.time() + if not extra_metrics: + extra_metrics = {} return [ Metric(ts, metric_id, v, label=k).as_dict() for k, v in extra_metrics.items()