Skip to content

Commit

Permalink
[Python] Make BigQuery load job names deterministic for retry resilie…
Browse files Browse the repository at this point in the history
…nce (#27384)

* use partition key instead of uuid for job suffix

* use pane index in job name to distinguish different load jobs in a streaming pipeline

* clarify name components

* fix affected tests
  • Loading branch information
ahmedabu98 authored Jul 10, 2023
1 parent e809912 commit 88e635a
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 35 deletions.
21 changes: 14 additions & 7 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _generate_job_name(job_name, job_type, step_name):
job_name=job_name,
step_id=step_name,
job_type=job_type,
random=random.randint(0, 1000))
random=_bq_uuid())


def file_prefix_generator(
Expand Down Expand Up @@ -657,14 +657,19 @@ def start_bundle(self):
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
self.pending_jobs = []

def process(self, element, load_job_name_prefix, *schema_side_inputs):
def process(
self,
element,
load_job_name_prefix,
pane_info=beam.DoFn.PaneInfoParam,
*schema_side_inputs):
# Each load job is assumed to have files respecting these constraints:
# 1. Total size of all files < 15 TB (Max size for load jobs)
# 2. Total no. of files in a single load job < 10,000
# This assumption means that there will always be a single load job
# triggered for each partition of files.
destination = element[0]
files = element[1]
partition_key, files = element[1]

if callable(self.schema):
schema = self.schema(destination, *schema_side_inputs)
Expand Down Expand Up @@ -692,8 +697,8 @@ def process(self, element, load_job_name_prefix, *schema_side_inputs):
table_reference.projectId,
table_reference.datasetId,
table_reference.tableId))
uid = _bq_uuid()
job_name = '%s_%s_%s' % (load_job_name_prefix, destination_hash, uid)
job_name = '%s_%s_pane%s_partition%s' % (
load_job_name_prefix, destination_hash, pane_info.index, partition_key)
_LOGGER.info('Load job has %s files. Job name is %s.', len(files), job_name)

create_disposition = self.create_disposition
Expand Down Expand Up @@ -799,8 +804,10 @@ def process(self, element):
else:
output_tag = PartitionFiles.SINGLE_PARTITION_TAG

for partition in partitions:
yield pvalue.TaggedOutput(output_tag, (destination, partition))
# we also pass along the index of partition as a key, which is used
# to create a deterministic load job name
for key, partition in enumerate(partitions):
yield pvalue.TaggedOutput(output_tag, (destination, (key, partition)))


class DeleteTablesFn(beam.DoFn):
Expand Down
56 changes: 29 additions & 27 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,9 @@ def test_partition(self):

def test_partition_files_dofn_file_split(self):
"""Force partitions to split based on max_files"""
multiple_partitions_result = [('destination0', ['file0', 'file1']),
('destination0', ['file2', 'file3'])]
single_partition_result = [('destination1', ['file0', 'file1'])]
multiple_partitions_result = [('destination0', (0, ['file0', 'file1'])),
('destination0', (1, ['file2', 'file3']))]
single_partition_result = [('destination1', (0, ['file0', 'file1']))]
with TestPipeline() as p:
destination_file_pairs = p | beam.Create(self._ELEMENTS, reshuffle=False)
partitioned_files = (
Expand All @@ -364,20 +364,22 @@ def test_partition_files_dofn_file_split(self):
single_partition = partitioned_files[bqfl.PartitionFiles\
.SINGLE_PARTITION_TAG]

assert_that(
multiple_partitions,
equal_to(multiple_partitions_result),
label='CheckMultiplePartitions')
assert_that(
single_partition,
equal_to(single_partition_result),
label='CheckSinglePartition')
assert_that(
multiple_partitions,
equal_to(multiple_partitions_result),
label='CheckMultiplePartitions')
assert_that(
single_partition,
equal_to(single_partition_result),
label='CheckSinglePartition')

def test_partition_files_dofn_size_split(self):
"""Force partitions to split based on max_partition_size"""
multiple_partitions_result = [('destination0', ['file0', 'file1', 'file2']),
('destination0', ['file3'])]
single_partition_result = [('destination1', ['file0', 'file1'])]
multiple_partitions_result = [
('destination0', (0, ['file0', 'file1',
'file2'])), ('destination0', (1, ['file3']))
]
single_partition_result = [('destination1', (0, ['file0', 'file1']))]
with TestPipeline() as p:
destination_file_pairs = p | beam.Create(self._ELEMENTS, reshuffle=False)
partitioned_files = (
Expand All @@ -390,14 +392,14 @@ def test_partition_files_dofn_size_split(self):
single_partition = partitioned_files[bqfl.PartitionFiles\
.SINGLE_PARTITION_TAG]

assert_that(
multiple_partitions,
equal_to(multiple_partitions_result),
label='CheckMultiplePartitions')
assert_that(
single_partition,
equal_to(single_partition_result),
label='CheckSinglePartition')
assert_that(
multiple_partitions,
equal_to(multiple_partitions_result),
label='CheckMultiplePartitions')
assert_that(
single_partition,
equal_to(single_partition_result),
label='CheckSinglePartition')


class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
Expand Down Expand Up @@ -584,8 +586,8 @@ def test_wait_for_load_job_completion(self, sleep_mock):
bq_client.jobs.Get.side_effect = [
job_1_waiting, job_2_done, job_1_done, job_2_done
]
partition_1 = ('project:dataset.table0', ['file0'])
partition_2 = ('project:dataset.table1', ['file1'])
partition_1 = ('project:dataset.table0', (0, ['file0']))
partition_2 = ('project:dataset.table1', (1, ['file1']))
bq_client.jobs.Insert.side_effect = [job_1, job_2]
test_job_prefix = "test_job"

Expand Down Expand Up @@ -627,8 +629,8 @@ def test_one_load_job_failed_after_waiting(self, sleep_mock):
bq_client.jobs.Get.side_effect = [
job_1_waiting, job_2_done, job_1_error, job_2_done
]
partition_1 = ('project:dataset.table0', ['file0'])
partition_2 = ('project:dataset.table1', ['file1'])
partition_1 = ('project:dataset.table0', (0, ['file0']))
partition_2 = ('project:dataset.table1', (1, ['file1']))
bq_client.jobs.Insert.side_effect = [job_1, job_2]
test_job_prefix = "test_job"

Expand Down Expand Up @@ -1041,7 +1043,7 @@ def test_bqfl_streaming_with_copy_jobs(self):

# Override these parameters to induce copy jobs
bqfl._DEFAULT_MAX_FILE_SIZE = 100
bqfl._MAXIMUM_LOAD_SIZE = 400
bqfl._MAXIMUM_LOAD_SIZE = 200

with beam.Pipeline(argv=args) as p:
stream_source = (
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ def wait_for_bq_job(self, job_reference, sleep_duration_sec=5, max_retries=0):
retry += 1
job = self.get_job(
job_reference.projectId, job_reference.jobId, job_reference.location)
logging.info('Job status: %s', job.status.state)
logging.info('Job %s status: %s', job.id, job.status.state)
if job.status.state == 'DONE' and job.status.errorResult:
raise RuntimeError(
'BigQuery job {} failed. Error Result: {}'.format(
Expand Down

0 comments on commit 88e635a

Please sign in to comment.