diff --git a/tessera-core/src/main/java/com/quorum/tessera/sync/SyncPoller.java b/tessera-core/src/main/java/com/quorum/tessera/sync/SyncPoller.java index 51013fe5b0..d403230860 100644 --- a/tessera-core/src/main/java/com/quorum/tessera/sync/SyncPoller.java +++ b/tessera-core/src/main/java/com/quorum/tessera/sync/SyncPoller.java @@ -1,6 +1,12 @@ package com.quorum.tessera.sync; +import com.quorum.tessera.client.P2pClient; +import com.quorum.tessera.node.PartyInfoParser; +import com.quorum.tessera.node.PartyInfoService; +import com.quorum.tessera.node.model.PartyInfo; import com.quorum.tessera.sync.model.SyncableParty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Objects; import java.util.Optional; @@ -12,18 +18,32 @@ */ public class SyncPoller implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(SyncPoller.class); + private final ExecutorService executorService; private final ResendPartyStore resendPartyStore; private final TransactionRequester transactionRequester; + private final PartyInfoService partyInfoService; + + private final P2pClient p2pClient; + + private final PartyInfoParser partyInfoParser; + public SyncPoller(final ExecutorService executorService, final ResendPartyStore resendPartyStore, - final TransactionRequester transactionRequester) { + final TransactionRequester transactionRequester, + final PartyInfoService partyInfoService, + final PartyInfoParser partyInfoParser, + final P2pClient p2pClient) { this.executorService = Objects.requireNonNull(executorService); this.resendPartyStore = Objects.requireNonNull(resendPartyStore); this.transactionRequester = Objects.requireNonNull(transactionRequester); + this.partyInfoService = Objects.requireNonNull(partyInfoService); + this.partyInfoParser = Objects.requireNonNull(partyInfoParser); + this.p2pClient = Objects.requireNonNull(p2pClient); } /** @@ -42,7 +62,13 @@ public void run() { final String url = requestDetails.getParty().getUrl(); final Runnable action = () -> { - final boolean allSucceeded = this.transactionRequester.requestAllTransactionsFromNode(url); + + // perform a getPartyInfo in order to ensure that the target tessera has the current tessera as a recipient + boolean allSucceeded = updatePartyInfo(url); + + if (allSucceeded) { + allSucceeded = this.transactionRequester.requestAllTransactionsFromNode(url); + } if (!allSucceeded) { this.resendPartyStore.incrementFailedAttempt(requestDetails); @@ -57,4 +83,19 @@ public void run() { } + private boolean updatePartyInfo(String url) { + try { + final PartyInfo partyInfo = partyInfoService.getPartyInfo(); + + final byte[] encodedPartyInfo = partyInfoParser.to(partyInfo); + + // we deliberately discard the response as we do not want to fully duplicate the PartyInfoPoller + return null != p2pClient.getPartyInfo(url, encodedPartyInfo); + } catch (final Exception ex) { + LOGGER.warn("Server error {} when connecting to {}", ex.getMessage(), url); + LOGGER.debug(null, ex); + return false; + } + } + } diff --git a/tessera-core/src/main/resources/tessera-spring.xml b/tessera-core/src/main/resources/tessera-spring.xml index 9b9d4453ab..08e107f6ff 100644 --- a/tessera-core/src/main/resources/tessera-spring.xml +++ b/tessera-core/src/main/resources/tessera-spring.xml @@ -75,6 +75,11 @@ + + + + + diff --git a/tessera-core/src/test/java/com/quorum/tessera/sync/SyncPollerTest.java b/tessera-core/src/test/java/com/quorum/tessera/sync/SyncPollerTest.java index e8f081994f..ec0a87d04d 100644 --- a/tessera-core/src/test/java/com/quorum/tessera/sync/SyncPollerTest.java +++ b/tessera-core/src/test/java/com/quorum/tessera/sync/SyncPollerTest.java @@ -1,5 +1,8 @@ package com.quorum.tessera.sync; +import com.quorum.tessera.client.P2pClient; +import com.quorum.tessera.node.PartyInfoParser; +import com.quorum.tessera.node.PartyInfoService; import com.quorum.tessera.node.model.Party; import com.quorum.tessera.sync.model.SyncableParty; import org.junit.After; @@ -22,13 +25,23 @@ public class SyncPollerTest { private SyncPoller syncPoller; + private PartyInfoService partyInfoService; + + private PartyInfoParser partyInfoParser; + + private P2pClient p2pClient; + @Before public void init() { this.executorService = mock(ExecutorService.class); this.resendPartyStore = mock(ResendPartyStore.class); this.transactionRequester = mock(TransactionRequester.class); + this.partyInfoService = mock(PartyInfoService.class); + this.partyInfoParser = mock(PartyInfoParser.class); + this.p2pClient = mock(P2pClient.class); + doReturn(new byte[]{}).when(p2pClient).getPartyInfo(anyString(), any()); - this.syncPoller = new SyncPoller(executorService, resendPartyStore, transactionRequester); + this.syncPoller = new SyncPoller(executorService, resendPartyStore, transactionRequester, partyInfoService, partyInfoParser, p2pClient); } @After @@ -66,6 +79,9 @@ public void singlePartySubmitsSingleTask() { task.run(); verify(transactionRequester).requestAllTransactionsFromNode(targetUrl); + verify(partyInfoService, times(1)).getPartyInfo(); + verify(partyInfoParser, times(1)).to(any()); + verify(p2pClient, times(1)).getPartyInfo(eq(targetUrl), any()); } @@ -93,4 +109,52 @@ public void singlePartyTaskFailsAndNotifiesStore() { } + @Test + public void singlePartyTaskUpdatePartyInfoThrowsAndNotifiesStore() { + + final String targetUrl = "fakeurl.com"; + final SyncableParty syncableParty = new SyncableParty(new Party(targetUrl), 0); + + doThrow(new RuntimeException("Unable to connect")).when(p2pClient).getPartyInfo(anyString(), any()); + + doReturn(Optional.of(syncableParty), Optional.empty()).when(resendPartyStore).getNextParty(); + + syncPoller.run(); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + verify(executorService).submit(captor.capture()); + verify(resendPartyStore, times(2)).getNextParty(); + + final Runnable task = captor.getValue(); + task.run(); + + verify(transactionRequester, times(0)).requestAllTransactionsFromNode(targetUrl); + verify(resendPartyStore).incrementFailedAttempt(syncableParty); + + } + + @Test + public void singlePartyTaskUpdatePartyInfoFailsAndNotifiesStore() { + + final String targetUrl = "fakeurl.com"; + final SyncableParty syncableParty = new SyncableParty(new Party(targetUrl), 0); + + doReturn(null).when(p2pClient).getPartyInfo(anyString(), any()); + + doReturn(Optional.of(syncableParty), Optional.empty()).when(resendPartyStore).getNextParty(); + + syncPoller.run(); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + verify(executorService).submit(captor.capture()); + verify(resendPartyStore, times(2)).getNextParty(); + + final Runnable task = captor.getValue(); + task.run(); + + verify(transactionRequester, times(0)).requestAllTransactionsFromNode(targetUrl); + verify(resendPartyStore).incrementFailedAttempt(syncableParty); + + } + }