Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix logic for hadnling responses in publish service #3608

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 44 additions & 33 deletions src/service/publish-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,46 +63,57 @@ class PublishService extends OperationService {
);
}

// Minimum replication reached, mark in the operational DB
// 1. Check minimum replication reached
if (completedNumber === minAckResponses) {
this.logger.debug(
`Minimum replication ${minAckResponses} reached for operationId: ${operationId}, dataset root: ${datasetRoot}`,
);

await this.repositoryModuleManager.updateMinAcksReached(operationId, true);
}

// All requests sent, minimum replication reached, mark as completed
if (leftoverNodes.length === 0 && completedNumber >= minAckResponses) {
await this.markOperationAsCompleted(
operationId,
blockchain,
null,
this.completedStatuses,
);
this.logResponsesSummary(completedNumber, failedNumber);
}

// All requests sent, minimum replication not reached, mark as failed
if (leftoverNodes.length === 0 && completedNumber < minAckResponses) {
this.markOperationAsFailed(
operationId,
blockchain,
'Not replicated to enough nodes!',
this.errorType,
);
this.logResponsesSummary(completedNumber, failedNumber);
}

// Not all requests sent, still possible to reach minimum replication,
// schedule requests for leftover nodes
const potentialCompletedNumber = completedNumber + leftoverNodes.length;
if (
leftoverNodes.length > 0 &&
potentialCompletedNumber >= minAckResponses &&
(totalResponses - 1) % batchSize === 0
) {
await this.scheduleOperationForLeftoverNodes(command.data, leftoverNodes);
// 2. Check if all responses have been received
if (totalResponses === numberOfFoundNodes) {
// 2.1 If minimum replication is reached, mark the operation as completed
if (completedNumber >= minAckResponses) {
await this.markOperationAsCompleted(
operationId,
blockchain,
null,
this.completedStatuses,
);
this.logResponsesSummary(completedNumber, failedNumber);
}
// 2.2 Otherwise, mark as failed
else {
await this.markOperationAsFailed(
operationId,
blockchain,
'Not replicated to enough nodes!',
this.errorType,
);
this.logResponsesSummary(completedNumber, failedNumber);
}
} else {
// 3. Not all responses have arrived yet.
const potentialCompletedNumber = completedNumber + leftoverNodes.length;
const canStillReachMinReplication = potentialCompletedNumber >= minAckResponses;
const canScheduleBatch = (totalResponses - 1) % batchSize === 0;

// 3.1 Check if minimum replication can still be achieve by scheduling leftover nodes
// (and it's at the end of a batch)
if (leftoverNodes.length > 0 && canStillReachMinReplication && canScheduleBatch) {
await this.scheduleOperationForLeftoverNodes(command.data, leftoverNodes);
}
// 3.2 If minimum replication cannot be reached and it's end of a batch, mark as failed
else if (!canStillReachMinReplication && canScheduleBatch) {
await this.markOperationAsFailed(
operationId,
blockchain,
'Not replicated to enough nodes!',
this.errorType,
);
this.logResponsesSummary(completedNumber, failedNumber);
}
}
}

Expand Down
Loading