Skip to content

Commit

Permalink
Added support for the List/Set type (#5914)
Browse files Browse the repository at this point in the history
* Delete logs directory

* Delete data directory

* Delete etc directory

* Merged local changes into New_ListSet

* Merged local changes into New_ListSet

* Merged local changes into New_ListSet

* Merged local changes into New_ListSet

* Delete share/resources directory

* Merged local changes into New_ListSet

* Merged local changes into New_ListSet

* New changes into New_ListSet

* New changes into New_ListSet

* New changes into New_ListSet

* New changes into New_ListSet

* New changes into New_ListSet

* New changes into New_ListSet

* New changes into New_ListSet

* Latest changes into New_ListSet

* Latest changes into New_ListSet

* Latest changes into New_ListSet

* Latest changes into New_ListSet

* Latest changes into New_ListSet

* Latest changes into New_ListSet

* Latest changes into New_ListSet

---------

Co-authored-by: yuxuan.wang <48981679+Salieri-004@users.noreply.github.com>
  • Loading branch information
YZW00 and Salieri-004 authored Oct 25, 2024
1 parent b011aa2 commit 6c09e83
Show file tree
Hide file tree
Showing 27 changed files with 2,003 additions and 65 deletions.
97 changes: 96 additions & 1 deletion src/codec/RowReaderV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,44 @@
*
* This source code is licensed under Apache 2.0 License.
*/

#include "codec/RowReaderV2.h"

namespace nebula {

using nebula::cpp2::PropertyType;

template <typename T, typename Container>
Value extractIntOrFloat(const folly::StringPiece& data, size_t& offset) {
int32_t containerOffset;
memcpy(reinterpret_cast<void*>(&containerOffset), data.data() + offset, sizeof(int32_t));
if (static_cast<size_t>(containerOffset) >= data.size()) {
LOG(ERROR) << "Container offset out of bounds. Offset: " << containerOffset
<< ", Data size: " << data.size();
return Value::kNullValue;
}
int32_t containerSize;
memcpy(reinterpret_cast<void*>(&containerSize), data.data() + containerOffset, sizeof(int32_t));
containerOffset += sizeof(int32_t);
Container container;
for (int32_t i = 0; i < containerSize; ++i) {
T value;
if (static_cast<size_t>(containerOffset + sizeof(T)) > data.size()) {
LOG(ERROR) << "Reading beyond data bounds. Attempting to read at offset: " << containerOffset
<< ", Data size: " << data.size();
return Value::kNullValue;
}
memcpy(reinterpret_cast<void*>(&value), data.data() + containerOffset, sizeof(T));
containerOffset += sizeof(T);

if constexpr (std::is_same_v<Container, List>) {
container.values.emplace_back(Value(value));
} else if constexpr (std::is_same_v<Container, Set>) {
container.values.insert(Value(value));
}
}
return Value(std::move(container));
}

bool RowReaderV2::resetImpl(meta::NebulaSchemaProvider const* schema, folly::StringPiece row) {
schema_ = schema;
data_ = row;
Expand Down Expand Up @@ -206,6 +237,70 @@ Value RowReaderV2::getValueByIndex(const int64_t index) const {
}
return std::move(geogRet).value();
}
case PropertyType::LIST_STRING: {
int32_t listOffset;
memcpy(reinterpret_cast<void*>(&listOffset), &data_[offset], sizeof(int32_t));
if (static_cast<size_t>(listOffset) >= data_.size()) {
LOG(ERROR) << "List offset out of bounds for LIST_STRING.";
return Value::kNullValue;
}
int32_t listSize;
memcpy(reinterpret_cast<void*>(&listSize), &data_[listOffset], sizeof(int32_t));
listOffset += sizeof(int32_t);

List list;
for (int32_t i = 0; i < listSize; ++i) {
int32_t strLen;
memcpy(reinterpret_cast<void*>(&strLen), &data_[listOffset], sizeof(int32_t));
listOffset += sizeof(int32_t);
if (static_cast<size_t>(listOffset + strLen) > data_.size()) {
LOG(ERROR) << "String length out of bounds for LIST_STRING.";
return Value::kNullValue;
}
std::string str(&data_[listOffset], strLen);
listOffset += strLen;
list.values.emplace_back(str);
}
return Value(std::move(list));
}
case PropertyType::LIST_INT:
return nebula::extractIntOrFloat<int32_t, List>(data_, offset);
case PropertyType::LIST_FLOAT:
return nebula::extractIntOrFloat<float, List>(data_, offset);
case PropertyType::SET_STRING: {
int32_t setOffset;
memcpy(reinterpret_cast<void*>(&setOffset), &data_[offset], sizeof(int32_t));
if (static_cast<size_t>(setOffset) >= data_.size()) {
LOG(ERROR) << "Set offset out of bounds for SET_STRING.";
return Value::kNullValue;
}
int32_t setSize;
memcpy(reinterpret_cast<void*>(&setSize), &data_[setOffset], sizeof(int32_t));
setOffset += sizeof(int32_t);

Set set;
std::unordered_set<std::string> uniqueStrings;
for (int32_t i = 0; i < setSize; ++i) {
int32_t strLen;
memcpy(reinterpret_cast<void*>(&strLen), &data_[setOffset], sizeof(int32_t));
setOffset += sizeof(int32_t);
if (static_cast<size_t>(setOffset + strLen) > data_.size()) {
LOG(ERROR) << "String length out of bounds for SET_STRING.";
return Value::kNullValue;
}
std::string str(&data_[setOffset], strLen);
setOffset += strLen;
uniqueStrings.insert(std::move(str));
}
for (const auto& str : uniqueStrings) {
set.values.insert(Value(str));
}
return Value(std::move(set));
}
case PropertyType::SET_INT:
return nebula::extractIntOrFloat<int32_t, Set>(data_, offset);
case PropertyType::SET_FLOAT:
return nebula::extractIntOrFloat<float, Set>(data_, offset);
case PropertyType::UNKNOWN:
break;
}
Expand Down
163 changes: 162 additions & 1 deletion src/codec/RowWriterV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,80 @@ namespace nebula {

using nebula::cpp2::PropertyType;

// Function used to identify the data type written
WriteResult writeItem(const Value& item, Value::Type valueType, std::string& buffer) {
switch (valueType) {
case Value::Type::STRING: {
std::string str = item.getStr();
int32_t strLen = str.size();
buffer.append(reinterpret_cast<const char*>(&strLen), sizeof(int32_t));
buffer.append(str.data(), strLen);
break;
}
case Value::Type::INT: {
int32_t intVal = item.getInt();
buffer.append(reinterpret_cast<const char*>(&intVal), sizeof(int32_t));
break;
}
case Value::Type::FLOAT: {
float floatVal = item.getFloat();
buffer.append(reinterpret_cast<const char*>(&floatVal), sizeof(float));
break;
}
default:
LOG(ERROR) << "Unsupported value type: " << static_cast<int>(valueType);
return WriteResult::TYPE_MISMATCH;
}
return WriteResult::SUCCEEDED;
}

// Function used to identify List data types (List<string>, List<int>, List<float>)
template <typename List>
WriteResult writeList(const List& container, Value::Type valueType, std::string& buffer) {
int32_t listSize = container.values.size();
if (listSize == 0) {
return WriteResult::SUCCEEDED;
}
for (const auto& item : container.values) {
if (item.type() != valueType) {
LOG(ERROR) << "Type mismatch: Expected " << static_cast<int>(valueType) << " but got "
<< static_cast<int>(item.type());
return WriteResult::TYPE_MISMATCH;
}
}
for (const auto& item : container.values) {
auto result = writeItem(item, valueType, buffer);
if (result != WriteResult::SUCCEEDED) {
return result;
}
}
return WriteResult::SUCCEEDED;
}

// Function used to identify Set data types (Set<string>, Set<int>, Set<float>)
template <typename Set>
WriteResult writeSet(const Set& container, Value::Type valueType, std::string& buffer) {
for (const auto& item : container.values) {
if (item.type() != valueType) {
LOG(ERROR) << "Type mismatch: Expected " << static_cast<int>(valueType) << " but got "
<< static_cast<int>(item.type());
return WriteResult::TYPE_MISMATCH;
}
}
std::unordered_set<Value> serialized;
for (const auto& item : container.values) {
if (serialized.find(item) != serialized.end()) {
continue;
}
auto result = writeItem(item, valueType, buffer);
if (result != WriteResult::SUCCEEDED) {
return result;
}
serialized.insert(item);
}
return WriteResult::SUCCEEDED;
}

RowWriterV2::RowWriterV2(const meta::NebulaSchemaProvider* schema)
: schema_(schema), numNullBytes_(0), approxStrLen_(0), finished_(false), outOfSpaceStr_(false) {
CHECK(!!schema_);
Expand Down Expand Up @@ -138,6 +212,12 @@ RowWriterV2::RowWriterV2(RowReaderWrapper& reader) : RowWriterV2(reader.getSchem
case Value::Type::DURATION:
set(i, v.moveDuration());
break;
case Value::Type::LIST:
set(i, v.moveList());
break;
case Value::Type::SET:
set(i, v.moveSet());
break;
default:
LOG(FATAL) << "Invalid data: " << v << ", type: " << v.typeName();
isSet_[i] = false;
Expand Down Expand Up @@ -226,11 +306,14 @@ WriteResult RowWriterV2::setValue(ssize_t index, const Value& val) {
return write(index, val.getGeography());
case Value::Type::DURATION:
return write(index, val.getDuration());
case Value::Type::LIST:
return write(index, val.getList());
case Value::Type::SET:
return write(index, val.getSet());
default:
return WriteResult::TYPE_MISMATCH;
}
}

WriteResult RowWriterV2::setValue(const std::string& name, const Value& val) {
CHECK(!finished_) << "You have called finish()";
int64_t index = schema_->getFieldIndex(name);
Expand Down Expand Up @@ -821,6 +904,78 @@ WriteResult RowWriterV2::write(ssize_t index, const Geography& v) {
return write(index, folly::StringPiece(wkb), true);
}

WriteResult RowWriterV2::write(ssize_t index, const List& list) {
auto field = schema_->field(index);
auto offset = headerLen_ + numNullBytes_ + field->offset();
int32_t listSize = list.size();
int32_t listOffset = buf_.size();
if (listSize > kMaxArraySize) {
LOG(ERROR) << "List size exceeds the maximum allowed length of " << kMaxArraySize;
return WriteResult::OUT_OF_RANGE;
}
if (isSet_[index]) {
outOfSpaceStr_ = true;
}
buf_.append(reinterpret_cast<const char*>(&listSize), sizeof(int32_t));
Value::Type valueType;
if (field->type() == PropertyType::LIST_STRING) {
valueType = Value::Type::STRING;
} else if (field->type() == PropertyType::LIST_INT) {
valueType = Value::Type::INT;
} else if (field->type() == PropertyType::LIST_FLOAT) {
valueType = Value::Type::FLOAT;
} else {
LOG(ERROR) << "Unsupported list type: " << static_cast<int>(field->type());
return WriteResult::TYPE_MISMATCH;
}
auto result = writeList(list, valueType, buf_);
if (result != WriteResult::SUCCEEDED) {
return result;
}
memcpy(&buf_[offset], reinterpret_cast<void*>(&listOffset), sizeof(int32_t));
if (field->nullable()) {
clearNullBit(field->nullFlagPos());
}
isSet_[index] = true;
return WriteResult::SUCCEEDED;
}

WriteResult RowWriterV2::write(ssize_t index, const Set& set) {
auto field = schema_->field(index);
auto offset = headerLen_ + numNullBytes_ + field->offset();
int32_t setSize = set.size();
int32_t setOffset = buf_.size();
if (setSize > kMaxArraySize) {
LOG(ERROR) << "Set size exceeds the maximum allowed length of " << kMaxArraySize;
return WriteResult::OUT_OF_RANGE;
}
if (isSet_[index]) {
outOfSpaceStr_ = true;
}
buf_.append(reinterpret_cast<const char*>(&setSize), sizeof(int32_t));
Value::Type valueType;
if (field->type() == PropertyType::SET_STRING) {
valueType = Value::Type::STRING;
} else if (field->type() == PropertyType::SET_INT) {
valueType = Value::Type::INT;
} else if (field->type() == PropertyType::SET_FLOAT) {
valueType = Value::Type::FLOAT;
} else {
LOG(ERROR) << "Unsupported set type: " << static_cast<int>(field->type());
return WriteResult::TYPE_MISMATCH;
}
auto result = writeSet(set, valueType, buf_);
if (result != WriteResult::SUCCEEDED) {
return result;
}
memcpy(&buf_[offset], reinterpret_cast<void*>(&setOffset), sizeof(int32_t));
if (field->nullable()) {
clearNullBit(field->nullFlagPos());
}
isSet_[index] = true;
return WriteResult::SUCCEEDED;
}

WriteResult RowWriterV2::checkUnsetFields() {
DefaultValueContext expCtx;
for (size_t i = 0; i < schema_->getNumFields(); i++) {
Expand Down Expand Up @@ -868,6 +1023,12 @@ WriteResult RowWriterV2::checkUnsetFields() {
case Value::Type::DURATION:
r = write(i, defVal.getDuration());
break;
case Value::Type::LIST:
r = write(i, defVal.getList());
break;
case Value::Type::SET:
r = write(i, defVal.getSet());
break;
default:
LOG(FATAL) << "Unsupported default value type: " << defVal.typeName()
<< ", default value: " << defVal
Expand Down
8 changes: 8 additions & 0 deletions src/codec/RowWriterV2.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ class RowWriterV2 {
size_t numNullBytes_;
size_t approxStrLen_;
bool finished_;
// Limit array length
static constexpr int32_t kMaxArraySize = 65535;

// When outOfSpaceStr_ is true, variant length string fields
// could hold an index, referring to the strings in the strList_
Expand Down Expand Up @@ -263,6 +265,12 @@ class RowWriterV2 {
WriteResult write(ssize_t index, const Duration& v);

WriteResult write(ssize_t index, const Geography& v);
// Supports storing ordered lists of strings, integers, and floats,
// including LIST_STRING, LIST_INT, and LIST_FLOAT.
WriteResult write(ssize_t index, const List& list);
// Supports storing unordered sets of strings, integers, and floats,
// including SET_STRING, SET_INT, and SET_FLOAT
WriteResult write(ssize_t index, const Set& set);
};

} // namespace nebula
Expand Down
14 changes: 14 additions & 0 deletions src/common/datatypes/List.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ struct List {
}
explicit List(const std::vector<Value>& l) : values(l) {}

// Template static factory method to create a list with specific types
template <typename T>
static List createFromVector(const std::vector<T>& items);

bool empty() const {
return values.empty();
}
Expand Down Expand Up @@ -102,6 +106,16 @@ inline std::ostream& operator<<(std::ostream& os, const List& l) {
return os << l.toString();
}

// Define using template static factory method
template <typename T>
inline List List::createFromVector(const std::vector<T>& items) {
std::vector<Value> values;
values.reserve(items.size());
for (const auto& item : items) {
values.emplace_back(Value(item));
}
return List(std::move(values));
}
} // namespace nebula

namespace std {
Expand Down
14 changes: 14 additions & 0 deletions src/common/datatypes/Set.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ struct Set {
values = std::move(value);
}

// Template Static Factory Method Declaration
template <typename T>
static Set createFromVector(const std::vector<T>& items);

void clear() {
values.clear();
}
Expand Down Expand Up @@ -68,6 +72,16 @@ struct Set {
inline std::ostream& operator<<(std::ostream& os, const Set& s) {
return os << s.toString();
}

// define using template static factory method
template <typename T>
inline Set Set::createFromVector(const std::vector<T>& items) {
std::unordered_set<Value> values;
for (const auto& item : items) {
values.emplace(Value(item));
}
return Set(std::move(values));
}
} // namespace nebula

namespace std {
Expand Down
Loading

0 comments on commit 6c09e83

Please sign in to comment.