Skip to content

Commit

Permalink
Strongly type the SessionRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
shs96c committed Apr 13, 2021
1 parent 6657964 commit f052b7b
Show file tree
Hide file tree
Showing 26 changed files with 555 additions and 591 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public Stream<Capabilities> stream() {
}
}

public ImmutableSet<Dialect> getDownstreamDialects() {
public Set<Dialect> getDownstreamDialects() {
return dialects.isEmpty() ? ImmutableSet.of(DEFAULT_DIALECT) : dialects;
}

Expand Down
11 changes: 11 additions & 0 deletions java/client/src/org/openqa/selenium/remote/http/Contents.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.openqa.selenium.internal.Require;
import org.openqa.selenium.json.Json;
import org.openqa.selenium.json.JsonInput;
import org.openqa.selenium.json.JsonOutput;

import java.io.ByteArrayInputStream;
Expand All @@ -33,6 +34,7 @@
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.UncheckedIOException;
import java.lang.reflect.Type;
import java.nio.charset.Charset;
import java.util.function.Supplier;

Expand Down Expand Up @@ -123,6 +125,15 @@ public static Supplier<InputStream> asJson(Object obj) {
return utf8String(builder);
}

public static <T> T fromJson(HttpMessage<?> message, Type typeOfT) {
try (Reader reader = reader(message);
JsonInput input = JSON.newInput(reader)) {
return input.read(typeOfT);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static Supplier<InputStream> memoize(Supplier<InputStream> delegate) {
return new MemoizedSupplier(delegate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ java_library(
"//java/server/src/org/openqa/selenium/grid/node/remote",
"//java/server/src/org/openqa/selenium/grid/security",
"//java/server/src/org/openqa/selenium/grid/sessionmap",
"//java/server/src/org/openqa/selenium/grid/sessionqueue",
# Default implementation of the session map. Loaded reflectively
"//java/server/src/org/openqa/selenium/grid/sessionmap/remote",
"//java/server/src/org/openqa/selenium/status",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
import org.openqa.selenium.grid.security.RequiresSecretFilter;
import org.openqa.selenium.grid.security.Secret;
import org.openqa.selenium.grid.sessionmap.SessionMap;
import org.openqa.selenium.grid.sessionqueue.SessionRequest;
import org.openqa.selenium.internal.Either;
import org.openqa.selenium.internal.Require;
import org.openqa.selenium.json.Json;
import org.openqa.selenium.remote.NewSessionPayload;
import org.openqa.selenium.remote.SessionId;
import org.openqa.selenium.remote.http.HttpClient;
import org.openqa.selenium.remote.http.HttpRequest;
Expand All @@ -53,13 +53,10 @@
import org.openqa.selenium.remote.tracing.Tracer;
import org.openqa.selenium.status.HasReadyState;

import java.io.IOException;
import java.io.Reader;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
Expand All @@ -73,11 +70,9 @@
import static org.openqa.selenium.remote.RemoteTags.CAPABILITIES_EVENT;
import static org.openqa.selenium.remote.RemoteTags.SESSION_ID;
import static org.openqa.selenium.remote.RemoteTags.SESSION_ID_EVENT;
import static org.openqa.selenium.remote.http.Contents.reader;
import static org.openqa.selenium.remote.http.Route.delete;
import static org.openqa.selenium.remote.http.Route.get;
import static org.openqa.selenium.remote.http.Route.post;
import static org.openqa.selenium.remote.tracing.HttpTracing.newSpanAsChildOf;
import static org.openqa.selenium.remote.tracing.Tags.EXCEPTION;

/**
Expand Down Expand Up @@ -160,24 +155,21 @@ protected Distributor(
.with(new SpanDecorator(tracer, req -> "distributor.status")));
}

public Either<SessionNotCreatedException, CreateSessionResponse> newSession(HttpRequest request)
public Either<SessionNotCreatedException, CreateSessionResponse> newSession(SessionRequest request)
throws SessionNotCreatedException {
Require.nonNull("Requests to process", request);

Span span = newSpanAsChildOf(tracer, request, "distributor.create_session_response");
Span span = tracer.getCurrentContext().createSpan("distributor.create_session_response");
Map<String, EventAttributeValue> attributeMap = new HashMap<>();
try (
Reader reader = reader(request);
NewSessionPayload payload = NewSessionPayload.create(reader)) {
Objects.requireNonNull(payload, "Requests to process must be set.");

try {
attributeMap.put(AttributeKey.LOGGER_CLASS.getKey(),
EventAttribute.setValue(getClass().getName()));

Iterator<Capabilities> iterator = payload.stream().iterator();
attributeMap.put("request.payload", EventAttribute.setValue(payload.toString()));
Iterator<Capabilities> iterator = request.getDesiredCapabilities().iterator();
attributeMap.put("request.payload", EventAttribute.setValue(request.getDesiredCapabilities().toString()));
String sessionReceivedMessage = "Session request received by the distributor";
span.addEvent(sessionReceivedMessage, attributeMap);
LOG.info(String.format("%s: \n %s", sessionReceivedMessage, payload));
LOG.info(String.format("%s: \n %s", sessionReceivedMessage, request.getDesiredCapabilities()));

if (!iterator.hasNext()) {
SessionNotCreatedException exception =
Expand All @@ -192,7 +184,7 @@ public Either<SessionNotCreatedException, CreateSessionResponse> newSession(Http

Either<SessionNotCreatedException, CreateSessionResponse> selected;
CreateSessionRequest firstRequest = new CreateSessionRequest(
payload.getDownstreamDialects(),
request.getDownstreamDialects(),
iterator.next(),
ImmutableMap.of("span", span));

Expand All @@ -208,7 +200,7 @@ public Either<SessionNotCreatedException, CreateSessionResponse> newSession(Http
if (!hostsWithCaps) {
String errorMessage = String.format(
"No Node supports the required capabilities: %s",
payload.stream().map(Capabilities::toString)
request.getDesiredCapabilities().stream().map(Capabilities::toString)
.collect(Collectors.joining(", ")));
SessionNotCreatedException exception = new SessionNotCreatedException(errorMessage);
span.setAttribute(AttributeKey.ERROR.getKey(), true);
Expand All @@ -229,7 +221,7 @@ public Either<SessionNotCreatedException, CreateSessionResponse> newSession(Http
String errorMessage =
String.format(
"Unable to find provider for session: %s",
payload.stream().map(Capabilities::toString)
request.getDesiredCapabilities().stream().map(Capabilities::toString)
.collect(Collectors.joining(", ")));
SessionNotCreatedException exception = new RetrySessionRequestException(errorMessage);
selected = Either.left(exception);
Expand Down Expand Up @@ -270,14 +262,13 @@ public Either<SessionNotCreatedException, CreateSessionResponse> newSession(Http
span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap);

return Either.left(e);
} catch (IOException e) {
} catch (UncheckedIOException e) {
span.setAttribute(AttributeKey.ERROR.getKey(), true);
span.setStatus(Status.UNKNOWN);

EXCEPTION.accept(attributeMap, e);
attributeMap.put(AttributeKey.EXCEPTION_MESSAGE.getKey(),
EventAttribute.setValue("Unknown error in LocalDistributor while creating session: " +
e.getMessage()));
EventAttribute.setValue("Unknown error in LocalDistributor while creating session: " + e.getMessage()));
span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap);

return Either.left(new SessionNotCreatedException(e.getMessage(), e));
Expand All @@ -296,8 +287,9 @@ public Either<SessionNotCreatedException, CreateSessionResponse> newSession(Http

protected abstract Set<NodeStatus> getAvailableNodes();

protected abstract Either<SessionNotCreatedException, CreateSessionResponse> reserve(SlotId slot,
CreateSessionRequest request);
protected abstract Either<SessionNotCreatedException, CreateSessionResponse> reserve(
SlotId slot,
CreateSessionRequest request);

@Override
public boolean test(HttpRequest httpRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.openqa.selenium.grid.distributor.local;

import com.google.common.collect.ImmutableSet;

import org.openqa.selenium.Beta;
import org.openqa.selenium.Capabilities;
import org.openqa.selenium.ImmutableCapabilities;
Expand Down Expand Up @@ -59,11 +58,11 @@
import org.openqa.selenium.grid.sessionmap.SessionMap;
import org.openqa.selenium.grid.sessionmap.config.SessionMapOptions;
import org.openqa.selenium.grid.sessionqueue.NewSessionQueuer;
import org.openqa.selenium.grid.sessionqueue.SessionRequest;
import org.openqa.selenium.grid.sessionqueue.config.NewSessionQueuerOptions;
import org.openqa.selenium.internal.Either;
import org.openqa.selenium.internal.Require;
import org.openqa.selenium.remote.http.HttpClient;
import org.openqa.selenium.remote.http.HttpRequest;
import org.openqa.selenium.remote.tracing.AttributeKey;
import org.openqa.selenium.remote.tracing.EventAttribute;
import org.openqa.selenium.remote.tracing.EventAttributeValue;
Expand Down Expand Up @@ -94,7 +93,6 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static org.openqa.selenium.grid.data.Availability.DOWN;
import static org.openqa.selenium.grid.data.Availability.DRAINING;
import static org.openqa.selenium.remote.tracing.HttpTracing.newSpanAsChildOf;

public class LocalDistributor extends Distributor {

Expand Down Expand Up @@ -395,10 +393,10 @@ public void run() {
if (hasCapacity) {
RequestId reqId = requestIds.poll();
if (reqId != null) {
Optional<HttpRequest> optionalHttpRequest = sessionRequests.remove(reqId);
Optional<SessionRequest> maybeRequest = sessionRequests.remove(reqId);
// Check if polling the queue did not return null
if (optionalHttpRequest.isPresent()) {
handleNewSessionRequest(optionalHttpRequest.get(), reqId);
if (maybeRequest.isPresent()) {
handleNewSessionRequest(maybeRequest.get(), reqId);
} else {
fireSessionRejectedEvent(
"Unable to poll request from the new session request queue.",
Expand All @@ -412,8 +410,8 @@ public void run() {
}
}

private void handleNewSessionRequest(HttpRequest sessionRequest, RequestId reqId) {
try (Span span = newSpanAsChildOf(tracer, sessionRequest, "distributor.poll_queue")) {
private void handleNewSessionRequest(SessionRequest sessionRequest, RequestId reqId) {
try (Span span = tracer.getCurrentContext().createSpan("distributor.poll_queue")) {
Map<String, EventAttributeValue> attributeMap = new HashMap<>();
attributeMap.put(
AttributeKey.LOGGER_CLASS.getKey(),
Expand All @@ -439,7 +437,7 @@ private void handleNewSessionRequest(HttpRequest sessionRequest, RequestId reqId
SessionNotCreatedException exception = response.left();

if (exception instanceof RetrySessionRequestException) {
boolean retried = sessionRequests.retryAddToQueue(sessionRequest, reqId);
boolean retried = sessionRequests.retryAddToQueue(sessionRequest);

attributeMap.put("request.retry_add", EventAttribute.setValue(retried));
span.addEvent("Retry adding to front of queue. No slot available.", attributeMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.openqa.selenium.grid.data.RequestId;
import org.openqa.selenium.internal.Require;
import org.openqa.selenium.remote.http.Contents;
import org.openqa.selenium.remote.http.HttpHandler;
import org.openqa.selenium.remote.http.HttpRequest;
import org.openqa.selenium.remote.http.HttpResponse;
Expand All @@ -50,7 +51,9 @@ public HttpResponse execute(HttpRequest req) {
HTTP_REQUEST.accept(span, req);
span.setAttribute(AttributeKey.REQUEST_ID.getKey(), id.toString());

boolean value = newSessionQueuer.retryAddToQueue(req, id);
SessionRequest sessionRequest = Contents.fromJson(req, SessionRequest.class);

boolean value = newSessionQueuer.retryAddToQueue(sessionRequest);

span.setAttribute("request.retry", value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.openqa.selenium.remote.tracing.Tags.HTTP_RESPONSE;

import org.openqa.selenium.internal.Require;
import org.openqa.selenium.remote.http.Contents;
import org.openqa.selenium.remote.http.HttpHandler;
import org.openqa.selenium.remote.http.HttpRequest;
import org.openqa.selenium.remote.http.HttpResponse;
Expand All @@ -43,7 +44,7 @@ public HttpResponse execute(HttpRequest req) {
try (Span span = newSpanAsChildOf(tracer, req, "sessionqueuer.add")) {
HTTP_REQUEST.accept(span, req);

HttpResponse response = newSessionQueuer.addToQueue(req);
HttpResponse response = newSessionQueuer.addToQueue(Contents.fromJson(req, SessionRequest.class));

HTTP_RESPONSE.accept(span, response);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

package org.openqa.selenium.grid.sessionqueue;

import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static java.util.Collections.singletonMap;
import static org.openqa.selenium.remote.http.Contents.asJson;
import static org.openqa.selenium.remote.http.Contents.bytes;

import org.openqa.selenium.events.EventBus;
import org.openqa.selenium.grid.data.NewSessionErrorResponse;
import org.openqa.selenium.grid.data.NewSessionRejectedEvent;
Expand All @@ -30,13 +25,10 @@
import org.openqa.selenium.grid.data.NewSessionResponseEvent;
import org.openqa.selenium.grid.data.RequestId;
import org.openqa.selenium.internal.Require;
import org.openqa.selenium.remote.http.HttpRequest;
import org.openqa.selenium.remote.http.HttpResponse;
import org.openqa.selenium.remote.tracing.Tracer;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
Expand All @@ -45,6 +37,11 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static java.util.Collections.singletonMap;
import static org.openqa.selenium.remote.http.Contents.asJson;
import static org.openqa.selenium.remote.http.Contents.bytes;

public class GetNewSessionResponse {

private static final Logger LOG = Logger.getLogger(GetNewSessionResponse.class.getName());
Expand Down Expand Up @@ -107,16 +104,15 @@ private void setErrorResponse(NewSessionErrorResponse sessionResponse) {
}
}

public HttpResponse add(HttpRequest request) {
public HttpResponse add(SessionRequest request) {
Require.nonNull("New Session request", request);

CountDownLatch latch = new CountDownLatch(1);
UUID uuid = UUID.randomUUID();
RequestId requestId = new RequestId(uuid);
RequestId requestId = request.getRequestId();
NewSessionRequest requestIdentifier = new NewSessionRequest(requestId, latch);
knownRequests.put(requestId, requestIdentifier);

if (!sessionRequests.offerLast(request, requestId)) {
if (!sessionRequests.offerLast(request)) {
return internalErrorResponse(
"Session request could not be created. Error while adding to the session queue.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ protected NewSessionQueue(Tracer tracer, Duration retryInterval, Duration reques
this.requestTimeout = Require.nonNull("Session request timeout", requestTimeout);
}

public abstract boolean offerLast(HttpRequest request, RequestId requestId);
public abstract boolean offerLast(SessionRequest request);

public abstract boolean offerFirst(HttpRequest request, RequestId requestId);
public abstract boolean offerFirst(SessionRequest request);

public abstract Optional<HttpRequest> remove(RequestId requestId);
public abstract Optional<SessionRequest> remove(RequestId requestId);

public abstract int clear();

Expand All @@ -57,13 +57,11 @@ protected NewSessionQueue(Tracer tracer, Duration retryInterval, Duration reques
public void addRequestHeaders(HttpRequest request, RequestId reqId) {
long timestamp = Instant.now().getEpochSecond();
request.addHeader(SESSIONREQUEST_TIMESTAMP_HEADER, Long.toString(timestamp));

request.addHeader(SESSIONREQUEST_ID_HEADER, reqId.toString());
}

public boolean hasRequestTimedOut(HttpRequest request) {
String enqueTimestampStr = request.getHeader(SESSIONREQUEST_TIMESTAMP_HEADER);
Instant enque = Instant.ofEpochSecond(Long.parseLong(enqueTimestampStr));
public boolean hasRequestTimedOut(SessionRequest request) {
Instant enque = request.getEnqueued();
Instant deque = Instant.now();
Duration duration = Duration.between(enque, deque);

Expand Down
Loading

0 comments on commit f052b7b

Please sign in to comment.