Skip to content

Commit

Permalink
Log to console when the execution client is first confirmed as online. (
Browse files Browse the repository at this point in the history
#6136)

Previously we would only log if it was offline and came back online but it's reassuring for users to get an explicit log indicating that Teku connected to the execution client correctly.
  • Loading branch information
ajsutton authored Aug 28, 2022
1 parent bdb507b commit 8478843
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package tech.pegasys.teku.ethereum.executionclient.web3j;

import static tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil.getMessageOrSimpleName;
import static tech.pegasys.teku.infrastructure.logging.EventLogger.EVENT_LOG;

import java.io.IOException;
import java.time.Duration;
Expand All @@ -25,18 +24,24 @@
import org.web3j.protocol.exceptions.ClientConnectionException;
import tech.pegasys.teku.ethereum.executionclient.schema.Response;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.logging.EventLogger;
import tech.pegasys.teku.infrastructure.time.TimeProvider;

public abstract class Web3JClient {
private static final int ERROR_REPEAT_DELAY_MILLIS = 30 * 1000;
private static final int NO_ERROR_TIME = -1;
private final EventLogger eventLog;
private final TimeProvider timeProvider;
private Web3jService web3jService;
private Web3j eth1Web3j;
private final AtomicLong lastError = new AtomicLong(NO_ERROR_TIME);

// Default to the provider being offline at startup so we log when it is first available
// but uses a very old value to make sure we log if the first request fails
private final AtomicLong lastError = new AtomicLong(0);
private boolean initialized = false;

protected Web3JClient(TimeProvider timeProvider) {
protected Web3JClient(final EventLogger eventLog, final TimeProvider timeProvider) {
this.eventLog = eventLog;
this.timeProvider = timeProvider;
}

Expand Down Expand Up @@ -92,14 +97,14 @@ protected void handleError(final Throwable error, final boolean couldBeAuthError
if (errorTime == NO_ERROR_TIME
|| timeProvider.getTimeInMillis().longValue() - errorTime > ERROR_REPEAT_DELAY_MILLIS) {
if (lastError.compareAndSet(errorTime, timeProvider.getTimeInMillis().longValue())) {
EVENT_LOG.executionClientIsOffline(error, couldBeAuthError);
eventLog.executionClientIsOffline(error, couldBeAuthError);
}
}
}

protected void handleSuccess() {
if (lastError.getAndUpdate(x -> NO_ERROR_TIME) != NO_ERROR_TIME) {
EVENT_LOG.executionClientIsOnline();
eventLog.executionClientIsOnline();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Objects.requireNonNull;
import static tech.pegasys.teku.infrastructure.logging.EventLogger.EVENT_LOG;

import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -69,12 +70,12 @@ public Web3JClient build() {
switch (endpoint.getScheme()) {
case "http":
case "https":
return new Web3jHttpClient(endpoint, timeProvider, timeout, jwtConfigOpt);
return new Web3jHttpClient(EVENT_LOG, endpoint, timeProvider, timeout, jwtConfigOpt);
case "ws":
case "wss":
return new Web3jWebsocketClient(endpoint, timeProvider, jwtConfigOpt);
return new Web3jWebsocketClient(EVENT_LOG, endpoint, timeProvider, jwtConfigOpt);
case "file":
return new Web3jIpcClient(endpoint, timeProvider, jwtConfigOpt);
return new Web3jIpcClient(EVENT_LOG, endpoint, timeProvider, jwtConfigOpt);
default:
throw new InvalidConfigurationException(prepareInvalidSchemeMessage(endpoint));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@
import org.web3j.protocol.http.HttpService;
import tech.pegasys.teku.ethereum.executionclient.OkHttpClientCreator;
import tech.pegasys.teku.ethereum.executionclient.auth.JwtConfig;
import tech.pegasys.teku.infrastructure.logging.EventLogger;
import tech.pegasys.teku.infrastructure.time.TimeProvider;

class Web3jHttpClient extends Web3JClient {
private static final Logger LOG = LogManager.getLogger();

Web3jHttpClient(
final EventLogger eventLog,
final URI endpoint,
final TimeProvider timeProvider,
final Duration timeout,
final Optional<JwtConfig> jwtConfig) {
super(timeProvider);
super(eventLog, timeProvider);
final OkHttpClient okHttpClient =
OkHttpClientCreator.create(timeout, LOG, jwtConfig, timeProvider);
final Web3jService httpService = new HttpService(endpoint.toString(), okHttpClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
import org.web3j.protocol.ipc.WindowsIpcService;
import tech.pegasys.teku.ethereum.executionclient.auth.JwtConfig;
import tech.pegasys.teku.infrastructure.exceptions.InvalidConfigurationException;
import tech.pegasys.teku.infrastructure.logging.EventLogger;
import tech.pegasys.teku.infrastructure.time.TimeProvider;

class Web3jIpcClient extends Web3JClient {
private static final Logger LOG = LogManager.getLogger();

Web3jIpcClient(
final URI endpoint, final TimeProvider timeProvider, final Optional<JwtConfig> jwtConfig) {
super(timeProvider);
final EventLogger eventLog,
final URI endpoint,
final TimeProvider timeProvider,
final Optional<JwtConfig> jwtConfig) {
super(eventLog, timeProvider);
if (jwtConfig.isPresent()) {
LOG.warn("JWT configuration is ignored with IPC endpoint URI");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import tech.pegasys.teku.ethereum.executionclient.auth.JwtConfig;
import tech.pegasys.teku.ethereum.executionclient.schema.Response;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.logging.EventLogger;
import tech.pegasys.teku.infrastructure.time.TimeProvider;

class Web3jWebsocketClient extends Web3JClient {
Expand All @@ -33,8 +34,11 @@ class Web3jWebsocketClient extends Web3JClient {
private Optional<JwtAuthWebsocketHelper> jwtAuth = Optional.empty();

Web3jWebsocketClient(
final URI endpoint, final TimeProvider timeProvider, final Optional<JwtConfig> jwtConfig) {
super(timeProvider);
final EventLogger eventLog,
final URI endpoint,
final TimeProvider timeProvider,
final Optional<JwtConfig> jwtConfig) {
super(eventLog, timeProvider);
this.webSocketClient = new WebSocketClient(endpoint);
final WebSocketService webSocketService = new WebSocketService(webSocketClient, false);
initWeb3jService(webSocketService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

package tech.pegasys.teku.ethereum.executionclient.web3j;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.net.URI;
Expand All @@ -23,8 +25,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Named;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -36,56 +37,115 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.SafeFutureAssert;
import tech.pegasys.teku.infrastructure.async.Waiter;
import tech.pegasys.teku.infrastructure.logging.EventLogger;
import tech.pegasys.teku.infrastructure.time.StubTimeProvider;
import tech.pegasys.teku.infrastructure.time.TimeProvider;

public class Web3JClientTest {
private static final TimeProvider TIME_PROVIDER = StubTimeProvider.withTimeInSeconds(1000);
private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1);
private static final Web3jService WEB3J_SERVICE = mock(Web3jService.class);
private static final Web3JClient WEB3J_CLIENT = new Web3JClientImpl(TIME_PROVIDER);
private static final URI ENDPOINT = URI.create("");
private static final Web3jHttpClient WEB3J_HTTP_CLIENT =
new Web3jHttpClient(ENDPOINT, TIME_PROVIDER, DEFAULT_TIMEOUT, Optional.empty());
private static final WebSocketService WEB_SOCKET_SERVICE = mock(WebSocketService.class);
private static final Web3jWebsocketClient WEB3J_WEBSOCKET_CLIENT =
new Web3jWebsocketClient(ENDPOINT, TIME_PROVIDER, Optional.empty());
private static final Web3jIpcClient WEB3J_IPC_CLIENT =
new Web3jIpcClient(URI.create("file:/a"), TIME_PROVIDER, Optional.empty());

static class Web3JClientImpl extends Web3JClient {
protected Web3JClientImpl(TimeProvider timeProvider) {
super(timeProvider);
initWeb3jService(WEB3J_SERVICE);
}
}

@BeforeAll
static void setup() {
WEB3J_HTTP_CLIENT.initWeb3jService(WEB3J_SERVICE);
WEB3J_WEBSOCKET_CLIENT.initWeb3jService(WEB_SOCKET_SERVICE);
}
private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1);

@SuppressWarnings("unused")
static Stream<Arguments> getClientInstances() {
return Stream.of(WEB3J_CLIENT, WEB3J_HTTP_CLIENT, WEB3J_WEBSOCKET_CLIENT, WEB3J_IPC_CLIENT)
final TimeProvider timeProvider = StubTimeProvider.withTimeInSeconds(1000);
final Web3jService web3jService = mock(Web3jService.class);
final WebSocketService webSocketService = mock(WebSocketService.class);
return Stream.<Named<ClientFactory>>of(
Named.of(
"Web3JClient",
eventLog -> {
final Web3JClient web3jClient = new Web3JClient(eventLog, timeProvider) {};
web3jClient.initWeb3jService(web3jService);
return web3jClient;
}),
Named.of(
"Web3jHttpClient",
eventLog -> {
final Web3jHttpClient client =
new Web3jHttpClient(
eventLog, ENDPOINT, timeProvider, DEFAULT_TIMEOUT, Optional.empty()) {};
client.initWeb3jService(web3jService);
return client;
}),
Named.of(
"Web3jWebsocketClient",
eventLog -> {
final Web3jWebsocketClient client =
new Web3jWebsocketClient(eventLog, ENDPOINT, timeProvider, Optional.empty());
client.initWeb3jService(webSocketService);
return client;
}),
Named.of(
"Web3jIpcClient",
eventLog -> {
final Web3jIpcClient client =
new Web3jIpcClient(
eventLog, URI.create("file:/a"), timeProvider, Optional.empty());
client.initWeb3jService(web3jService);
return client;
}))
.map(Arguments::of);
}

@ParameterizedTest
@MethodSource("getClientInstances")
void shouldTimeoutIfResponseNotReceived(final Web3JClient client) throws Exception {
void shouldTimeoutIfResponseNotReceived(final ClientFactory clientFactory) throws Exception {
final Web3JClient client = clientFactory.create();
Request<Void, VoidResponse> request =
new Request<>("test", new ArrayList<>(), WEB3J_SERVICE, VoidResponse.class);
when(WEB3J_SERVICE.sendAsync(request, VoidResponse.class))
new Request<>("test", new ArrayList<>(), client.getWeb3jService(), VoidResponse.class);
when(client.getWeb3jService().sendAsync(request, VoidResponse.class))
.thenReturn(new CompletableFuture<>());

final Duration crazyShortTimeout = Duration.ofMillis(0);
final SafeFuture<Response<Void>> result = client.doRequest(request, crazyShortTimeout);
Waiter.waitFor(result);
SafeFutureAssert.assertThatSafeFuture(result).isCompleted();
final Response<Void> response = SafeFutureAssert.safeJoin(result);
Assertions.assertThat(response.getErrorMessage())
.isEqualTo(TimeoutException.class.getSimpleName());
assertThat(response.getErrorMessage()).isEqualTo(TimeoutException.class.getSimpleName());
}

@ParameterizedTest
@MethodSource("getClientInstances")
void shouldLogOnFirstSuccess(final ClientFactory clientFactory) throws Exception {
final EventLogger eventLog = mock(EventLogger.class);
final Web3JClient client = clientFactory.create(eventLog);
Request<Void, VoidResponse> request =
new Request<>("test", new ArrayList<>(), client.getWeb3jService(), VoidResponse.class);
when(client.getWeb3jService().sendAsync(request, VoidResponse.class))
.thenReturn(SafeFuture.completedFuture(new VoidResponse()));

final SafeFuture<Response<Void>> result = client.doRequest(request, DEFAULT_TIMEOUT);
Waiter.waitFor(result);

verify(eventLog).executionClientIsOnline();
}

@ParameterizedTest
@MethodSource("getClientInstances")
void shouldReportFailureAndRecovery(final ClientFactory clientFactory) throws Exception {
final EventLogger eventLog = mock(EventLogger.class);
final Web3JClient client = clientFactory.create(eventLog);
Request<Void, VoidResponse> request =
new Request<>("test", new ArrayList<>(), client.getWeb3jService(), VoidResponse.class);
final Throwable error = new TimeoutException();
when(client.getWeb3jService().sendAsync(request, VoidResponse.class))
.thenReturn(SafeFuture.failedFuture(error))
.thenReturn(SafeFuture.completedFuture(new VoidResponse()));

Waiter.waitFor(client.doRequest(request, DEFAULT_TIMEOUT));

verify(eventLog).executionClientIsOffline(error, false);

Waiter.waitFor(client.doRequest(request, DEFAULT_TIMEOUT));
verify(eventLog).executionClientIsOnline();
}

private interface ClientFactory {

default Web3JClient create() {
return create(mock(EventLogger.class));
}

Web3JClient create(EventLogger eventLog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static tech.pegasys.teku.infrastructure.logging.EventLogger.EVENT_LOG;

import java.net.ConnectException;
import java.net.URI;
Expand All @@ -48,7 +49,8 @@ public class Web3jWebsocketClientTest {

@BeforeEach
public void setup() {
this.web3jWebsocketClient = new Web3jWebsocketClient(endpoint, timeProvider, Optional.empty());
this.web3jWebsocketClient =
new Web3jWebsocketClient(EVENT_LOG, endpoint, timeProvider, Optional.empty());
web3jWebsocketClient.initWeb3jService(webSocketService);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void executionClientIsOffline(final Throwable error, final boolean couldB
}

public void executionClientIsOnline() {
info("Execution Client is back online", Color.GREEN);
info("Execution Client is online", Color.GREEN);
}

public void builderIsOffline(final String errorMessage) {
Expand Down

0 comments on commit 8478843

Please sign in to comment.