Skip to content

Commit

Permalink
Ydb stable 23-3-13
Browse files Browse the repository at this point in the history
x-stable-origin-commit: 7103e48a006e617d91bf3587e6daa7c3927b7b82
  • Loading branch information
dcherednik committed Oct 12, 2023
1 parent 052aa97 commit f0615a5
Show file tree
Hide file tree
Showing 13 changed files with 413 additions and 40 deletions.
104 changes: 104 additions & 0 deletions ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,110 @@ Y_UNIT_TEST_SUITE(KqpIndexMetadata) {
}
}

void TestNoReadFromMainTableBeforeJoin(bool UseExtractPredicates) {
using namespace NYql;
using namespace NYql::NNodes;

TKikimrSettings settings;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetPredicateExtract20(UseExtractPredicates);
settings.SetAppConfig(appConfig);

TKikimrRunner kikimr(settings);

auto& server = kikimr.GetTestServer();
auto gateway = GetIcGateway(server);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

{
const TString createTableSql(R"(
--!syntax_v1
CREATE TABLE `/Root/tg` (
id Utf8, b Utf8, am Decimal(22, 9), cur Utf8, pa_id Utf8, system_date Timestamp, status Utf8, type Utf8, product Utf8,
PRIMARY KEY (b, id),
INDEX tg_index GLOBAL SYNC ON (`b`, `pa_id`, `system_date`, `id`)
COVER(status, type, product, am)
);)");
auto result = session.ExecuteSchemeQuery(createTableSql).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

// core optimizer should inject CoExtractMembert over KqlReadTableIndex with columns set based on ORDER BY
// after that KqlReadTableIndex has all columns present in index and should be rewriten in to index read
// limit must be applied in to this (index read) stage.
// As result we have limited numbers of lookups from main table

{
const TString query(Q1_(R"(
--!syntax_v1
DECLARE $b_1 AS Text;
DECLARE $pa_id_1 AS Text;
DECLARE $b_2 AS Text;
DECLARE $constant_param_1 AS Timestamp;
DECLARE $constant_param_2 AS Text;
DECLARE $type_1 AS List<Text>;
DECLARE $status_1 AS Text;
DECLARE $am_1 AS Decimal(22, 9);
SELECT *
FROM tg
WHERE (`tg`.`b` = $b_1) AND (`tg`.`id` IN (
SELECT `id`
FROM (
SELECT *
FROM `/Root/tg` VIEW tg_index AS tg
WHERE (`tg`.`pa_id` = $pa_id_1)
AND (`tg`.`b` = $b_2)
AND ((`tg`.`system_date`, `tg`.`id`) <= ($constant_param_1, $constant_param_2))
AND (`tg`.`type` NOT IN $type_1)
AND (`tg`.`status` <> $status_1)
AND (`tg`.`am` <> $am_1)
ORDER BY
`tg`.`b` DESC,
`tg`.`pa_id` DESC,
`tg`.`system_date` DESC,
`tg`.`id` DESC
LIMIT 11)
))
ORDER BY
`tg`.`system_date` DESC,
`tg`.`id` DESC
)"));

auto explainResult = session.ExplainDataQuery(query).GetValueSync();
UNIT_ASSERT_C(explainResult.IsSuccess(), explainResult.GetIssues().ToString());

Cerr << explainResult.GetAst() << Endl;
UNIT_ASSERT_C(explainResult.GetAst().Contains("'('\"Reverse\")"), explainResult.GetAst());
UNIT_ASSERT_C(explainResult.GetAst().Contains("'('\"Sorted\")"), explainResult.GetAst());

NJson::TJsonValue plan;
NJson::ReadJsonTree(explainResult.GetPlan(), &plan, true);
UNIT_ASSERT(ValidatePlanNodeIds(plan));
Cerr << plan << Endl;
auto mainTableAccess = CountPlanNodesByKv(plan, "Table", "tg");
UNIT_ASSERT_VALUES_EQUAL(mainTableAccess, 1);

auto indexTableAccess = CountPlanNodesByKv(plan, "Table", "tg/tg_index/indexImplTable");
UNIT_ASSERT_VALUES_EQUAL(indexTableAccess, 1);

auto filterOnIndex = CountPlanNodesByKv(plan, "Node Type", "Limit-Filter-TablePointLookup");
UNIT_ASSERT_VALUES_EQUAL(filterOnIndex, 1);

auto limitFilterNode = FindPlanNodeByKv(plan, "Node Type", "Limit-Filter-TablePointLookup");
auto val = FindPlanNodes(limitFilterNode, "Limit");
UNIT_ASSERT_VALUES_EQUAL(val.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(val[0], "11");
}
}

Y_UNIT_TEST_TWIN(TestNoReadFromMainTableBeforeJoin, ExtractPredicate) {
TestNoReadFromMainTableBeforeJoin(ExtractPredicate);
}

Y_UNIT_TEST(HandleWriteOnlyIndex) {
using namespace NYql;
using namespace NYql::NNodes;
Expand Down
4 changes: 0 additions & 4 deletions ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3871,10 +3871,6 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}

Y_UNIT_TEST_TWIN(ComplexLookupLimit, NewPredicateExtract) {
if (NewPredicateExtract) {
return;
}

TKikimrSettings settings;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetPredicateExtract20(NewPredicateExtract);
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/protos/tx.proto
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ message TEvUpdatedLastStep {
optional fixed64 LastStep = 3;
}

// Time cast to coordinators
// Notifies coordinator that clients are waiting for some specific steps
message TEvRequirePlanSteps {
optional fixed64 CoordinatorID = 1;
repeated uint64 PlanSteps = 2 [packed = true];
}

// coordinator to mediator
message TCoordinatorTransaction {
repeated uint64 AffectedSet = 1; // read and write set - joined and then filtered for concrete mediator
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,19 @@ namespace NKikimr::NFlatTxCoordinator {
}
}

void TTxCoordinator::Handle(TEvTxProxy::TEvRequirePlanSteps::TPtr& ev) {
auto* msg = ev->Get();
for (ui64 step : msg->Record.GetPlanSteps()) {
// Note: we could schedule an exact volatile step here in the future
step = AlignPlanStep(step);
// Note: this is not a sibling step, but it behaves similar enough
// so we reuse the same queue here.
if (step > VolatileState.LastPlanned && PendingSiblingSteps.insert(step).second) {
SchedulePlanTickExact(step);
}
}
}

void TTxCoordinator::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
auto* msg = ev->Get();
if (auto* state = Siblings.FindPtr(msg->TabletId); state && state->Subscribed) {
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/coordinator/coordinator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,12 @@ void TTxCoordinator::SchedulePlanTickAligned(ui64 next) {
return;
}

SchedulePlanTickExact(AlignPlanStep(next));
}

ui64 TTxCoordinator::AlignPlanStep(ui64 step) {
const ui64 resolution = Config.Resolution;
SchedulePlanTickExact((next + resolution - 1) / resolution * resolution);
return ((step + resolution - 1) / resolution * resolution);
}

void TTxCoordinator::Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorContext &ctx) {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/coordinator/coordinator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat
void SubscribeToSibling(TSiblingState& state);
void UnsubscribeFromSiblings();
void Handle(TEvTxProxy::TEvUpdatedLastStep::TPtr &ev);
void Handle(TEvTxProxy::TEvRequirePlanSteps::TPtr &ev);
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev);

void Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorContext &ctx);
Expand All @@ -674,6 +675,7 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat
void SchedulePlanTick();
void SchedulePlanTickExact(ui64 next);
void SchedulePlanTickAligned(ui64 next);
ui64 AlignPlanStep(ui64 step);
bool RestoreMediatorInfo(TTabletId mediatorId, TVector<TAutoPtr<TMediatorStep>> &planned, TTransactionContext &txc, /*TKeyBuilder &kb, */THashMap<TTxId,TVector<TTabletId>> &pushToAffected) const;

void TryInitMonCounters(const TActorContext &ctx);
Expand Down Expand Up @@ -737,6 +739,7 @@ class TTxCoordinator : public TActor<TTxCoordinator>, public TTabletExecutedFlat
HFunc(TEvTabletPipe::TEvServerDisconnected, Handle);
HFunc(TEvPrivate::TEvRestoredProcessingParams, Handle);
hFunc(TEvTxProxy::TEvUpdatedLastStep, Handle);
hFunc(TEvTxProxy::TEvRequirePlanSteps, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
)

Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2285,9 +2285,17 @@ void TDataShard::CheckMediatorStateRestored() {
// writes before the restart, and conversely don't accidentally read any
// data that is definitely not replied yet.
if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) {
const ui64 writeStep = SnapshotManager.GetImmediateWriteEdge().Step;
const TRowVersion edge(GetMaxObservedStep(), Max<ui64>());
SnapshotManager.PromoteImmediateWriteEdgeReplied(
Min(edge, SnapshotManager.GetImmediateWriteEdge()));
// Try to ensure writes become visible sooner rather than later
if (edge.Step < writeStep) {
if (MediatorTimeCastWaitingSteps.insert(writeStep).second) {
Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), writeStep));
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << writeStep << " from mediator time cast");
}
}
}

