Skip to content

Commit

Permalink
one worker write to one port of down stream worker
Browse files Browse the repository at this point in the history
  • Loading branch information
huasiy committed Oct 28, 2024
1 parent 326949d commit cf79158
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
public class Worker<WI extends WorkerInfo>
{
private final long workerId;
private int workerPortIndex;
private final Lease lease;
private final WI workerInfo;
private boolean terminated;

public Worker(long workerId, Lease lease, WI workerInfo)
public Worker(long workerId, Lease lease, int workerPortIndex, WI workerInfo)
{
this.workerId = workerId;
this.workerPortIndex = workerPortIndex;
this.lease = requireNonNull(lease, "lease is null");
this.workerInfo = requireNonNull(workerInfo, "worker info is null");
this.terminated = false;
Expand All @@ -48,6 +50,10 @@ public long getWorkerId()
return workerId;
}

public void setWorkerPortIndex(int index) { this.workerPortIndex = index; }

public int getWorkerPortIndex() { return workerPortIndex; }

public WI getWorkerInfo()
{
return workerInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void addWorker(Worker<CFWorkerInfo> worker)
if (downStreamWorkerNum > fixedWorkerNum)
{
// one-to-multiple stream
// TODO: find a query to test
List<Integer> workerIndexs = new ArrayList<>();
int num = downStreamWorkerNum / fixedWorkerNum;
if (downStreamWorkerNum > fixedWorkerNum*num)
Expand All @@ -129,10 +130,11 @@ public void addWorker(Worker<CFWorkerInfo> worker)
} else
{
// multiple-to-one stream
if (workerIndexAssigner < downStreamWorkerNum)
{
worker.getWorkerInfo().setPassSchema(true);
}
// if (workerIndexAssigner < downStreamWorkerNum)
// {
// worker.getWorkerInfo().setPassSchema(true);
// }
worker.setWorkerPortIndex(this.workerIndexAssigner / this.downStreamWorkerNum);
List<Integer> workerIndexes = new ArrayList<>();
workerIndexes.add(this.workerIndexAssigner % this.downStreamWorkerNum);
this.workerIndexAssigner++;
Expand All @@ -141,7 +143,7 @@ public void addWorker(Worker<CFWorkerInfo> worker)
} else
{
// assume one-to-one stream
worker.getWorkerInfo().setPassSchema(true);
worker.setWorkerPortIndex(0);
List<Integer> workerIndexs = new ArrayList<>(this.workerIndexAssigner);
this.workerIndexAssigner++;
this.workerIdToWorkerIndex.put(worker.getWorkerId(), workerIndexs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,8 @@ public Worker<CFWorkerInfo> registerWorker(CFWorkerInfo workerInfo) throws Worke
{
throw new WorkerCoordinateException("failed to register worker, error code=" + response.getErrorCode());
}
workerInfo.setPassSchema(response.getPassSchema());
return new Worker<>(response.getWorkerId(),
new Lease(response.getLeasePeriodMs(), response.getLeaseStartTimeMs()), workerInfo);
new Lease(response.getLeasePeriodMs(), response.getLeaseStartTimeMs()), response.getWorkerPortIndex(), workerInfo);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void registerWorker(TurboProto.RegisterWorkerRequest request,
CFWorkerInfo workerInfo = new CFWorkerInfo(request.getWorkerInfo());
Lease lease = new Lease(WorkerLeasePeriodMs, System.currentTimeMillis());
long workerId = CFWorkerManager.Instance().createWorkerId();
Worker<CFWorkerInfo> worker = new Worker<>(workerId, lease, workerInfo);
Worker<CFWorkerInfo> worker = new Worker<>(workerId, lease, 0, workerInfo);
CFWorkerManager.Instance().registerCFWorker(worker);
log.debug("register worker, local address: " + workerInfo.getIp() + ", transId: " + workerInfo.getTransId()
+ ", stageId: " + workerInfo.getStageId() + ", workerId: " + workerId);
Expand All @@ -67,8 +67,8 @@ public void registerWorker(TurboProto.RegisterWorkerRequest request,
requireNonNull(stageCoordinator, "stage coordinator is not found");
stageCoordinator.addWorker(worker);
TurboProto.RegisterWorkerResponse response = TurboProto.RegisterWorkerResponse.newBuilder()
.setErrorCode(SUCCESS).setWorkerId(workerId).setLeasePeriodMs(lease.getPeriodMs())
.setLeaseStartTimeMs(lease.getStartTimeMs()).setPassSchema(worker.getWorkerInfo().getPassSchema()).build();
.setErrorCode(SUCCESS).setWorkerId(workerId).setWorkerPortIndex(worker.getWorkerPortIndex()).setLeasePeriodMs(lease.getPeriodMs())
.setLeaseStartTimeMs(lease.getStartTimeMs()).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
Expand Down
8 changes: 4 additions & 4 deletions proto/turbo.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ message RegisterWorkerRequest {

message RegisterWorkerResponse {
int32 errorCode = 1;
int64 workerId = 2; // the unique id assigned by the coordinator for this worker
int64 leaseStartTimeMs = 3; // the time since the epoch in milliseconds of the lease
int64 leasePeriodMs = 4; // the valid period in milliseconds of the lease
bool passSchema = 5; // if this worker send schema to down stream worker
int32 workerPortIndex = 2; // the index of port this worker will write to down stream worker
int64 workerId = 3; // the unique id assigned by the coordinator for this worker
int64 leaseStartTimeMs = 4; // the time since the epoch in milliseconds of the lease
int64 leasePeriodMs = 5; // the valid period in milliseconds of the lease
}

message GetDownstreamWorkersRequest {
Expand Down

0 comments on commit cf79158

Please sign in to comment.