From 54b16e0416ada85dd2904b2d2bee7650a1d27b36 Mon Sep 17 00:00:00 2001 From: Ke Wang Date: Fri, 16 Aug 2024 23:10:08 -0700 Subject: [PATCH] Refactor WriterOptions processConfigs function (#10762) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/10762 Merge processSessionConfigs and processHiveConnectorConfigs as one function processConfigs. In processConfigs(), it prioritize getting config from session property, if not found, use config from connector config. Reviewed By: xiaoxmeng Differential Revision: D61304850 fbshipit-source-id: 19b2a66958559cb54f96e4dcc73480ccd81d57c8 --- velox/connectors/hive/HiveDataSink.cpp | 3 +-- velox/dwio/common/Options.h | 5 +++-- velox/dwio/dwrf/writer/Writer.cpp | 5 +++++ velox/dwio/dwrf/writer/Writer.h | 4 ++++ velox/dwio/parquet/writer/Writer.cpp | 28 ++++++++++--------------- velox/dwio/parquet/writer/Writer.h | 5 +++-- velox/exec/fuzzer/PrestoQueryRunner.cpp | 4 +++- 7 files changed, 30 insertions(+), 24 deletions(-) diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index d20f144276e8..a185ca20e9d4 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -693,8 +693,7 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { // through insertTableHandle) // 2. Otherwise, acquire user defined session properties. // 3. Lastly, acquire general hive connector configs. - options->processSessionConfigs(*connectorSessionProperties); - options->processHiveConnectorConfigs(*hiveConfig_->config()); + options->processConfigs(*hiveConfig_->config(), *connectorSessionProperties); // Only overwrite options in case they were not already provided. if (options->schema == nullptr) { diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index a3aaa8ff3b28..a695e30b45b9 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -620,8 +620,9 @@ struct WriterOptions { // WriterOption implementations should provide this function to specify how to // process format-specific session and connector configs. - virtual void processSessionConfigs(const config::ConfigBase&) {} - virtual void processHiveConnectorConfigs(const config::ConfigBase&) {} + virtual void processConfigs( + const config::ConfigBase& connectorConfig, + const config::ConfigBase& session) {}; virtual ~WriterOptions() = default; }; diff --git a/velox/dwio/dwrf/writer/Writer.cpp b/velox/dwio/dwrf/writer/Writer.cpp index 1312ee3b9c40..2c38455f9611 100644 --- a/velox/dwio/dwrf/writer/Writer.cpp +++ b/velox/dwio/dwrf/writer/Writer.cpp @@ -32,6 +32,11 @@ using facebook::velox::common::testutil::TestValue; namespace facebook::velox::dwrf { + +void WriterOptions::processConfigs( + const config::ConfigBase& connectorConfig, + const config::ConfigBase& session) {}; + namespace { dwio::common::StripeProgress getStripeProgress(const WriterContext& context) { diff --git a/velox/dwio/dwrf/writer/Writer.h b/velox/dwio/dwrf/writer/Writer.h index 9dbe2252e0b3..5056faa3942a 100644 --- a/velox/dwio/dwrf/writer/Writer.h +++ b/velox/dwio/dwrf/writer/Writer.h @@ -51,6 +51,10 @@ struct WriterOptions : public dwio::common::WriterOptions { WriterContext& context, const velox::dwio::common::TypeWithId& type)> columnWriterFactory; + + void processConfigs( + const config::ConfigBase& connectorConfig, + const config::ConfigBase& session) override; }; class Writer : public dwio::common::Writer { diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index 0a92aa916e02..c27fd6b7b548 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -435,28 +435,22 @@ std::optional getTimestampTimeZone( } // namespace -void WriterOptions::processSessionConfigs(const config::ConfigBase& config) { +void WriterOptions::processConfigs( + const config::ConfigBase& connectorConfig, + const config::ConfigBase& session) { if (!parquetWriteTimestampUnit) { parquetWriteTimestampUnit = - getTimestampUnit(config, kParquetSessionWriteTimestampUnit); + getTimestampUnit(session, kParquetSessionWriteTimestampUnit).has_value() + ? getTimestampUnit(session, kParquetSessionWriteTimestampUnit) + : getTimestampUnit(connectorConfig, kParquetSessionWriteTimestampUnit); } - - if (!parquetWriteTimestampTimeZone) { - parquetWriteTimestampTimeZone = - getTimestampTimeZone(config, core::QueryConfig::kSessionTimezone); - } -} - -void WriterOptions::processHiveConnectorConfigs( - const config::ConfigBase& config) { - if (!parquetWriteTimestampUnit) { - parquetWriteTimestampUnit = - getTimestampUnit(config, kParquetHiveConnectorWriteTimestampUnit); - } - if (!parquetWriteTimestampTimeZone) { parquetWriteTimestampTimeZone = - getTimestampTimeZone(config, core::QueryConfig::kSessionTimezone); + getTimestampTimeZone(session, core::QueryConfig::kSessionTimezone) + .has_value() + ? getTimestampTimeZone(session, core::QueryConfig::kSessionTimezone) + : getTimestampTimeZone( + connectorConfig, core::QueryConfig::kSessionTimezone); } } diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index e3da877be4f5..2b0940304e0d 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -123,8 +123,9 @@ struct WriterOptions : public dwio::common::WriterOptions { "hive.parquet.writer.timestamp-unit"; // Process hive connector and session configs. - void processSessionConfigs(const config::ConfigBase& config) override; - void processHiveConnectorConfigs(const config::ConfigBase& config) override; + void processConfigs( + const config::ConfigBase& connectorConfig, + const config::ConfigBase& session) override; }; // Writes Velox vectors into a DataSink using Arrow Parquet writer. diff --git a/velox/exec/fuzzer/PrestoQueryRunner.cpp b/velox/exec/fuzzer/PrestoQueryRunner.cpp index 599552b7ab81..05e3e712fcfb 100644 --- a/velox/exec/fuzzer/PrestoQueryRunner.cpp +++ b/velox/exec/fuzzer/PrestoQueryRunner.cpp @@ -17,6 +17,7 @@ #include "velox/exec/fuzzer/PrestoQueryRunner.h" #include // @manual #include +#include #include #include "velox/common/base/Fs.h" #include "velox/common/encode/Base64.h" @@ -44,7 +45,8 @@ void writeToFile( memory::MemoryPool* pool) { VELOX_CHECK_GT(data.size(), 0); - auto options = std::make_shared(); + std::shared_ptr options = + std::make_shared(); options->schema = data[0]->type(); options->memoryPool = pool;