Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add http server port in the worker net address #18499

Merged
merged 6 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,19 @@ public void join(WorkerInfo workerInfo) throws IOException {
"Existing WorkerServiceEntity for path:%s corrupted",
pathOnRing));
}
throw new AlreadyExistsException(
String.format("Some other member with same id registered on the ring, bail."
+ "Conflicting worker addr:%s, worker identity:%s."
+ "Different workers can't assume same worker identity in non-k8s env,"
+ "clean local worker identity settings to continue.",
existingEntity.get().getWorkerNetAddress().toString(),
existingEntity.get().getIdentity()));
if (existingEntity.get().equalsIgnoringOptionalFields(entity)) {
// Same entity but potentially with new optional fields,
// update the original etcd-stored worker information
mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity));
} else {
throw new AlreadyExistsException(
String.format("Some other member with same id registered on the ring, bail."
+ "Conflicting worker addr:%s, worker identity:%s."
+ "Different workers can't assume same worker identity in non-k8s env,"
+ "clean local worker identity settings to continue.",
existingEntity.get().getWorkerNetAddress().toString(),
existingEntity.get().getIdentity()));
}
}
}
} catch (InterruptedException | ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ private static List<WorkerInfo> parseWorkerAddresses(
.setContainerHost(Configuration.global()
.getOrDefault(PropertyKey.WORKER_CONTAINER_HOSTNAME, ""))
.setRpcPort(conf.getInt(PropertyKey.WORKER_RPC_PORT))
.setWebPort(conf.getInt(PropertyKey.WORKER_WEB_PORT));
.setWebPort(conf.getInt(PropertyKey.WORKER_WEB_PORT))
.setHttpServerPort(conf.getInt(PropertyKey.WORKER_HTTP_SERVER_PORT));
//data port, these are initialized from configuration for client to deduce the
//workeraddr related info, on worker side, it will be corrected by join().
InetSocketAddress inetAddr;
Expand Down Expand Up @@ -126,7 +127,8 @@ public void join(WorkerInfo worker) throws IOException {
.setDomainSocketPath(addr.getDomainSocketPath())
.setNettyDataPort(addr.getNettyDataPort())
.setWebPort(addr.getWebPort())
.setSecureRpcPort(addr.getSecureRpcPort()));
.setSecureRpcPort(addr.getSecureRpcPort())
.setHttpServerPort(addr.getHttpServerPort()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,35 @@ public void deserialize(byte[] buf) {
.create();
gson.fromJson(new InputStreamReader(new ByteArrayInputStream(buf)), WorkerServiceEntity.class);
}

/**
* A customized equality comparison which ignores optional fields such as mHttpServerPort in the
* WorkerNetAddress.
*
* @param o The object to be compared with this WorkerServiceEntity for equality
* @return true if the specified object is equal to this WorkerServiceEntity by ignoring optional
* fields; false otherwise
*/
public boolean equalsIgnoringOptionalFields(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WorkerServiceEntity anotherO = (WorkerServiceEntity) o;
return mIdentity.equals(anotherO.mIdentity)
&& getServiceEntityName().equals(anotherO.getServiceEntityName())
&& equalsIgnoringHttpServerPort(anotherO.getWorkerNetAddress());
}

private boolean equalsIgnoringHttpServerPort(WorkerNetAddress other) {
return mAddress.getHost().equals(other.getHost())
&& mAddress.getContainerHost().equals(other.getContainerHost())
&& mAddress.getSecureRpcPort() == other.getSecureRpcPort()
&& mAddress.getRpcPort() == other.getRpcPort()
&& mAddress.getDataPort() == other.getDataPort()
&& mAddress.getWebPort() == other.getWebPort()
&& mAddress.getDomainSocketPath().equals(other.getDomainSocketPath());
}
}
29 changes: 27 additions & 2 deletions dora/core/common/src/main/java/alluxio/wire/WorkerNetAddress.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public final class WorkerNetAddress implements Serializable {
@Expose
@com.google.gson.annotations.SerializedName("DomainSocketPath")
private String mDomainSocketPath = "";
@Expose
@com.google.gson.annotations.SerializedName("HttpServerPort")
// Optional field - skipped in the customized equality comparison
private int mHttpServerPort;

/**
* Creates a new instance of {@link WorkerNetAddress}.
Expand All @@ -77,6 +81,7 @@ public WorkerNetAddress(WorkerNetAddress copyFrom) {
mNettyDataPort = copyFrom.mNettyDataPort;
mWebPort = copyFrom.mWebPort;
mDomainSocketPath = copyFrom.mDomainSocketPath;
mHttpServerPort = copyFrom.mHttpServerPort;
}

/**
Expand Down Expand Up @@ -143,6 +148,14 @@ public String getDomainSocketPath() {
return mDomainSocketPath;
}

/**
* @return the http server port
*/
@ApiModelProperty(value = "Port of the worker's http server for rest apis")
public int getHttpServerPort() {
return mHttpServerPort;
}

/**
* @param host the host to use
* @return the worker net address
Expand Down Expand Up @@ -217,6 +230,15 @@ public WorkerNetAddress setDomainSocketPath(String domainSocketPath) {
return this;
}

/**
* @param httpServerPort the http server port to use
* @return the worker net address
*/
public WorkerNetAddress setHttpServerPort(int httpServerPort) {
mHttpServerPort = httpServerPort;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -232,13 +254,14 @@ public boolean equals(Object o) {
&& mRpcPort == that.mRpcPort
&& mDataPort == that.mDataPort
&& mWebPort == that.mWebPort
&& mDomainSocketPath.equals(that.mDomainSocketPath);
&& mDomainSocketPath.equals(that.mDomainSocketPath)
&& mHttpServerPort == that.mHttpServerPort;
}

@Override
public int hashCode() {
return Objects.hashCode(mSecureRpcPort, mHost, mContainerHost, mDataPort, mRpcPort, mWebPort,
mDomainSocketPath);
mDomainSocketPath, mHttpServerPort);
}

/**
Expand All @@ -254,6 +277,7 @@ public String dumpMainInfo() {
.add("dataPort", mDataPort)
.add("webPort", mWebPort)
.add("domainSocketPath", mDomainSocketPath)
.add("httpServerPort", mHttpServerPort)
.toString();
}

Expand All @@ -267,6 +291,7 @@ public String toString() {
.add("webPort", mWebPort)
.add("domainSocketPath", mDomainSocketPath)
.add("secureRpcPort", mSecureRpcPort)
.add("httpServerPort", mHttpServerPort)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,32 @@ public void testSerializationWorkerServiceEntity() throws Exception {
deserialized.deserialize(jsonBytes);
Assert.assertEquals(deserialized, entity);
}

@Test
public void testEqualsIgnoringOptionalFields() throws Exception {
final WorkerNetAddress workerNetAddress1 = new WorkerNetAddress()
.setHost("worker1").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(1021);
final WorkerNetAddress workerNetAddress2 = new WorkerNetAddress()
.setHost("worker1").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(2021);
final WorkerNetAddress workerNetAddress3 = new WorkerNetAddress()
.setHost("worker3").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(1021);
final WorkerIdentity identity = WorkerIdentity.fromProto(
alluxio.grpc.WorkerIdentity.newBuilder()
.setIdentifier(ByteString.copyFrom(Longs.toByteArray(1L)))
.setVersion(0)
.build());
WorkerServiceEntity entity1 = new WorkerServiceEntity(identity, workerNetAddress1);
WorkerServiceEntity entity2 = new WorkerServiceEntity(identity, workerNetAddress2);
WorkerServiceEntity entity3 = new WorkerServiceEntity(identity, workerNetAddress3);

Assert.assertTrue(entity1.equalsIgnoringOptionalFields(entity2));
Assert.assertFalse(entity2.equalsIgnoringOptionalFields(entity3));
Assert.assertFalse(entity3.equalsIgnoringOptionalFields(entity1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public void copyConstructor() throws IllegalAccessException {
.setNettyDataPort(1)
.setSecureRpcPort(1)
.setWebPort(1)
.setDomainSocketPath("path");
.setDomainSocketPath("path")
.setHttpServerPort(1);
WorkerNetAddress copied = new WorkerNetAddress(original);
// copied instance should contain exactly the same content
checkEquality(original, copied);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,9 @@ public WorkerNetAddress getAddress() {
if (mNettyDataTransmissionEnable) {
workerNetAddress.setNettyDataPort(getNettyDataLocalPort());
}
if (Configuration.getBoolean(PropertyKey.WORKER_HTTP_SERVER_ENABLED)) {
workerNetAddress.setHttpServerPort(Configuration.getInt(PropertyKey.WORKER_HTTP_SERVER_PORT));
}
return workerNetAddress;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,19 +200,19 @@ public void testEtcdMembership(MembershipManager membershipManager) throws Excep
.setAddress(new WorkerNetAddress()
.setHost("worker1").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(1021));
WorkerInfo wkr2 = new WorkerInfo()
.setIdentity(WorkerIdentityTestUtils.randomUuidBasedId())
.setAddress(new WorkerNetAddress()
.setHost("worker2").setContainerHost("containerhostname2")
.setRpcPort(2000).setDataPort(2001).setWebPort(2011)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(2021));
WorkerInfo wkr3 = new WorkerInfo()
.setIdentity(WorkerIdentityTestUtils.randomUuidBasedId())
.setAddress(new WorkerNetAddress()
.setHost("worker3").setContainerHost("containerhostname3")
.setRpcPort(3000).setDataPort(3001).setWebPort(3011)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(3031));
membershipManager.join(wkr1);
membershipManager.join(wkr2);
membershipManager.join(wkr3);
Expand Down Expand Up @@ -260,13 +260,13 @@ public void testFlakyNetwork() throws Exception {
.setAddress(new WorkerNetAddress()
.setHost("worker-1").setContainerHost("containerhostname1")
.setRpcPort(29999).setDataPort(29997).setWebPort(30000)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(30001));
WorkerInfo wkr2 = new WorkerInfo()
.setIdentity(WorkerIdentityTestUtils.randomUuidBasedId())
.setAddress(new WorkerNetAddress()
.setHost("worker-2").setContainerHost("containerhostname2")
.setRpcPort(29999).setDataPort(29997).setWebPort(30000)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(30001));
membershipManager.join(wkr1);
membershipManager.join(wkr2);
CommonUtils.waitFor("Workers joined",
Expand Down Expand Up @@ -326,19 +326,19 @@ public void testStaticMembership() throws Exception {
.setAddress(new WorkerNetAddress()
.setHost("worker1").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(1021));
WorkerInfo wkr2 = new WorkerInfo()
.setIdentity(WorkerIdentityTestUtils.randomUuidBasedId())
.setAddress(new WorkerNetAddress()
.setHost("worker2").setContainerHost("containerhostname2")
.setRpcPort(2000).setDataPort(2001).setWebPort(2011)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(2021));
WorkerInfo wkr3 = new WorkerInfo()
.setIdentity(WorkerIdentityTestUtils.randomUuidBasedId())
.setAddress(new WorkerNetAddress()
.setHost("worker3").setContainerHost("containerhostname3")
.setRpcPort(3000).setDataPort(3001).setWebPort(3011)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(3021));
membershipManager.join(wkr1);
membershipManager.join(wkr2);
membershipManager.join(wkr3);
Expand Down Expand Up @@ -367,13 +367,13 @@ public void testSameWorkerIdentityConflict() throws Exception {
.setAddress(new WorkerNetAddress()
.setHost("worker1").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(1021));
WorkerInfo wkr2 = new WorkerInfo()
.setIdentity(workerIdentity1)
.setAddress(new WorkerNetAddress()
.setHost("worker2").setContainerHost("containerhostname2")
.setRpcPort(2000).setDataPort(2001).setWebPort(2011)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(2021));
membershipManager.join(wkr1);
// bring wrk1 down and join wrk2 with a same worker identity.
membershipManager.stopHeartBeat(wkr1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can u add a test to have a worker join without http port set and afterwards join with http port set , it should be allowed to join and the list of workers should be unchanged.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test testWorkerHttpServerPorts. Could you take a review?

Expand Down Expand Up @@ -401,4 +401,44 @@ public void testSameWorkerIdentityConflict() throws Exception {
Assert.assertTrue(curWorkerInfo.isPresent());
Assert.assertEquals(wkr2.getAddress(), curWorkerInfo.get().getAddress());
}

@Test
public void testOptionalHttpPortChangeInWorkerAddress() throws Exception {
final MembershipManager membershipManager = getHealthyEtcdMemberMgr();
Assert.assertTrue(membershipManager instanceof EtcdMembershipManager);
// join without http server ports
WorkerIdentity workerIdentity = WorkerIdentityTestUtils.randomUuidBasedId();
WorkerNetAddress workerNetAddress = new WorkerNetAddress()
.setHost("worker1").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock");
WorkerInfo wkr = new WorkerInfo()
.setIdentity(workerIdentity)
.setAddress(workerNetAddress);
membershipManager.join(wkr);
Optional<WorkerInfo> curWorkerInfo = membershipManager.getLiveMembers()
.getWorkerById(workerIdentity);
Assert.assertTrue(curWorkerInfo.isPresent());
membershipManager.stopHeartBeat(wkr);
CommonUtils.waitFor("wkr is not alive.", () -> {
try {
return membershipManager.getFailedMembers().getWorkerById(workerIdentity).isPresent();
} catch (IOException e) {
// IGNORE
return false;
}
}, WaitForOptions.defaults().setTimeoutMs(5000));

// set the http server port and rejoin
workerNetAddress.setHttpServerPort(1021);
membershipManager.join(wkr);
// check if the worker is rejoined and information updated
WorkerClusterView allMembers = membershipManager.getAllMembers();
Assert.assertEquals(1, allMembers.size());
curWorkerInfo = membershipManager.getLiveMembers().getWorkerById(workerIdentity);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also assert the getallmembers only have one worker -> meaning that we are not creating two entries

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point! I added that assertion.

Assert.assertTrue(curWorkerInfo.isPresent());
Assert.assertEquals(wkr.getAddress(), curWorkerInfo.get().getAddress());
Assert.assertEquals(wkr.getAddress().getHttpServerPort(),
curWorkerInfo.get().getAddress().getHttpServerPort());
}
}
Loading