diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index e4a8754cb485..f5b74860a836 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -266,6 +266,7 @@ struct IDqAsyncIoFactory : public TThrRefBase { std::shared_ptr Alloc; std::shared_ptr KeyTypeHelper; NActors::TActorId ParentId; + ::NMonitoring::TDynamicCounterPtr TaskCounters; google::protobuf::Any LookupSource; //provider specific data source const NKikimr::NMiniKQL::TStructType* KeyType; const NKikimr::NMiniKQL::TStructType* PayloadType; @@ -299,6 +300,7 @@ struct IDqAsyncIoFactory : public TThrRefBase { const THashMap& SecureParams; const THashMap& TaskParams; const NActors::TActorId& ComputeActorId; + ::NMonitoring::TDynamicCounterPtr TaskCounters; const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; const NKikimr::NMiniKQL::THolderFactory& HolderFactory; std::shared_ptr Alloc; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 3d1b8b284761..ccab911b90f3 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -207,6 +207,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped MkqlMemoryQuota = taskCounters->GetCounter("MkqlMemoryQuota"); OutputChannelSize = taskCounters->GetCounter("OutputChannelSize"); SourceCpuTimeMs = taskCounters->GetCounter("SourceCpuTimeMs", true); + InputTransformCpuTimeMs = taskCounters->GetCounter("InputTransformCpuTimeMs", true); } } @@ -1340,6 +1341,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped .SecureParams = secureParams, .TaskParams = taskParams, .ComputeActorId = this->SelfId(), + .TaskCounters = TaskCounters, .TypeEnv = typeEnv, .HolderFactory = holderFactory, .Alloc = Alloc, @@ -1432,11 +1434,20 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) { Y_ABORT_UNLESS(SourcesMap.FindPtr(ev->Get()->InputIndex) || InputTransformsMap.FindPtr(ev->Get()->InputIndex)); - auto cpuTimeDelta = TakeSourceCpuTimeDelta(); - if (SourceCpuTimeMs) { - SourceCpuTimeMs->Add(cpuTimeDelta.MilliSeconds()); + { + auto cpuTimeDelta = TakeSourceCpuTimeDelta(); + if (SourceCpuTimeMs) { + SourceCpuTimeMs->Add(cpuTimeDelta.MilliSeconds()); + } + CpuTimeSpent += cpuTimeDelta; + } + { + auto cpuTimeDelta = TakeInputTransformCpuTimeDelta(); + if (InputTransformCpuTimeMs) { + InputTransformCpuTimeMs->Add(cpuTimeDelta.MilliSeconds()); + } + CpuTimeSpent += cpuTimeDelta; } - CpuTimeSpent += cpuTimeDelta; ContinueExecute(EResumeSource::CANewAsyncInput); } @@ -1631,6 +1642,21 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped return result; } + TDuration GetInputTransformCpuTime() const { + auto result = TDuration::Zero(); + for (auto& [inputIndex, sourceInfo] : InputTransformsMap) { + result += sourceInfo.AsyncInput->GetCpuTime(); + } + return result; + } + + TDuration TakeInputTransformCpuTimeDelta() { + auto newInputTransformCpuTime = GetInputTransformCpuTime(); + auto result = newInputTransformCpuTime - InputTransformCpuTime; + InputTransformCpuTime = newInputTransformCpuTime; + return result; + } + void FillStats(NDqProto::TDqComputeActorStats* dst, bool last) { if (RuntimeSettings.CollectNone()) { return; @@ -1640,7 +1666,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped ReportEventElapsedTime(); } - dst->SetCpuTimeUs(CpuTime.MicroSeconds() + SourceCpuTime.MicroSeconds()); + dst->SetCpuTimeUs(CpuTime.MicroSeconds() + SourceCpuTime.MicroSeconds() + InputTransformCpuTime.MicroSeconds()); dst->SetMaxMemoryUsage(MemoryLimits.MemoryQuotaManager->GetMaxMemorySize()); if (auto memProfileStats = GetMemoryProfileStats(); memProfileStats) { @@ -1679,7 +1705,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped cpuTimeUs += CpuTime.MicroSeconds(); } // CpuTimeUs does include SourceCpuTime - protoTask->SetCpuTimeUs(cpuTimeUs + SourceCpuTime.MicroSeconds()); + protoTask->SetCpuTimeUs(cpuTimeUs + SourceCpuTime.MicroSeconds() + InputTransformCpuTime.MicroSeconds()); protoTask->SetSourceCpuTimeUs(SourceCpuTime.MicroSeconds()); ui64 ingressBytes = 0; @@ -1947,6 +1973,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped TDqComputeActorMetrics MetricsReporter; NWilson::TSpan ComputeActorSpan; TDuration SourceCpuTime; + TDuration InputTransformCpuTime; private: TInstant StartTime; bool Running = true; @@ -1957,6 +1984,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryQuota; ::NMonitoring::TDynamicCounters::TCounterPtr OutputChannelSize; ::NMonitoring::TDynamicCounters::TCounterPtr SourceCpuTimeMs; + ::NMonitoring::TDynamicCounters::TCounterPtr InputTransformCpuTimeMs; THolder Stat; TDuration CpuTimeSpent; }; diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 4c2e7b2c38f4..e061e7b3ce61 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -33,6 +33,7 @@ class TInputTransformStreamLookupBase ui64 inputIndex, NUdf::TUnboxedValue inputFlow, NActors::TActorId computeActorId, + ::NMonitoring::TDynamicCounterPtr taskCounters, IDqAsyncIoFactory* factory, NDqProto::TDqInputTransformLookupSettings&& settings, TVector&& lookupInputIndexes, @@ -51,6 +52,7 @@ class TInputTransformStreamLookupBase , InputIndex(inputIndex) , InputFlow(std::move(inputFlow)) , ComputeActorId(std::move(computeActorId)) + , TaskCounters(taskCounters) , Factory(factory) , Settings(std::move(settings)) , LookupInputIndexes(std::move(lookupInputIndexes)) @@ -74,6 +76,7 @@ class TInputTransformStreamLookupBase Y_DEBUG_ABORT_UNLESS(OtherInputIndexes[i] < InputRowType->GetElementsCount()); } Y_DEBUG_ABORT_UNLESS(LookupInputIndexes.size() == LookupKeyType->GetMembersCount()); + InitMonCounters(taskCounters); } void Bootstrap() { @@ -82,6 +85,7 @@ class TInputTransformStreamLookupBase .Alloc = Alloc, .KeyTypeHelper = KeyTypeHelper, .ParentId = SelfId(), + .TaskCounters = TaskCounters, .LookupSource = Settings.GetRightSource().GetLookupSource(), .KeyType = LookupKeyType, .PayloadType = LookupPayloadType, @@ -142,6 +146,7 @@ class TInputTransformStreamLookupBase } void Handle(IDqAsyncLookupSource::TEvLookupResult::TPtr ev) { + auto startCycleCount = GetCycleCountFast(); if (!KeysForLookup) return; auto guard = BindAllocator(); @@ -160,6 +165,11 @@ class TInputTransformStreamLookupBase LruCache->Update(NUdf::TUnboxedValue(const_cast(k)), std::move(v), now + CacheTtl); } KeysForLookup->clear(); + auto deltaTime = GetCpuTimeDelta(startCycleCount); + CpuTime += deltaTime; + if (CpuTimeUs) { + CpuTimeUs->Add(deltaTime.MicroSeconds()); + } Send(ComputeActorId, new TEvNewAsyncInputDataArrived{InputIndex}); } @@ -198,13 +208,15 @@ class TInputTransformStreamLookupBase i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe&, bool& finished, i64 freeSpace) final { Y_UNUSED(freeSpace); + auto startCycleCount = GetCycleCountFast(); auto guard = BindAllocator(); DrainReadyQueue(batch); if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && KeysForLookup->empty()) { - NUdf::TUnboxedValue* inputRowItems; - NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems); + Y_DEBUG_ABORT_UNLESS(AwaitingQueue.empty()); + NUdf::TUnboxedValue* inputRowItems; + NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems); const auto now = std::chrono::steady_clock::now(); LruCache->Prune(now); while ( @@ -227,15 +239,43 @@ class TInputTransformStreamLookupBase KeysForLookup->emplace(std::move(key), NUdf::TUnboxedValue{}); } } + if (Batches && (!KeysForLookup->empty() || !ReadyQueue.RowCount())) { + Batches->Inc(); + LruHits->Add(ReadyQueue.RowCount()); + LruMiss->Add(AwaitingQueue.size()); + } if (!KeysForLookup->empty()) { Send(LookupSourceId, new IDqAsyncLookupSource::TEvLookupRequest(KeysForLookup)); } DrainReadyQueue(batch); } + auto deltaTime = GetCpuTimeDelta(startCycleCount); + CpuTime += deltaTime; + if (CpuTimeUs) { + CpuTimeUs->Add(deltaTime.MicroSeconds()); + } finished = IsFinished(); return AwaitingQueue.size(); } + void InitMonCounters(const ::NMonitoring::TDynamicCounterPtr& taskCounters) { + if (!taskCounters) { + return; + } + LruHits = taskCounters->GetCounter("LookupHits"); + LruMiss = taskCounters->GetCounter("LookupMiss"); + CpuTimeUs = taskCounters->GetCounter("LookupCpuUs"); + Batches = taskCounters->GetCounter("LookupBatches"); + } + + static TDuration GetCpuTimeDelta(ui64 startCycleCount) { + return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - startCycleCount)); + } + + TDuration GetCpuTime() override { + return CpuTime; + } + TMaybe ExtraData() override { google::protobuf::Any result; //TODO fill me @@ -267,6 +307,7 @@ class TInputTransformStreamLookupBase ui64 InputIndex; // NYql::NDq::IDqComputeActorAsyncInput NUdf::TUnboxedValue InputFlow; const NActors::TActorId ComputeActorId; + ::NMonitoring::TDynamicCounterPtr TaskCounters; IDqAsyncIoFactory::TPtr Factory; NDqProto::TDqInputTransformLookupSettings Settings; protected: @@ -290,6 +331,12 @@ class TInputTransformStreamLookupBase NKikimr::NMiniKQL::TUnboxedValueBatch ReadyQueue; NYql::NDq::TDqAsyncStats IngressStats; std::shared_ptr KeysForLookup; + + ::NMonitoring::TDynamicCounters::TCounterPtr LruHits; + ::NMonitoring::TDynamicCounters::TCounterPtr LruMiss; + ::NMonitoring::TDynamicCounters::TCounterPtr CpuTimeUs; + ::NMonitoring::TDynamicCounters::TCounterPtr Batches; + TDuration CpuTime; }; class TInputTransformStreamLookupWide: public TInputTransformStreamLookupBase { @@ -536,6 +583,7 @@ std::pair CreateInputTransformStre args.InputIndex, args.TransformInput, args.ComputeActorId, + args.TaskCounters, factory, std::move(settings), std::move(lookupKeyInputIndexes), @@ -555,6 +603,7 @@ std::pair CreateInputTransformStre args.InputIndex, args.TransformInput, args.ComputeActorId, + args.TaskCounters, factory, std::move(settings), std::move(lookupKeyInputIndexes), diff --git a/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp b/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp index e85bdfa01e14..619f10a99bcf 100644 --- a/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp +++ b/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp @@ -176,6 +176,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) { connectorMock, std::make_shared(), edge, + nullptr, alloc, keyTypeHelper, std::move(lookupSourceSettings), diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp index d5cfd19f3336..720c84c1bee0 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp @@ -64,6 +64,7 @@ namespace NYql::NDq { NConnector::IClient::TPtr connectorClient, TGenericTokenProvider::TPtr tokenProvider, NActors::TActorId&& parentId, + ::NMonitoring::TDynamicCounterPtr taskCounters, std::shared_ptr alloc, std::shared_ptr keyTypeHelper, NYql::Generic::TLookupSource&& lookupSource, @@ -85,6 +86,7 @@ namespace NYql::NDq { , ColumnDestinations(CreateColumnDestination()) , MaxKeysInRequest(maxKeysInRequest) { + InitMonCounters(taskCounters); } ~TGenericLookupActor() { @@ -97,6 +99,18 @@ namespace NYql::NDq { Request.reset(); KeyTypeHelper.reset(); } + void InitMonCounters(const ::NMonitoring::TDynamicCounterPtr& taskCounters) { + if (!taskCounters) { + return; + } + Count = taskCounters->GetCounter("LookupSrcReqs"); + Keys = taskCounters->GetCounter("LookupSrcKeys"); + ResultChunks = taskCounters->GetCounter("LookupSrcChunks"); + ResultRows = taskCounters->GetCounter("LookupSrcRows"); + ResultBytes = taskCounters->GetCounter("LookupSrcBytes"); + AnswerTime = taskCounters->GetCounter("LookupSrcAnswerMs"); + CpuTime = taskCounters->GetCounter("LookupSrcCpuUs"); + } public: void Bootstrap() { @@ -215,13 +229,24 @@ namespace NYql::NDq { } private: + static TDuration GetCpuTimeDelta(ui64 startCycleCount) { + return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - startCycleCount)); + } + void CreateRequest(std::shared_ptr request) { if (!request) { return; } + auto startCycleCount = GetCycleCountFast(); + SentTime = TInstant::Now(); YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request->size() << " keys"; Y_ABORT_IF(request->size() == 0 || request->size() > MaxKeysInRequest); + if (Count) { + Count->Inc(); + Keys->Add(request->size()); + } + Request = std::move(request); NConnector::NApi::TListSplitsRequest splitRequest; @@ -243,6 +268,9 @@ namespace NYql::NDq { SendError(actorSystem, selfId, result.Status); } }); + if (CpuTime) { + CpuTime->Add(GetCpuTimeDelta(startCycleCount).MicroSeconds()); + } } void ReadNextData() { @@ -270,7 +298,15 @@ namespace NYql::NDq { } void ProcessReceivedData(const NConnector::NApi::TReadSplitsResponse& resp) { + auto startCycleCount = GetCycleCountFast(); Y_ABORT_UNLESS(resp.payload_case() == NConnector::NApi::TReadSplitsResponse::PayloadCase::kArrowIpcStreaming); + if (ResultChunks) { + ResultChunks->Inc(); + if (resp.has_stats()) { + ResultRows->Add(resp.stats().rows()); + ResultBytes->Add(resp.stats().bytes()); + } + } auto guard = Guard(*Alloc); NKikimr::NArrow::NSerialization::TSerializerContainer deser = NKikimr::NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer(); // todo move to class' member const auto& data = deser->Deserialize(resp.arrow_ipc_streaming()); @@ -296,12 +332,18 @@ namespace NYql::NDq { *v = std::move(output); // duplicates will be overwritten } } + if (CpuTime) { + CpuTime->Add(GetCpuTimeDelta(startCycleCount).MicroSeconds()); + } } void FinalizeRequest() { YQL_CLOG(DEBUG, ProviderGeneric) << "Sending lookup results for " << Request->size() << " keys"; auto guard = Guard(*Alloc); auto ev = new IDqAsyncLookupSource::TEvLookupResult(Request); + if (AnswerTime) { + AnswerTime->Add((TInstant::Now() - SentTime).MilliSeconds()); + } Request.reset(); TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(ParentId, SelfId(), ev)); LookupResult = {}; @@ -404,12 +446,21 @@ namespace NYql::NDq { std::shared_ptr Request; NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult; + ::NMonitoring::TDynamicCounters::TCounterPtr Count; + ::NMonitoring::TDynamicCounters::TCounterPtr Keys; + ::NMonitoring::TDynamicCounters::TCounterPtr ResultRows; + ::NMonitoring::TDynamicCounters::TCounterPtr ResultBytes; + ::NMonitoring::TDynamicCounters::TCounterPtr ResultChunks; + ::NMonitoring::TDynamicCounters::TCounterPtr AnswerTime; + ::NMonitoring::TDynamicCounters::TCounterPtr CpuTime; + TInstant SentTime; }; std::pair CreateGenericLookupActor( NConnector::IClient::TPtr connectorClient, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NActors::TActorId parentId, + ::NMonitoring::TDynamicCounterPtr taskCounters, std::shared_ptr alloc, std::shared_ptr keyTypeHelper, NYql::Generic::TLookupSource&& lookupSource, @@ -425,6 +476,7 @@ namespace NYql::NDq { connectorClient, std::move(tokenProvider), std::move(parentId), + taskCounters, alloc, keyTypeHelper, std::move(lookupSource), diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h index 9f8c0c268f23..128964b1553f 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h +++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h @@ -15,6 +15,7 @@ namespace NYql::NDq { NConnector::IClient::TPtr connectorClient, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NActors::TActorId parentId, + ::NMonitoring::TDynamicCounterPtr taskCounters, std::shared_ptr alloc, std::shared_ptr keyTypeHelper, NYql::Generic::TLookupSource&& lookupSource, diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp index a7422cd018df..7aaa837d961b 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp @@ -22,6 +22,7 @@ namespace NYql::NDq { genericClient, credentialsFactory, std::move(args.ParentId), + args.TaskCounters, args.Alloc, args.KeyTypeHelper, std::move(lookupSource), diff --git a/ydb/tests/fq/generic/streaming/test_join.py b/ydb/tests/fq/generic/streaming/test_join.py index 61fa4210180b..0f8cb0c4c0c1 100644 --- a/ydb/tests/fq/generic/streaming/test_join.py +++ b/ydb/tests/fq/generic/streaming/test_join.py @@ -519,6 +519,17 @@ def test_streamlookup( messages_ctr = Counter(map(freeze, map(json.loads, map(itemgetter(1), messages)))) assert read_data_ctr == messages_ctr + if DEBUG: + for node_index in kikimr.compute_plane.kikimr_cluster.nodes: + sensors = kikimr.compute_plane.get_sensors(node_index, "dq_tasks").find_sensors( + labels={"operation": query_id}, key_label="sensor" + ) + for k in sensors: + for prefix in ("Lookup", "InputTransform"): + if k.startswith(prefix): + print(f'node[{node_index}].operation[{query_id}].{k} = {sensors[k]}', file=sys.stderr) + break + fq_client.abort_query(query_id) fq_client.wait_query(query_id) diff --git a/ydb/tests/tools/fq_runner/kikimr_metrics.py b/ydb/tests/tools/fq_runner/kikimr_metrics.py index 006e95119738..08825289171b 100644 --- a/ydb/tests/tools/fq_runner/kikimr_metrics.py +++ b/ydb/tests/tools/fq_runner/kikimr_metrics.py @@ -31,7 +31,7 @@ def find_sensors(self, labels, key_label): continue v = lbls.get(key_label, None) if v is not None: - result[v] = s["value"] + result[v] = s.get("value", None) return result def collect_non_zeros(self):