Skip to content

Commit

Permalink
reformat ScanResponseObserver.java
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies committed Jun 9, 2024
1 parent dc04c45 commit 250b83f
Showing 1 changed file with 72 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,49 +106,7 @@ private boolean sendCondition() {

private boolean sendTaskCondition() {
return sendCondition() && (sendTask == null || sendTask.isDone());
} Runnable rr = new Runnable() {
@Override
public void run() {
try {
if (readCondition()) {
synchronized (iter) {
while (readCondition()) {
Request r = scanReq.getScanRequest();
ScanType t = r.getScanType();
boolean isVertex = t.equals(ScanType.SCAN_VERTEX);
ArrayList<T> data = new ArrayList<>(BATCH_SIZE);
int count = 0;
while (iter.hasNext() && leftCount > -1) {
count++;
leftCount--;
T next = (T) iter.next();
data.add(next);
if (count >= BATCH_SIZE) {
offer(data, isVertex);
// data.clear();
break;
}
}
if (!(iter.hasNext() && leftCount > -1)) {
if (data.size() > 0 &&
data.size() < BATCH_SIZE) {
offer(data, isVertex);
}
readOver.set(true);
data = null;
//log.warn("scan complete , count: {},time: {}",
// sum, System.currentTimeMillis());
return;
}
}
}
}
} catch (Exception e) {
log.warn("read data with error: ", e);
sender.onError(e);
}
}
};
}

private void offer(Iterable<T> data, boolean isVertex) {
ScanResponse.Builder builder = ScanResponse.newBuilder();
Expand Down Expand Up @@ -203,34 +161,7 @@ public void onNext(ScanPartitionRequest scanReq) {
cltSeqNo.getAndIncrement();
startSend();
}
} Runnable sr = () -> {
while (sendCondition()) {
ScanResponse response;
try {
if (readOver.get()) {
if ((response = packages.poll()) == null) {
sender.onCompleted();
} else {
sender.onNext(response);
nextSeqNo.incrementAndGet();
}
} else {
response = packages.poll(10,
TimeUnit.MILLISECONDS);
if (response != null) {
sender.onNext(response);
nextSeqNo.incrementAndGet();
startRead();
} else {
break;
}
}

} catch (InterruptedException e) {
break;
}
}
};
}

@Override
public void onError(Throwable t) {
Expand Down Expand Up @@ -259,8 +190,76 @@ private void close() {
}
}

Runnable rr = new Runnable() {
@Override
public void run() {
try {
if (readCondition()) {
synchronized (iter) {
while (readCondition()) {
Request r = scanReq.getScanRequest();
ScanType t = r.getScanType();
boolean isVertex = t.equals(ScanType.SCAN_VERTEX);
ArrayList<T> data = new ArrayList<>(BATCH_SIZE);
int count = 0;
while (iter.hasNext() && leftCount > -1) {
count++;
leftCount--;
T next = (T) iter.next();
data.add(next);
if (count >= BATCH_SIZE) {
offer(data, isVertex);
// data.clear();
break;
}
}
if (!(iter.hasNext() && leftCount > -1)) {
if (data.size() > 0 &&
data.size() < BATCH_SIZE) {
offer(data, isVertex);
}
readOver.set(true);
data = null;
//log.warn("scan complete , count: {},time: {}",
// sum, System.currentTimeMillis());
return;
}
}
}
}
} catch (Exception e) {
log.warn("read data with error: ", e);
sender.onError(e);
}
}
};

Runnable sr = () -> {
while (sendCondition()) {
ScanResponse response;
try {
if (readOver.get()) {
if ((response = packages.poll()) == null) {
sender.onCompleted();
} else {
sender.onNext(response);
nextSeqNo.incrementAndGet();
}
} else {
response = packages.poll(10,
TimeUnit.MILLISECONDS);
if (response != null) {
sender.onNext(response);
nextSeqNo.incrementAndGet();
startRead();
} else {
break;
}
}



} catch (InterruptedException e) {
break;
}
}
};
}

0 comments on commit 250b83f

Please sign in to comment.