Skip to content

Commit

Permalink
Merge 6c3a910 into 0da7943
Browse files Browse the repository at this point in the history
  • Loading branch information
yumkam authored Oct 11, 2024
2 parents 0da7943 + 6c3a910 commit 61b1194
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 9 deletions.
2 changes: 2 additions & 0 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ struct IDqAsyncIoFactory : public TThrRefBase {
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> KeyTypeHelper;
NActors::TActorId ParentId;
::NMonitoring::TDynamicCounterPtr TaskCounters;
google::protobuf::Any LookupSource; //provider specific data source
const NKikimr::NMiniKQL::TStructType* KeyType;
const NKikimr::NMiniKQL::TStructType* PayloadType;
Expand Down Expand Up @@ -299,6 +300,7 @@ struct IDqAsyncIoFactory : public TThrRefBase {
const THashMap<TString, TString>& SecureParams;
const THashMap<TString, TString>& TaskParams;
const NActors::TActorId& ComputeActorId;
::NMonitoring::TDynamicCounterPtr TaskCounters;
const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
Expand Down
40 changes: 34 additions & 6 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
MkqlMemoryQuota = taskCounters->GetCounter("MkqlMemoryQuota");
OutputChannelSize = taskCounters->GetCounter("OutputChannelSize");
SourceCpuTimeMs = taskCounters->GetCounter("SourceCpuTimeMs", true);
InputTransformCpuTimeMs = taskCounters->GetCounter("InputTransformCpuTimeMs", true);
}
}

Expand Down Expand Up @@ -1340,6 +1341,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
.SecureParams = secureParams,
.TaskParams = taskParams,
.ComputeActorId = this->SelfId(),
.TaskCounters = TaskCounters,
.TypeEnv = typeEnv,
.HolderFactory = holderFactory,
.Alloc = Alloc,
Expand Down Expand Up @@ -1432,11 +1434,20 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>

void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) {
Y_ABORT_UNLESS(SourcesMap.FindPtr(ev->Get()->InputIndex) || InputTransformsMap.FindPtr(ev->Get()->InputIndex));
auto cpuTimeDelta = TakeSourceCpuTimeDelta();
if (SourceCpuTimeMs) {
SourceCpuTimeMs->Add(cpuTimeDelta.MilliSeconds());
{
auto cpuTimeDelta = TakeSourceCpuTimeDelta();
if (SourceCpuTimeMs) {
SourceCpuTimeMs->Add(cpuTimeDelta.MilliSeconds());
}
CpuTimeSpent += cpuTimeDelta;
}
{
auto cpuTimeDelta = TakeInputTransformCpuTimeDelta();
if (InputTransformCpuTimeMs) {
InputTransformCpuTimeMs->Add(cpuTimeDelta.MilliSeconds());
}
CpuTimeSpent += cpuTimeDelta;
}
CpuTimeSpent += cpuTimeDelta;
ContinueExecute(EResumeSource::CANewAsyncInput);
}

Expand Down Expand Up @@ -1631,6 +1642,21 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
return result;
}

TDuration GetInputTransformCpuTime() const {
auto result = TDuration::Zero();
for (auto& [inputIndex, sourceInfo] : InputTransformsMap) {
result += sourceInfo.AsyncInput->GetCpuTime();
}
return result;
}

TDuration TakeInputTransformCpuTimeDelta() {
auto newInputTransformCpuTime = GetInputTransformCpuTime();
auto result = newInputTransformCpuTime - InputTransformCpuTime;
InputTransformCpuTime = newInputTransformCpuTime;
return result;
}

