Skip to content

Commit

Permalink
time partition
Browse files Browse the repository at this point in the history
  • Loading branch information
Wiwi-Creator committed Sep 18, 2024
1 parent 430fe61 commit aab820f
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 10 deletions.
3 changes: 3 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions .idea/embulk-output-bigquery.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions lib/embulk/output/bigquery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ def self.configure(config, schema, task_count)
'payload_column' => config.param('payload_column', :string, :default => nil),
'payload_column_index' => config.param('payload_column_index', :integer, :default => nil),

'description' => config.param('description', :string, :default => nil),

'open_timeout_sec' => config.param('open_timeout_sec', :integer, :default => nil),
'timeout_sec' => config.param('timeout_sec', :integer, :default => nil), # google-api-ruby-client < v0.11.0
'send_timeout_sec' => config.param('send_timeout_sec', :integer, :default => nil), # google-api-ruby-client >= v0.11.0
Expand All @@ -89,6 +87,7 @@ def self.configure(config, schema, task_count)
'ignore_unknown_values' => config.param('ignore_unknown_values', :bool, :default => false),
'allow_quoted_newlines' => config.param('allow_quoted_newlines', :bool, :default => false),
'time_partitioning' => config.param('time_partitioning', :hash, :default => nil),
'range_partitioning' => config.param('range_partitioning', :hash, :default => nil),
'clustering' => config.param('clustering', :hash, :default => nil), # google-api-ruby-client >= v0.21.0
'schema_update_options' => config.param('schema_update_options', :array, :default => nil),

Expand Down
27 changes: 19 additions & 8 deletions lib/embulk/output/bigquery/bigquery_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def load_from_gcs(object_uris, table)
opts = {}

Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" }
response = with_network_retry { client.insert_job(@project, body, **opts) }
response = with_network_retry { client.insert_job(@project, body, opts) }
unless @task['is_skip_job_result_check']
response = wait_load('Load', response)
end
Expand Down Expand Up @@ -222,7 +222,7 @@ def load(path, table, write_disposition: 'WRITE_APPEND')
# },
}
Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" }
response = with_network_retry { client.insert_job(@project, body, **opts) }
response = with_network_retry { client.insert_job(@project, body, opts) }
if @task['is_skip_job_result_check']
response
else
Expand Down Expand Up @@ -278,7 +278,7 @@ def copy(source_table, destination_table, destination_dataset = nil, write_dispo

opts = {}
Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" }
response = with_network_retry { client.insert_job(@project, body, **opts) }
response = with_network_retry { client.insert_job(@project, body, opts) }
wait_load('Copy', response)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Expand Down Expand Up @@ -372,7 +372,7 @@ def create_dataset(dataset = nil, reference: nil)
end
opts = {}
Embulk.logger.debug { "embulk-output-bigquery: insert_dataset(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts})" }
with_network_retry { client.insert_dataset(@project, body, **opts) }
with_network_retry { client.insert_dataset(@project, body, opts) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 409 && /Already Exists:/ =~ e.message
# ignore 'Already Exists' error
Expand Down Expand Up @@ -410,6 +410,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
dataset ||= @dataset
options ||= {}
options['time_partitioning'] ||= @task['time_partitioning']
options['range_partitioning'] ||= @task['range_partitioning']
if Helper.has_partition_decorator?(table)
options['time_partitioning'] ||= {'type' => 'DAY'}
table = Helper.chomp_partition_decorator(table)
Expand All @@ -420,7 +421,6 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
table_reference: {
table_id: table,
},
description: @task['description'],
schema: {
fields: fields,
}
Expand All @@ -434,6 +434,17 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
}
end

if options['range_partitioning']
body[:range_partitioning] = {
field: options['range_partitioning']['field'],
}
body[:range_partitioning][:range] = {
start: options['range_partitioning']['range']['start'],
end: options['range_partitioning']['range']['end'],
interval: options['range_partitioning']['range']['interval'],
}
end

options['clustering'] ||= @task['clustering']
if options['clustering']
body[:clustering] = {
Expand All @@ -447,8 +458,8 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
end

opts = {}
Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@destination_project}, #{dataset}, #{@location_for_log}, #{body}, #{opts})" }
with_network_retry { client.insert_table(@destination_project, dataset, body, **opts) }
Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts})" }
with_network_retry { client.insert_table(@project, dataset, body, opts) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 409 && /Already Exists:/ =~ e.message
# ignore 'Already Exists' error
Expand All @@ -457,7 +468,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)

response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
"embulk-output-bigquery: insert_table(#{@destination_project}, #{dataset}, #{@location_for_log}, #{body}, #{opts}), response:#{response}"
"embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts}), response:#{response}"
}
raise Error, "failed to create table #{@destination_project}:#{dataset}.#{table} in #{@location_for_log}, response:#{response}"
end
Expand Down

0 comments on commit aab820f

Please sign in to comment.