Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix etcd calls exception handling and adjust failure detection timeout #18554

Merged
merged 4 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -878,17 +878,22 @@ public WorkerClusterView getCachedWorkers(GetWorkerListType type) throws IOExcep
synchronized (mCachedWorkerClusterView) {
if (mCachedWorkerClusterView.get() == null || mCachedWorkerClusterView.get().isEmpty()
|| mWorkerRefreshPolicy.attempt()) {
switch (type) {
case ALL:
mCachedWorkerClusterView.set(getAllWorkers());
break;
case LIVE:
mCachedWorkerClusterView.set(getLiveWorkers());
break;
case LOST:
mCachedWorkerClusterView.set(getLostWorkers());
break;
default:
try {
switch (type) {
case ALL:
mCachedWorkerClusterView.set(getAllWorkers());
break;
case LIVE:
mCachedWorkerClusterView.set(getLiveWorkers());
break;
case LOST:
mCachedWorkerClusterView.set(getLostWorkers());
break;
default:
}
} catch (Throwable th) {
LOG.warn("Got exception while trying to refresh {} worker list, ex:{},"
+ " using the last updated worker cluster view.", type, th.getMessage());
}
}
return mCachedWorkerClusterView.get();
Expand Down
10 changes: 10 additions & 0 deletions dora/core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -4842,6 +4842,14 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.ALL)
.build();
public static final PropertyKey WORKER_FAILURE_DETECTION_TIMEOUT =
durationBuilder(Name.WORKER_FAILURE_DETECTION_TIMEOUT)
.setDefaultValue("2min")
.setDescription("The timeout to consider a worker failure in membership"
+ " failure detection.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.ALL)
.build();
public static final PropertyKey WORKER_STATIC_MEMBERSHIP_MANAGER_CONFIG_FILE =
stringBuilder(Name.WORKER_STATIC_MEMBERSHIP_MANAGER_CONFIG_FILE)
.setDefaultValue(format("${%s}/workers", Name.CONF_DIR))
Expand Down Expand Up @@ -8211,6 +8219,8 @@ public static final class Name {
"alluxio.worker.ufs.instream.cache.max.size";
public static final String WORKER_MEMBERSHIP_MANAGER_TYPE =
"alluxio.worker.membership.manager.type";
public static final String WORKER_FAILURE_DETECTION_TIMEOUT =
"alluxio.worker.failure.detection.timeout";
public static final String WORKER_STATIC_MEMBERSHIP_MANAGER_CONFIG_FILE =
"alluxio.worker.static.membership.manager.config.file";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ public class AlluxioEtcdClient {
private static final Logger LOG = LoggerFactory.getLogger(AlluxioEtcdClient.class);
private static final Lock INSTANCE_LOCK = new ReentrantLock();
public static final String BASE_PATH = "/ServiceDiscovery";
public static final long DEFAULT_LEASE_TTL_IN_SEC = 2L;
public static final long DEFAULT_TIMEOUT_IN_SEC = 2L;
public static final int RETRY_TIMES = 3;
private static final int RETRY_SLEEP_IN_MS = 100;
private static final int MAX_RETRY_SLEEP_IN_MS = 500;
public static final long DEFAULT_LEASE_TTL_IN_SEC = 5L;
public static final long DEFAULT_TIMEOUT_IN_SEC = 5L;
public static final int RETRY_TIMES = 5;
private static final int RETRY_SLEEP_IN_MS = 500;
private static final int MAX_RETRY_SLEEP_IN_MS = 5000;
@GuardedBy("INSTANCE_LOCK")
@Nullable
private static volatile AlluxioEtcdClient sAlluxioEtcdClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public void join(WorkerInfo workerInfo) throws IOException {
LOG.info("Try joining on etcd for worker:{} ", workerInfo);
WorkerServiceEntity entity =
new WorkerServiceEntity(workerInfo.getIdentity(), workerInfo.getAddress());
entity.setLeaseTTLInSec(
mConf.getDuration(PropertyKey.WORKER_FAILURE_DETECTION_TIMEOUT).getSeconds());
String pathOnRing = new StringBuffer()
.append(getRingPathPrefix())
.append(entity.getServiceEntityName()).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,13 @@ public ServiceDiscoveryRecipe(AlluxioEtcdClient client, String pathPrefix) {
* update of the DefaultServiceEntity fields(lease,revision num) is guarded by
* lock within DefaultServiceEntity instance.
* @param service
* @param force
* @param forceOverwriteOldLease
* @throws IOException
*/
private void newLeaseInternal(DefaultServiceEntity service, boolean force) throws IOException {
private void newLeaseInternal(DefaultServiceEntity service,
boolean forceOverwriteOldLease) throws IOException {
try (LockResource lockResource = new LockResource(service.getLock())) {
if (!force && service.getLease() != null
if (!forceOverwriteOldLease && service.getLease() != null
&& !mAlluxioEtcdClient.isLeaseExpired(service.getLease())) {
LOG.info("Lease attached with service:{} is not expired, bail from here.",
service.getServiceEntityName());
Expand All @@ -101,13 +102,15 @@ private void newLeaseInternal(DefaultServiceEntity service, boolean force) throw
.append(MembershipManager.PATH_SEPARATOR)
.append(path).toString();
try {
AlluxioEtcdClient.Lease oldLease = service.getLease();
AlluxioEtcdClient.Lease lease = mAlluxioEtcdClient.createLease(
service.getLeaseTTLInSec(), service.getLeaseTimeoutInSec(), TimeUnit.SECONDS);
Txn txn = mAlluxioEtcdClient.getEtcdClient().getKVClient().txn();
ByteSequence keyToPut = ByteSequence.from(fullPath, StandardCharsets.UTF_8);
ByteSequence valToPut = ByteSequence.from(service.serialize());
// Always overwrite the key
CompletableFuture<TxnResponse> txnResponseFut = txn
.If(new Cmp(keyToPut, Cmp.Op.EQUAL, CmpTarget.version(0L)))
.If()
.Then(Op.put(keyToPut, valToPut, PutOption.newBuilder()
.withLeaseId(lease.mLeaseId).build()))
.Then(Op.get(keyToPut, GetOption.DEFAULT))
Expand All @@ -118,9 +121,6 @@ private void newLeaseInternal(DefaultServiceEntity service, boolean force) throw
txnResponse.getGetResponses().stream().map(
r -> kvs.addAll(r.getKvs())).collect(Collectors.toList());
if (!txnResponse.isSucceeded()) {
if (!kvs.isEmpty()) {
throw new AlreadyExistsException("Same service kv pair already exists.");
}
throw new IOException("Failed to new a lease for service:" + service.toString());
}
Preconditions.checkState(!kvs.isEmpty(), "No such service entry found.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package alluxio.membership;

import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerState;

Expand All @@ -32,7 +33,7 @@
*/
public class ServiceRegistryMembershipManager implements MembershipManager {
private static final Logger LOG = LoggerFactory.getLogger(ServiceRegistryMembershipManager.class);

private final AlluxioConfiguration mConf;
private AlluxioEtcdClient mAlluxioEtcdClient;

/**
Expand Down Expand Up @@ -60,6 +61,7 @@ public ServiceRegistryMembershipManager(AlluxioConfiguration conf) {
*/
public ServiceRegistryMembershipManager(AlluxioConfiguration conf,
AlluxioEtcdClient alluxioEtcdClient) {
mConf = conf;
mAlluxioEtcdClient = alluxioEtcdClient;
}

Expand All @@ -68,6 +70,8 @@ public void join(WorkerInfo workerInfo) throws IOException {
LOG.info("Try joining Service Registry for worker:{} ", workerInfo);
WorkerServiceEntity entity =
new WorkerServiceEntity(workerInfo.getIdentity(), workerInfo.getAddress());
entity.setLeaseTTLInSec(
mConf.getDuration(PropertyKey.WORKER_FAILURE_DETECTION_TIMEOUT).getSeconds());
mAlluxioEtcdClient.mServiceDiscovery.registerAndStartSync(entity);
LOG.info("register to service registry for worker:{} ", workerInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import alluxio.exception.status.AlreadyExistsException;
import alluxio.exception.status.InvalidArgumentException;
import alluxio.util.CommonUtils;
import alluxio.util.FormatUtils;
import alluxio.util.WaitForOptions;
import alluxio.wire.WorkerIdentity;
import alluxio.wire.WorkerIdentityTestUtils;
Expand Down Expand Up @@ -54,11 +55,16 @@
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

public class MembershipManagerTest {
private static final Network NETWORK = Network.newNetwork();
private static final int ETCD_PORT = 2379;
private static final String WORKER_FAILURE_TIMEOUT = "15sec";
private static final long WORKER_FAILURE_TIMEOUT_IN_MILLIS =
FormatUtils.parseTimeSize(WORKER_FAILURE_TIMEOUT);

@Rule
public TemporaryFolder mFolder = new TemporaryFolder();

Expand All @@ -68,11 +74,11 @@ public class MembershipManagerTest {
/*
@BeforeClass
public static void init() {
PropertyConfigurator.configure("alluxio/conf/log4j.properties");
PropertyConfigurator.configure("/Users/lucyge/Documents/github/alluxio/conf/log4j.properties");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notice that the absolute path contains your own information

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good catch thanks!

Properties props = new Properties();
props.setProperty(PropertyKey.LOGGER_TYPE.toString(), "Console");
}
*/
*/

@ClassRule
public static final GenericContainer<?> ETCD_CONTAINER =
Expand Down Expand Up @@ -117,6 +123,7 @@ public static void afterAll() {

@Before
public void before() throws IOException {
Configuration.set(PropertyKey.WORKER_FAILURE_DETECTION_TIMEOUT, WORKER_FAILURE_TIMEOUT);
List<String> strs = getHealthyAlluxioEtcdClient().getChildren("/")
.stream().map(kv -> kv.getKey().toString(StandardCharsets.UTF_8))
.collect(Collectors.toList());
Expand Down Expand Up @@ -242,7 +249,7 @@ public void testEtcdMembership(MembershipManager membershipManager) throws Excep
throw new RuntimeException(
String.format("Unexpected error while getting failed members: %s", e));
}
}, WaitForOptions.defaults().setTimeoutMs(TimeUnit.SECONDS.toMillis(10)));
}, WaitForOptions.defaults().setTimeoutMs(WORKER_FAILURE_TIMEOUT_IN_MILLIS + 1000));
List<WorkerInfo> expectedFailedList = new ArrayList<>();
expectedFailedList.add(new WorkerInfo(wkr2).setState(WorkerState.LOST));
Assert.assertEquals(expectedFailedList,
Expand Down Expand Up @@ -308,7 +315,7 @@ public void testServiceRegistryMembershipManager() throws Exception {
throw new RuntimeException(
String.format("Unexpected error while getting failed members: %s", e));
}
}, WaitForOptions.defaults().setTimeoutMs(TimeUnit.SECONDS.toMillis(10)));
}, WaitForOptions.defaults().setTimeoutMs(WORKER_FAILURE_TIMEOUT_IN_MILLIS + 1000));
Assert.assertTrue(Lists.newArrayList(membershipManager.getFailedMembers()).isEmpty());
List<WorkerInfo> actualLiveMembers = membershipManager.getLiveMembers().stream()
.sorted(Comparator.comparing(w -> w.getAddress().getHost()))
Expand All @@ -324,6 +331,8 @@ public void testServiceRegistryMembershipManager() throws Exception {

@Test
public void testFlakyNetwork() throws Exception {
System.out.println("WORKER_FAILURE_TIMEOUT is configured to be "
+ WORKER_FAILURE_TIMEOUT);
MembershipManager membershipManager = getToxicEtcdMemberMgr();
WorkerInfo wkr1 = new WorkerInfo()
.setIdentity(WorkerIdentityTestUtils.randomUuidBasedId())
Expand All @@ -347,13 +356,34 @@ public void testFlakyNetwork() throws Exception {
throw new RuntimeException(
String.format("Unexpected error while getting live members: %s", e));
}
}, WaitForOptions.defaults().setTimeoutMs(TimeUnit.SECONDS.toMillis(10)));
}, WaitForOptions.defaults().setTimeoutMs(WORKER_FAILURE_TIMEOUT_IN_MILLIS + 1000));

MembershipManager healthyMgr = getHealthyEtcdMemberMgr();
System.out.println("All Node Status:\n" + healthyMgr.showAllMembers());
System.out.println("Induce 10 sec latency upstream to etcd...");
System.out.println(String.format("Induce %d sec latency upstream to etcd...",
TimeUnit.MILLISECONDS.toSeconds(WORKER_FAILURE_TIMEOUT_IN_MILLIS / 2)));
/* For induced latency lower than WORKER_FAILURE_TIMEOUT_IN_MILLIS, we shouldn't see
that the worker is considered failed. */
sEtcdProxy.toxics()
.latency("latency", ToxicDirection.UPSTREAM, WORKER_FAILURE_TIMEOUT_IN_MILLIS / 2);
// assert that we will never see any worker considered FAIL.
Assert.assertThrows(TimeoutException.class, () ->
CommonUtils.waitFor("Workers network errored but not considered fail",
() -> {
try {
return !healthyMgr.getFailedMembers().isEmpty();
} catch (IOException e) {
throw new RuntimeException(
String.format("Unexpected error while getting failed members: %s", e));
}
}, WaitForOptions.defaults().setTimeoutMs(TimeUnit.SECONDS.toMillis(10))));
System.out.println("All Node Status:\n" + healthyMgr.showAllMembers());
System.out.println(String.format(
"Remove latency toxics and induce %d sec latency upstream to etcd...",
TimeUnit.MILLISECONDS.toSeconds(WORKER_FAILURE_TIMEOUT_IN_MILLIS + 10000)));
sEtcdProxy.toxics().get("latency").remove();
sEtcdProxy.toxics()
.latency("latency", ToxicDirection.UPSTREAM, 10000);
.latency("latency", ToxicDirection.UPSTREAM, WORKER_FAILURE_TIMEOUT_IN_MILLIS + 10000);
CommonUtils.waitFor("Workers network errored",
() -> {
try {
Expand All @@ -362,7 +392,8 @@ public void testFlakyNetwork() throws Exception {
throw new RuntimeException(
String.format("Unexpected error while getting failed members: %s", e));
}
}, WaitForOptions.defaults().setTimeoutMs(TimeUnit.SECONDS.toMillis(10)));
}, WaitForOptions.defaults().setTimeoutMs(WORKER_FAILURE_TIMEOUT_IN_MILLIS + 10000));

System.out.println("All Node Status:\n" + healthyMgr.showAllMembers());
System.out.println("Remove latency toxics...");
sEtcdProxy.toxics().get("latency").remove();
Expand All @@ -374,7 +405,7 @@ public void testFlakyNetwork() throws Exception {
throw new RuntimeException(
String.format("Unexpected error while getting failed members: %s", e));
}
}, WaitForOptions.defaults().setTimeoutMs(TimeUnit.SECONDS.toMillis(10)));
}, WaitForOptions.defaults().setTimeoutMs(WORKER_FAILURE_TIMEOUT_IN_MILLIS));
System.out.println("All Node Status:\n" + healthyMgr.showAllMembers());
}

Expand Down Expand Up @@ -454,7 +485,7 @@ public void testSameWorkerIdentityConflict() throws Exception {
// IGNORE
return false;
}
}, WaitForOptions.defaults().setTimeoutMs(5000));
}, WaitForOptions.defaults().setTimeoutMs(WORKER_FAILURE_TIMEOUT_IN_MILLIS + 1000));
try {
membershipManager.join(wkr2);
} catch (IOException ex) {
Expand Down Expand Up @@ -497,7 +528,7 @@ public void testOptionalHttpPortChangeInWorkerAddress() throws Exception {
// IGNORE
return false;
}
}, WaitForOptions.defaults().setTimeoutMs(5000));
}, WaitForOptions.defaults().setTimeoutMs(WORKER_FAILURE_TIMEOUT_IN_MILLIS + 1000));

