Skip to content

Commit

Permalink
Create collection index creation ordering (#833)
Browse files Browse the repository at this point in the history
  • Loading branch information
maheshrajamani authored Jan 25, 2024
1 parent 4200171 commit e7d0827
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 22 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/continuous-integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ jobs:
needs: [ build ]
runs-on: ubuntu-latest

# max run time 20 minutes
timeout-minutes: 20
# max run time 40 minutes
timeout-minutes: 40

strategy:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,24 +121,69 @@ public Uni<AsyncResultSet> executeWrite(SimpleStatement statement) {
}

/**
* Execute schema change query with bound statement.
* Execute schema change query with bound statement for create ddl statements.
*
* @param boundStatement - Bound statement with query and parameters. The table name used in the
* query must have keyspace prefixed.
* @return AsyncResultSet
*/
public Uni<AsyncResultSet> executeSchemaChange(SimpleStatement boundStatement) {
public Uni<AsyncResultSet> executeCreateSchemaChange(SimpleStatement boundStatement) {
return executeSchemaChange(boundStatement, "create");
}

/**
* Execute schema change query with bound statement for drop ddl statements.
*
* @param boundStatement - Bound statement with query and parameters. The table name used in the
* query must have keyspace prefixed.
* @return AsyncResultSet
*/
public Uni<AsyncResultSet> executeDropSchemaChange(SimpleStatement boundStatement) {
return executeSchemaChange(boundStatement, "drop");
}

/**
* Execute schema change query with bound statement for drop ddl statements.
*
* @param boundStatement - Bound statement with query and parameters. The table name used in the
* query must have keyspace prefixed.
* @return AsyncResultSet
*/
public Uni<AsyncResultSet> executeTruncateSchemaChange(SimpleStatement boundStatement) {
return executeSchemaChange(boundStatement, "truncate");
}

private Uni<AsyncResultSet> executeSchemaChange(SimpleStatement boundStatement, String profile) {
return Uni.createFrom()
.completionStage(
cqlSessionCache
.getSession()
.executeAsync(
boundStatement
.setExecutionProfileName("ddl")
.setExecutionProfileName(profile)
.setIdempotent(true)
.setSerialConsistencyLevel(
operationsConfig.queriesConfig().consistency().schemaChanges())))
.onFailure(DriverTimeoutException.class)
.recoverWithUni(
throwable -> {
logger.error("Timeout executing schema change query : {}", boundStatement.getQuery());
SimpleStatement duplicate = SimpleStatement.newInstance(boundStatement.getQuery());
return Uni.createFrom()
.completionStage(
cqlSessionCache
.getSession()
.executeAsync(
duplicate
.setExecutionProfileName(profile)
.setIdempotent(true)
.setSerialConsistencyLevel(
operationsConfig
.queriesConfig()
.consistency()
.schemaChanges())));
})
.onFailure(DriverTimeoutException.class)
.retry()
.atMost(2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
Expand Down Expand Up @@ -127,7 +128,7 @@ public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {

private Uni<Supplier<CommandResult>> executeCollectionCreation(QueryExecutor queryExecutor) {
final Uni<AsyncResultSet> execute =
queryExecutor.executeSchemaChange(getCreateTable(commandContext.namespace(), name));
queryExecutor.executeCreateSchemaChange(getCreateTable(commandContext.namespace(), name));
final Uni<Boolean> indexResult =
execute
.onItem()
Expand All @@ -136,20 +137,21 @@ private Uni<Supplier<CommandResult>> executeCollectionCreation(QueryExecutor que
if (res.wasApplied()) {
final List<SimpleStatement> indexStatements =
getIndexStatements(commandContext.namespace(), name);
List<Uni<AsyncResultSet>> indexes = new ArrayList<>(10);
indexStatements.stream()
.forEach(index -> indexes.add(queryExecutor.executeSchemaChange(index)));
return Uni.combine()
.all()
.unis(indexes)
.combinedWith(
return Multi.createFrom()
.items(indexStatements.stream())
.onItem()
.transformToUni(queryExecutor::executeCreateSchemaChange)
.concatenate()
.collect()
.asList()
.onItem()
.transform(
results -> {
final Optional<?> first =
final Optional<AsyncResultSet> first =
results.stream()
.filter(
indexRes -> !(((AsyncResultSet) indexRes).wasApplied()))
.filter(indexRes -> !(indexRes.wasApplied()))
.findFirst();
return first.isPresent() ? false : true;
return !first.isPresent();
});
} else {
return Uni.createFrom().item(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
SimpleStatement.newInstance(String.format(CREATE_KEYSPACE_CQL, name, replicationMap));
// execute
return queryExecutor
.executeSchemaChange(createKeyspace)
.executeCreateSchemaChange(createKeyspace)

// if we have a result always respond positively
.map(any -> new SchemaChangeResult(any.wasApplied()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
SimpleStatement query = SimpleStatement.newInstance(cql);
// execute
return queryExecutor
.executeSchemaChange(query)
.executeDropSchemaChange(query)

// if we have a result always respond positively
.map(any -> new SchemaChangeResult(any.wasApplied()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
SimpleStatement.newInstance(DROP_KEYSPACE_CQL.formatted(name));
// execute
return queryExecutor
.executeSchemaChange(deleteStatement)
.executeDropSchemaChange(deleteStatement)

// if we have a result always respond positively
.map(any -> new SchemaChangeResult(any.wasApplied()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
SimpleStatement query = SimpleStatement.newInstance(cql);
// execute
return queryExecutor
.executeSchemaChange(query)
.executeTruncateSchemaChange(query)

// if we have a result always respond positively
.map(any -> new DeleteOperationPage(null, false, false));
Expand Down
10 changes: 9 additions & 1 deletion src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,18 @@ datastax-java-driver {
basic.request.timeout = 30 seconds
}

ddl {
create {
basic.request.timeout = 10 seconds
}

drop {
basic.request.timeout = 15 seconds
}

truncate {
basic.request.timeout = 15 seconds
}

count {
basic.request.timeout = 60 seconds
}
Expand Down

0 comments on commit e7d0827

Please sign in to comment.