Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clp-s: Update core functionality to prepare for generic parser support #355

Merged
merged 51 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
3e0435c
Refactor clp-s NodeType enum and add StructuredArray NodeType
gibber9809 Feb 21, 2024
5b8c97c
Implement compression path for naive array structurization
gibber9809 Mar 22, 2024
d5440c1
Fix bugs causing AST to be displayed incorrectly using print() debug …
gibber9809 Mar 27, 2024
1f3b1f3
Use bithacks to encode unstructured object delimiters using a single …
gibber9809 Mar 28, 2024
959f7a8
Add docstrings to new methods in Schema.hpp
gibber9809 Mar 28, 2024
7ea6eb4
Record the number ordered and unordered entries in each schema
gibber9809 Mar 31, 2024
8bc1356
Get most of the way towards decompression for structurized arrays
gibber9809 Apr 2, 2024
87502f3
Fix trivial bug related to marshalling empty structured arrays
gibber9809 Apr 3, 2024
f6b8aa5
Fix bug where keys for string columns for objects inside structured a…
gibber9809 Apr 3, 2024
86d7a8d
Add nearly correct implementation of serialization for structurized j…
gibber9809 Apr 3, 2024
8cf5f0c
Fix trivial bug
gibber9809 Apr 4, 2024
624f4c4
Track node depth in schema tree
gibber9809 Apr 5, 2024
29952c8
Implement decompression for structured arrays
gibber9809 Apr 5, 2024
994946f
Make ColumnDescriptor clean up sequences of multiple wildcard tokens …
gibber9809 Apr 7, 2024
c2ea3b6
Rewrite column resolution to increase understandability, and make Sch…
gibber9809 Apr 7, 2024
cd812e5
Fix edge case where pure wildcard flag is sometimes not set on Column…
gibber9809 Apr 8, 2024
be178be
Fix bug in Column Resolution
gibber9809 Apr 8, 2024
456ab68
Add comment documenting known broken case for column resolution
gibber9809 Apr 8, 2024
8ed2f83
Support structured arrays in last stage of search
gibber9809 Apr 8, 2024
f686756
Rename first object in hierarchy from 'root' to the empty string to b…
gibber9809 Apr 10, 2024
7007c8a
Simplify unordered object marshalling code and slightly improve perfo…
gibber9809 Apr 10, 2024
7ee691e
Remove unused include
gibber9809 Apr 10, 2024
5c7e6f9
Implement optimization to reuse same schema reader instead of allocat…
gibber9809 Apr 10, 2024
36496b5
Lazily initialize data-structures related to serialization to reduce …
gibber9809 Apr 11, 2024
a0ac519
Fix performance bug causing slow loading of large SchemaMap
gibber9809 Apr 11, 2024
c67b85b
Use vector<SchemaNode> instead of vector<std::shared_ptr<SchemaNode>>…
gibber9809 Apr 11, 2024
acf8b92
Introduce hint for ZstdDecompressor that the compressor will be reuse…
gibber9809 Apr 11, 2024
edd32b2
Rework column readers to use unaligned views into shared buffer to av…
gibber9809 Apr 12, 2024
8c1f015
Remove unused struct
gibber9809 Apr 12, 2024
7d1e236
Get rid of hack related to values inside of arrays during json serial…
gibber9809 Apr 12, 2024
61a302c
Fix [[]] -> []] serialization bug
gibber9809 Apr 12, 2024
3ec2ac6
Improve memory safety for column readers using shared table buffer
gibber9809 Apr 12, 2024
e09ae8e
Code cleanup
gibber9809 Apr 15, 2024
952a97f
Minor cleanup
gibber9809 Apr 15, 2024
262ac9e
Remove code specific to array-structurization support
gibber9809 Apr 15, 2024
ba7b844
Apply suggestions from code review
gibber9809 Apr 19, 2024
be045e2
Address more review comments
gibber9809 Apr 19, 2024
96596e6
Add missing entry in CMakeLists.txt
gibber9809 Apr 22, 2024
e673487
Merge remote-tracking branch 'upstream/main' into clp-core-generic-pa…
gibber9809 Apr 24, 2024
d5218e1
Fix bug with uninitialized local schema tree during marshalling
gibber9809 Apr 24, 2024
33fe38f
Fix performance bug with wildcard search on large number of schemas
gibber9809 Apr 28, 2024
870da2f
Merge remote-tracking branch 'upstream/main' into clp-core-generic-pa…
gibber9809 May 1, 2024
f37b57c
Apply suggestions from code review
gibber9809 May 8, 2024
dd66174
Address review comments
gibber9809 May 8, 2024
ac1d5b1
Minor style change
gibber9809 May 9, 2024
2374dfa
Improve docstrings for Span and UnalignedSpan classes
gibber9809 May 10, 2024
ab35d8c
Rename UnalignedSpan UnalignedMemSpan
gibber9809 May 10, 2024
0800800
Upgrade build to use c++20
gibber9809 May 10, 2024
36c80a5
Replace uses of our custom Span class with std::span
gibber9809 May 10, 2024
8dba752
Fix build issue on MacOS
gibber9809 May 12, 2024
21c5611
Merge remote-tracking branch 'upstream/main' into clp-core-generic-pa…
gibber9809 May 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 114 additions & 44 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ void ArchiveReader::read_metadata() {
int32_t schema_id;
uint64_t num_messages;
size_t table_offset;
size_t in_memory_size;

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(schema_id);
ErrorCodeSuccess != error)
Expand All @@ -60,7 +61,13 @@ void ArchiveReader::read_metadata() {
throw OperationFailed(error, __FILENAME__, __LINE__);
}