void FillStats(NDqProto::TDqComputeActorStats* dst, bool last) {
if (RuntimeSettings.CollectNone()) {
return;
Expand All @@ -1640,7 +1666,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
ReportEventElapsedTime();
}

dst->SetCpuTimeUs(CpuTime.MicroSeconds() + SourceCpuTime.MicroSeconds());
dst->SetCpuTimeUs(CpuTime.MicroSeconds() + SourceCpuTime.MicroSeconds() + InputTransformCpuTime.MicroSeconds());
dst->SetMaxMemoryUsage(MemoryLimits.MemoryQuotaManager->GetMaxMemorySize());

if (auto memProfileStats = GetMemoryProfileStats(); memProfileStats) {
Expand Down Expand Up @@ -1679,7 +1705,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
cpuTimeUs += CpuTime.MicroSeconds();
}
// CpuTimeUs does include SourceCpuTime
protoTask->SetCpuTimeUs(cpuTimeUs + SourceCpuTime.MicroSeconds());
protoTask->SetCpuTimeUs(cpuTimeUs + SourceCpuTime.MicroSeconds() + InputTransformCpuTime.MicroSeconds());
protoTask->SetSourceCpuTimeUs(SourceCpuTime.MicroSeconds());

ui64 ingressBytes = 0;
Expand Down Expand Up @@ -1947,6 +1973,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
TDqComputeActorMetrics MetricsReporter;
NWilson::TSpan ComputeActorSpan;
TDuration SourceCpuTime;
TDuration InputTransformCpuTime;
private:
TInstant StartTime;
bool Running = true;
Expand All @@ -1957,6 +1984,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryQuota;
::NMonitoring::TDynamicCounters::TCounterPtr OutputChannelSize;
::NMonitoring::TDynamicCounters::TCounterPtr SourceCpuTimeMs;
::NMonitoring::TDynamicCounters::TCounterPtr InputTransformCpuTimeMs;
THolder<NYql::TCounters> Stat;
TDuration CpuTimeSpent;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class TInputTransformStreamLookupBase
ui64 inputIndex,
NUdf::TUnboxedValue inputFlow,
NActors::TActorId computeActorId,
::NMonitoring::TDynamicCounterPtr taskCounters,
IDqAsyncIoFactory* factory,
NDqProto::TDqInputTransformLookupSettings&& settings,
TVector<size_t>&& lookupInputIndexes,
Expand All @@ -51,6 +52,7 @@ class TInputTransformStreamLookupBase
, InputIndex(inputIndex)
, InputFlow(std::move(inputFlow))
, ComputeActorId(std::move(computeActorId))
, TaskCounters(taskCounters)
, Factory(factory)
, Settings(std::move(settings))
, LookupInputIndexes(std::move(lookupInputIndexes))
Expand All @@ -74,6 +76,7 @@ class TInputTransformStreamLookupBase
Y_DEBUG_ABORT_UNLESS(OtherInputIndexes[i] < InputRowType->GetElementsCount());
}
Y_DEBUG_ABORT_UNLESS(LookupInputIndexes.size() == LookupKeyType->GetMembersCount());
InitMonCounters(taskCounters);
}

void Bootstrap() {
Expand All @@ -82,6 +85,7 @@ class TInputTransformStreamLookupBase
.Alloc = Alloc,
.KeyTypeHelper = KeyTypeHelper,
.ParentId = SelfId(),
.TaskCounters = TaskCounters,
.LookupSource = Settings.GetRightSource().GetLookupSource(),
.KeyType = LookupKeyType,
.PayloadType = LookupPayloadType,
Expand Down Expand Up @@ -142,6 +146,7 @@ class TInputTransformStreamLookupBase
}

void Handle(IDqAsyncLookupSource::TEvLookupResult::TPtr ev) {
auto startCycleCount = GetCycleCountFast();
if (!KeysForLookup)
return;
auto guard = BindAllocator();
Expand All @@ -160,6 +165,11 @@ class TInputTransformStreamLookupBase
LruCache->Update(NUdf::TUnboxedValue(const_cast<NUdf::TUnboxedValue&&>(k)), std::move(v), now + CacheTtl);
}
KeysForLookup->clear();
auto deltaTime = GetCpuTimeDelta(startCycleCount);
CpuTime += deltaTime;
if (CpuTimeUs) {
CpuTimeUs->Add(deltaTime.MicroSeconds());
}
Send(ComputeActorId, new TEvNewAsyncInputDataArrived{InputIndex});
}

Expand Down Expand Up @@ -198,13 +208,15 @@ class TInputTransformStreamLookupBase

