From 9df77784d0dd124d6283222de845bb7a73495b9a Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Wed, 9 Oct 2024 12:07:14 +0300 Subject: [PATCH 01/21] compute actor: pass counters to input transform --- ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 3d1b8b284761..177199cf7a06 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1340,6 +1340,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped .SecureParams = secureParams, .TaskParams = taskParams, .ComputeActorId = this->SelfId(), + .TaskCounters = TaskCounters, .TypeEnv = typeEnv, .HolderFactory = holderFactory, .Alloc = Alloc, From 012981a1782e769b7db5549f2f38168db9439b38 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Wed, 9 Oct 2024 12:18:21 +0300 Subject: [PATCH 02/21] input transform: pass in task counters --- .../yql/dq/actors/compute/dq_compute_actor_async_io.h | 1 + .../input_transforms/dq_input_transform_lookup.cpp | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index e4a8754cb485..231aedc54603 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -299,6 +299,7 @@ struct IDqAsyncIoFactory : public TThrRefBase { const THashMap& SecureParams; const THashMap& TaskParams; const NActors::TActorId& ComputeActorId; + ::NMonitoring::TDynamicCounterPtr TaskCounters; const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; const NKikimr::NMiniKQL::THolderFactory& HolderFactory; std::shared_ptr Alloc; diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index be28224dbe7f..59533e1aa2b6 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -33,6 +33,7 @@ class TInputTransformStreamLookupBase ui64 inputIndex, NUdf::TUnboxedValue inputFlow, NActors::TActorId computeActorId, + ::NMonitoring::TDynamicCounterPtr taskCounters, IDqAsyncIoFactory* factory, NDqProto::TDqInputTransformLookupSettings&& settings, TVector&& lookupInputIndexes, @@ -51,6 +52,7 @@ class TInputTransformStreamLookupBase , InputIndex(inputIndex) , InputFlow(std::move(inputFlow)) , ComputeActorId(std::move(computeActorId)) + , TaskCounters(taskCounters) , Factory(factory) , Settings(std::move(settings)) , LookupInputIndexes(std::move(lookupInputIndexes)) @@ -74,6 +76,7 @@ class TInputTransformStreamLookupBase Y_DEBUG_ABORT_UNLESS(OtherInputIndexes[i] < InputRowType->GetElementsCount()); } Y_DEBUG_ABORT_UNLESS(LookupInputIndexes.size() == LookupKeyType->GetMembersCount()); + InitMonCounters(taskCounters); } void Bootstrap() { @@ -227,6 +230,9 @@ class TInputTransformStreamLookupBase return AwaitingQueue.size(); } + void InitMonCounters(const ::NMonitoring::TDynamicCounterPtr& taskCounters) { + } + TMaybe ExtraData() override { google::protobuf::Any result; //TODO fill me @@ -258,6 +264,7 @@ class TInputTransformStreamLookupBase ui64 InputIndex; // NYql::NDq::IDqComputeActorAsyncInput NUdf::TUnboxedValue InputFlow; const NActors::TActorId ComputeActorId; + ::NMonitoring::TDynamicCounterPtr TaskCounters; IDqAsyncIoFactory::TPtr Factory; NDqProto::TDqInputTransformLookupSettings Settings; protected: @@ -527,6 +534,7 @@ std::pair CreateInputTransformStre args.InputIndex, args.TransformInput, args.ComputeActorId, + args.TaskCounters, factory, std::move(settings), std::move(lookupKeyInputIndexes), @@ -546,6 +554,7 @@ std::pair CreateInputTransformStre args.InputIndex, args.TransformInput, args.ComputeActorId, + args.TaskCounters, factory, std::move(settings), std::move(lookupKeyInputIndexes), From 7c60f31e1d086feb39d2d9d585976af6b5faae1f Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Thu, 10 Oct 2024 13:23:16 +0300 Subject: [PATCH 03/21] stream lookup transform: add dynamic counters --- .../dq/actors/compute/dq_compute_actor_impl.h | 39 ++++++++++++++--- .../dq_input_transform_lookup.cpp | 43 +++++++++++++++++++ 2 files changed, 76 insertions(+), 6 deletions(-) diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 177199cf7a06..ccab911b90f3 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -207,6 +207,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped MkqlMemoryQuota = taskCounters->GetCounter("MkqlMemoryQuota"); OutputChannelSize = taskCounters->GetCounter("OutputChannelSize"); SourceCpuTimeMs = taskCounters->GetCounter("SourceCpuTimeMs", true); + InputTransformCpuTimeMs = taskCounters->GetCounter("InputTransformCpuTimeMs", true); } } @@ -1433,11 +1434,20 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) { Y_ABORT_UNLESS(SourcesMap.FindPtr(ev->Get()->InputIndex) || InputTransformsMap.FindPtr(ev->Get()->InputIndex)); - auto cpuTimeDelta = TakeSourceCpuTimeDelta(); - if (SourceCpuTimeMs) { - SourceCpuTimeMs->Add(cpuTimeDelta.MilliSeconds()); + { + auto cpuTimeDelta = TakeSourceCpuTimeDelta(); + if (SourceCpuTimeMs) { + SourceCpuTimeMs->Add(cpuTimeDelta.MilliSeconds()); + } + CpuTimeSpent += cpuTimeDelta; + } + { + auto cpuTimeDelta = TakeInputTransformCpuTimeDelta(); + if (InputTransformCpuTimeMs) { + InputTransformCpuTimeMs->Add(cpuTimeDelta.MilliSeconds()); + } + CpuTimeSpent += cpuTimeDelta; } - CpuTimeSpent += cpuTimeDelta; ContinueExecute(EResumeSource::CANewAsyncInput); } @@ -1632,6 +1642,21 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped return result; } + TDuration GetInputTransformCpuTime() const { + auto result = TDuration::Zero(); + for (auto& [inputIndex, sourceInfo] : InputTransformsMap) { + result += sourceInfo.AsyncInput->GetCpuTime(); + } + return result; + } + + TDuration TakeInputTransformCpuTimeDelta() { + auto newInputTransformCpuTime = GetInputTransformCpuTime(); + auto result = newInputTransformCpuTime - InputTransformCpuTime; + InputTransformCpuTime = newInputTransformCpuTime; + return result; + } + void FillStats(NDqProto::TDqComputeActorStats* dst, bool last) { if (RuntimeSettings.CollectNone()) { return; @@ -1641,7 +1666,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped ReportEventElapsedTime(); } - dst->SetCpuTimeUs(CpuTime.MicroSeconds() + SourceCpuTime.MicroSeconds()); + dst->SetCpuTimeUs(CpuTime.MicroSeconds() + SourceCpuTime.MicroSeconds() + InputTransformCpuTime.MicroSeconds()); dst->SetMaxMemoryUsage(MemoryLimits.MemoryQuotaManager->GetMaxMemorySize()); if (auto memProfileStats = GetMemoryProfileStats(); memProfileStats) { @@ -1680,7 +1705,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped cpuTimeUs += CpuTime.MicroSeconds(); } // CpuTimeUs does include SourceCpuTime - protoTask->SetCpuTimeUs(cpuTimeUs + SourceCpuTime.MicroSeconds()); + protoTask->SetCpuTimeUs(cpuTimeUs + SourceCpuTime.MicroSeconds() + InputTransformCpuTime.MicroSeconds()); protoTask->SetSourceCpuTimeUs(SourceCpuTime.MicroSeconds()); ui64 ingressBytes = 0; @@ -1948,6 +1973,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped TDqComputeActorMetrics MetricsReporter; NWilson::TSpan ComputeActorSpan; TDuration SourceCpuTime; + TDuration InputTransformCpuTime; private: TInstant StartTime; bool Running = true; @@ -1958,6 +1984,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryQuota; ::NMonitoring::TDynamicCounters::TCounterPtr OutputChannelSize; ::NMonitoring::TDynamicCounters::TCounterPtr SourceCpuTimeMs; + ::NMonitoring::TDynamicCounters::TCounterPtr InputTransformCpuTimeMs; THolder Stat; TDuration CpuTimeSpent; }; diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 59533e1aa2b6..1604962160e0 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -135,10 +135,12 @@ class TInputTransformStreamLookupBase } void Handle(IDqAsyncLookupSource::TEvLookupResult::TPtr ev) { + auto StartCycleCount = GetCycleCountFast(); auto guard = BindAllocator(); const auto now = std::chrono::steady_clock::now(); auto lookupResult = ev->Get()->Result.lock(); Y_ABORT_UNLESS(lookupResult == KeysForLookup); + auto lookupResultSize = lookupResult->size(); for (; !AwaitingQueue.empty(); AwaitingQueue.pop_front()) { auto& [lookupKey, inputOther] = AwaitingQueue.front(); auto lookupPayload = lookupResult->FindPtr(lookupKey); @@ -151,6 +153,12 @@ class TInputTransformStreamLookupBase LruCache->Update(NUdf::TUnboxedValue(const_cast(k)), std::move(v), now + CacheTtl); } KeysForLookup.reset(); + if (LookupCount) { + LookupCount->Inc(); + LookupKeys->Add(lookupResultSize); + LookupTimeMs->Add(std::chrono::duration_cast(now - LastLookupTime).count()); + } + CpuTime += GetCpuTimeDelta(StartCycleCount); Send(ComputeActorId, new TEvNewAsyncInputDataArrived{InputIndex}); } @@ -189,6 +197,7 @@ class TInputTransformStreamLookupBase i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe&, bool& finished, i64 freeSpace) final { Y_UNUSED(freeSpace); + auto StartCycleCount = GetCycleCountFast(); auto guard = BindAllocator(); DrainReadyQueue(batch); @@ -220,17 +229,41 @@ class TInputTransformStreamLookupBase } } if (!KeysForLookup->empty()) { + LastLookupTime = now; Send(LookupSourceId, new IDqAsyncLookupSource::TEvLookupRequest(KeysForLookup)); } else { KeysForLookup.reset(); } + if (Batches) { + Batches->Inc(); + LruHits->Add(ReadyQueue.RowCount()); + LruMiss->Add(AwaitingQueue.size()); + } DrainReadyQueue(batch); } + CpuTime += GetCpuTimeDelta(StartCycleCount); finished = IsFinished(); return AwaitingQueue.size(); } void InitMonCounters(const ::NMonitoring::TDynamicCounterPtr& taskCounters) { + if (taskCounters) { + LruHits = taskCounters->GetCounter("StreamLookupTransformLruHits"); + LruMiss = taskCounters->GetCounter("StreamLookupTransformLruMiss"); + CpuTimeMs = taskCounters->GetCounter("StreamLookupTransformCpuTimeMs"); + Batches = taskCounters->GetCounter("StreamLookupTransformBatchCount"); + LookupCount = taskCounters->GetCounter("StreamLookupTransformCount"); + LookupKeys = taskCounters->GetCounter("StreamLookupTransformKeys"); + LookupTimeMs = taskCounters->GetCounter("StreamLookupTransformTimeMs"); + } + } + + static TDuration GetCpuTimeDelta(ui64 StartCycleCount) { + return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - StartCycleCount)); + } + + TDuration GetCpuTime() override { + return CpuTime; } TMaybe ExtraData() override { @@ -288,6 +321,16 @@ class TInputTransformStreamLookupBase NKikimr::NMiniKQL::TUnboxedValueBatch ReadyQueue; NYql::NDq::TDqAsyncStats IngressStats; std::shared_ptr KeysForLookup; + + ::NMonitoring::TDynamicCounters::TCounterPtr LruHits; + ::NMonitoring::TDynamicCounters::TCounterPtr LruMiss; + ::NMonitoring::TDynamicCounters::TCounterPtr CpuTimeMs; + ::NMonitoring::TDynamicCounters::TCounterPtr Batches; + ::NMonitoring::TDynamicCounters::TCounterPtr LookupCount; + ::NMonitoring::TDynamicCounters::TCounterPtr LookupKeys; + ::NMonitoring::TDynamicCounters::TCounterPtr LookupTimeMs; + std::chrono::steady_clock::time_point LastLookupTime {}; + TDuration CpuTime {}; }; class TInputTransformStreamLookupWide: public TInputTransformStreamLookupBase { From ddf55708d9e4a1facae899bf229ce3d16b56cc3a Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Thu, 10 Oct 2024 13:26:05 +0300 Subject: [PATCH 04/21] generic lookup: add dynamic counters --- .../compute/dq_compute_actor_async_io.h | 1 + .../dq_input_transform_lookup.cpp | 1 + .../actors/yql_generic_lookup_actor.cpp | 48 +++++++++++++++++++ .../generic/actors/yql_generic_lookup_actor.h | 1 + .../actors/yql_generic_provider_factories.cpp | 1 + 5 files changed, 52 insertions(+) diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 231aedc54603..f5b74860a836 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -266,6 +266,7 @@ struct IDqAsyncIoFactory : public TThrRefBase { std::shared_ptr Alloc; std::shared_ptr KeyTypeHelper; NActors::TActorId ParentId; + ::NMonitoring::TDynamicCounterPtr TaskCounters; google::protobuf::Any LookupSource; //provider specific data source const NKikimr::NMiniKQL::TStructType* KeyType; const NKikimr::NMiniKQL::TStructType* PayloadType; diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 1604962160e0..34a7e084e90f 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -85,6 +85,7 @@ class TInputTransformStreamLookupBase .Alloc = Alloc, .KeyTypeHelper = KeyTypeHelper, .ParentId = SelfId(), + .TaskCounters = TaskCounters, .LookupSource = Settings.GetRightSource().GetLookupSource(), .KeyType = LookupKeyType, .PayloadType = LookupPayloadType, diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp index 25c809c63d79..994d52ccb2a0 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp @@ -64,6 +64,7 @@ namespace NYql::NDq { NConnector::IClient::TPtr connectorClient, TGenericTokenProvider::TPtr tokenProvider, NActors::TActorId&& parentId, + ::NMonitoring::TDynamicCounterPtr taskCounters, std::shared_ptr alloc, std::shared_ptr keyTypeHelper, NYql::Generic::TLookupSource&& lookupSource, @@ -85,6 +86,7 @@ namespace NYql::NDq { , ColumnDestinations(CreateColumnDestination()) , MaxKeysInRequest(maxKeysInRequest) { + InitMonCounters(taskCounters); } ~TGenericLookupActor() { @@ -97,6 +99,17 @@ namespace NYql::NDq { Request.reset(); KeyTypeHelper.reset(); } + void InitMonCounters(const ::NMonitoring::TDynamicCounterPtr& taskCounters) { + if (taskCounters) { + Count = taskCounters->GetCounter("GenericLookupCount"); + Keys = taskCounters->GetCounter("GenericLookupKeys"); + ResultChunks = taskCounters->GetCounter("GenericLookupResultChunks"); + ResultRows = taskCounters->GetCounter("GenericLookupResultRows"); + ResultBytes = taskCounters->GetCounter("GenericLookupResultBytes"); + AnswerTime = taskCounters->GetCounter("GenericLookupAnswerTimeMs"); + CpuTime = taskCounters->GetCounter("GenericLookupCpuTimeUs"); + } + } public: void Bootstrap() { @@ -209,13 +222,24 @@ namespace NYql::NDq { } private: + static TDuration GetCpuTimeDelta(ui64 StartCycleCount) { + return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - StartCycleCount)); + } + void CreateRequest(std::shared_ptr request) { if (!request) { return; } + auto StartCycleCount = GetCycleCountFast(); + SentTime = TInstant::Now(); YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request->size() << " keys"; Y_ABORT_IF(request->size() == 0 || request->size() > MaxKeysInRequest); + if (Count) { + Count->Inc(); + Keys->Add(request->size()); + } + Request = std::move(request); NConnector::NApi::TListSplitsRequest splitRequest; @@ -237,6 +261,8 @@ namespace NYql::NDq { SendError(actorSystem, selfId, result.Status); } }); + if (CpuTime) + CpuTime->Add(GetCpuTimeDelta(StartCycleCount).MicroSeconds()); } void ReadNextData() { @@ -264,7 +290,15 @@ namespace NYql::NDq { } void ProcessReceivedData(const NConnector::NApi::TReadSplitsResponse& resp) { + auto StartCycleCount = GetCycleCountFast(); Y_ABORT_UNLESS(resp.payload_case() == NConnector::NApi::TReadSplitsResponse::PayloadCase::kArrowIpcStreaming); + if (ResultChunks) { + ResultChunks->Inc(); + if (resp.has_stats()) { + ResultRows->Add(resp.stats().rows()); + ResultBytes->Add(resp.stats().bytes()); + } + } auto guard = Guard(*Alloc); NKikimr::NArrow::NSerialization::TSerializerContainer deser = NKikimr::NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer(); // todo move to class' member const auto& data = deser->Deserialize(resp.arrow_ipc_streaming()); @@ -290,12 +324,16 @@ namespace NYql::NDq { *v = std::move(output); // duplicates will be overwritten } } + if (CpuTime) + CpuTime->Add(GetCpuTimeDelta(StartCycleCount).MicroSeconds()); } void FinalizeRequest() { YQL_CLOG(DEBUG, ProviderGeneric) << "Sending lookup results for " << Request->size() << " keys"; auto guard = Guard(*Alloc); auto ev = new IDqAsyncLookupSource::TEvLookupResult(Request); + if (AnswerTime) + AnswerTime->Add((TInstant::Now() - SentTime).MilliSeconds()); Request.reset(); TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(ParentId, SelfId(), ev)); LookupResult = {}; @@ -398,12 +436,21 @@ namespace NYql::NDq { std::shared_ptr Request; NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult; + ::NMonitoring::TDynamicCounters::TCounterPtr Count; + ::NMonitoring::TDynamicCounters::TCounterPtr Keys; + ::NMonitoring::TDynamicCounters::TCounterPtr ResultRows; + ::NMonitoring::TDynamicCounters::TCounterPtr ResultBytes; + ::NMonitoring::TDynamicCounters::TCounterPtr ResultChunks; + ::NMonitoring::TDynamicCounters::TCounterPtr AnswerTime; + ::NMonitoring::TDynamicCounters::TCounterPtr CpuTime; + TInstant SentTime; }; std::pair CreateGenericLookupActor( NConnector::IClient::TPtr connectorClient, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NActors::TActorId parentId, + ::NMonitoring::TDynamicCounterPtr taskCounters, std::shared_ptr alloc, std::shared_ptr keyTypeHelper, NYql::Generic::TLookupSource&& lookupSource, @@ -419,6 +466,7 @@ namespace NYql::NDq { connectorClient, std::move(tokenProvider), std::move(parentId), + taskCounters, alloc, keyTypeHelper, std::move(lookupSource), diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h index 9f8c0c268f23..128964b1553f 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h +++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h @@ -15,6 +15,7 @@ namespace NYql::NDq { NConnector::IClient::TPtr connectorClient, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NActors::TActorId parentId, + ::NMonitoring::TDynamicCounterPtr taskCounters, std::shared_ptr alloc, std::shared_ptr keyTypeHelper, NYql::Generic::TLookupSource&& lookupSource, diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp index a7422cd018df..7aaa837d961b 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp @@ -22,6 +22,7 @@ namespace NYql::NDq { genericClient, credentialsFactory, std::move(args.ParentId), + args.TaskCounters, args.Alloc, args.KeyTypeHelper, std::move(lookupSource), From 322e48c8219db6b60f903b20692c21dbe7570286 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Thu, 10 Oct 2024 13:27:04 +0300 Subject: [PATCH 05/21] tests/tools/fq_runner/kikimr_metrics: avoid failure on missing "value" --- ydb/tests/tools/fq_runner/kikimr_metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/tests/tools/fq_runner/kikimr_metrics.py b/ydb/tests/tools/fq_runner/kikimr_metrics.py index 006e95119738..08825289171b 100644 --- a/ydb/tests/tools/fq_runner/kikimr_metrics.py +++ b/ydb/tests/tools/fq_runner/kikimr_metrics.py @@ -31,7 +31,7 @@ def find_sensors(self, labels, key_label): continue v = lbls.get(key_label, None) if v is not None: - result[v] = s["value"] + result[v] = s.get("value", None) return result def collect_non_zeros(self): From 49679837c0416ccb4c22b9cf22bd19506f369ae0 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Thu, 10 Oct 2024 13:28:50 +0300 Subject: [PATCH 06/21] tests/fq/generic/streaming/test_join: dump counters in debug mode --- ydb/tests/fq/generic/streaming/test_join.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/ydb/tests/fq/generic/streaming/test_join.py b/ydb/tests/fq/generic/streaming/test_join.py index 61fa4210180b..95a5b2cb9823 100644 --- a/ydb/tests/fq/generic/streaming/test_join.py +++ b/ydb/tests/fq/generic/streaming/test_join.py @@ -519,6 +519,15 @@ def test_streamlookup( messages_ctr = Counter(map(freeze, map(json.loads, map(itemgetter(1), messages)))) assert read_data_ctr == messages_ctr + if DEBUG: + for node_index in kikimr.compute_plane.kikimr_cluster.nodes: + sensors = kikimr.compute_plane.get_sensors(node_index, "dq_tasks").find_sensors(labels={"operation": query_id}, key_label="sensor") + for k in sensors: + for prefix in ("GenericLookup", "StreamLookupTransform", "InputTransform"): + if k.startswith(prefix): + print(f'node[{node_index}].operation[{query_id}].{k} = {sensors[k]}', file=sys.stderr) + break + fq_client.abort_query(query_id) fq_client.wait_query(query_id) From d81e08535dd70c94c566119d43d90141875990d2 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Thu, 10 Oct 2024 13:48:45 +0300 Subject: [PATCH 07/21] streamlookup: fix batch counting --- .../input_transforms/dq_input_transform_lookup.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 34a7e084e90f..22f08d078a75 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -229,17 +229,17 @@ class TInputTransformStreamLookupBase KeysForLookup->emplace(std::move(key), NUdf::TUnboxedValue{}); } } + if (Batches && (!KeysForLookup->empty() || !ReadyQueue.RowCount())) { + Batches->Inc(); + LruHits->Add(ReadyQueue.RowCount()); + LruMiss->Add(AwaitingQueue.size()); + } if (!KeysForLookup->empty()) { LastLookupTime = now; Send(LookupSourceId, new IDqAsyncLookupSource::TEvLookupRequest(KeysForLookup)); } else { KeysForLookup.reset(); } - if (Batches) { - Batches->Inc(); - LruHits->Add(ReadyQueue.RowCount()); - LruMiss->Add(AwaitingQueue.size()); - } DrainReadyQueue(batch); } CpuTime += GetCpuTimeDelta(StartCycleCount); From 4522de1d56d9bcbd6d6763936df5228488fd63ea Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Thu, 10 Oct 2024 14:24:58 +0300 Subject: [PATCH 08/21] fix genericlookup ut --- .../providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp b/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp index e85bdfa01e14..619f10a99bcf 100644 --- a/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp +++ b/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp @@ -176,6 +176,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) { connectorMock, std::make_shared(), edge, + nullptr, alloc, keyTypeHelper, std::move(lookupSourceSettings), From 49c4fdc4c85db62ad7add585cadba4a58a9d7273 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Thu, 10 Oct 2024 14:25:18 +0300 Subject: [PATCH 09/21] fix python style --- ydb/tests/fq/generic/streaming/test_join.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ydb/tests/fq/generic/streaming/test_join.py b/ydb/tests/fq/generic/streaming/test_join.py index 95a5b2cb9823..85a6906e2cde 100644 --- a/ydb/tests/fq/generic/streaming/test_join.py +++ b/ydb/tests/fq/generic/streaming/test_join.py @@ -521,7 +521,9 @@ def test_streamlookup( if DEBUG: for node_index in kikimr.compute_plane.kikimr_cluster.nodes: - sensors = kikimr.compute_plane.get_sensors(node_index, "dq_tasks").find_sensors(labels={"operation": query_id}, key_label="sensor") + sensors = kikimr.compute_plane.get_sensors(node_index, "dq_tasks").find_sensors( + labels={"operation": query_id}, key_label="sensor" + ) for k in sensors: for prefix in ("GenericLookup", "StreamLookupTransform", "InputTransform"): if k.startswith(prefix): From 11d659e67bd2e0a3c8a550c9515810f1c8bbeab9 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Thu, 10 Oct 2024 14:30:16 +0300 Subject: [PATCH 10/21] Remove duplicated counters StreamLookupTransformCount is same as GenericLookupCount StreamLookupTransformKeys is same as GenericLookupKeys StreamLookupTransformTimeMs is almost same as GenericLookupAnswerTimeMs (plus expenses on actor message passing, and it expected to be small) --- .../input_transforms/dq_input_transform_lookup.cpp | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 22f08d078a75..1883b5cb0bb9 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -154,11 +154,6 @@ class TInputTransformStreamLookupBase LruCache->Update(NUdf::TUnboxedValue(const_cast(k)), std::move(v), now + CacheTtl); } KeysForLookup.reset(); - if (LookupCount) { - LookupCount->Inc(); - LookupKeys->Add(lookupResultSize); - LookupTimeMs->Add(std::chrono::duration_cast(now - LastLookupTime).count()); - } CpuTime += GetCpuTimeDelta(StartCycleCount); Send(ComputeActorId, new TEvNewAsyncInputDataArrived{InputIndex}); } @@ -253,9 +248,6 @@ class TInputTransformStreamLookupBase LruMiss = taskCounters->GetCounter("StreamLookupTransformLruMiss"); CpuTimeMs = taskCounters->GetCounter("StreamLookupTransformCpuTimeMs"); Batches = taskCounters->GetCounter("StreamLookupTransformBatchCount"); - LookupCount = taskCounters->GetCounter("StreamLookupTransformCount"); - LookupKeys = taskCounters->GetCounter("StreamLookupTransformKeys"); - LookupTimeMs = taskCounters->GetCounter("StreamLookupTransformTimeMs"); } } @@ -327,9 +319,6 @@ class TInputTransformStreamLookupBase ::NMonitoring::TDynamicCounters::TCounterPtr LruMiss; ::NMonitoring::TDynamicCounters::TCounterPtr CpuTimeMs; ::NMonitoring::TDynamicCounters::TCounterPtr Batches; - ::NMonitoring::TDynamicCounters::TCounterPtr LookupCount; - ::NMonitoring::TDynamicCounters::TCounterPtr LookupKeys; - ::NMonitoring::TDynamicCounters::TCounterPtr LookupTimeMs; std::chrono::steady_clock::time_point LastLookupTime {}; TDuration CpuTime {}; }; From d359dd3132c7125a8ebaa29e2497636fed7fb45c Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Thu, 10 Oct 2024 14:42:09 +0300 Subject: [PATCH 11/21] fix build (unused variable) --- .../yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 1883b5cb0bb9..e6794e26fab8 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -141,7 +141,6 @@ class TInputTransformStreamLookupBase const auto now = std::chrono::steady_clock::now(); auto lookupResult = ev->Get()->Result.lock(); Y_ABORT_UNLESS(lookupResult == KeysForLookup); - auto lookupResultSize = lookupResult->size(); for (; !AwaitingQueue.empty(); AwaitingQueue.pop_front()) { auto& [lookupKey, inputOther] = AwaitingQueue.front(); auto lookupPayload = lookupResult->FindPtr(lookupKey); From 2c8e95685cfebcfe8fac5b382e57daaef6627601 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Thu, 10 Oct 2024 15:00:52 +0300 Subject: [PATCH 12/21] steamlookup: remove now-unused member --- .../dq/actors/input_transforms/dq_input_transform_lookup.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index e6794e26fab8..8f30eeb625a8 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -229,7 +229,6 @@ class TInputTransformStreamLookupBase LruMiss->Add(AwaitingQueue.size()); } if (!KeysForLookup->empty()) { - LastLookupTime = now; Send(LookupSourceId, new IDqAsyncLookupSource::TEvLookupRequest(KeysForLookup)); } else { KeysForLookup.reset(); @@ -318,7 +317,6 @@ class TInputTransformStreamLookupBase ::NMonitoring::TDynamicCounters::TCounterPtr LruMiss; ::NMonitoring::TDynamicCounters::TCounterPtr CpuTimeMs; ::NMonitoring::TDynamicCounters::TCounterPtr Batches; - std::chrono::steady_clock::time_point LastLookupTime {}; TDuration CpuTime {}; }; From 69ba94a70e42209caebc770ea49440ae6a52d62f Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Thu, 10 Oct 2024 19:07:08 +0300 Subject: [PATCH 13/21] Add forgotten StreamLookupCpuTimeUs --- .../input_transforms/dq_input_transform_lookup.cpp | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 8f30eeb625a8..6294a99fbf45 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -153,7 +153,10 @@ class TInputTransformStreamLookupBase LruCache->Update(NUdf::TUnboxedValue(const_cast(k)), std::move(v), now + CacheTtl); } KeysForLookup.reset(); - CpuTime += GetCpuTimeDelta(StartCycleCount); + auto deltaTime = GetCpuTimeDelta(StartCycleCount); + CpuTime += deltaTime; + if (CpuTimeUs) + CpuTimeUs->Add(deltaTime.MicroSeconds()); Send(ComputeActorId, new TEvNewAsyncInputDataArrived{InputIndex}); } @@ -235,7 +238,10 @@ class TInputTransformStreamLookupBase } DrainReadyQueue(batch); } - CpuTime += GetCpuTimeDelta(StartCycleCount); + auto deltaTime = GetCpuTimeDelta(StartCycleCount); + CpuTime += deltaTime; + if (CpuTimeUs) + CpuTimeUs->Add(deltaTime.MicroSeconds()); finished = IsFinished(); return AwaitingQueue.size(); } @@ -244,7 +250,7 @@ class TInputTransformStreamLookupBase if (taskCounters) { LruHits = taskCounters->GetCounter("StreamLookupTransformLruHits"); LruMiss = taskCounters->GetCounter("StreamLookupTransformLruMiss"); - CpuTimeMs = taskCounters->GetCounter("StreamLookupTransformCpuTimeMs"); + CpuTimeUs = taskCounters->GetCounter("StreamLookupTransformCpuTimeUs"); Batches = taskCounters->GetCounter("StreamLookupTransformBatchCount"); } } @@ -315,7 +321,7 @@ class TInputTransformStreamLookupBase ::NMonitoring::TDynamicCounters::TCounterPtr LruHits; ::NMonitoring::TDynamicCounters::TCounterPtr LruMiss; - ::NMonitoring::TDynamicCounters::TCounterPtr CpuTimeMs; + ::NMonitoring::TDynamicCounters::TCounterPtr CpuTimeUs; ::NMonitoring::TDynamicCounters::TCounterPtr Batches; TDuration CpuTime {}; }; From 861b5a133554954a40b3094703501fac8149808f Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 11 Oct 2024 13:17:48 +0300 Subject: [PATCH 14/21] fix review comments --- .../input_transforms/dq_input_transform_lookup.cpp | 12 ++++++------ .../generic/actors/yql_generic_lookup_actor.cpp | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 9dc27ca45b45..d3174f03a6a6 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -146,7 +146,7 @@ class TInputTransformStreamLookupBase } void Handle(IDqAsyncLookupSource::TEvLookupResult::TPtr ev) { - auto StartCycleCount = GetCycleCountFast(); + auto startCycleCount = GetCycleCountFast(); if (!KeysForLookup) return; auto guard = BindAllocator(); @@ -165,7 +165,7 @@ class TInputTransformStreamLookupBase LruCache->Update(NUdf::TUnboxedValue(const_cast(k)), std::move(v), now + CacheTtl); } KeysForLookup->clear(); - auto deltaTime = GetCpuTimeDelta(StartCycleCount); + auto deltaTime = GetCpuTimeDelta(startCycleCount); CpuTime += deltaTime; if (CpuTimeUs) CpuTimeUs->Add(deltaTime.MicroSeconds()); @@ -207,7 +207,7 @@ class TInputTransformStreamLookupBase i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe&, bool& finished, i64 freeSpace) final { Y_UNUSED(freeSpace); - auto StartCycleCount = GetCycleCountFast(); + auto startCycleCount = GetCycleCountFast(); auto guard = BindAllocator(); DrainReadyQueue(batch); @@ -247,7 +247,7 @@ class TInputTransformStreamLookupBase } DrainReadyQueue(batch); } - auto deltaTime = GetCpuTimeDelta(StartCycleCount); + auto deltaTime = GetCpuTimeDelta(startCycleCount); CpuTime += deltaTime; if (CpuTimeUs) CpuTimeUs->Add(deltaTime.MicroSeconds()); @@ -264,8 +264,8 @@ class TInputTransformStreamLookupBase } } - static TDuration GetCpuTimeDelta(ui64 StartCycleCount) { - return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - StartCycleCount)); + static TDuration GetCpuTimeDelta(ui64 startCycleCount) { + return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - startCycleCount)); } TDuration GetCpuTime() override { diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp index c8c2aa763f98..c28085d60bb9 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp @@ -228,15 +228,15 @@ namespace NYql::NDq { } private: - static TDuration GetCpuTimeDelta(ui64 StartCycleCount) { - return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - StartCycleCount)); + static TDuration GetCpuTimeDelta(ui64 startCycleCount) { + return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - startCycleCount)); } void CreateRequest(std::shared_ptr request) { if (!request) { return; } - auto StartCycleCount = GetCycleCountFast(); + auto startCycleCount = GetCycleCountFast(); SentTime = TInstant::Now(); YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request->size() << " keys"; Y_ABORT_IF(request->size() == 0 || request->size() > MaxKeysInRequest); @@ -268,7 +268,7 @@ namespace NYql::NDq { } }); if (CpuTime) - CpuTime->Add(GetCpuTimeDelta(StartCycleCount).MicroSeconds()); + CpuTime->Add(GetCpuTimeDelta(startCycleCount).MicroSeconds()); } void ReadNextData() { @@ -296,7 +296,7 @@ namespace NYql::NDq { } void ProcessReceivedData(const NConnector::NApi::TReadSplitsResponse& resp) { - auto StartCycleCount = GetCycleCountFast(); + auto startCycleCount = GetCycleCountFast(); Y_ABORT_UNLESS(resp.payload_case() == NConnector::NApi::TReadSplitsResponse::PayloadCase::kArrowIpcStreaming); if (ResultChunks) { ResultChunks->Inc(); @@ -331,7 +331,7 @@ namespace NYql::NDq { } } if (CpuTime) - CpuTime->Add(GetCpuTimeDelta(StartCycleCount).MicroSeconds()); + CpuTime->Add(GetCpuTimeDelta(startCycleCount).MicroSeconds()); } void FinalizeRequest() { From 712f4d932d0c4c0ea000f6877cc5788e4ce5c2bf Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 11 Oct 2024 13:23:09 +0300 Subject: [PATCH 15/21] fix review comments --- .../input_transforms/dq_input_transform_lookup.cpp | 8 +++++--- .../generic/actors/yql_generic_lookup_actor.cpp | 9 ++++++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index d3174f03a6a6..cdbc72ebdcc3 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -167,8 +167,9 @@ class TInputTransformStreamLookupBase KeysForLookup->clear(); auto deltaTime = GetCpuTimeDelta(startCycleCount); CpuTime += deltaTime; - if (CpuTimeUs) + if (CpuTimeUs) { CpuTimeUs->Add(deltaTime.MicroSeconds()); + } Send(ComputeActorId, new TEvNewAsyncInputDataArrived{InputIndex}); } @@ -249,8 +250,9 @@ class TInputTransformStreamLookupBase } auto deltaTime = GetCpuTimeDelta(startCycleCount); CpuTime += deltaTime; - if (CpuTimeUs) + if (CpuTimeUs) { CpuTimeUs->Add(deltaTime.MicroSeconds()); + } finished = IsFinished(); return AwaitingQueue.size(); } @@ -332,7 +334,7 @@ class TInputTransformStreamLookupBase ::NMonitoring::TDynamicCounters::TCounterPtr LruMiss; ::NMonitoring::TDynamicCounters::TCounterPtr CpuTimeUs; ::NMonitoring::TDynamicCounters::TCounterPtr Batches; - TDuration CpuTime {}; + TDuration CpuTime; }; class TInputTransformStreamLookupWide: public TInputTransformStreamLookupBase { diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp index c28085d60bb9..cfd765a65dee 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp @@ -267,8 +267,9 @@ namespace NYql::NDq { SendError(actorSystem, selfId, result.Status); } }); - if (CpuTime) + if (CpuTime) { CpuTime->Add(GetCpuTimeDelta(startCycleCount).MicroSeconds()); + } } void ReadNextData() { @@ -330,16 +331,18 @@ namespace NYql::NDq { *v = std::move(output); // duplicates will be overwritten } } - if (CpuTime) + if (CpuTime) { CpuTime->Add(GetCpuTimeDelta(startCycleCount).MicroSeconds()); + } } void FinalizeRequest() { YQL_CLOG(DEBUG, ProviderGeneric) << "Sending lookup results for " << Request->size() << " keys"; auto guard = Guard(*Alloc); auto ev = new IDqAsyncLookupSource::TEvLookupResult(Request); - if (AnswerTime) + if (AnswerTime) { AnswerTime->Add((TInstant::Now() - SentTime).MilliSeconds()); + } Request.reset(); TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(ParentId, SelfId(), ev)); LookupResult = {}; From 7faa3792706f793aebb9e86b8092fd6b696f34c4 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 11 Oct 2024 13:33:00 +0300 Subject: [PATCH 16/21] fix review comments (reduce indentation) --- .../dq_input_transform_lookup.cpp | 11 ++++++----- .../generic/actors/yql_generic_lookup_actor.cpp | 17 +++++++++-------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index cdbc72ebdcc3..5f4b916b53ff 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -258,12 +258,13 @@ class TInputTransformStreamLookupBase } void InitMonCounters(const ::NMonitoring::TDynamicCounterPtr& taskCounters) { - if (taskCounters) { - LruHits = taskCounters->GetCounter("StreamLookupTransformLruHits"); - LruMiss = taskCounters->GetCounter("StreamLookupTransformLruMiss"); - CpuTimeUs = taskCounters->GetCounter("StreamLookupTransformCpuTimeUs"); - Batches = taskCounters->GetCounter("StreamLookupTransformBatchCount"); + if (!taskCounters) { + return; } + LruHits = taskCounters->GetCounter("StreamLookupTransformLruHits"); + LruMiss = taskCounters->GetCounter("StreamLookupTransformLruMiss"); + CpuTimeUs = taskCounters->GetCounter("StreamLookupTransformCpuTimeUs"); + Batches = taskCounters->GetCounter("StreamLookupTransformBatchCount"); } static TDuration GetCpuTimeDelta(ui64 startCycleCount) { diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp index cfd765a65dee..efdddea50561 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp @@ -100,15 +100,16 @@ namespace NYql::NDq { KeyTypeHelper.reset(); } void InitMonCounters(const ::NMonitoring::TDynamicCounterPtr& taskCounters) { - if (taskCounters) { - Count = taskCounters->GetCounter("GenericLookupCount"); - Keys = taskCounters->GetCounter("GenericLookupKeys"); - ResultChunks = taskCounters->GetCounter("GenericLookupResultChunks"); - ResultRows = taskCounters->GetCounter("GenericLookupResultRows"); - ResultBytes = taskCounters->GetCounter("GenericLookupResultBytes"); - AnswerTime = taskCounters->GetCounter("GenericLookupAnswerTimeMs"); - CpuTime = taskCounters->GetCounter("GenericLookupCpuTimeUs"); + if (!taskCounters) { + return; } + Count = taskCounters->GetCounter("GenericLookupCount"); + Keys = taskCounters->GetCounter("GenericLookupKeys"); + ResultChunks = taskCounters->GetCounter("GenericLookupResultChunks"); + ResultRows = taskCounters->GetCounter("GenericLookupResultRows"); + ResultBytes = taskCounters->GetCounter("GenericLookupResultBytes"); + AnswerTime = taskCounters->GetCounter("GenericLookupAnswerTimeMs"); + CpuTime = taskCounters->GetCounter("GenericLookupCpuTimeUs"); } public: From 91b6dac2cbd2cd4b7c4e05ebdd4ea8dd9ac9f3ca Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 11 Oct 2024 13:42:58 +0300 Subject: [PATCH 17/21] fix review (shorten labels) --- .../actors/input_transforms/dq_input_transform_lookup.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 5f4b916b53ff..597052aff037 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -261,10 +261,10 @@ class TInputTransformStreamLookupBase if (!taskCounters) { return; } - LruHits = taskCounters->GetCounter("StreamLookupTransformLruHits"); - LruMiss = taskCounters->GetCounter("StreamLookupTransformLruMiss"); - CpuTimeUs = taskCounters->GetCounter("StreamLookupTransformCpuTimeUs"); - Batches = taskCounters->GetCounter("StreamLookupTransformBatchCount"); + LruHits = taskCounters->GetCounter("StreamLookupLruHits"); + LruMiss = taskCounters->GetCounter("StreamLookupLruMiss"); + CpuTimeUs = taskCounters->GetCounter("StreamLookupCpuTimeUs"); + Batches = taskCounters->GetCounter("StreamLookupBatches"); } static TDuration GetCpuTimeDelta(ui64 startCycleCount) { From d73c9db91f8731008674c11639c66dd8aaaa01dd Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 11 Oct 2024 13:46:11 +0300 Subject: [PATCH 18/21] fixup! fix review (shorten labels) --- ydb/tests/fq/generic/streaming/test_join.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/tests/fq/generic/streaming/test_join.py b/ydb/tests/fq/generic/streaming/test_join.py index 85a6906e2cde..3d7dbe2d3cfd 100644 --- a/ydb/tests/fq/generic/streaming/test_join.py +++ b/ydb/tests/fq/generic/streaming/test_join.py @@ -525,7 +525,7 @@ def test_streamlookup( labels={"operation": query_id}, key_label="sensor" ) for k in sensors: - for prefix in ("GenericLookup", "StreamLookupTransform", "InputTransform"): + for prefix in ("GenericLookup", "StreamLookup", "InputTransform"): if k.startswith(prefix): print(f'node[{node_index}].operation[{query_id}].{k} = {sensors[k]}', file=sys.stderr) break From 2769269ad72816dd966a2933941e17608823ca7c Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 11 Oct 2024 13:50:53 +0300 Subject: [PATCH 19/21] Fix indentation --- .../dq/actors/input_transforms/dq_input_transform_lookup.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 597052aff037..aeb000403516 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -214,8 +214,8 @@ class TInputTransformStreamLookupBase DrainReadyQueue(batch); if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && KeysForLookup->empty()) { - NUdf::TUnboxedValue* inputRowItems; - NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems); + NUdf::TUnboxedValue* inputRowItems; + NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems); const auto now = std::chrono::steady_clock::now(); LruCache->Prune(now); while ( From 868e55dbe905fd0d801364ebfa3dd7ca745cce98 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 11 Oct 2024 13:51:19 +0300 Subject: [PATCH 20/21] Verify invariants (!NDEBUG) --- .../yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index aeb000403516..0578f8736a8e 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -214,6 +214,7 @@ class TInputTransformStreamLookupBase DrainReadyQueue(batch); if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && KeysForLookup->empty()) { + Y_DEBUG_ABORT_UNLESS(AwaitingQueue.empty()); NUdf::TUnboxedValue* inputRowItems; NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems); const auto now = std::chrono::steady_clock::now(); From 6c3a9102bc1450a1ac3e46bf1e83cb6bb6c90b11 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 11 Oct 2024 15:48:40 +0300 Subject: [PATCH 21/21] yet another counter rename round --- .../input_transforms/dq_input_transform_lookup.cpp | 8 ++++---- .../generic/actors/yql_generic_lookup_actor.cpp | 14 +++++++------- ydb/tests/fq/generic/streaming/test_join.py | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 0578f8736a8e..e061e7b3ce61 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -262,10 +262,10 @@ class TInputTransformStreamLookupBase if (!taskCounters) { return; } - LruHits = taskCounters->GetCounter("StreamLookupLruHits"); - LruMiss = taskCounters->GetCounter("StreamLookupLruMiss"); - CpuTimeUs = taskCounters->GetCounter("StreamLookupCpuTimeUs"); - Batches = taskCounters->GetCounter("StreamLookupBatches"); + LruHits = taskCounters->GetCounter("LookupHits"); + LruMiss = taskCounters->GetCounter("LookupMiss"); + CpuTimeUs = taskCounters->GetCounter("LookupCpuUs"); + Batches = taskCounters->GetCounter("LookupBatches"); } static TDuration GetCpuTimeDelta(ui64 startCycleCount) { diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp index efdddea50561..720c84c1bee0 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp @@ -103,13 +103,13 @@ namespace NYql::NDq { if (!taskCounters) { return; } - Count = taskCounters->GetCounter("GenericLookupCount"); - Keys = taskCounters->GetCounter("GenericLookupKeys"); - ResultChunks = taskCounters->GetCounter("GenericLookupResultChunks"); - ResultRows = taskCounters->GetCounter("GenericLookupResultRows"); - ResultBytes = taskCounters->GetCounter("GenericLookupResultBytes"); - AnswerTime = taskCounters->GetCounter("GenericLookupAnswerTimeMs"); - CpuTime = taskCounters->GetCounter("GenericLookupCpuTimeUs"); + Count = taskCounters->GetCounter("LookupSrcReqs"); + Keys = taskCounters->GetCounter("LookupSrcKeys"); + ResultChunks = taskCounters->GetCounter("LookupSrcChunks"); + ResultRows = taskCounters->GetCounter("LookupSrcRows"); + ResultBytes = taskCounters->GetCounter("LookupSrcBytes"); + AnswerTime = taskCounters->GetCounter("LookupSrcAnswerMs"); + CpuTime = taskCounters->GetCounter("LookupSrcCpuUs"); } public: diff --git a/ydb/tests/fq/generic/streaming/test_join.py b/ydb/tests/fq/generic/streaming/test_join.py index 3d7dbe2d3cfd..0f8cb0c4c0c1 100644 --- a/ydb/tests/fq/generic/streaming/test_join.py +++ b/ydb/tests/fq/generic/streaming/test_join.py @@ -525,7 +525,7 @@ def test_streamlookup( labels={"operation": query_id}, key_label="sensor" ) for k in sensors: - for prefix in ("GenericLookup", "StreamLookup", "InputTransform"): + for prefix in ("Lookup", "InputTransform"): if k.startswith(prefix): print(f'node[{node_index}].operation[{query_id}].{k} = {sensors[k]}', file=sys.stderr) break