Skip to content

Commit

Permalink
Thinclient Store Model (#43707)
Browse files Browse the repository at this point in the history
* start

* push progress

* Update ThinClientStoreModel.java

* Skeleton for ThinClientStoreModel and RNTBD serialization

* Adding serialization skeleton in ThinClientStoreModel

* test

* fix

* todo

* pr comments

---------

Co-authored-by: Fabian Meiswinkel <fabianm@microsoft.com>
Co-authored-by: Abhijeet Mohanty <abhmohanty@microsoft.com>
  • Loading branch information
3 people authored Jan 20, 2025
1 parent 0afce24 commit f37a2a6
Show file tree
Hide file tree
Showing 9 changed files with 370 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public void validateApiType() throws Exception {
ResourceType.Document);

try {
storeModel.performRequest(dsr, HttpMethod.POST).block();
storeModel.performRequest(dsr).block();
fail("Request should fail");
} catch (Exception e) {
//no-op
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.azure.cosmos.implementation;

import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.implementation.circuitBreaker.GlobalPartitionEndpointManagerForCircuitBreaker;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import com.azure.cosmos.implementation.http.Http2ConnectionConfig;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.http.HttpClientConfig;
import com.azure.cosmos.implementation.http.HttpHeaders;
import com.azure.cosmos.implementation.http.HttpRequest;
import com.azure.cosmos.implementation.http.ReactorNettyClient;
import io.netty.channel.ConnectTimeoutException;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;

import java.net.URI;

import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;

public class ThinClientStoreModelTest {
@Test(groups = "unit")
public void testThinClientStoreModel() throws Exception {
DiagnosticsClientContext clientContext = Mockito.mock(DiagnosticsClientContext.class);
Mockito.doReturn(new DiagnosticsClientContext.DiagnosticsClientConfig()).when(clientContext).getConfig();
Mockito
.doReturn(ImplementationBridgeHelpers
.CosmosDiagnosticsHelper
.getCosmosDiagnosticsAccessor()
.create(clientContext, 1d))
.when(clientContext).createDiagnostics();

String sdkGlobalSessionToken = "1#100#1=20#2=5#3=30";
ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
Mockito.doReturn(sdkGlobalSessionToken).when(sessionContainer).resolveGlobalSessionToken(any());

GlobalEndpointManager globalEndpointManager = Mockito.mock(GlobalEndpointManager.class);

Mockito.doReturn(new URI("https://localhost"))
.when(globalEndpointManager).resolveServiceEndpoint(any());

// mocking with HTTP/1.1 client, just using this test as basic store model validation. e2e request flow
// with HTTP/2 will be tested in future PR once the wiring is all connected
HttpClient httpClient = Mockito.mock(HttpClient.class);
Mockito.when(httpClient.send(any(), any())).thenReturn(Mono.error(new ConnectTimeoutException()));

ThinClientStoreModel storeModel = new ThinClientStoreModel(
clientContext,
sessionContainer,
ConsistencyLevel.SESSION,
new UserAgentContainer(),
globalEndpointManager,
httpClient);

RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName(
clientContext,
OperationType.Read,
"/fakeResourceFullName",
ResourceType.Document);

try {
storeModel.performRequest(dsr).block();
} catch (Exception e) {
//no-op
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ public static class HttpHeaders {

// Priority Level for throttling
public static final String PRIORITY_LEVEL = "x-ms-cosmos-priority-level";

// Thinclient headers
public static final String THINCLIENT_PROXY_OPERATION_TYPE = "x-ms-thinclient-proxy-operation-type";
public static final String THINCLIENT_PROXY_RESOURCE_TYPE = "x-ms-thinclient-proxy-resource-type";
}

public static class A_IMHeaderValues {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.cosmos.implementation.directconnectivity.WFConstants;
import com.azure.cosmos.implementation.faultinjection.FaultInjectionRequestContext;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.http.HttpTransportSerializer;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import com.azure.cosmos.implementation.routing.Range;
Expand All @@ -29,6 +30,9 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

/**
* This is core Transport/Connection agnostic request to the Azure Cosmos DB database service.
Expand Down Expand Up @@ -89,6 +93,8 @@ public class RxDocumentServiceRequest implements Cloneable {

private volatile boolean hasFeedRangeFilteringBeenApplied = false;

private final AtomicReference<HttpTransportSerializer> httpTransportSerializer = new AtomicReference<>(null);

public boolean isReadOnlyRequest() {
return this.operationType.isReadOnlyOperation();
}
Expand Down Expand Up @@ -1233,4 +1239,28 @@ public String getEffectivePartitionKey() {
public void setEffectivePartitionKey(String effectivePartitionKey) {
this.effectivePartitionKey = effectivePartitionKey;
}

public void setThinclientHeaders(String operationType, String resourceType) {
this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_OPERATION_TYPE, operationType);
this.headers.put(HttpConstants.HttpHeaders.THINCLIENT_PROXY_RESOURCE_TYPE, resourceType);
}

public RxDocumentServiceRequest setHttpTransportSerializer(HttpTransportSerializer transportSerializer) {
this.httpTransportSerializer.set(transportSerializer);

return this;
}

public HttpTransportSerializer getEffectiveHttpTransportSerializer(
HttpTransportSerializer defaultTransportSerializer) {

checkNotNull(defaultTransportSerializer, "Argument 'defaultTransportSerializer' must not be null.");

HttpTransportSerializer snapshot = this.httpTransportSerializer.get();
if (snapshot != null) {
return snapshot;
}

return defaultTransportSerializer;
}
}
Loading

0 comments on commit f37a2a6

Please sign in to comment.