From b637538d9ee0841306574234638edb61694b473a Mon Sep 17 00:00:00 2001 From: Artyom Sayadyan Date: Fri, 12 Jan 2024 12:51:24 +0300 Subject: [PATCH 1/2] NODE-2652 Lazy GRPC API response --- .../api/grpc/TransactionsApiGrpcImpl.scala | 9 ++++++++- .../api/common/CommonBlocksApi.scala | 16 +++++++++------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/grpc-server/src/main/scala/com/wavesplatform/api/grpc/TransactionsApiGrpcImpl.scala b/grpc-server/src/main/scala/com/wavesplatform/api/grpc/TransactionsApiGrpcImpl.scala index 7848170398..2a1090e26f 100644 --- a/grpc-server/src/main/scala/com/wavesplatform/api/grpc/TransactionsApiGrpcImpl.scala +++ b/grpc-server/src/main/scala/com/wavesplatform/api/grpc/TransactionsApiGrpcImpl.scala @@ -51,7 +51,10 @@ class TransactionsApiGrpcImpl(blockchain: Blockchain, commonApi: CommonTransacti // By ids case None => - Observable.fromIterable(transactionIds.flatMap(commonApi.transactionById)) + for { + id <- Observable.fromIterable(transactionIds) + tx <- Observable.fromIterable(commonApi.transactionById(id)) + } yield tx } val transactionIdSet = transactionIds.toSet @@ -124,6 +127,10 @@ class TransactionsApiGrpcImpl(blockchain: Blockchain, commonApi: CommonTransacti result <- commonApi.broadcastTransaction(vtx) _ <- result.resultE.toFuture // Check for success } yield tx).wrapErrors + override def getTransactionSnapshots( + request: TransactionSnapshotsRequest, + responseObserver: StreamObserver[TransactionSnapshotResponse] + ): Unit = ??? } private object TransactionsApiGrpcImpl { diff --git a/node/src/main/scala/com/wavesplatform/api/common/CommonBlocksApi.scala b/node/src/main/scala/com/wavesplatform/api/common/CommonBlocksApi.scala index a3f48b86d2..fe92f87162 100644 --- a/node/src/main/scala/com/wavesplatform/api/common/CommonBlocksApi.scala +++ b/node/src/main/scala/com/wavesplatform/api/common/CommonBlocksApi.scala @@ -44,12 +44,11 @@ object CommonBlocksApi { .flatMap(Observable.fromIterable(_)) def blocksRange(fromHeight: Int, toHeight: Int, generatorAddress: Address): Observable[(BlockMeta, Seq[(TxMeta, Transaction)])] = - Observable.fromIterable( - (fixHeight(fromHeight) to fixHeight(toHeight)) - .flatMap(h => metaAt(h)) - .collect { case m if m.header.generator.toAddress == generatorAddress => m.height } - .flatMap(h => blockInfoAt(h)) - ) + for { + height <- Observable.fromIterable(fixHeight(fromHeight) to fixHeight(toHeight)) + meta <- Observable.fromIterable(metaAt(height)) if meta.header.generator.toAddress == generatorAddress + block <- Observable.fromIterable(blockInfoAt(meta.height)) + } yield block def blockDelay(blockId: BlockId, blockNum: Int): Option[Long] = blockchain @@ -75,7 +74,10 @@ object CommonBlocksApi { def meta(id: ByteStr): Option[BlockMeta] = blockchain.heightOf(id).flatMap(metaAt) def metaRange(fromHeight: Int, toHeight: Int): Observable[BlockMeta] = - Observable.fromIterable((fixHeight(fromHeight) to fixHeight(toHeight)).flatMap(h => metaAt(h))) + for { + height <- Observable.fromIterable(fixHeight(fromHeight) to fixHeight(toHeight)) + meta <- Observable.fromIterable(metaAt(height)) + } yield meta def block(blockId: BlockId): Option[(BlockMeta, Seq[(TxMeta, Transaction)])] = blockchain.heightOf(blockId).flatMap(h => blockInfoAt(h)) } From 4bf52d8c221f4d1207af61f0c43a6c6ecbf0ba1a Mon Sep 17 00:00:00 2001 From: Artyom Sayadyan Date: Fri, 12 Jan 2024 12:52:43 +0300 Subject: [PATCH 2/2] Removed illegal code --- .../com/wavesplatform/api/grpc/TransactionsApiGrpcImpl.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/grpc-server/src/main/scala/com/wavesplatform/api/grpc/TransactionsApiGrpcImpl.scala b/grpc-server/src/main/scala/com/wavesplatform/api/grpc/TransactionsApiGrpcImpl.scala index 2a1090e26f..33698cae04 100644 --- a/grpc-server/src/main/scala/com/wavesplatform/api/grpc/TransactionsApiGrpcImpl.scala +++ b/grpc-server/src/main/scala/com/wavesplatform/api/grpc/TransactionsApiGrpcImpl.scala @@ -127,10 +127,6 @@ class TransactionsApiGrpcImpl(blockchain: Blockchain, commonApi: CommonTransacti result <- commonApi.broadcastTransaction(vtx) _ <- result.resultE.toFuture // Check for success } yield tx).wrapErrors - override def getTransactionSnapshots( - request: TransactionSnapshotsRequest, - responseObserver: StreamObserver[TransactionSnapshotResponse] - ): Unit = ??? } private object TransactionsApiGrpcImpl {