Skip to content

Commit

Permalink
Merge UUID related fixes for export (#6697)
Browse files Browse the repository at this point in the history
Co-authored-by: Semyon Danilov <senya@ydb.tech>
  • Loading branch information
dcherednik and SammyVimes committed Jul 16, 2024
1 parent 5ec8b8c commit 0abd28e
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 1 deletion.
27 changes: 27 additions & 0 deletions ydb/core/io_formats/cell_maker/cell_maker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/library/binary_json/write.h>
#include <ydb/library/dynumber/dynumber.h>
#include <ydb/library/uuid/uuid.h>

#include <ydb/library/yql/minikql/dom/yson.h>
#include <ydb/library/yql/minikql/dom/json.h>
Expand Down Expand Up @@ -132,6 +133,21 @@ namespace {
return true;
}

struct TUuidHolder {
union {
ui16 Array[8];
ui64 Halves[2];
} Buf;
};

template <>
bool TryParse(TStringBuf value, TUuidHolder& result) {
if (!NUuid::ParseUuidToArray(value, result.Buf.Array, false)) {
return false;
}
return true;
}

template <typename T, typename U>
using TConverter = std::function<U(const T&)>;

Expand Down Expand Up @@ -171,6 +187,14 @@ namespace {
return v.Str;
}

TStringBuf UuidToStringBuf(const TUuidHolder& uuid) {
char uuidBuf[16];

NUuid::UuidHalfsToBytes(uuidBuf, 16, uuid.Buf.Halves[1], uuid.Buf.Halves[0]);

return TStringBuf(uuidBuf, 16);
}

template <typename T, typename U = T>
struct TCellMaker {
static bool Make(TCell& c, TStringBuf v, TMemoryPool& pool, TString& err, TConverter<T, U> conv = &Implicit<T, U>) {
Expand Down Expand Up @@ -297,6 +321,8 @@ bool MakeCell(TCell& cell, TStringBuf value, NScheme::TTypeInfo type, TMemoryPoo
return TCellMaker<NYql::NDecimal::TInt128, std::pair<ui64, ui64>>::Make(cell, value, pool, err, &Int128ToPair);
case NScheme::NTypeIds::Pg:
return TCellMaker<NPg::TConvertResult, TStringBuf>::Make(cell, value, pool, err, &PgToStringBuf, type.GetTypeDesc());
case NScheme::NTypeIds::Uuid:
return TCellMaker<TUuidHolder, TStringBuf>::Make(cell, value, pool, err, &UuidToStringBuf);
default:
return false;
}
Expand Down Expand Up @@ -390,6 +416,7 @@ bool CheckCellValue(const TCell& cell, NScheme::TTypeInfo type) {
case NScheme::NTypeIds::JsonDocument: // checked at parsing time
case NScheme::NTypeIds::DyNumber: // checked at parsing time
case NScheme::NTypeIds::Pg: // checked at parsing time
case NScheme::NTypeIds::Uuid: // checked at parsing time
return true;
case NScheme::NTypeIds::Date:
return cell.AsValue<ui16>() < NUdf::MAX_DATE;
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,12 @@ class TxPlanSerializer {
}
}

if (auto literal = key.Maybe<TCoUuid>()) {
TStringStream out;
NUuid::UuidBytesToString(literal.Cast().Literal().Value().Data(), out);
return out.Str();
}

if (auto literal = key.Maybe<TCoDataCtor>()) {
return literal.Cast().Literal().StringValue();
}
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,12 @@ void FillLiteralProtoImpl(const NNodes::TCoDataCtor& literal, TProto& proto) {
protoValue.SetHi128(*reinterpret_cast<ui64*>(p + 8));
break;
}
case EDataSlot::Uuid: {
const ui64* uuidData = reinterpret_cast<const ui64*>(value.Data());
protoValue.SetLow128(uuidData[0]);
protoValue.SetHi128(uuidData[1]);
break;
}

default:
YQL_ENSURE(false, "Unexpected type slot " << slot);
Expand Down Expand Up @@ -738,6 +744,12 @@ void FillLiteralProto(const NNodes::TCoDataCtor& literal, Ydb::TypedValue& proto
protoValue.set_high_128(*reinterpret_cast<ui64*>(p + 8));
break;
}
case EDataSlot::Uuid: {
const ui64* uuidData = reinterpret_cast<const ui64*>(value.Data());
protoValue.set_low_128(uuidData[0]);
protoValue.set_high_128(uuidData[1]);
break;
}

default:
YQL_ENSURE(false, "Unexpected type slot " << slot);
Expand Down
88 changes: 88 additions & 0 deletions ydb/core/kqp/ut/yql/kqp_yql_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,94 @@ Y_UNIT_TEST_SUITE(KqpYql) {
}
}

Y_UNIT_TEST(TestUuidPrimaryKeyPrefixSearch) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetEnableUuidAsPrimaryKey(true)
.SetKqpSettings({setting});
TKikimrRunner kikimr(serverSettings.SetWithSampleTables(false));

auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

TVector<TString> testUuids = {
"5b99a330-04ef-4f1a-9b64-ba6d5f44eafe",
"afcbef30-9ac3-481a-aa6a-8d9b785dbb0a",
"b91cd23b-861c-4cc1-9119-801a4dac1cb9",
"65df9ecc-a97d-47b2-ae56-3c023da6ee8c",
};

{
const auto query = Q_(R"(
CREATE TABLE test(
key uuid NOT NULL,
val int,
PRIMARY KEY (key)
);
)");
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
{
int val = 0;
for (const auto& uuid : testUuids) {
const auto query = Sprintf("\
INSERT INTO test (key, val)\n\
VALUES (Uuid(\"%s\"), %u);\n\
", uuid.Data(), val++);
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
}
{
int val = 0;
for (const auto& uuid : testUuids) {
const auto query = Sprintf("SELECT * FROM test WHERE key=Uuid(\"%s\");", uuid.Data());
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

TResultSetParser parser(result.GetResultSetParser(0));
UNIT_ASSERT(parser.TryNextRow());
UNIT_ASSERT_VALUES_EQUAL(parser.ColumnParser("val").GetOptionalInt32().GetRef(), val++);
UNIT_ASSERT_VALUES_EQUAL(parser.RowsCount(), 1);
}
}
}

Y_UNIT_TEST(TestUuidDefaultColumn) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetEnableUuidAsPrimaryKey(true)
.SetKqpSettings({setting});
TKikimrRunner kikimr(serverSettings.SetWithSampleTables(false));

auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

{
const auto query = Q_(R"(
CREATE TABLE test(
key int NOT NULL,
val uuid NOT NULL DEFAULT Uuid("65df9ecc-a97d-47b2-ae56-3c023da6ee8c"),
PRIMARY KEY (key)
);
)");
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
{
const auto query = "INSERT INTO test (key) VALUES (0);";
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
}

Y_UNIT_TEST(UuidPrimaryKeyBulkUpsert) {
auto settings = TKikimrSettings()
.SetEnableUuidAsPrimaryKey(true)
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tx/datashard/export_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,15 @@ bool PgToStream(TStringBuf data, void* typeDesc, IOutputStream& out, TString& er
return true;
}

bool UuidToStream(const std::pair<ui64, ui64>& loHi, IOutputStream& out, TString& err) {
Y_UNUSED(err);

NYdb::TUuidValue uuid(loHi.first, loHi.second);

out << uuid.ToString();

return true;
}

} // NDataShard
} // NKikimr
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/export_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ TString DyNumberToString(TStringBuf data);
bool DecimalToStream(const std::pair<ui64, i64>& loHi, IOutputStream& out, TString& err);
bool DyNumberToStream(TStringBuf data, IOutputStream& out, TString& err);
bool PgToStream(TStringBuf data, void* typeDesc, IOutputStream& out, TString& err);
bool UuidToStream(const std::pair<ui64, ui64>& loHi, IOutputStream& out, TString& err);

} // NDataShard
} // NKikimr
3 changes: 3 additions & 0 deletions ydb/core/tx/datashard/export_s3_buffer_raw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
case NScheme::NTypeIds::Pg:
serialized = PgToStream(cell.AsBuf(), column.Type.GetTypeDesc(), out, ErrorString);
break;
case NScheme::NTypeIds::Uuid:
serialized = UuidToStream(cell.AsValue<std::pair<ui64, ui64>>(), out, ErrorString);
break;
default:
Y_ABORT("Unsupported type");
}
Expand Down
24 changes: 24 additions & 0 deletions ydb/core/tx/schemeshard/ut_backup/ut_backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,30 @@ Y_UNIT_TEST_SUITE(TBackupTests) {
});
}

