Skip to content

Commit

Permalink
grace_join: fix batch mode performance (#6705)
Browse files Browse the repository at this point in the history
  • Loading branch information
yumkam authored Jul 16, 2024
1 parent c02bfea commit 98a74be
Show file tree
Hide file tree
Showing 3 changed files with 294 additions and 76 deletions.
94 changes: 54 additions & 40 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
if( hasMoreRightTuples )
RightTableBatch_ = true;

auto table1Batch = LeftTableBatch_;
auto table2Batch = RightTableBatch_;

JoinTable1 = &t1;
JoinTable2 = &t2;

Expand All @@ -333,13 +336,11 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef

if ( JoinKind == EJoinKind::Right || JoinKind == EJoinKind::RightOnly || JoinKind == EJoinKind::RightSemi ) {
std::swap(JoinTable1, JoinTable2);
std::swap(table1Batch, table2Batch);
}

ui64 tuplesFound = 0;

std::vector<ui64, TMKQLAllocator<ui64, EMemorySubPool::Temporary>> joinSlots;
ui64 reservedSize = 6 * (DefaultTupleBytes * DefaultTuplesNum) / sizeof(ui64);
joinSlots.reserve( reservedSize );
std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds, EMemorySubPool::Temporary>> joinResults;


Expand All @@ -361,9 +362,10 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
bool table2HasKeyStringColumns = (JoinTable2->NumberOfKeyStringColumns != 0);
bool table1HasKeyIColumns = (JoinTable1->NumberOfKeyIColumns != 0);
bool table2HasKeyIColumns = (JoinTable2->NumberOfKeyIColumns != 0);
bool swapTables = tuplesNum2 > tuplesNum1 && !table1Batch || table2Batch;


if (tuplesNum2 > tuplesNum1) {
if (swapTables) {
std::swap(bucket1, bucket2);
std::swap(headerSize1, headerSize2);
std::swap(nullsSize1, nullsSize2);
Expand All @@ -373,7 +375,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
std::swap(tuplesNum1, tuplesNum2);
}

if (tuplesNum2 == 0)
if (tuplesNum2 == 0 || tuplesNum1 == 0)
continue;

ui64 slotSize = headerSize2 + 1;
Expand All @@ -384,10 +386,15 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
slotSize = slotSize + avgStringsSize;
}


ui64 nSlots = (3 * tuplesNum2 + 1) | 1;
joinSlots.clear();
joinSlots.resize(nSlots*slotSize, 0);
ui64 &nSlots = bucket2->NSlots;
auto &joinSlots = bucket2->JoinSlots;
bool initHashTable = false;

if (!nSlots) {
nSlots = (3 * tuplesNum2 + 1) | 1;
joinSlots.resize(nSlots*slotSize, 0);
initHashTable = true;
}

auto firstSlot = [begin = joinSlots.begin(), slotSize, nSlots](auto hash) {
ui64 slotNum = hash % nSlots;
Expand All @@ -401,35 +408,37 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
return it;
};

ui32 tuple2Idx = 0;
auto it2 = bucket2->KeyIntVals.begin();
for (ui64 keysValSize = headerSize2; it2 != bucket2->KeyIntVals.end(); it2 += keysValSize, ++tuple2Idx) {
if ( table2HasKeyStringColumns || table2HasKeyIColumns) {
keysValSize = headerSize2 + *(it2 + headerSize2 - 1) ;
}
if (initHashTable) {
ui32 tuple2Idx = 0;
auto it2 = bucket2->KeyIntVals.begin();
for (ui64 keysValSize = headerSize2; it2 != bucket2->KeyIntVals.end(); it2 += keysValSize, ++tuple2Idx) {
if ( table2HasKeyStringColumns || table2HasKeyIColumns) {
keysValSize = headerSize2 + *(it2 + headerSize2 - 1) ;
}

ui64 hash = *it2;
ui64 * nullsPtr = it2+1;
if (HasBitSet(nullsPtr, 1))
continue;
ui64 hash = *it2;
ui64 * nullsPtr = it2+1;
if (HasBitSet(nullsPtr, 1))
continue;

auto slotIt = firstSlot(hash);
auto slotIt = firstSlot(hash);

for (; *slotIt != 0; slotIt = nextSlot(slotIt))
{
}
for (; *slotIt != 0; slotIt = nextSlot(slotIt))
{
}

if (keysValSize <= slotSize - 1)
{
std::copy_n(it2, keysValSize, slotIt);
}
else
{
std::copy_n(it2, headerSize2, slotIt);
if (keysValSize <= slotSize - 1)
{
std::copy_n(it2, keysValSize, slotIt);
}
else
{
std::copy_n(it2, headerSize2, slotIt);

*(slotIt + headerSize2) = it2 + headerSize2 - bucket2->KeyIntVals.begin();
*(slotIt + headerSize2) = it2 + headerSize2 - bucket2->KeyIntVals.begin();
}
slotIt[slotSize - 1] = tuple2Idx;
}
slotIt[slotSize - 1] = tuple2Idx;
}


Expand Down Expand Up @@ -462,7 +471,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
if (*slotIt != hash)
continue;

tuple2Idx = slotIt[slotSize - 1];
auto tuple2Idx = slotIt[slotSize - 1];

if (table1HasKeyIColumns || !(keysValSize - nullsSize1 <= slotSize - 1 - nullsSize2)) {
// 2nd condition cannot be true unless HasKeyStringColumns or HasKeyIColumns, hence size at the end of header is present
Expand Down Expand Up @@ -505,16 +514,18 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef

tuplesFound++;
JoinTuplesIds joinIds;
joinIds.id1 = tuple1Idx;
joinIds.id2 = tuple2Idx;
if (JoinTable2->TableBucketsStats[bucket].TuplesNum > JoinTable1->TableBucketsStats[bucket].TuplesNum)
{
std::swap(joinIds.id1, joinIds.id2);
}
joinIds.id1 = swapTables ? tuple2Idx : tuple1Idx;
joinIds.id2 = swapTables ? tuple1Idx : tuple2Idx;
joinResults.emplace_back(joinIds);
}
}

if (!hasMoreLeftTuples && !hasMoreRightTuples) {
joinSlots.clear();
joinSlots.shrink_to_fit();
nSlots = 0;
}

std::sort(joinResults.begin(), joinResults.end(), [](JoinTuplesIds a, JoinTuplesIds b)
{
if (a.id1 < b.id1) return true;
Expand Down Expand Up @@ -560,7 +571,6 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
}

}

}

HasMoreLeftTuples_ = hasMoreLeftTuples;
Expand Down Expand Up @@ -1111,6 +1121,8 @@ void TTable::ClearBucket(ui64 bucket) {
tb.InterfaceOffsets.clear();
tb.JoinIds.clear();
tb.RightIds.clear();
tb.JoinSlots.clear();
tb.NSlots = 0;

TTableBucketStats & tbs = TableBucketsStats[bucket];
tbs.TuplesNum = 0;
Expand All @@ -1128,6 +1140,7 @@ void TTable::ShrinkBucket(ui64 bucket) {
tb.InterfaceOffsets.shrink_to_fit();
tb.JoinIds.shrink_to_fit();
tb.RightIds.shrink_to_fit();
tb.JoinSlots.shrink_to_fit();
}

void TTable::InitializeBucketSpillers(ISpiller::TPtr spiller) {
Expand All @@ -1138,6 +1151,7 @@ void TTable::InitializeBucketSpillers(ISpiller::TPtr spiller) {

ui64 TTable::GetSizeOfBucket(ui64 bucket) const {
return TableBuckets[bucket].KeyIntVals.size() * sizeof(ui64)
+ TableBuckets[bucket].JoinSlots.size() * sizeof(ui64)
+ TableBuckets[bucket].DataIntVals.size() * sizeof(ui64)
+ TableBuckets[bucket].StringsValues.size()
+ TableBuckets[bucket].StringsOffsets.size() * sizeof(ui32)
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ struct TTableBucket {
std::set<ui32> AllLeftMatchedIds; // All row ids of left join table which have matching rows in right table. To process streaming join mode.
std::set<ui32> AllRightMatchedIds; // All row ids of right join table which matching rows in left table. To process streaming join mode.

std::vector<ui64, TMKQLAllocator<ui64>> JoinSlots; // Hashtable
ui64 NSlots = 0; // Hashtable
};

struct TTableBucketStats {
Expand Down
Loading

0 comments on commit 98a74be

Please sign in to comment.