Skip to content

Commit

Permalink
fix wrong isolation level (#6568)
Browse files Browse the repository at this point in the history
  • Loading branch information
VPolka committed Jul 29, 2024
1 parent 4381e8c commit eab45d1
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 14 deletions.
12 changes: 0 additions & 12 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -487,18 +487,6 @@ class TKqpQueryState : public TNonCopyable {
PrepareCurrentStatement();
}

void PrepareStatementTransaction(NKqpProto::TKqpPhyTx_EType txType) {
if (!HasTxControl()) {
switch (txType) {
case NKqpProto::TKqpPhyTx::TYPE_SCHEME:
TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED;
break;
default:
TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE;
}
}
}

// validate the compiled query response and ensure that all table versions are not
// changed since the last compilation.
bool EnsureTableVersions(const TEvTxProxySchemeCache::TEvNavigateKeySetResult& response);
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1097,11 +1097,10 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

bool ExecutePhyTx(const TKqpPhyTxHolder::TConstPtr& tx, bool commit) {
if (tx) {
QueryState->PrepareStatementTransaction(tx->GetType());
switch (tx->GetType()) {
case NKqpProto::TKqpPhyTx::TYPE_SCHEME:
YQL_ENSURE(tx->StagesSize() == 0);
if (QueryState->HasTxControl() && QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED) {
if (QueryState->HasTxControl() && !QueryState->HasImplicitTx() && QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"Scheme operations cannot be executed inside transaction");
return true;
Expand Down
182 changes: 182 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2313,6 +2313,10 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
ALTER TABLE TestDdlDml2 DROP COLUMN Value2;
UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (2, "2");
SELECT * FROM TestDdlDml2;
CREATE TABLE TestDdlDml33 (
Key Uint64,
PRIMARY KEY (Key)
);
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2);
Expand All @@ -2327,6 +2331,13 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
CompareYson(R"([[[1u];["1"]];[[2u];["2"]]])", FormatResultSetYson(result.GetResultSet(0)));

result = db.ExecuteQuery(R"(
SELECT * FROM TestDdlDml33;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));

result = db.ExecuteQuery(R"(
CREATE TABLE TestDdlDml4 (
Key Uint64,
Expand Down Expand Up @@ -2621,6 +2632,177 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
}

Y_UNIT_TEST(CheckIsolationLevelFroPerStatementMode) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
appConfig.MutableTableServiceConfig()->SetEnableAstCache(true);
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetKqpSettings({setting});

TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetQueryClient();
auto tableClient = kikimr.GetTableClient();
auto session = tableClient.CreateSession().GetValueSync().GetSession();

{
// 1 ddl statement
auto result = db.ExecuteQuery(R"(
CREATE TABLE Test1 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 0);
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());

NYdb::NTable::TDescribeTableResult describe = session.DescribeTable("/Root/Test1").GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetStatus(), EStatus::SUCCESS);
}

{
// 2 ddl statements
auto result = db.ExecuteQuery(R"(
CREATE TABLE Test2 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
CREATE TABLE Test3 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 0);
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());

NYdb::NTable::TDescribeTableResult describe1 = session.DescribeTable("/Root/Test2").GetValueSync();
UNIT_ASSERT_EQUAL(describe1.GetStatus(), EStatus::SUCCESS);
NYdb::NTable::TDescribeTableResult describe2 = session.DescribeTable("/Root/Test3").GetValueSync();
UNIT_ASSERT_EQUAL(describe2.GetStatus(), EStatus::SUCCESS);
}

{
// 1 dml statement
auto result = db.ExecuteQuery(R"(
SELECT * FROM Test1;
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());
}

{
// 2 dml statements
auto result = db.ExecuteQuery(R"(
SELECT * FROM Test2;
SELECT * FROM Test3;
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2);
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());
}

{
// 1 ddl 1 dml statements
auto result = db.ExecuteQuery(R"(
CREATE TABLE Test4 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
SELECT * FROM Test4;
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());
NYdb::NTable::TDescribeTableResult describe = session.DescribeTable("/Root/Test4").GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetStatus(), EStatus::SUCCESS);
}

{
// 1 dml 1 ddl statements
auto result = db.ExecuteQuery(R"(
SELECT * FROM Test4;
CREATE TABLE Test5 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());
NYdb::NTable::TDescribeTableResult describe = session.DescribeTable("/Root/Test5").GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetStatus(), EStatus::SUCCESS);
}

{
// 1 ddl 1 dml 1 ddl 1 dml statements
auto result = db.ExecuteQuery(R"(
CREATE TABLE Test6 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
SELECT * FROM Test6;
CREATE TABLE Test7 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
SELECT * FROM Test7;
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2);
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());
NYdb::NTable::TDescribeTableResult describe1 = session.DescribeTable("/Root/Test6").GetValueSync();
UNIT_ASSERT_EQUAL(describe1.GetStatus(), EStatus::SUCCESS);
NYdb::NTable::TDescribeTableResult describe2 = session.DescribeTable("/Root/Test7").GetValueSync();
UNIT_ASSERT_EQUAL(describe2.GetStatus(), EStatus::SUCCESS);
}

{
// 1 dml 1 ddl 1 dml 1 ddl statements
auto result = db.ExecuteQuery(R"(
SELECT * FROM Test7;
CREATE TABLE Test8 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
SELECT * FROM Test8;
CREATE TABLE Test9 (
Key Uint64,
Value1 String,
Value2 String,
PRIMARY KEY (Key)
);
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2);
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());
NYdb::NTable::TDescribeTableResult describe1 = session.DescribeTable("/Root/Test8").GetValueSync();
UNIT_ASSERT_EQUAL(describe1.GetStatus(), EStatus::SUCCESS);
NYdb::NTable::TDescribeTableResult describe2 = session.DescribeTable("/Root/Test9").GetValueSync();
UNIT_ASSERT_EQUAL(describe2.GetStatus(), EStatus::SUCCESS);
}
}

Y_UNIT_TEST(TableSink_ReplaceFromSelectOlap) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
Expand Down

0 comments on commit eab45d1

Please sign in to comment.