diff --git a/src/service/publish-service.js b/src/service/publish-service.js index 3c8dfe1f6..a8c2fc269 100644 --- a/src/service/publish-service.js +++ b/src/service/publish-service.js @@ -63,46 +63,57 @@ class PublishService extends OperationService { ); } - // Minimum replication reached, mark in the operational DB + // 1. Check minimum replication reached if (completedNumber === minAckResponses) { this.logger.debug( `Minimum replication ${minAckResponses} reached for operationId: ${operationId}, dataset root: ${datasetRoot}`, ); - await this.repositoryModuleManager.updateMinAcksReached(operationId, true); } - // All requests sent, minimum replication reached, mark as completed - if (leftoverNodes.length === 0 && completedNumber >= minAckResponses) { - await this.markOperationAsCompleted( - operationId, - blockchain, - null, - this.completedStatuses, - ); - this.logResponsesSummary(completedNumber, failedNumber); - } - - // All requests sent, minimum replication not reached, mark as failed - if (leftoverNodes.length === 0 && completedNumber < minAckResponses) { - this.markOperationAsFailed( - operationId, - blockchain, - 'Not replicated to enough nodes!', - this.errorType, - ); - this.logResponsesSummary(completedNumber, failedNumber); - } - - // Not all requests sent, still possible to reach minimum replication, - // schedule requests for leftover nodes - const potentialCompletedNumber = completedNumber + leftoverNodes.length; - if ( - leftoverNodes.length > 0 && - potentialCompletedNumber >= minAckResponses && - (totalResponses - 1) % batchSize === 0 - ) { - await this.scheduleOperationForLeftoverNodes(command.data, leftoverNodes); + // 2. Check if all responses have been received + if (totalResponses === numberOfFoundNodes) { + // 2.1 If minimum replication is reached, mark the operation as completed + if (completedNumber >= minAckResponses) { + await this.markOperationAsCompleted( + operationId, + blockchain, + null, + this.completedStatuses, + ); + this.logResponsesSummary(completedNumber, failedNumber); + } + // 2.2 Otherwise, mark as failed + else { + await this.markOperationAsFailed( + operationId, + blockchain, + 'Not replicated to enough nodes!', + this.errorType, + ); + this.logResponsesSummary(completedNumber, failedNumber); + } + } else { + // 3. Not all responses have arrived yet. + const potentialCompletedNumber = completedNumber + leftoverNodes.length; + const canStillReachMinReplication = potentialCompletedNumber >= minAckResponses; + const canScheduleBatch = (totalResponses - 1) % batchSize === 0; + + // 3.1 Check if minimum replication can still be achieve by scheduling leftover nodes + // (and it's at the end of a batch) + if (leftoverNodes.length > 0 && canStillReachMinReplication && canScheduleBatch) { + await this.scheduleOperationForLeftoverNodes(command.data, leftoverNodes); + } + // 3.2 If minimum replication cannot be reached and it's end of a batch, mark as failed + else if (!canStillReachMinReplication && canScheduleBatch) { + await this.markOperationAsFailed( + operationId, + blockchain, + 'Not replicated to enough nodes!', + this.errorType, + ); + this.logResponsesSummary(completedNumber, failedNumber); + } } }