Skip to content

Commit

Permalink
add DELETE API for /config/{subject}
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoyali97 committed Feb 23, 2021
1 parent c91079b commit b2fecf6
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ public class RestService implements Configurable {
private static final TypeReference<String> DELETE_MODE_RESPONSE_TYPE =
new TypeReference<String>() {
};
private static final TypeReference<Config> DELETE_SUBJECT_CONFIG_RESPONSE_TYPE =
new TypeReference<Config>() {
};
private static final TypeReference<ServerClusterId> GET_CLUSTER_ID_RESPONSE_TYPE =
new TypeReference<ServerClusterId>() {
};
Expand Down Expand Up @@ -600,21 +603,44 @@ public ConfigUpdateRequest updateConfig(Map<String, String> requestProperties,

public Config getConfig(String subject)
throws IOException, RestClientException {
return getConfig(DEFAULT_REQUEST_PROPERTIES, subject);
return getConfig(DEFAULT_REQUEST_PROPERTIES, subject, false);
}

public Config getConfig(Map<String, String> requestProperties,
String subject)
throws IOException, RestClientException {
return getConfig(requestProperties, subject, false);
}

public Config getConfig(Map<String, String> requestProperties,
String subject,
boolean defaultToGlobal)
throws IOException, RestClientException {
String path = subject != null
? UriBuilder.fromPath("/config/{subject}").build(subject).toString()
: "/config";
? UriBuilder.fromPath("/config/{subject}")
.queryParam("defaultToGlobal", defaultToGlobal).build(subject).toString()
: "/config";

Config config =
httpRequest(path, "GET", null, requestProperties, GET_CONFIG_RESPONSE_TYPE);
return config;
}

public Config deleteSubjectConfig(String subject)
throws IOException, RestClientException {
return deleteSubjectConfig(DEFAULT_REQUEST_PROPERTIES, subject);
}

public Config deleteSubjectConfig(Map<String, String> requestProperties, String subject)
throws IOException, RestClientException {
UriBuilder builder = UriBuilder.fromPath("/config/{subject}");
String path = builder.build(subject).toString();

Config response = httpRequest(path, "DELETE", null, requestProperties,
DELETE_SUBJECT_CONFIG_RESPONSE_TYPE);
return response;
}

public ModeUpdateRequest setMode(String mode)
throws IOException, RestClientException {
return setMode(mode, null);
Expand Down
67 changes: 67 additions & 0 deletions core/generated/swagger-ui/schema-registry-api-spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,47 @@ paths:
500:
description: "Error code 50001 -- Error in the backend data store\nError\
\ code 50003 -- Error while forwarding the request to the primary"
delete:
summary: "Deletes the specified subject-level compatibility level config and\
\ revert to the global default."
description: ""
operationId: "deleteSubjectConfig"
consumes:
- "application/vnd.schemaregistry.v1+json"
- "application/vnd.schemaregistry+json"
- "application/json"
- "application/octet-stream"
produces:
- "application/vnd.schemaregistry.v1+json"
- "application/vnd.schemaregistry+json; qs=0.9"
- "application/json; qs=0.5"
parameters:
- name: "subject"
in: "path"
description: "the name of the subject"
required: true
type: "string"
responses:
200:
description: "successful operation"
schema:
type: "string"
enum:
- "NONE"
- "BACKWARD"
- "BACKWARD_TRANSITIVE"
- "FORWARD"
- "FORWARD_TRANSITIVE"
- "FULL"
- "FULL_TRANSITIVE"
404:
description: "Error code 40401 -- Subject not found"
500:
description: "Error code 50001 -- Error in the backend datastore"
/mode:
get:
summary: "Get global mode."
description: ""
operationId: "getTopLevelMode"
consumes:
- "application/vnd.schemaregistry.v1+json"
Expand All @@ -250,7 +289,11 @@ paths:
description: "successful operation"
schema:
$ref: "#/definitions/ModeGetResponse"
500:
description: "Error code 50001 -- Error in the backend data store"
put:
summary: "Update global mode."
description: ""
operationId: "updateTopLevelMode"
consumes:
- "application/vnd.schemaregistry.v1+json"
Expand All @@ -273,8 +316,17 @@ paths:
description: "successful operation"
schema:
$ref: "#/definitions/ModeUpdateRequest"
422:
description: "Error code 42204 -- Invalid mode\nError code 42205 -- Operation\
\ not permitted"
500:
description: "Error code 50001 -- Error in the backend data store\nError\
\ code 50003 -- Error while forwarding the request to the primary\nError\
\ code 50004 -- Unknown leader"
/mode/{subject}:
get:
summary: "Get mode for a subject."
description: ""
operationId: "getMode"
consumes:
- "application/vnd.schemaregistry.v1+json"
Expand All @@ -288,14 +340,21 @@ paths:
parameters:
- name: "subject"
in: "path"
description: "Name of the Subject"
required: true
type: "string"
responses:
200:
description: "successful operation"
schema:
$ref: "#/definitions/ModeGetResponse"
404:
description: "Subject not found"
500:
description: "Error code 50001 -- Error in the backend data store"
put:
summary: "Update mode for the specified subject."
description: ""
operationId: "updateMode"
consumes:
- "application/vnd.schemaregistry.v1+json"
Expand All @@ -309,6 +368,7 @@ paths:
parameters:
- name: "subject"
in: "path"
description: "Name of the Subject"
required: true
type: "string"
- in: "body"
Expand All @@ -322,6 +382,13 @@ paths:
description: "successful operation"
schema:
$ref: "#/definitions/ModeUpdateRequest"
422:
description: "Error code 42204 -- Invalid mode\nError code 42205 -- Operation\
\ not permitted"
500:
description: "Error code 50001 -- Error in the backend data store\nError\
\ code 50003 -- Error while forwarding the request to the primary\nError\
\ code 50004 -- Unknown leader"
/schemas:
get:
summary: "Get the schemas."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@

import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;

Expand Down Expand Up @@ -178,4 +181,42 @@ public Config getTopLevelConfig() {
}
return config;
}

@DELETE
@Path("/{subject}")
@ApiOperation(value = "Deletes the specified subject-level compatibility level config and "
+ "revert to the global default.", response = CompatibilityLevel.class)
@ApiResponses(value = {
@ApiResponse(code = 404, message = "Error code 40401 -- Subject not found"),
@ApiResponse(code = 500, message = "Error code 50001 -- Error in the backend datastore")
})
public void deleteSubjectConfig(
final @Suspended AsyncResponse asyncResponse,
@Context HttpHeaders headers,
@ApiParam(value = "the name of the subject", required = true)
@PathParam("subject") String subject) {
log.info("Deleting compatibility setting for subject {}", subject);
Config deletedConfig;
try {
CompatibilityLevel currentCompatibility = schemaRegistry.getCompatibilityLevel(subject);
if (currentCompatibility == null) {
throw Errors.subjectNotFoundException(subject);
}

Map<String, String> headerProperties = requestHeaderBuilder.buildRequestHeaders(
headers, schemaRegistry.config().whitelistHeaders());
schemaRegistry.deleteSubjectCompatibilityConfigOrForward(subject, headerProperties);
deletedConfig = new Config(currentCompatibility.name);
} catch (OperationNotPermittedException e) {
throw Errors.operationNotPermittedException(e.getMessage());
} catch (SchemaRegistryStoreException e) {
throw Errors.storeException("Failed to update compatibility level", e);
} catch (UnknownLeaderException e) {
throw Errors.unknownLeaderException("Failed to update compatibility level", e);
} catch (SchemaRegistryRequestForwardingException e) {
throw Errors.requestForwardingFailedException("Error while forwarding update config request"
+ " to the leader", e);
}
asyncResponse.resume(deletedConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.Config;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
import io.confluent.kafka.schemaregistry.client.rest.entities.SubjectVersion;
Expand Down Expand Up @@ -879,6 +880,26 @@ private List<Integer> forwardDeleteSubjectRequestToLeader(
}
}

private Config forwardDeleteSubjectCompatibilityConfigToLeader(
Map<String, String> requestProperties,
String subject
) throws SchemaRegistryRequestForwardingException {
UrlList baseUrl = leaderRestService.getBaseUrls();

log.debug(String.format("Forwarding delete subject compatibility config request %s to %s",
subject, baseUrl));
try {
return leaderRestService.deleteSubjectConfig(requestProperties, subject);
} catch (IOException e) {
throw new SchemaRegistryRequestForwardingException(
String.format(
"Unexpected error while forwarding delete subject compatibility config"
+ "request %s to %s", subject, baseUrl), e);
} catch (RestClientException e) {
throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), e);
}
}

private void forwardSetModeRequestToLeader(
String subject, Mode mode,
Map<String, String> headerProperties)
Expand Down Expand Up @@ -1283,6 +1304,42 @@ public void updateConfigOrForward(String subject, CompatibilityLevel newCompatib
}
}

