Skip to content

Commit

Permalink
add DELETE API for /mode/{subject}
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoyali97 committed Feb 23, 2021
1 parent 8a0cb6d commit dbb6d95
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public class RestService implements Configurable {
private static final TypeReference<? extends List<Integer>> DELETE_SUBJECT_RESPONSE_TYPE =
new TypeReference<List<Integer>>() {
};
private static final TypeReference<String> DELETE_MODE_RESPONSE_TYPE =
private static final TypeReference<String> DELETE_SUBJECT_MODE_RESPONSE_TYPE =
new TypeReference<String>() {
};
private static final TypeReference<Config> DELETE_SUBJECT_CONFIG_RESPONSE_TYPE =
Expand Down Expand Up @@ -692,6 +692,21 @@ public ModeGetResponse getMode(String subject, boolean defaultToGlobal)
return mode;
}

public String deleteSubjectMode(String subject)
throws IOException, RestClientException {
return deleteSubjectMode(DEFAULT_REQUEST_PROPERTIES, subject);
}

public String deleteSubjectMode(Map<String, String> requestProperties, String subject)
throws IOException, RestClientException {
UriBuilder builder = UriBuilder.fromPath("/mode/{subject}");
String path = builder.build(subject).toString();

String response = httpRequest(path, "DELETE", null, requestProperties,
DELETE_SUBJECT_MODE_RESPONSE_TYPE);
return response;
}

public List<Schema> getSchemas(
String subjectPrefix,
boolean lookupDeletedSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ public void deleteSubjectConfig(
} catch (OperationNotPermittedException e) {
throw Errors.operationNotPermittedException(e.getMessage());
} catch (SchemaRegistryStoreException e) {
throw Errors.storeException("Failed to update compatibility level", e);
throw Errors.storeException("Failed to delete compatibility level", e);
} catch (UnknownLeaderException e) {
throw Errors.unknownLeaderException("Failed to update compatibility level", e);
throw Errors.unknownLeaderException("Failed to delete compatibility level", e);
} catch (SchemaRegistryRequestForwardingException e) {
throw Errors.requestForwardingFailedException("Error while forwarding update config request"
throw Errors.requestForwardingFailedException("Error while forwarding delete config request"
+ " to the leader", e);
}
asyncResponse.resume(deletedConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@

import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import java.util.Locale;
Expand Down Expand Up @@ -151,4 +154,41 @@ public ModeUpdateRequest updateTopLevelMode(
public ModeGetResponse getTopLevelMode() {
return getMode(null, false);
}

@DELETE
@Path("/{subject}")
@ApiOperation(value = "Deletes the specified subject-level mode and revert to "
+ "the global default.", response = Mode.class)
@ApiResponses(value = {
@ApiResponse(code = 404, message = "Error code 40401 -- Subject not found"),
@ApiResponse(code = 500, message = "Error code 50001 -- Error in the backend datastore")
})
public void deleteSubjectMode(
final @Suspended AsyncResponse asyncResponse,
@Context HttpHeaders headers,
@ApiParam(value = "the name of the subject", required = true)
@PathParam("subject") String subject) {
log.info("Deleting mode for subject {}", subject);
Mode deletedMode;
try {
deletedMode = schemaRegistry.getMode(subject);
if (deletedMode == null) {
throw Errors.subjectNotFoundException(subject);
}

Map<String, String> headerProperties = requestHeaderBuilder.buildRequestHeaders(
headers, schemaRegistry.config().whitelistHeaders());
schemaRegistry.deleteSubjectModeOrForward(subject, headerProperties);
} catch (OperationNotPermittedException e) {
throw Errors.operationNotPermittedException(e.getMessage());
} catch (SchemaRegistryStoreException e) {
throw Errors.storeException("Failed to delete mode", e);
} catch (UnknownLeaderException e) {
throw Errors.unknownLeaderException("Failed to delete mode", e);
} catch (SchemaRegistryRequestForwardingException e) {
throw Errors.requestForwardingFailedException("Error while forwarding delete mode request"
+ " to the leader", e);
}
asyncResponse.resume(deletedMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.Config;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
import io.confluent.kafka.schemaregistry.client.rest.entities.SubjectVersion;
Expand Down Expand Up @@ -880,7 +879,7 @@ private List<Integer> forwardDeleteSubjectRequestToLeader(
}
}

private Config forwardDeleteSubjectCompatibilityConfigToLeader(
private void forwardDeleteSubjectCompatibilityConfigToLeader(
Map<String, String> requestProperties,
String subject
) throws SchemaRegistryRequestForwardingException {
Expand All @@ -889,7 +888,7 @@ private Config forwardDeleteSubjectCompatibilityConfigToLeader(
log.debug(String.format("Forwarding delete subject compatibility config request %s to %s",
subject, baseUrl));
try {
return leaderRestService.deleteSubjectConfig(requestProperties, subject);
leaderRestService.deleteSubjectConfig(requestProperties, subject);
} catch (IOException e) {
throw new SchemaRegistryRequestForwardingException(
String.format(
Expand Down Expand Up @@ -922,6 +921,26 @@ private void forwardSetModeRequestToLeader(
}
}

private void forwardDeleteSubjectModeRequestToLeader(
String subject,
Map<String, String> headerProperties)
throws SchemaRegistryRequestForwardingException {
UrlList baseUrl = leaderRestService.getBaseUrls();

log.debug(String.format("Forwarding delete subject mode request %s to %s",
subject, baseUrl));
try {
leaderRestService.deleteSubjectMode(headerProperties, subject);
} catch (IOException e) {
throw new SchemaRegistryRequestForwardingException(
String.format(
"Unexpected error while forwarding delete subject mode"
+ "request %s to %s", subject, baseUrl), e);
} catch (RestClientException e) {
throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), e);
}
}

private ParsedSchema canonicalizeSchema(Schema schema, boolean isNew)
throws InvalidSchemaException {
if (schema == null
Expand Down Expand Up @@ -1500,6 +1519,41 @@ public void setModeOrForward(String subject, Mode mode, Map<String, String> head
}
}

public void deleteSubjectMode(String subject)
throws SchemaRegistryStoreException, OperationNotPermittedException {
if (!allowModeChanges) {
throw new OperationNotPermittedException("Mode changes are not allowed");
}
try {
kafkaStore.waitUntilKafkaReaderReachesLastOffset(subject, kafkaStoreTimeoutMs);
deleteMode(subject);
} catch (StoreException e) {
throw new SchemaRegistryStoreException("Failed to delete subject config value from store",
e);
}
}

public void deleteSubjectModeOrForward(String subject, Map<String, String> headerProperties)
throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException,
OperationNotPermittedException, UnknownLeaderException {
kafkaStore.lockFor(subject).lock();
try {
if (isLeader()) {
deleteSubjectMode(subject);
} else {
// forward delete subject config request to the leader
if (leaderIdentity != null) {
forwardDeleteSubjectModeRequestToLeader(subject, headerProperties);
} else {
throw new UnknownLeaderException("Delete config request failed since leader is "
+ "unknown");
}
}
} finally {
kafkaStore.lockFor(subject).unlock();
}
}

KafkaStore<SchemaRegistryKey, SchemaRegistryValue> getKafkaStore() {
return this.kafkaStore;
}
Expand Down

0 comments on commit dbb6d95

Please sign in to comment.