// set the http server port and rejoin
workerNetAddress.setHttpServerPort(1021);
Expand Down Expand Up @@ -554,7 +585,7 @@ public void testDecommission() throws Exception {
throw new RuntimeException(
String.format("Unexpected error while getting failed members: %s", e));
}
}, WaitForOptions.defaults().setTimeoutMs(TimeUnit.SECONDS.toMillis(10)));
}, WaitForOptions.defaults().setTimeoutMs(WORKER_FAILURE_TIMEOUT_IN_MILLIS + 1000));
membershipManager.decommission(wkr2);
Assert.assertFalse(membershipManager.getAllMembers()
.getWorkerById(wkr2.getIdentity()).isPresent());
Expand All @@ -575,7 +606,7 @@ public void testDecommission() throws Exception {
throw new RuntimeException(
String.format("Unexpected error while getting live members: %s", e));
}
}, WaitForOptions.defaults().setTimeoutMs(TimeUnit.SECONDS.toMillis(10)));
}, WaitForOptions.defaults().setTimeoutMs(WORKER_FAILURE_TIMEOUT_IN_MILLIS + 1000));
Optional<WorkerInfo> newWkr2Entity = membershipManager.getAllMembers().getWorkerById(wkr2Id);
Assert.assertTrue(newWkr2Entity.isPresent());
wkr2Replacement.setState(WorkerState.LIVE);
Expand Down
Loading