Y_UNIT_TEST_WITH_COMPRESSION(BackupUuidColumn) {
TTestBasicRuntime runtime;

Backup(runtime, ToString(Codec), R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Uuid" }
KeyColumnNames: ["key"]
)", [](TTestBasicRuntime& runtime) {
NKikimrMiniKQL::TResult result;
TString error;
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, TTestTxConfig::FakeHiveTablets, Sprintf(R"(
(
(let key '( '('key (Uint32 '%d) ) ) )
(let row '( '('value (Uuid '"%s") ) ) )
(return (AsList (UpdateRow '__user__%s key row) ))
)
)", 1, "0000111122223333", "Table"), result, error);

UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
UNIT_ASSERT_VALUES_EQUAL(error, "");
});
}

template<ECompressionCodec Codec>
void ShouldSucceedOnLargeData(ui32 minWriteBatchSize, const std::pair<ui32, ui32>& expectedResult) {
TTestBasicRuntime runtime;
Expand Down
71 changes: 70 additions & 1 deletion ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,76 @@ value {
env.TestWaitNotification(runtime, txId);
TestGetExport(runtime, txId, "/MyRoot");

TestImport(runtime, txId, "/MyRoot", Sprintf(R"(
TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_prefix: "Backup1"
destination_path: "/MyRoot/Restored"
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetImport(runtime, txId, "/MyRoot");
}

Y_UNIT_TEST(ExportImportUuid) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableTablePgTypes(true));
ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Uuid" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

{
TString tablePath = "/MyRoot/Table";
int partitionIdx = 0;

auto tableDesc = DescribePath(runtime, tablePath, true, true);
const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
UNIT_ASSERT(partitionIdx < tablePartitions.size());
const ui64 datashardTabletId = tablePartitions[partitionIdx].GetDatashardId();

NKikimrMiniKQL::TResult result;
TString error;
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, datashardTabletId, Sprintf(R"(
(
(let key '( '('key (Uint32 '%d) ) ) )
(let row '( '('value (Uuid '"%s") ) ) )
(return (AsList (UpdateRow '__user__%s key row) ))
)
)", 1, "0123456789012345", "Table"), result, error);

UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
UNIT_ASSERT_VALUES_EQUAL(error, "");
}

TPortManager portManager;
const ui16 port = portManager.GetPort();

TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());

TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_path: "/MyRoot/Table"
destination_prefix: "Backup1"
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetExport(runtime, txId, "/MyRoot");

TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
Expand Down
6 changes: 6 additions & 0 deletions ydb/library/uuid/uuid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ static void WriteHex(ui16 bytes, IOutputStream& out, bool reverseBytes = false)
}
}

void UuidBytesToString(TString in, IOutputStream& out) {
ui16 dw[8];
std::memcpy(dw, in.Data(), sizeof(dw));
NUuid::UuidToString(dw, out);
}

void UuidToString(ui16 dw[8], IOutputStream& out) {
WriteHex(dw[1], out);
WriteHex(dw[0], out);
Expand Down
1 change: 1 addition & 0 deletions ydb/library/uuid/uuid.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace NUuid {

static constexpr ui32 UUID_LEN = 16;

void UuidBytesToString(TString in, IOutputStream& out);
void UuidToString(ui16 dw[8], IOutputStream& out);
void UuidHalfsToByteString(ui64 low, ui64 hi, IOutputStream& out);

Expand Down

0 comments on commit 0abd28e

Please sign in to comment.