Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call getPartyInfo before requestAllTransactionsFromNode in order to e… #459

Merged
merged 2 commits into from Sep 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package com.quorum.tessera.sync;

import com.quorum.tessera.client.P2pClient;
import com.quorum.tessera.node.PartyInfoParser;
import com.quorum.tessera.node.PartyInfoService;
import com.quorum.tessera.node.model.PartyInfo;
import com.quorum.tessera.sync.model.SyncableParty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.Optional;
Expand All @@ -12,18 +18,32 @@
*/
public class SyncPoller implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(SyncPoller.class);

private final ExecutorService executorService;

private final ResendPartyStore resendPartyStore;

private final TransactionRequester transactionRequester;

private final PartyInfoService partyInfoService;

private final P2pClient p2pClient;

private final PartyInfoParser partyInfoParser;

public SyncPoller(final ExecutorService executorService,
final ResendPartyStore resendPartyStore,
final TransactionRequester transactionRequester) {
final TransactionRequester transactionRequester,
final PartyInfoService partyInfoService,
final PartyInfoParser partyInfoParser,
final P2pClient p2pClient) {
this.executorService = Objects.requireNonNull(executorService);
this.resendPartyStore = Objects.requireNonNull(resendPartyStore);
this.transactionRequester = Objects.requireNonNull(transactionRequester);
this.partyInfoService = Objects.requireNonNull(partyInfoService);
this.partyInfoParser = Objects.requireNonNull(partyInfoParser);
this.p2pClient = Objects.requireNonNull(p2pClient);
}

/**
Expand All @@ -42,7 +62,13 @@ public void run() {
final String url = requestDetails.getParty().getUrl();

final Runnable action = () -> {
final boolean allSucceeded = this.transactionRequester.requestAllTransactionsFromNode(url);

// perform a getPartyInfo in order to ensure that the target tessera has the current tessera as a recipient
boolean allSucceeded = updatePartyInfo(url);

if (allSucceeded) {
allSucceeded = this.transactionRequester.requestAllTransactionsFromNode(url);
}

if (!allSucceeded) {
this.resendPartyStore.incrementFailedAttempt(requestDetails);
Expand All @@ -57,4 +83,19 @@ public void run() {

}

private boolean updatePartyInfo(String url) {
try {
final PartyInfo partyInfo = partyInfoService.getPartyInfo();

final byte[] encodedPartyInfo = partyInfoParser.to(partyInfo);

// we deliberately discard the response as we do not want to fully duplicate the PartyInfoPoller
return null != p2pClient.getPartyInfo(url, encodedPartyInfo);
} catch (final Exception ex) {
LOGGER.warn("Server error {} when connecting to {}", ex.getMessage(), url);
LOGGER.debug(null, ex);
return false;
}
}

}
5 changes: 5 additions & 0 deletions tessera-core/src/main/resources/tessera-spring.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
</constructor-arg>
<constructor-arg ref="resendPartyStore" />
<constructor-arg ref="transactionRequester" />
<constructor-arg ref="partyInfoService"/>
<constructor-arg>
<bean class="com.quorum.tessera.node.PartyInfoParser" factory-method="create"/>
</constructor-arg>
<constructor-arg ref="p2pClient"/>
</bean>

<bean name="syncExecutor" class="com.quorum.tessera.threading.TesseraScheduledExecutor">
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.quorum.tessera.sync;

import com.quorum.tessera.client.P2pClient;
import com.quorum.tessera.node.PartyInfoParser;
import com.quorum.tessera.node.PartyInfoService;
import com.quorum.tessera.node.model.Party;
import com.quorum.tessera.sync.model.SyncableParty;
import org.junit.After;
Expand All @@ -22,13 +25,23 @@ public class SyncPollerTest {

private SyncPoller syncPoller;

private PartyInfoService partyInfoService;

private PartyInfoParser partyInfoParser;

private P2pClient p2pClient;

@Before
public void init() {
this.executorService = mock(ExecutorService.class);
this.resendPartyStore = mock(ResendPartyStore.class);
this.transactionRequester = mock(TransactionRequester.class);
this.partyInfoService = mock(PartyInfoService.class);
this.partyInfoParser = mock(PartyInfoParser.class);
this.p2pClient = mock(P2pClient.class);
doReturn(new byte[]{}).when(p2pClient).getPartyInfo(anyString(), any());

this.syncPoller = new SyncPoller(executorService, resendPartyStore, transactionRequester);
this.syncPoller = new SyncPoller(executorService, resendPartyStore, transactionRequester, partyInfoService, partyInfoParser, p2pClient);
}

@After
Expand Down Expand Up @@ -66,6 +79,9 @@ public void singlePartySubmitsSingleTask() {
task.run();

verify(transactionRequester).requestAllTransactionsFromNode(targetUrl);
verify(partyInfoService, times(1)).getPartyInfo();
verify(partyInfoParser, times(1)).to(any());
verify(p2pClient, times(1)).getPartyInfo(eq(targetUrl), any());

}

Expand Down Expand Up @@ -93,4 +109,52 @@ public void singlePartyTaskFailsAndNotifiesStore() {

}

@Test
public void singlePartyTaskUpdatePartyInfoThrowsAndNotifiesStore() {

final String targetUrl = "fakeurl.com";
final SyncableParty syncableParty = new SyncableParty(new Party(targetUrl), 0);

doThrow(new RuntimeException("Unable to connect")).when(p2pClient).getPartyInfo(anyString(), any());

doReturn(Optional.of(syncableParty), Optional.empty()).when(resendPartyStore).getNextParty();

syncPoller.run();

final ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(executorService).submit(captor.capture());
verify(resendPartyStore, times(2)).getNextParty();

final Runnable task = captor.getValue();
task.run();

verify(transactionRequester, times(0)).requestAllTransactionsFromNode(targetUrl);
verify(resendPartyStore).incrementFailedAttempt(syncableParty);

}

@Test
public void singlePartyTaskUpdatePartyInfoFailsAndNotifiesStore() {

final String targetUrl = "fakeurl.com";
final SyncableParty syncableParty = new SyncableParty(new Party(targetUrl), 0);

doReturn(null).when(p2pClient).getPartyInfo(anyString(), any());

doReturn(Optional.of(syncableParty), Optional.empty()).when(resendPartyStore).getNextParty();

syncPoller.run();

final ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(executorService).submit(captor.capture());
verify(resendPartyStore, times(2)).getNextParty();

final Runnable task = captor.getValue();
task.run();

verify(transactionRequester, times(0)).requestAllTransactionsFromNode(targetUrl);
verify(resendPartyStore).incrementFailedAttempt(syncableParty);

}

}