From 5b76a35c100416b3077769474d34638019be0466 Mon Sep 17 00:00:00 2001 From: Alexander Tretyakov Date: Mon, 19 Aug 2024 18:09:40 +0000 Subject: [PATCH] chore: Refactor data loading libraries Bug: 341257656 Change-Id: Iffa61ce96d36a2031cf1e03e31f91ef35f2429bf GitOrigin-RevId: 1164215eb94d5ef7ad599551355dfeb87c6df57f --- public/data_loading/csv/BUILD.bazel | 4 +++- public/data_loading/csv/constants.h | 18 +++++++++--------- .../csv/csv_delta_record_stream_reader.h | 16 +++++----------- .../csv/csv_delta_record_stream_writer.cc | 10 ++++------ .../csv/csv_delta_record_stream_writer.h | 17 +++++------------ 5 files changed, 26 insertions(+), 39 deletions(-) diff --git a/public/data_loading/csv/BUILD.bazel b/public/data_loading/csv/BUILD.bazel index 396907f0..6e4a67f1 100644 --- a/public/data_loading/csv/BUILD.bazel +++ b/public/data_loading/csv/BUILD.bazel @@ -21,7 +21,9 @@ package(default_visibility = [ cc_library( name = "constants", hdrs = ["constants.h"], - deps = [], + deps = [ + "@com_google_riegeli//riegeli/csv:csv_record", + ], ) cc_library( diff --git a/public/data_loading/csv/constants.h b/public/data_loading/csv/constants.h index 1f39da43..1f54f78b 100644 --- a/public/data_loading/csv/constants.h +++ b/public/data_loading/csv/constants.h @@ -17,9 +17,10 @@ #ifndef TOOLS_DATA_CLI_CSV_CONSTANTS_H_ #define TOOLS_DATA_CLI_CSV_CONSTANTS_H_ -#include #include +#include "riegeli/csv/csv_record.h" + namespace kv_server { inline constexpr std::string_view kUpdateMutationType = "update"; @@ -50,16 +51,15 @@ inline constexpr std::string_view kVersionColumn = "version"; inline constexpr std::string_view kLogicalShardColumn = "logical_shard"; inline constexpr std::string_view kPhysicalShardColumn = "physical_shard"; -inline constexpr std::array kKeyValueMutationRecordHeader = - {kKeyColumn, kLogicalCommitTimeColumn, kMutationTypeColumn, kValueColumn, - kValueTypeColumn}; +inline constexpr riegeli::CsvHeaderConstant kKeyValueMutationRecordHeader = { + kKeyColumn, kLogicalCommitTimeColumn, kMutationTypeColumn, kValueColumn, + kValueTypeColumn}; -inline constexpr std::array - kUserDefinedFunctionsConfigHeader = {kCodeSnippetColumn, kHandlerNameColumn, - kLogicalCommitTimeColumn, - kLanguageColumn, kVersionColumn}; +inline constexpr riegeli::CsvHeaderConstant kUserDefinedFunctionsConfigHeader = + {kCodeSnippetColumn, kHandlerNameColumn, kLogicalCommitTimeColumn, + kLanguageColumn, kVersionColumn}; -inline constexpr std::array kShardMappingRecordHeader = { +inline constexpr riegeli::CsvHeaderConstant kShardMappingRecordHeader = { kLogicalShardColumn, kPhysicalShardColumn}; } // namespace kv_server diff --git a/public/data_loading/csv/csv_delta_record_stream_reader.h b/public/data_loading/csv/csv_delta_record_stream_reader.h index 31ca6d1b..5c179a0f 100644 --- a/public/data_loading/csv/csv_delta_record_stream_reader.h +++ b/public/data_loading/csv/csv_delta_record_stream_reader.h @@ -19,7 +19,6 @@ #include #include -#include #include "absl/log/log.h" #include "public/data_loading/csv/constants.h" @@ -116,26 +115,21 @@ riegeli::CsvReaderBase::Options GetRecordReaderOptions( riegeli::CsvReaderBase::Options reader_options; reader_options.set_field_separator(options.field_separator); - std::vector header; + riegeli::CsvHeader header; switch (options.record_type) { case Record::KeyValueMutationRecord: - header = - std::vector(kKeyValueMutationRecordHeader.begin(), - kKeyValueMutationRecordHeader.end()); + header = *kKeyValueMutationRecordHeader; break; case Record::UserDefinedFunctionsConfig: - header = std::vector( - kUserDefinedFunctionsConfigHeader.begin(), - kUserDefinedFunctionsConfigHeader.end()); + header = *kUserDefinedFunctionsConfigHeader; break; case Record::ShardMappingRecord: - header = std::vector(kShardMappingRecordHeader.begin(), - kShardMappingRecordHeader.end()); + header = *kShardMappingRecordHeader; break; default: LOG(ERROR) << "Unable to set CSV reader header"; } - reader_options.set_required_header(riegeli::CsvHeader(std::move(header))); + reader_options.set_required_header(std::move(header)); return reader_options; } } // namespace internal diff --git a/public/data_loading/csv/csv_delta_record_stream_writer.cc b/public/data_loading/csv/csv_delta_record_stream_writer.cc index e94f622d..f5fa93a7 100644 --- a/public/data_loading/csv/csv_delta_record_stream_writer.cc +++ b/public/data_loading/csv/csv_delta_record_stream_writer.cc @@ -116,8 +116,7 @@ absl::StatusOr MakeCsvRecordWithKVMutation( const auto record = std::get(data_record.record); - riegeli::CsvHeader header(kKeyValueMutationRecordHeader); - riegeli::CsvRecord csv_record(header); + riegeli::CsvRecord csv_record(*kKeyValueMutationRecordHeader); csv_record[kKeyColumn] = record.key; absl::StatusOr value = GetRecordValue( record.value, std::string(1, value_separator), csv_encoding); @@ -158,8 +157,8 @@ absl::StatusOr MakeCsvRecordWithUdfConfig( const auto udf_config = std::get(data_record.record); - riegeli::CsvHeader header(kUserDefinedFunctionsConfigHeader); - riegeli::CsvRecord csv_record(header); + riegeli::CsvRecord csv_record(*kUserDefinedFunctionsConfigHeader); + csv_record[kCodeSnippetColumn] = udf_config.code_snippet; csv_record[kHandlerNameColumn] = udf_config.handler_name; csv_record[kLogicalCommitTimeColumn] = @@ -182,8 +181,7 @@ absl::StatusOr MakeCsvRecordWithShardMapping( } const auto shard_mapping_struct = std::get(data_record.record); - riegeli::CsvHeader header(kShardMappingRecordHeader); - riegeli::CsvRecord csv_record(header); + riegeli::CsvRecord csv_record(*kShardMappingRecordHeader); csv_record[kLogicalShardColumn] = absl::StrCat(shard_mapping_struct.logical_shard); csv_record[kPhysicalShardColumn] = diff --git a/public/data_loading/csv/csv_delta_record_stream_writer.h b/public/data_loading/csv/csv_delta_record_stream_writer.h index 849e7fc6..4f4cc39f 100644 --- a/public/data_loading/csv/csv_delta_record_stream_writer.h +++ b/public/data_loading/csv/csv_delta_record_stream_writer.h @@ -18,7 +18,6 @@ #define PUBLIC_DATA_LOADING_CSV_CSV_DELTA_RECORD_STREAM_WRITER_H_ #include -#include #include "absl/status/status.h" #include "absl/status/statusor.h" @@ -107,25 +106,19 @@ riegeli::CsvWriterBase::Options GetRecordWriterOptions( const typename CsvDeltaRecordStreamWriter::Options& options) { riegeli::CsvWriterBase::Options writer_options; writer_options.set_field_separator(options.field_separator); - std::vector header; + riegeli::CsvHeader header; switch (options.record_type) { case DataRecordType::kKeyValueMutationRecord: - header = - std::vector(kKeyValueMutationRecordHeader.begin(), - kKeyValueMutationRecordHeader.end()); + header = *kKeyValueMutationRecordHeader; break; case DataRecordType::kUserDefinedFunctionsConfig: - header = std::vector( - kUserDefinedFunctionsConfigHeader.begin(), - kUserDefinedFunctionsConfigHeader.end()); + header = *kUserDefinedFunctionsConfigHeader; break; case DataRecordType::kShardMappingRecord: - header = std::vector(kShardMappingRecordHeader.begin(), - kShardMappingRecordHeader.end()); + header = *kShardMappingRecordHeader; break; } - riegeli::CsvHeader header_opt(std::move(header)); - writer_options.set_header(std::move(header_opt)); + writer_options.set_header(std::move(header)); return writer_options; } } // namespace internal