MediatorStateWaiting = false;
Expand Down
91 changes: 91 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3773,6 +3773,97 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"{ items { uint32_value: 3 } items { uint32_value: 3 } }");
}

Y_UNIT_TEST(ReadIteratorLocalSnapshotThenWrite) {
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);

TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetDomainPlanResolution(100)
.SetAppConfig(app);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;
auto opts = TShardedTableOptions()
.Shards(1)
.Columns({
{"key", "Uint32", true, false},
{"value", "Uint32", false, false}});
CreateShardedTable(server, sender, "/Root", "table-1", opts);
CreateShardedTable(server, sender, "/Root", "table-2", opts);

// Perform a snapshot read, this will persist "reads from snapshots" flag
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, Q_(R"(
SELECT key, value
FROM `/Root/table-1`
UNION ALL
SELECT key, value
FROM `/Root/table-2`
)")),
"");

// Insert rows using a single-shard write
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)"));

// Wait until mediator goes idle
size_t timecastUpdates = 0;
auto observer = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
switch (ev->GetTypeRewrite()) {
case TEvMediatorTimecast::TEvUpdate::EventType: {
++timecastUpdates;
break;
}
case TEvDataShard::TEvRead::EventType: {
auto* msg = ev->Get<TEvDataShard::TEvRead>();
msg->Record.SetMaxRowsInResult(1);
break;
}
}
return TTestActorRuntime::EEventAction::PROCESS;
};
auto prevObserverFunc = runtime.SetObserverFunc(observer);

auto waitFor = [&](const auto& condition, const TString& description) {
if (!condition()) {
Cerr << "... waiting for " << description << Endl;
TDispatchOptions options;
options.CustomFinalCondition = [&]() {
return condition();
};
runtime.DispatchEvents(options);
UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
}
};

waitFor([&]{ return timecastUpdates >= 3; }, "at least 3 timecast updates");

UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, Q_(R"(
SELECT key, value
FROM `/Root/table-1`
ORDER BY key
)")),
"{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
"{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 3 } }");

auto start = runtime.GetCurrentTime();
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 22), (3, 33)"));
auto duration = runtime.GetCurrentTime() - start;
UNIT_ASSERT_C(duration <= TDuration::MilliSeconds(200), "UPSERT takes too much time: " << duration);
}

}

} // namespace NKikimr
Loading

0 comments on commit f0615a5

Please sign in to comment.