Skip to content

Commit

Permalink
[Improve] Fix possible potential thread unsafe issues (#2043)
Browse files Browse the repository at this point in the history
Co-authored-by: tomsun28 <tomsun28@outlook.com>
  • Loading branch information
gjjjj0101 and tomsun28 authored Jun 14, 2024
1 parent af1f59e commit f10e5db
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@ public Map.Entry<K, V> higherEntry(K key) {
}
}

public Map.Entry<K, V> higherOrFirstEntry(K key){
readWriteLock.readLock().lock();
try {
Map.Entry<K, V> entry = super.higherEntry(key);
if (entry == null) {
return super.firstEntry();
}
return entry;
} finally {
readWriteLock.readLock().unlock();
}
}

@Override
public Map.Entry<K, V> ceilingEntry(K key) {
readWriteLock.readLock().lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,43 @@ public ConsistentHash() {
dispatchJobCache = Collections.synchronizedList(new LinkedList<>());
}

/**
* add virtual node
* @param newNode node
* @param identity virtual node identity
*/
public synchronized void addVirtualNode(Node newNode, String identity){
int virtualHashKey = hash(identity);
hashCircle.put(virtualHashKey, newNode);
newNode.addVirtualNodeJobs(virtualHashKey, ConcurrentHashMap.newKeySet(16));
Map.Entry<Integer, Node> higherVirtualNode = hashCircle.higherOrFirstEntry(virtualHashKey);
// Reassign tasks that are routed to the higherVirtualNode virtual node
// Tasks are either on the original virtual node or on the new virtual node
Integer higherVirtualNodeKey = higherVirtualNode.getKey();
Node higherNode = higherVirtualNode.getValue();
Set<Long[]> dispatchJobs = higherNode.clearVirtualNodeJobs(higherVirtualNodeKey);
if (dispatchJobs != null && !dispatchJobs.isEmpty()) {
Set<Long[]> reDispatchJobs = ConcurrentHashMap.newKeySet(dispatchJobs.size());
Iterator<Long[]> iterator = dispatchJobs.iterator();
while (iterator.hasNext()) {
Long[] jobHash = iterator.next();
int dispatchHash = jobHash[1].intValue();
if (dispatchHash <= virtualHashKey) {
reDispatchJobs.add(jobHash);
iterator.remove();
}
}
higherNode.virtualNodeMap.put(higherVirtualNodeKey, dispatchJobs);
Set<Long> jobIds = reDispatchJobs.stream().map(item -> item[0]).collect(Collectors.toSet());
newNode.addVirtualNodeJobs(virtualHashKey, reDispatchJobs);
if (higherNode != newNode) {
higherNode.assignJobs.removeAssignJobs(jobIds);
higherNode.assignJobs.addRemovingJobs(jobIds);
newNode.assignJobs.addAddingJobs(jobIds);
}
}
}

/**
* add collector node
* @param newNode node
Expand All @@ -73,47 +110,48 @@ public void addNode(Node newNode) {
if (!CommonConstants.MODE_PRIVATE.equals(newNode.mode)) {
byte virtualNodeNum = newNode.quality == null ? VIRTUAL_NODE_DEFAULT_SIZE : newNode.quality;
for (byte i = 0; i < virtualNodeNum; i++) {
int virtualHashKey = hash(newNode.identity + i);
hashCircle.put(virtualHashKey, newNode);
newNode.addVirtualNodeJobs(virtualHashKey, ConcurrentHashMap.newKeySet(16));
Map.Entry<Integer, Node> higherVirtualNode = hashCircle.higherEntry(virtualHashKey);
if (higherVirtualNode == null) {
higherVirtualNode = hashCircle.firstEntry();
}
// Reassign tasks that are routed to the higherVirtualNode virtual node
// Tasks are either on the original virtual node or on the new virtual node
Integer higherVirtualNodeKey = higherVirtualNode.getKey();
Node higherNode = higherVirtualNode.getValue();
Set<Long[]> dispatchJobs = higherNode.clearVirtualNodeJobs(higherVirtualNodeKey);
if (dispatchJobs != null && !dispatchJobs.isEmpty()) {
Set<Long[]> reDispatchJobs = ConcurrentHashMap.newKeySet(dispatchJobs.size());
Iterator<Long[]> iterator = dispatchJobs.iterator();
while (iterator.hasNext()) {
Long[] jobHash = iterator.next();
int dispatchHash = jobHash[1].intValue();
if (dispatchHash <= virtualHashKey) {
reDispatchJobs.add(jobHash);
iterator.remove();
}
}
higherNode.virtualNodeMap.put(higherVirtualNodeKey, dispatchJobs);
Set<Long> jobIds = reDispatchJobs.stream().map(item -> item[0]).collect(Collectors.toSet());
newNode.addVirtualNodeJobs(virtualHashKey, reDispatchJobs);
if (higherNode != newNode) {
higherNode.assignJobs.removeAssignJobs(jobIds);
higherNode.assignJobs.addRemovingJobs(jobIds);
newNode.assignJobs.addAddingJobs(jobIds);
}
}
}
addVirtualNode(newNode, newNode.identity + i);
}
}
existNodeMap.put(newNode.identity, newNode);
if (!dispatchJobCache.isEmpty()) {
int size = dispatchJobCache.size();
for (int index = 0; index < size; index++) {
DispatchJob dispatchJob = dispatchJobCache.remove(0);
dispatchJob(dispatchJob.dispatchHash, dispatchJob.jobId, false);
}
dispatchJobInCache();
}

/**
* remove virtual node
* @param deletedNode node
* @param virtualNodeHash virtual node hash key
*/
public synchronized void removeVirtualNode(Node deletedNode, Integer virtualNodeHash) {
Set<Long[]> removeJobHashSet = deletedNode.virtualNodeMap.get(virtualNodeHash);
// Migrate the virtualNodeEntry collection task to the nearest virtual node that is larger than it
hashCircle.remove(virtualNodeHash);
if (removeJobHashSet == null || removeJobHashSet.isEmpty()) {
return;
}
Map.Entry<Integer, Node> higherVirtualEntry = hashCircle.higherOrFirstEntry(virtualNodeHash);
if (higherVirtualEntry == null || higherVirtualEntry.getValue() == deletedNode) {
higherVirtualEntry = null;
}
// jobId
Set<Long> removeJobIds = removeJobHashSet.stream().map(item -> item[0]).collect(Collectors.toSet());
deletedNode.assignJobs.removeAssignJobs(removeJobIds);
deletedNode.assignJobs.addRemovingJobs(removeJobIds);
if (higherVirtualEntry == null) {
// jobId-dispatchHash
removeJobHashSet.forEach(value -> {
Long jobId = value[0];
Integer dispatchHash = value[1].intValue();
if (removeJobIds.contains(jobId)) {
dispatchJobCache.add(new DispatchJob(dispatchHash, jobId));
} else {
log.error("Get job {} from removeJobMap null.", jobId);
}
});
} else {
Node higherVirtualNode = higherVirtualEntry.getValue();
higherVirtualNode.addVirtualNodeJobs(higherVirtualEntry.getKey(), removeJobHashSet);
higherVirtualNode.assignJobs.addAddingJobs(removeJobIds);
}
}

Expand All @@ -126,51 +164,22 @@ public Node removeNode(String name) {
if (deletedNode == null) {
return null;
}
for (Map.Entry<Integer, Set<Long[]>> virtualNodeEntry : deletedNode.virtualNodeMap.entrySet()) {
Integer virtualNodeHash = virtualNodeEntry.getKey();
Set<Long[]> removeJobHashSet = virtualNodeEntry.getValue();
// Migrate the virtualNodeEntry collection task to the nearest virtual node that is larger than it
hashCircle.remove(virtualNodeHash);
if (removeJobHashSet == null || removeJobHashSet.isEmpty()) {
continue;
}
Map.Entry<Integer, Node> higherVirtualEntry = hashCircle.higherEntry(virtualNodeHash);
if (higherVirtualEntry == null) {
higherVirtualEntry = hashCircle.firstEntry();
}
if (higherVirtualEntry == null || higherVirtualEntry.getValue() == deletedNode) {
higherVirtualEntry = null;
}
// jobId
Set<Long> removeJobIds = removeJobHashSet.stream().map(item -> item[0]).collect(Collectors.toSet());
deletedNode.assignJobs.removeAssignJobs(removeJobIds);
deletedNode.assignJobs.addRemovingJobs(removeJobIds);
if (higherVirtualEntry == null) {
// jobId-dispatchHash
virtualNodeEntry.getValue().forEach(value -> {
Long jobId = value[0];
Integer dispatchHash = value[1].intValue();
if (removeJobIds.contains(jobId)) {
dispatchJobCache.add(new DispatchJob(dispatchHash, jobId));
} else {
log.error("Get job {} from removeJobMap null.", jobId);
}
});
} else {
Node higherVirtualNode = higherVirtualEntry.getValue();
higherVirtualNode.addVirtualNodeJobs(higherVirtualEntry.getKey(), removeJobHashSet);
higherVirtualNode.assignJobs.addAddingJobs(removeJobIds);
}
for (Integer virtualNodeHash : deletedNode.virtualNodeMap.keySet()) {
removeVirtualNode(deletedNode, virtualNodeHash);
}
deletedNode.destroy();
dispatchJobInCache();
return deletedNode;
}

public synchronized void dispatchJobInCache() {
if (!dispatchJobCache.isEmpty()) {
int size = dispatchJobCache.size();
for (int index = 0; index < size; index++) {
DispatchJob dispatchJob = dispatchJobCache.remove(0);
dispatchJob(dispatchJob.dispatchHash, dispatchJob.jobId, false);
}
}
return deletedNode;
}

/**
Expand All @@ -180,7 +189,7 @@ public Node removeNode(String name) {
public Map<String, Node> getAllNodes() {
return existNodeMap;
}

/**
* get node
* @param collectorName collector name
Expand Down Expand Up @@ -213,7 +222,7 @@ public Node dispatchJob(String dispatchKey, Long jobId) {
int dispatchHash = hash(dispatchKey);
return dispatchJob(dispatchHash, jobId, true);
}

/**
* The collector node to which the collector is assigned is obtained in advance based on the collection task information
*
Expand Down Expand Up @@ -250,7 +259,7 @@ public Node dispatchJob(Integer dispatchHash, Long jobId, boolean isFlushed) {
curNode.addJob(virtualKey, dispatchHash, jobId, isFlushed);
return curNode;
}

/**
* The collector node to which the collector is assigned is obtained in advance based on the collection task information
*
Expand Down Expand Up @@ -383,7 +392,7 @@ private synchronized void addJob(Integer virtualNodeKey, Integer dispatchHash, L
Set<Long[]> virtualNodeJob = virtualNodeMap.computeIfAbsent(virtualNodeKey, k -> ConcurrentHashMap.newKeySet(16));
virtualNodeJob.add(new Long[]{jobId, dispatchHash.longValue()});
if (isFlushed) {
assignJobs.addAssignJob(jobId);
assignJobs.addAssignJob(jobId);
} else {
assignJobs.addAddingJob(jobId);
}
Expand Down Expand Up @@ -416,7 +425,7 @@ private void addVirtualNodeJobs(Integer virtualHashKey, Set<Long[]> reDispatchJo
});
virtualNodeMap.put(virtualHashKey, reDispatchJobs);
}

public void removeVirtualNodeJob(Long jobId) {
if (jobId == null || virtualNodeMap == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.manager.scheduler;

import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.apache.hertzbeat.common.util.SnowFlakeIdGenerator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/**
* Test case for {@link ConsistentHash}
*/
public class ConsistentHashTest {

private ConsistentHash consistentHash;

@BeforeEach
void setUp() {
consistentHash = new ConsistentHash();
}

@Test
void testAddNode() {
String job1 = "job1";
long jobId1 = SnowFlakeIdGenerator.generateId();
String job2 = "job2";
long jobId2 = SnowFlakeIdGenerator.generateId();
String job3 = "job3";
long jobId3 = SnowFlakeIdGenerator.generateId();
ConsistentHash.Node node1 = new ConsistentHash.Node("node1", "public","192.168.0.1", System.currentTimeMillis(), (byte) 10);
ConsistentHash.Node node2 = new ConsistentHash.Node("node2", "public","192.168.0.2", System.currentTimeMillis(), (byte) 10);
consistentHash.addNode(node1);
consistentHash.dispatchJob(job1, jobId1);
consistentHash.dispatchJob(job2, jobId2);
consistentHash.dispatchJob(job3, jobId3);
consistentHash.addNode(node2);
assertTrue(node2.getAssignJobs().getAddingJobs().containsAll(node1.getAssignJobs().getRemovingJobs()));
assertTrue(node2.getAssignJobs().getAddingJobs().containsAll(node2.getAssignJobs().getRemovingJobs()));
assertSame(consistentHash.getNode("node1"), node1);
assertSame(consistentHash.getNode("node2"), node2);
}

@Test
void testDispatchJob() {
String job1 = "job1";
long jobId1 = SnowFlakeIdGenerator.generateId();
ConsistentHash.Node res1 = consistentHash.dispatchJob(job1, jobId1);
assertNull(res1);
ConsistentHash.Node node1 = new ConsistentHash.Node("node1", "public","192.168.0.1", System.currentTimeMillis(), (byte) 10);
consistentHash.addNode(node1);
String job2 = "job2";
long jobId2 = SnowFlakeIdGenerator.generateId();
ConsistentHash.Node res2 = consistentHash.dispatchJob(job2, jobId2);
assertSame(res2, node1);
assertTrue(consistentHash.getDispatchJobCache().isEmpty());
}

@Test
void testRemoveNode() {
String job1 = "job1";
long jobId1 = SnowFlakeIdGenerator.generateId();
String job2 = "job2";
long jobId2 = SnowFlakeIdGenerator.generateId();
String job3 = "job3";
long jobId3 = SnowFlakeIdGenerator.generateId();
ConsistentHash.Node node1 = new ConsistentHash.Node("node1", "public", "192.168.0.1", System.currentTimeMillis(), (byte) 10);
ConsistentHash.Node node2 = new ConsistentHash.Node("node2", "public", "192.168.0.2", System.currentTimeMillis(), (byte) 10);
consistentHash.addNode(node1);
consistentHash.addNode(node2);
consistentHash.dispatchJob(job1, jobId1);
consistentHash.dispatchJob(job2, jobId2);
consistentHash.dispatchJob(job3, jobId3);
consistentHash.removeNode(node2.getIdentity());
assertTrue(node1.getAssignJobs().getAddingJobs().containsAll(node2.getAssignJobs().getRemovingJobs()));
assertSame(consistentHash.getNode("node1"), node1);
assertNull(consistentHash.getNode("node2"));
consistentHash.removeNode(node1.getIdentity());
assertEquals(3, consistentHash.getDispatchJobCache().size());
}

}

0 comments on commit f10e5db

Please sign in to comment.