public void deleteSubjectCompatibilityConfig(String subject)
throws SchemaRegistryStoreException, OperationNotPermittedException {
if (isReadOnlyMode(subject)) {
throw new OperationNotPermittedException("Subject " + subject + " is in read-only mode");
}
try {
kafkaStore.waitUntilKafkaReaderReachesLastOffset(subject, kafkaStoreTimeoutMs);
deleteSubjectCompatibility(subject);
} catch (StoreException e) {
throw new SchemaRegistryStoreException("Failed to delete subject config value from store",
e);
}
}

public void deleteSubjectCompatibilityConfigOrForward(String subject,
Map<String, String> headerProperties)
throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException,
OperationNotPermittedException, UnknownLeaderException {
kafkaStore.lockFor(subject).lock();
try {
if (isLeader()) {
deleteSubjectCompatibilityConfig(subject);
} else {
// forward delete subject config request to the leader
if (leaderIdentity != null) {
forwardDeleteSubjectCompatibilityConfigToLeader(headerProperties, subject);
} else {
throw new UnknownLeaderException("Delete config request failed since leader is "
+ "unknown");
}
}
} finally {
kafkaStore.lockFor(subject).unlock();
}
}

private String kafkaClusterId(SchemaRegistryConfig config) throws SchemaRegistryException {
Properties adminClientProps = new Properties();
KafkaStore.addSchemaRegistryConfigsToClientProperties(config, adminClientProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,14 @@ public void testSubjectConfigChange() throws Exception {
FORWARD.name,
restApp.restClient.getConfig(subject).getCompatibilityLevel());

// delete subject compatibility
restApp.restClient.deleteSubjectConfig(subject);

assertEquals("Compatibility level for this subject should be reverted to none",
NONE.name,
restApp.restClient
.getConfig(RestService.DEFAULT_REQUEST_PROPERTIES, subject, true)
.getCompatibilityLevel());
}

@Test
Expand Down

0 comments on commit b2fecf6

Please sign in to comment.