i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
Y_UNUSED(freeSpace);
auto startCycleCount = GetCycleCountFast();
auto guard = BindAllocator();

DrainReadyQueue(batch);

if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && KeysForLookup->empty()) {
NUdf::TUnboxedValue* inputRowItems;
NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems);
Y_DEBUG_ABORT_UNLESS(AwaitingQueue.empty());
NUdf::TUnboxedValue* inputRowItems;
NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems);
const auto now = std::chrono::steady_clock::now();
LruCache->Prune(now);
while (
Expand All @@ -227,15 +239,43 @@ class TInputTransformStreamLookupBase
KeysForLookup->emplace(std::move(key), NUdf::TUnboxedValue{});
}
}
if (Batches && (!KeysForLookup->empty() || !ReadyQueue.RowCount())) {
Batches->Inc();
LruHits->Add(ReadyQueue.RowCount());
LruMiss->Add(AwaitingQueue.size());
}
if (!KeysForLookup->empty()) {
Send(LookupSourceId, new IDqAsyncLookupSource::TEvLookupRequest(KeysForLookup));
}
DrainReadyQueue(batch);
}
auto deltaTime = GetCpuTimeDelta(startCycleCount);
CpuTime += deltaTime;
if (CpuTimeUs) {
CpuTimeUs->Add(deltaTime.MicroSeconds());
}
finished = IsFinished();
return AwaitingQueue.size();
}

void InitMonCounters(const ::NMonitoring::TDynamicCounterPtr& taskCounters) {
if (!taskCounters) {
return;
}
LruHits = taskCounters->GetCounter("LookupHits");
LruMiss = taskCounters->GetCounter("LookupMiss");
CpuTimeUs = taskCounters->GetCounter("LookupCpuUs");
Batches = taskCounters->GetCounter("LookupBatches");
}

static TDuration GetCpuTimeDelta(ui64 startCycleCount) {
return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - startCycleCount));
}

TDuration GetCpuTime() override {
return CpuTime;
}

TMaybe<google::protobuf::Any> ExtraData() override {
google::protobuf::Any result;
//TODO fill me
Expand Down Expand Up @@ -267,6 +307,7 @@ class TInputTransformStreamLookupBase
ui64 InputIndex; // NYql::NDq::IDqComputeActorAsyncInput
NUdf::TUnboxedValue InputFlow;
const NActors::TActorId ComputeActorId;
::NMonitoring::TDynamicCounterPtr TaskCounters;
IDqAsyncIoFactory::TPtr Factory;
NDqProto::TDqInputTransformLookupSettings Settings;
protected:
Expand All @@ -290,6 +331,12 @@ class TInputTransformStreamLookupBase
NKikimr::NMiniKQL::TUnboxedValueBatch ReadyQueue;
NYql::NDq::TDqAsyncStats IngressStats;
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> KeysForLookup;

::NMonitoring::TDynamicCounters::TCounterPtr LruHits;
::NMonitoring::TDynamicCounters::TCounterPtr LruMiss;
::NMonitoring::TDynamicCounters::TCounterPtr CpuTimeUs;
::NMonitoring::TDynamicCounters::TCounterPtr Batches;
TDuration CpuTime;
};

class TInputTransformStreamLookupWide: public TInputTransformStreamLookupBase {
Expand Down Expand Up @@ -536,6 +583,7 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateInputTransformStre
args.InputIndex,
args.TransformInput,
args.ComputeActorId,
args.TaskCounters,
factory,
std::move(settings),
std::move(lookupKeyInputIndexes),
Expand All @@ -555,6 +603,7 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateInputTransformStre
args.InputIndex,
args.TransformInput,
args.ComputeActorId,
args.TaskCounters,
factory,
std::move(settings),
std::move(lookupKeyInputIndexes),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
connectorMock,
std::make_shared<NYql::NTestCreds::TSecuredServiceAccountCredentialsFactory>(),
edge,
nullptr,
alloc,
keyTypeHelper,
std::move(lookupSourceSettings),
Expand Down
Loading

0 comments on commit 61b1194

Please sign in to comment.