Skip to content

Commit

Permalink
chore: Refactor data loading libraries
Browse files Browse the repository at this point in the history
Bug: 341257656
Change-Id: Iffa61ce96d36a2031cf1e03e31f91ef35f2429bf
GitOrigin-RevId: 1164215eb94d5ef7ad599551355dfeb87c6df57f
  • Loading branch information
lx3-g authored and copybara-github committed Aug 19, 2024
1 parent 985215d commit 5b76a35
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 39 deletions.
4 changes: 3 additions & 1 deletion public/data_loading/csv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ package(default_visibility = [
cc_library(
name = "constants",
hdrs = ["constants.h"],
deps = [],
deps = [
"@com_google_riegeli//riegeli/csv:csv_record",
],
)

cc_library(
Expand Down
18 changes: 9 additions & 9 deletions public/data_loading/csv/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
#ifndef TOOLS_DATA_CLI_CSV_CONSTANTS_H_
#define TOOLS_DATA_CLI_CSV_CONSTANTS_H_

#include <array>
#include <string_view>

#include "riegeli/csv/csv_record.h"

namespace kv_server {

inline constexpr std::string_view kUpdateMutationType = "update";
Expand Down Expand Up @@ -50,16 +51,15 @@ inline constexpr std::string_view kVersionColumn = "version";
inline constexpr std::string_view kLogicalShardColumn = "logical_shard";
inline constexpr std::string_view kPhysicalShardColumn = "physical_shard";

inline constexpr std::array<std::string_view, 5> kKeyValueMutationRecordHeader =
{kKeyColumn, kLogicalCommitTimeColumn, kMutationTypeColumn, kValueColumn,
kValueTypeColumn};
inline constexpr riegeli::CsvHeaderConstant kKeyValueMutationRecordHeader = {
kKeyColumn, kLogicalCommitTimeColumn, kMutationTypeColumn, kValueColumn,
kValueTypeColumn};

inline constexpr std::array<std::string_view, 5>
kUserDefinedFunctionsConfigHeader = {kCodeSnippetColumn, kHandlerNameColumn,
kLogicalCommitTimeColumn,
kLanguageColumn, kVersionColumn};
inline constexpr riegeli::CsvHeaderConstant kUserDefinedFunctionsConfigHeader =
{kCodeSnippetColumn, kHandlerNameColumn, kLogicalCommitTimeColumn,
kLanguageColumn, kVersionColumn};

inline constexpr std::array<std::string_view, 2> kShardMappingRecordHeader = {
inline constexpr riegeli::CsvHeaderConstant kShardMappingRecordHeader = {
kLogicalShardColumn, kPhysicalShardColumn};

} // namespace kv_server
Expand Down
16 changes: 5 additions & 11 deletions public/data_loading/csv/csv_delta_record_stream_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

#include <string>
#include <utility>
#include <vector>

#include "absl/log/log.h"
#include "public/data_loading/csv/constants.h"
Expand Down Expand Up @@ -116,26 +115,21 @@ riegeli::CsvReaderBase::Options GetRecordReaderOptions(
riegeli::CsvReaderBase::Options reader_options;
reader_options.set_field_separator(options.field_separator);

std::vector<std::string_view> header;
riegeli::CsvHeader header;
switch (options.record_type) {
case Record::KeyValueMutationRecord:
header =
std::vector<std::string_view>(kKeyValueMutationRecordHeader.begin(),
kKeyValueMutationRecordHeader.end());
header = *kKeyValueMutationRecordHeader;
break;
case Record::UserDefinedFunctionsConfig:
header = std::vector<std::string_view>(
kUserDefinedFunctionsConfigHeader.begin(),
kUserDefinedFunctionsConfigHeader.end());
header = *kUserDefinedFunctionsConfigHeader;
break;
case Record::ShardMappingRecord:
header = std::vector<std::string_view>(kShardMappingRecordHeader.begin(),
kShardMappingRecordHeader.end());
header = *kShardMappingRecordHeader;
break;
default:
LOG(ERROR) << "Unable to set CSV reader header";
}
reader_options.set_required_header(riegeli::CsvHeader(std::move(header)));
reader_options.set_required_header(std::move(header));
return reader_options;
}
} // namespace internal
Expand Down
10 changes: 4 additions & 6 deletions public/data_loading/csv/csv_delta_record_stream_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ absl::StatusOr<riegeli::CsvRecord> MakeCsvRecordWithKVMutation(
const auto record =
std::get<KeyValueMutationRecordStruct>(data_record.record);

riegeli::CsvHeader header(kKeyValueMutationRecordHeader);
riegeli::CsvRecord csv_record(header);
riegeli::CsvRecord csv_record(*kKeyValueMutationRecordHeader);
csv_record[kKeyColumn] = record.key;
absl::StatusOr<ValueStruct> value = GetRecordValue(
record.value, std::string(1, value_separator), csv_encoding);
Expand Down Expand Up @@ -158,8 +157,8 @@ absl::StatusOr<riegeli::CsvRecord> MakeCsvRecordWithUdfConfig(
const auto udf_config =
std::get<UserDefinedFunctionsConfigStruct>(data_record.record);

riegeli::CsvHeader header(kUserDefinedFunctionsConfigHeader);
riegeli::CsvRecord csv_record(header);
riegeli::CsvRecord csv_record(*kUserDefinedFunctionsConfigHeader);

csv_record[kCodeSnippetColumn] = udf_config.code_snippet;
csv_record[kHandlerNameColumn] = udf_config.handler_name;
csv_record[kLogicalCommitTimeColumn] =
Expand All @@ -182,8 +181,7 @@ absl::StatusOr<riegeli::CsvRecord> MakeCsvRecordWithShardMapping(
}
const auto shard_mapping_struct =
std::get<ShardMappingRecordStruct>(data_record.record);
riegeli::CsvHeader header(kShardMappingRecordHeader);
riegeli::CsvRecord csv_record(header);
riegeli::CsvRecord csv_record(*kShardMappingRecordHeader);
csv_record[kLogicalShardColumn] =
absl::StrCat(shard_mapping_struct.logical_shard);
csv_record[kPhysicalShardColumn] =
Expand Down
17 changes: 5 additions & 12 deletions public/data_loading/csv/csv_delta_record_stream_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#define PUBLIC_DATA_LOADING_CSV_CSV_DELTA_RECORD_STREAM_WRITER_H_

#include <utility>
#include <vector>

#include "absl/status/status.h"
#include "absl/status/statusor.h"
Expand Down Expand Up @@ -107,25 +106,19 @@ riegeli::CsvWriterBase::Options GetRecordWriterOptions(
const typename CsvDeltaRecordStreamWriter<DestStreamT>::Options& options) {
riegeli::CsvWriterBase::Options writer_options;
writer_options.set_field_separator(options.field_separator);
std::vector<std::string_view> header;
riegeli::CsvHeader header;
switch (options.record_type) {
case DataRecordType::kKeyValueMutationRecord:
header =
std::vector<std::string_view>(kKeyValueMutationRecordHeader.begin(),
kKeyValueMutationRecordHeader.end());
header = *kKeyValueMutationRecordHeader;
break;
case DataRecordType::kUserDefinedFunctionsConfig:
header = std::vector<std::string_view>(
kUserDefinedFunctionsConfigHeader.begin(),
kUserDefinedFunctionsConfigHeader.end());
header = *kUserDefinedFunctionsConfigHeader;
break;
case DataRecordType::kShardMappingRecord:
header = std::vector<std::string_view>(kShardMappingRecordHeader.begin(),
kShardMappingRecordHeader.end());
header = *kShardMappingRecordHeader;
break;
}
riegeli::CsvHeader header_opt(std::move(header));
writer_options.set_header(std::move(header_opt));
writer_options.set_header(std::move(header));
return writer_options;
}
} // namespace internal
Expand Down

0 comments on commit 5b76a35

Please sign in to comment.