Skip to content

Commit

Permalink
Thread safely SeriesPartitionTable (apache#12679)
Browse files Browse the repository at this point in the history
* finish

* use concurrentskiplist

* bug fix

---------

Co-authored-by: OneSizeFitQuorum <tanxinyu@apache.org>
  • Loading branch information
2 people authored and SzyWilliam committed Nov 22, 2024
1 parent 03938b6 commit 70c493e
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 22 deletions.
4 changes: 1 addition & 3 deletions .mvn/develocity.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
<buildScan>
<backgroundBuildScanUpload>#{isFalse(env['GITHUB_ACTIONS'])}</backgroundBuildScanUpload>
<publishing>
<onlyIf>
<![CDATA[authenticated]]>
</onlyIf>
<onlyIf><![CDATA[authenticated]]></onlyIf>
</publishing>
<obfuscation>
<ipAddresses>#{{'0.0.0.0'}}</ipAddresses>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) {
return getConsensusManager().write(plan);
} catch (ConsensusException e) {
// The allocation might fail due to consensus error
LOGGER.error("Write DataPartition allocation result failed because: {}", status);
LOGGER.error("Write partition allocation result failed because: {}", status);
TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
return res;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,35 +33,35 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

public class SeriesPartitionTable {

private final TreeMap<TTimePartitionSlot, List<TConsensusGroupId>> seriesPartitionMap;
private final ConcurrentSkipListMap<TTimePartitionSlot, List<TConsensusGroupId>>
seriesPartitionMap;

public SeriesPartitionTable() {
this.seriesPartitionMap = new TreeMap<>();
this.seriesPartitionMap = new ConcurrentSkipListMap<>();
}

public SeriesPartitionTable(Map<TTimePartitionSlot, List<TConsensusGroupId>> seriesPartitionMap) {
this.seriesPartitionMap = new TreeMap<>(seriesPartitionMap);
this.seriesPartitionMap = new ConcurrentSkipListMap<>(seriesPartitionMap);
}

public Map<TTimePartitionSlot, List<TConsensusGroupId>> getSeriesPartitionMap() {
return seriesPartitionMap;
}

public void putDataPartition(TTimePartitionSlot timePartitionSlot, TConsensusGroupId groupId) {
seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new ArrayList<>()).add(groupId);
seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new Vector<>()).add(groupId);
}

/**
Expand All @@ -75,7 +75,6 @@ public boolean getDataPartition(
TTimeSlotList partitionSlotList, SeriesPartitionTable seriesPartitionTable) {
AtomicBoolean result = new AtomicBoolean(true);
List<TTimePartitionSlot> partitionSlots = partitionSlotList.getTimePartitionSlots();

if (partitionSlots.isEmpty()) {
// Return all DataPartitions in one SeriesPartitionSlot
// when the queried TimePartitionSlots are empty
Expand All @@ -84,7 +83,8 @@ public boolean getDataPartition(
boolean isNeedLeftAll = partitionSlotList.isNeedLeftAll(),
isNeedRightAll = partitionSlotList.isNeedRightAll();
if (isNeedLeftAll || isNeedRightAll) {
// we need to calculate the leftMargin which contains all the time partition on the unclosed
// we need to calculate the leftMargin which contains all the time partition on the
// unclosed
// left side: (-oo, leftMargin)
// and the rightMargin which contains all the time partition on the unclosed right side:
// (rightMargin, +oo)
Expand All @@ -106,7 +106,6 @@ public boolean getDataPartition(
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}

// Return the DataPartition for each match TimePartitionSlot
partitionSlots.forEach(
timePartitionSlot -> {
Expand All @@ -119,7 +118,6 @@ public boolean getDataPartition(
}
});
}

return result.get();
}

Expand Down Expand Up @@ -153,7 +151,7 @@ public TConsensusGroupId getPredecessorDataPartition(TTimePartitionSlot timePart
* @return the timePartition's corresponding dataRegionIds. return the dataRegions which
* timeslotIds are in the time range [startTimeSlotId, endTimeSlotId].
*/
List<TConsensusGroupId> getRegionId(
public List<TConsensusGroupId> getRegionId(
TTimePartitionSlot startTimeSlotId, TTimePartitionSlot endTimeSlotId) {
return seriesPartitionMap.entrySet().stream()
.filter(
Expand All @@ -164,7 +162,7 @@ List<TConsensusGroupId> getRegionId(
.collect(Collectors.toList());
}

List<TTimePartitionSlot> getTimeSlotList(
public List<TTimePartitionSlot> getTimeSlotList(
TConsensusGroupId regionId, long startTime, long endTime) {
if (regionId.getId() == -1) {
return seriesPartitionMap.keySet().stream()
Expand Down Expand Up @@ -214,14 +212,12 @@ public void createDataPartition(
public synchronized List<TTimePartitionSlot> filterUnassignedDataPartitionSlots(
List<TTimePartitionSlot> partitionSlots) {
List<TTimePartitionSlot> result = new Vector<>();

partitionSlots.forEach(
timePartitionSlot -> {
if (!seriesPartitionMap.containsKey(timePartitionSlot)) {
result.add(timePartitionSlot);
}
});

return result;
}

Expand Down Expand Up @@ -258,13 +254,11 @@ public void deserialize(ByteBuffer buffer) {
for (int i = 0; i < timePartitionSlotNum; i++) {
TTimePartitionSlot timePartitionSlot =
ThriftCommonsSerDeUtils.deserializeTTimePartitionSlot(buffer);

int consensusGroupIdNum = buffer.getInt();
List<TConsensusGroupId> consensusGroupIds = new Vector<>();
for (int j = 0; j < consensusGroupIdNum; j++) {
consensusGroupIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer));
}

seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
}
}
Expand All @@ -276,15 +270,13 @@ public void deserialize(InputStream inputStream, TProtocol protocol)
for (int i = 0; i < timePartitionSlotNum; i++) {
TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
timePartitionSlot.read(protocol);

int consensusGroupIdNum = ReadWriteIOUtils.readInt(inputStream);
List<TConsensusGroupId> consensusGroupIds = new Vector<>();
for (int j = 0; j < consensusGroupIdNum; j++) {
TConsensusGroupId consensusGroupId = new TConsensusGroupId();
consensusGroupId.read(protocol);
consensusGroupIds.add(consensusGroupId);
}

seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
}
}
Expand Down

0 comments on commit 70c493e

Please sign in to comment.