Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread safely SeriesPartitionTable #12679

Merged
merged 5 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) {
return getConsensusManager().write(plan);
} catch (ConsensusException e) {
// The allocation might fail due to consensus error
LOGGER.error("Write DataPartition allocation result failed because: {}", status);
LOGGER.error("Write partition allocation result failed because: {}", status);
TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
return res;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,26 @@
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

public class SeriesPartitionTable {

private final TreeMap<TTimePartitionSlot, List<TConsensusGroupId>> seriesPartitionMap;
private final ConcurrentSkipListMap<TTimePartitionSlot, List<TConsensusGroupId>>
seriesPartitionMap;

public SeriesPartitionTable() {
this.seriesPartitionMap = new TreeMap<>();
this.seriesPartitionMap = new ConcurrentSkipListMap<>();
}

public SeriesPartitionTable(Map<TTimePartitionSlot, List<TConsensusGroupId>> seriesPartitionMap) {
this.seriesPartitionMap = new TreeMap<>(seriesPartitionMap);
this.seriesPartitionMap = new ConcurrentSkipListMap<>(seriesPartitionMap);
}

public Map<TTimePartitionSlot, List<TConsensusGroupId>> getSeriesPartitionMap() {
return seriesPartitionMap;
return new TreeMap<>(seriesPartitionMap);
}

public void putDataPartition(TTimePartitionSlot timePartitionSlot, TConsensusGroupId groupId) {
Expand All @@ -75,7 +77,6 @@ public boolean getDataPartition(
TTimeSlotList partitionSlotList, SeriesPartitionTable seriesPartitionTable) {
AtomicBoolean result = new AtomicBoolean(true);
List<TTimePartitionSlot> partitionSlots = partitionSlotList.getTimePartitionSlots();

if (partitionSlots.isEmpty()) {
// Return all DataPartitions in one SeriesPartitionSlot
// when the queried TimePartitionSlots are empty
Expand All @@ -84,7 +85,8 @@ public boolean getDataPartition(
boolean isNeedLeftAll = partitionSlotList.isNeedLeftAll(),
isNeedRightAll = partitionSlotList.isNeedRightAll();
if (isNeedLeftAll || isNeedRightAll) {
// we need to calculate the leftMargin which contains all the time partition on the unclosed
// we need to calculate the leftMargin which contains all the time partition on the
// unclosed
// left side: (-oo, leftMargin)
// and the rightMargin which contains all the time partition on the unclosed right side:
// (rightMargin, +oo)
Expand All @@ -106,7 +108,6 @@ public boolean getDataPartition(
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}

// Return the DataPartition for each match TimePartitionSlot
partitionSlots.forEach(
timePartitionSlot -> {
Expand All @@ -119,7 +120,6 @@ public boolean getDataPartition(
}
});
}

return result.get();
}

Expand Down Expand Up @@ -153,7 +153,7 @@ public TConsensusGroupId getPredecessorDataPartition(TTimePartitionSlot timePart
* @return the timePartition's corresponding dataRegionIds. return the dataRegions which
* timeslotIds are in the time range [startTimeSlotId, endTimeSlotId].
*/
List<TConsensusGroupId> getRegionId(
public List<TConsensusGroupId> getRegionId(
TTimePartitionSlot startTimeSlotId, TTimePartitionSlot endTimeSlotId) {
return seriesPartitionMap.entrySet().stream()
.filter(
Expand All @@ -164,7 +164,7 @@ List<TConsensusGroupId> getRegionId(
.collect(Collectors.toList());
}

List<TTimePartitionSlot> getTimeSlotList(
public List<TTimePartitionSlot> getTimeSlotList(
TConsensusGroupId regionId, long startTime, long endTime) {
if (regionId.getId() == -1) {
return seriesPartitionMap.keySet().stream()
Expand Down Expand Up @@ -214,14 +214,12 @@ public void createDataPartition(
public synchronized List<TTimePartitionSlot> filterUnassignedDataPartitionSlots(
List<TTimePartitionSlot> partitionSlots) {
List<TTimePartitionSlot> result = new Vector<>();

partitionSlots.forEach(
timePartitionSlot -> {
if (!seriesPartitionMap.containsKey(timePartitionSlot)) {
result.add(timePartitionSlot);
}
});

return result;
}

Expand Down Expand Up @@ -258,13 +256,11 @@ public void deserialize(ByteBuffer buffer) {
for (int i = 0; i < timePartitionSlotNum; i++) {
TTimePartitionSlot timePartitionSlot =
ThriftCommonsSerDeUtils.deserializeTTimePartitionSlot(buffer);

int consensusGroupIdNum = buffer.getInt();
List<TConsensusGroupId> consensusGroupIds = new Vector<>();
for (int j = 0; j < consensusGroupIdNum; j++) {
consensusGroupIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer));
}

seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
}
}
Expand All @@ -276,15 +272,13 @@ public void deserialize(InputStream inputStream, TProtocol protocol)
for (int i = 0; i < timePartitionSlotNum; i++) {
TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
timePartitionSlot.read(protocol);

int consensusGroupIdNum = ReadWriteIOUtils.readInt(inputStream);
List<TConsensusGroupId> consensusGroupIds = new Vector<>();
for (int j = 0; j < consensusGroupIdNum; j++) {
TConsensusGroupId consensusGroupId = new TConsensusGroupId();
consensusGroupId.read(protocol);
consensusGroupIds.add(consensusGroupId);
}

seriesPartitionMap.put(timePartitionSlot, consensusGroupIds);
}
}
Expand Down
Loading