Skip to content

Commit

Permalink
feat: multiple dbs support (#1102)
Browse files Browse the repository at this point in the history
This PR consists of cherry picks from the https://github.com/googleapis/java-datastore/tree/multi-db branch. They include:

#928 
#940 
#942 

This also enables parameterized testing for ITDatastoreTest.
  • Loading branch information
kolea2 authored Jun 8, 2023
1 parent c226997 commit 7887f32
Show file tree
Hide file tree
Showing 39 changed files with 908 additions and 273 deletions.
5 changes: 5 additions & 0 deletions .kokoro/nightly/integration.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ env_vars: {
value: "java-docs-samples-testing"
}

env_vars: {
key: "DATASTORE_PROJECT_ID"
value: "java-docs-samples-testing"
}

env_vars: {
key: "ENABLE_FLAKYBOT"
value: "true"
Expand Down
5 changes: 5 additions & 0 deletions .kokoro/nightly/java11-integration.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ env_vars: {
value: "gcloud-devel"
}

env_vars: {
key: "DATASTORE_PROJECT_ID"
value: "gcloud-devel"
}

env_vars: {
key: "ENABLE_FLAKYBOT"
value: "true"
Expand Down
5 changes: 5 additions & 0 deletions .kokoro/presubmit/graalvm-native-17.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ env_vars: {
env_vars: {
key: "SECRET_MANAGER_KEYS"
value: "java-it-service-account"
}

env_vars: {
key: "DATASTORE_PROJECT_ID"
value: "gcloud-devel"
}
5 changes: 5 additions & 0 deletions .kokoro/presubmit/graalvm-native.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,8 @@ env_vars: {
key: "SECRET_MANAGER_KEYS"
value: "java-it-service-account"
}

env_vars: {
key: "DATASTORE_PROJECT_ID"
value: "gcloud-devel"
}
5 changes: 5 additions & 0 deletions .kokoro/presubmit/integration.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ env_vars: {
value: "gcloud-devel"
}

env_vars: {
key: "DATASTORE_PROJECT_ID"
value: "gcloud-devel"
}

env_vars: {
key: "GOOGLE_APPLICATION_CREDENTIALS"
value: "secret_manager/java-it-service-account"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ private DatastoreException invalidResponseException(String method, IOException e
}

public AllocateIdsResponse allocateIds(AllocateIdsRequest request) throws DatastoreException {
try (InputStream is = remoteRpc.call("allocateIds", request)) {
try (InputStream is =
remoteRpc.call("allocateIds", request, request.getProjectId(), request.getDatabaseId())) {
return AllocateIdsResponse.parseFrom(is);
} catch (IOException exception) {
throw invalidResponseException("allocateIds", exception);
Expand All @@ -76,47 +77,54 @@ public AllocateIdsResponse allocateIds(AllocateIdsRequest request) throws Datast

public BeginTransactionResponse beginTransaction(BeginTransactionRequest request)
throws DatastoreException {
try (InputStream is = remoteRpc.call("beginTransaction", request)) {
try (InputStream is =
remoteRpc.call(
"beginTransaction", request, request.getProjectId(), request.getDatabaseId())) {
return BeginTransactionResponse.parseFrom(is);
} catch (IOException exception) {
throw invalidResponseException("beginTransaction", exception);
}
}

public CommitResponse commit(CommitRequest request) throws DatastoreException {
try (InputStream is = remoteRpc.call("commit", request)) {
try (InputStream is =
remoteRpc.call("commit", request, request.getProjectId(), request.getDatabaseId())) {
return CommitResponse.parseFrom(is);
} catch (IOException exception) {
throw invalidResponseException("commit", exception);
}
}

public LookupResponse lookup(LookupRequest request) throws DatastoreException {
try (InputStream is = remoteRpc.call("lookup", request)) {
try (InputStream is =
remoteRpc.call("lookup", request, request.getProjectId(), request.getDatabaseId())) {
return LookupResponse.parseFrom(is);
} catch (IOException exception) {
throw invalidResponseException("lookup", exception);
}
}

public ReserveIdsResponse reserveIds(ReserveIdsRequest request) throws DatastoreException {
try (InputStream is = remoteRpc.call("reserveIds", request)) {
try (InputStream is =
remoteRpc.call("reserveIds", request, request.getProjectId(), request.getDatabaseId())) {
return ReserveIdsResponse.parseFrom(is);
} catch (IOException exception) {
throw invalidResponseException("reserveIds", exception);
}
}

public RollbackResponse rollback(RollbackRequest request) throws DatastoreException {
try (InputStream is = remoteRpc.call("rollback", request)) {
try (InputStream is =
remoteRpc.call("rollback", request, request.getProjectId(), request.getDatabaseId())) {
return RollbackResponse.parseFrom(is);
} catch (IOException exception) {
throw invalidResponseException("rollback", exception);
}
}

public RunQueryResponse runQuery(RunQueryRequest request) throws DatastoreException {
try (InputStream is = remoteRpc.call("runQuery", request)) {
try (InputStream is =
remoteRpc.call("runQuery", request, request.getProjectId(), request.getDatabaseId())) {
return RunQueryResponse.parseFrom(is);
} catch (IOException exception) {
throw invalidResponseException("runQuery", exception);
Expand All @@ -125,7 +133,9 @@ public RunQueryResponse runQuery(RunQueryRequest request) throws DatastoreExcept

public RunAggregationQueryResponse runAggregationQuery(RunAggregationQueryRequest request)
throws DatastoreException {
try (InputStream is = remoteRpc.call("runAggregationQuery", request)) {
try (InputStream is =
remoteRpc.call(
"runAggregationQuery", request, request.getProjectId(), request.getDatabaseId())) {
return RunAggregationQueryResponse.parseFrom(is);
} catch (IOException exception) {
throw invalidResponseException("runAggregationQuery", exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.core.BetaApi;
import java.util.Arrays;
import java.util.List;

Expand All @@ -40,6 +41,7 @@
*/
public class DatastoreOptions {
private final String projectId;
private final String databaseId;
private final String projectEndpoint;
private final String host;
private final String localHost;
Expand All @@ -56,6 +58,7 @@ public class DatastoreOptions {
b.projectId != null || b.projectEndpoint != null,
"Either project ID or project endpoint must be provided.");
this.projectId = b.projectId;
this.databaseId = b.databaseId;
this.projectEndpoint = b.projectEndpoint;
this.host = b.host;
this.localHost = b.localHost;
Expand All @@ -72,6 +75,7 @@ public static class Builder {
"Can set at most one of project endpoint, host, and local host.";

private String projectId;
private String databaseId;
private String projectEndpoint;
private String host;
private String localHost;
Expand All @@ -83,6 +87,7 @@ public Builder() {}

public Builder(DatastoreOptions options) {
this.projectId = options.projectId;
this.databaseId = options.databaseId;
this.projectEndpoint = options.projectEndpoint;
this.host = options.host;
this.localHost = options.localHost;
Expand All @@ -102,6 +107,13 @@ public Builder projectId(String projectId) {
return this;
}

/** Sets the database ID used to access Cloud Datastore. */
@BetaApi
public Builder databaseId(String databaseId) {
this.databaseId = databaseId;
return this;
}

/**
* Sets the host used to access Cloud Datastore. To connect to the Cloud Datastore Emulator, use
* {@link #localHost} instead.
Expand Down Expand Up @@ -176,6 +188,10 @@ public String getProjectId() {
return projectId;
}

public String getDatabaseId() {
return databaseId;
}

public String getProjectEndpoint() {
return projectEndpoint;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ private List<Key> getScatterKeys(
do {
RunQueryRequest.Builder scatterRequest =
RunQueryRequest.newBuilder().setPartitionId(partition).setQuery(scatterPointQuery);
scatterRequest.setProjectId(partition.getProjectId());
scatterRequest.setDatabaseId(partition.getDatabaseId());
if (readTime != null) {
scatterRequest.setReadOptions(ReadOptions.newBuilder().setReadTime(readTime).build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.api.client.http.protobuf.ProtoHttpContent;
import com.google.api.client.util.IOUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.protobuf.MessageLite;
import com.google.rpc.Code;
import com.google.rpc.Status;
Expand All @@ -46,6 +47,8 @@ class RemoteRpc {
@VisibleForTesting static final String API_FORMAT_VERSION_HEADER = "X-Goog-Api-Format-Version";
private static final String API_FORMAT_VERSION = "2";

@VisibleForTesting static final String X_GOOG_REQUEST_PARAMS_HEADER = "x-goog-request-params";

private final HttpRequestFactory client;
private final HttpRequestInitializer initializer;
private final String url;
Expand Down Expand Up @@ -74,7 +77,9 @@ class RemoteRpc {
*
* @throws DatastoreException if the RPC fails.
*/
public InputStream call(String methodName, MessageLite request) throws DatastoreException {
public InputStream call(
String methodName, MessageLite request, String projectId, String databaseId)
throws DatastoreException {
logger.fine("remote datastore call " + methodName);

long startTime = System.currentTimeMillis();
Expand All @@ -84,7 +89,7 @@ public InputStream call(String methodName, MessageLite request) throws Datastore
rpcCount.incrementAndGet();
ProtoHttpContent payload = new ProtoHttpContent(request);
HttpRequest httpRequest = client.buildPostRequest(resolveURL(methodName), payload);
setHeaders(request, httpRequest);
setHeaders(request, httpRequest, projectId, databaseId);
// Don't throw an HTTPResponseException on error. It converts the response to a String and
// throws away the original, whereas we need the raw bytes to parse it as a proto.
httpRequest.setThrowExceptionOnExecuteError(false);
Expand Down Expand Up @@ -123,8 +128,16 @@ public InputStream call(String methodName, MessageLite request) throws Datastore
}

@VisibleForTesting
void setHeaders(MessageLite request, HttpRequest httpRequest) {
void setHeaders(
MessageLite request, HttpRequest httpRequest, String projectId, String databaseId) {
httpRequest.getHeaders().put(API_FORMAT_VERSION_HEADER, API_FORMAT_VERSION);
StringBuilder builder = new StringBuilder("project_id=");
builder.append(projectId);
if (!Strings.isNullOrEmpty(databaseId)) {
builder.append("&database_id=");
builder.append(databaseId);
}
httpRequest.getHeaders().put(X_GOOG_REQUEST_PARAMS_HEADER, builder.toString());
if (enableE2EChecksum && request != null) {
String checksum = EndToEndChecksumHandler.computeChecksum(request.toByteArray());
if (checksum != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,18 @@ public void create_LocalHost() {
.isEqualTo("http://localhost:8080/v1/projects/project-id");
}

@Test
public void setDatabaseId() {
DatastoreOptions options =
new DatastoreOptions.Builder()
.projectId(PROJECT_ID)
.databaseId("test-db")
.localHost("localhost:8080")
.build();
assertThat(options.getProjectId()).isEqualTo(PROJECT_ID);
assertThat(options.getDatabaseId()).isEqualTo("test-db");
}

@Test
public void create_LocalHostIp() {
Datastore datastore =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,60 @@ public void getSplits() throws Exception {
RunQueryRequest expectedSplitQueryRequest =
RunQueryRequest.newBuilder()
.setPartitionId(PARTITION)
.setProjectId(PROJECT_ID)
.setQuery(
splitQuery.toBuilder().setLimit(Int32Value.newBuilder().setValue(2 * 32).build()))
.build();

assertArrayEquals(expectedSplitQueryRequest.toByteArray(), mockClient.getLastBody());
}

@Test
public void getSplitsWithDatabaseId() throws Exception {
Datastore datastore = factory.create(options.build());
MockDatastoreFactory mockClient = (MockDatastoreFactory) factory;

PartitionId partition =
PartitionId.newBuilder().setProjectId(PROJECT_ID).setDatabaseId("test-database").build();

RunQueryResponse splitQueryResponse =
RunQueryResponse.newBuilder()
.setQuery(splitQuery)
.setBatch(
QueryResultBatch.newBuilder()
.setEntityResultType(ResultType.KEY_ONLY)
.setMoreResults(MoreResultsType.NO_MORE_RESULTS)
.addEntityResults(makeKeyOnlyEntity(splitKey0))
.addEntityResults(makeKeyOnlyEntity(splitKey1))
.addEntityResults(makeKeyOnlyEntity(splitKey2))
.addEntityResults(makeKeyOnlyEntity(splitKey3))
.build())
.build();

mockClient.setNextResponse(splitQueryResponse);

List<Query> splitQueries = QuerySplitterImpl.INSTANCE.getSplits(query, partition, 3, datastore);

assertThat(splitQueries)
.containsExactly(
query
.toBuilder()
.setFilter(makeFilterWithKeyRange(propertyFilter, null, splitKey1))
.build(),
query
.toBuilder()
.setFilter(makeFilterWithKeyRange(propertyFilter, splitKey1, splitKey3))
.build(),
query
.toBuilder()
.setFilter(makeFilterWithKeyRange(propertyFilter, splitKey3, null))
.build());

RunQueryRequest expectedSplitQueryRequest =
RunQueryRequest.newBuilder()
.setPartitionId(partition)
.setProjectId(PROJECT_ID)
.setDatabaseId("test-database")
.setQuery(
splitQuery.toBuilder().setLimit(Int32Value.newBuilder().setValue(2 * 32).build()))
.build();
Expand Down Expand Up @@ -235,6 +289,7 @@ public void notEnoughSplits() throws Exception {
RunQueryRequest expectedSplitQueryRequest =
RunQueryRequest.newBuilder()
.setPartitionId(PARTITION)
.setProjectId(PROJECT_ID)
.setQuery(
splitQuery.toBuilder().setLimit(Int32Value.newBuilder().setValue(99 * 32).build()))
.build();
Expand Down Expand Up @@ -286,6 +341,7 @@ public void getSplits_withReadTime() throws Exception {
RunQueryRequest expectedSplitQueryRequest =
RunQueryRequest.newBuilder()
.setPartitionId(PARTITION)
.setProjectId(PROJECT_ID)
.setQuery(
splitQuery.toBuilder().setLimit(Int32Value.newBuilder().setValue(2 * 32).build()))
.setReadOptions(ReadOptions.newBuilder().setReadTime(readTime))
Expand Down
Loading

0 comments on commit 7887f32

Please sign in to comment.