m_id_to_table_metadata[schema_id] = {num_messages, table_offset};
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(in_memory_size);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

m_id_to_table_metadata[schema_id] = {num_messages, table_offset, in_memory_size};
m_schema_ids.push_back(schema_id);
}
m_table_metadata_decompressor.close();
Expand All @@ -74,7 +81,7 @@ void ArchiveReader::read_dictionaries_and_metadata() {
read_metadata();
}

std::unique_ptr<SchemaReader> ArchiveReader::read_table(
SchemaReader& ArchiveReader::read_table(
int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
Expand All @@ -85,93 +92,156 @@ std::unique_ptr<SchemaReader> ArchiveReader::read_table(
throw OperationFailed(ErrorCodeFileNotFound, __FILENAME__, __LINE__);
}

auto schema_reader
auto& schema_reader
= create_schema_reader(schema_id, should_extract_timestamp, should_marshal_records);

m_tables_file_reader.try_seek_from_begin(m_id_to_table_metadata[schema_id].offset);
m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity);
schema_reader->load(m_tables_decompressor);
m_tables_decompressor.close();
schema_reader.load(m_tables_decompressor, m_id_to_table_metadata[schema_id].in_memory_size);
m_tables_decompressor.close_for_reuse();
return schema_reader;
}

BaseColumnReader*
ArchiveReader::append_reader_column(std::unique_ptr<SchemaReader>& reader, int32_t column_id) {
BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int32_t column_id) {
BaseColumnReader* column_reader = nullptr;
auto node = m_schema_tree->get_node(column_id);
std::string key_name = node->get_key_name();
switch (node->get_type()) {
case NodeType::INTEGER:
column_reader = new Int64ColumnReader(key_name, column_id);
break;
case NodeType::FLOAT:
column_reader = new FloatColumnReader(key_name, column_id);
auto const& node = m_schema_tree->get_node(column_id);
switch (node.get_type()) {
case NodeType::Integer:
column_reader = new Int64ColumnReader(column_id);
break;
case NodeType::CLPSTRING:
column_reader = new ClpStringColumnReader(key_name, column_id, m_var_dict, m_log_dict);
case NodeType::Float:
column_reader = new FloatColumnReader(column_id);
break;
case NodeType::VARSTRING:
column_reader = new VariableStringColumnReader(key_name, column_id, m_var_dict);
case NodeType::ClpString:
column_reader = new ClpStringColumnReader(column_id, m_var_dict, m_log_dict);
break;
case NodeType::BOOLEAN:
column_reader = new BooleanColumnReader(key_name, column_id);
case NodeType::VarString:
column_reader = new VariableStringColumnReader(column_id, m_var_dict);
break;
case NodeType::ARRAY:
column_reader = new ClpStringColumnReader(
key_name,
column_id,
m_var_dict,
m_array_dict,
true
);
case NodeType::Boolean:
column_reader = new BooleanColumnReader(column_id);
break;
case NodeType::DATESTRING:
column_reader = new DateStringColumnReader(key_name, column_id, m_timestamp_dict);
case NodeType::UnstructuredArray:
column_reader = new ClpStringColumnReader(column_id, m_var_dict, m_array_dict, true);
break;
case NodeType::OBJECT:
case NodeType::NULLVALUE:
reader->append_column(column_id);
case NodeType::DateString:
column_reader = new DateStringColumnReader(column_id, m_timestamp_dict);
break;
case NodeType::UNKNOWN:
// No need to push columns without associated object readers into the SchemaReader.
case NodeType::Object:
case NodeType::NullValue:
case NodeType::Unknown:
break;
}

if (column_reader) {
reader->append_column(column_reader);
reader.append_column(column_reader);
}
return column_reader;
}

std::unique_ptr<SchemaReader> ArchiveReader::create_schema_reader(
void ArchiveReader::append_unordered_reader_columns(
SchemaReader& reader,
NodeType unordered_object_type,
Span<int32_t> schema_ids,
bool should_marshal_records
) {
int32_t mst_subtree_root_node_id = INT32_MAX;
size_t object_readers_begin = reader.get_next_column_reader_position();
for (int32_t column_id : schema_ids) {
if (Schema::schema_entry_is_unordered_object(column_id)) {
continue;
}
BaseColumnReader* column_reader = nullptr;
auto const& node = m_schema_tree->get_node(column_id);
if (INT32_MAX == mst_subtree_root_node_id) {
mst_subtree_root_node_id = m_schema_tree->find_matching_subtree_root_in_subtree(
-1,
column_id,
unordered_object_type
);
}
switch (node.get_type()) {
case NodeType::Integer:
column_reader = new Int64ColumnReader(column_id);
break;
case NodeType::Float:
column_reader = new FloatColumnReader(column_id);
break;
case NodeType::ClpString:
column_reader = new ClpStringColumnReader(column_id, m_var_dict, m_log_dict);
break;
case NodeType::VarString:
column_reader = new VariableStringColumnReader(column_id, m_var_dict);
break;
case NodeType::Boolean:
column_reader = new BooleanColumnReader(column_id);
break;
// UnstructuredArray and DateString currently aren't supported as part of any unordered
// object, so we disregard them here
case NodeType::UnstructuredArray:
case NodeType::DateString:
// No need to push columns without associated object readers into the SchemaReader.
case NodeType::Object:
case NodeType::NullValue:
case NodeType::Unknown:
break;
}

if (column_reader) {
reader.append_unordered_column(column_reader);
}
}

if (should_marshal_records) {
reader.mark_unordered_object(object_readers_begin, mst_subtree_root_node_id, schema_ids);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to change object_readers_begin to something like ...begin_pos or ...begin_offset?

}
}

SchemaReader& ArchiveReader::create_schema_reader(
int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
) {
auto reader = std::make_unique<SchemaReader>(
auto& schema = (*m_schema_map)[schema_id];
m_schema_reader.reset(
m_schema_tree,
schema_id,
schema.get_ordered_schema_view(),
m_id_to_table_metadata[schema_id].num_messages,
should_marshal_records
);
auto timestamp_column_ids = m_timestamp_dict->get_authoritative_timestamp_column_ids();

for (int32_t column_id : (*m_schema_map)[reader->get_schema_id()]) {
BaseColumnReader* column_reader = append_reader_column(reader, column_id);
for (size_t i = 0; i < schema.size(); ++i) {
int32_t column_id = schema[i];
if (Schema::schema_entry_is_unordered_object(column_id)) {
size_t length = Schema::get_unordered_object_length(column_id);
append_unordered_reader_columns(
m_schema_reader,
Schema::get_unordered_object_type(column_id),
schema.get_view(i + 1, length),
should_marshal_records
);
i += length;
continue;
}
BaseColumnReader* column_reader = append_reader_column(m_schema_reader, column_id);

if (should_extract_timestamp && column_reader && timestamp_column_ids.count(column_id) > 0)
{
reader->mark_column_as_timestamp(column_reader);
m_schema_reader.mark_column_as_timestamp(column_reader);
}
}
return reader;
return m_schema_reader;
}

void ArchiveReader::store(FileWriter& writer) {
std::string message;

for (auto& [id, table_metadata] : m_id_to_table_metadata) {
auto schema_reader = read_table(id, false, true);
while (schema_reader->get_next_message(message)) {
auto& schema_reader = read_table(id, false, true);
while (schema_reader.get_next_message(message)) {
writer.write(message.c_str(), message.length());
}
}
Expand Down
22 changes: 18 additions & 4 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "ReaderUtils.hpp"
#include "SchemaReader.hpp"
#include "TimestampDictionaryReader.hpp"
#include "Utils.hpp"

namespace clp_s {
class ArchiveReader {
Expand Down Expand Up @@ -87,7 +88,7 @@ class ArchiveReader {
* @param should_marshal_records
* @return the schema reader
*/
std::unique_ptr<SchemaReader>
SchemaReader&
read_table(int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records);

std::shared_ptr<VariableDictionaryReader> get_variable_dictionary() { return m_var_dict; }
Expand Down Expand Up @@ -128,7 +129,7 @@ class ArchiveReader {
* @param should_extract_timestamp
* @param should_marshal_records
*/
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved
std::unique_ptr<SchemaReader> create_schema_reader(
SchemaReader& create_schema_reader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the return value in the description?

int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
Expand All @@ -139,8 +140,20 @@ class ArchiveReader {
* @param reader
* @param column_id
*/
BaseColumnReader*
append_reader_column(std::unique_ptr<SchemaReader>& reader, int32_t column_id);
BaseColumnReader* append_reader_column(SchemaReader& reader, int32_t column_id);

/**
* Appends columns for the entire schema of an unordered object.
* @param reader
* @param column_id
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved
* @param should_marshal_records
*/
void append_unordered_reader_columns(
SchemaReader& reader,
NodeType unordered_object_type,
Span<int32_t> schema_ids,
bool should_marshal_records
);

bool m_is_open;
std::string m_archive_path;
Expand All @@ -159,6 +172,7 @@ class ArchiveReader {
FileReader m_table_metadata_file_reader;
ZstdDecompressor m_tables_decompressor;
ZstdDecompressor m_table_metadata_decompressor;
SchemaReader m_schema_reader{nullptr, -1, {nullptr, 0}, 0, false};
};
} // namespace clp_s

Expand Down
31 changes: 18 additions & 13 deletions components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,32 +92,35 @@ size_t ArchiveWriter::get_data_size() {

void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const& schema) {
for (int32_t id : schema) {
auto node = m_schema_tree.get_node(id);
switch (node->get_type()) {
case NodeType::INTEGER:
if (Schema::schema_entry_is_unordered_object(id)) {
continue;
}
auto const& node = m_schema_tree.get_node(id);
switch (node.get_type()) {
case NodeType::Integer:
writer->append_column(new Int64ColumnWriter(id));
break;
case NodeType::FLOAT:
case NodeType::Float:
writer->append_column(new FloatColumnWriter(id));
break;
case NodeType::CLPSTRING:
case NodeType::ClpString:
writer->append_column(new ClpStringColumnWriter(id, m_var_dict, m_log_dict));
break;
case NodeType::VARSTRING:
case NodeType::VarString:
writer->append_column(new VariableStringColumnWriter(id, m_var_dict));
break;
case NodeType::BOOLEAN:
case NodeType::Boolean:
writer->append_column(new BooleanColumnWriter(id));
break;
case NodeType::ARRAY:
case NodeType::UnstructuredArray:
writer->append_column(new ClpStringColumnWriter(id, m_var_dict, m_array_dict));
break;
case NodeType::DATESTRING:
case NodeType::DateString:
writer->append_column(new DateStringColumnWriter(id));
break;
case NodeType::OBJECT:
case NodeType::NULLVALUE:
case NodeType::UNKNOWN:
case NodeType::Object:
case NodeType::NullValue:
case NodeType::Unknown:
break;
}
}
Expand All @@ -141,9 +144,11 @@ size_t ArchiveWriter::store_tables() {
m_table_metadata_compressor.write_numeric_value(m_tables_file_writer.get_pos());

m_tables_compressor.open(m_tables_file_writer, m_compression_level);
i.second->store(m_tables_compressor);
size_t table_image_size = i.second->store(m_tables_compressor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change the name?

Copy link
Contributor Author

@gibber9809 gibber9809 Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure how about "uncompressed_size"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to me

m_tables_compressor.close();
delete i.second;

m_table_metadata_compressor.write_numeric_value(table_image_size);
}
m_table_metadata_compressor.close();

Expand Down
Loading
Loading