Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert beat types changes 2x #3560

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -170,18 +170,6 @@ public void testBulkWithGlobalDefaults() throws Exception {
}
}

// Todo: This test is added to verify type support in bulk action. This should be removed once all clients
// avoid sending this param.
// https://github.com/opensearch-project/OpenSearch/issues/3484
public void testBulkWithTypes() throws Exception {
String bulkAction = copyToStringFromClasspath("/org/opensearch/action/bulk/bulk-with-deprecated-types.json");
{
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, XContentType.JSON);
assertThat(bulkRequest.numberOfActions(), equalTo(5));
}
}

private void createSamplePipeline(String pipelineId) throws IOException, ExecutionException, InterruptedException {
XContentBuilder pipeline = jsonBuilder().startObject()
.startArray("processors")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,7 @@ public BulkRequest add(
String routing = valueOrDefault(defaultRouting, globalRouting);
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
Boolean requireAlias = valueOrDefault(defaultRequireAlias, globalRequireAlias);
// https://github.com/opensearch-project/OpenSearch/issues/3484
// Undo error on types which breaks compatibility with some external clients
new BulkRequestParser(false).parse(
new BulkRequestParser().parse(
data,
defaultIndex,
routing,
Expand All @@ -298,7 +296,7 @@ public BulkRequest add(
requireAlias,
allowExplicitIndex,
xContentType,
(indexRequest, type) -> internalAdd(indexRequest),
this::internalAdd,
this::internalAdd,
this::add
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -67,7 +66,6 @@
public final class BulkRequestParser {

private static final ParseField INDEX = new ParseField("_index");
private static final ParseField TYPE = new ParseField("_type");
private static final ParseField ID = new ParseField("_id");
private static final ParseField ROUTING = new ParseField("routing");
private static final ParseField OP_TYPE = new ParseField("op_type");
Expand All @@ -80,17 +78,6 @@ public final class BulkRequestParser {
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS);

// TODO: Remove this parameter once the BulkMonitoring endpoint has been removed
private final boolean errorOnType;

/**
* Create a new parser.
* @param errorOnType whether to allow _type information in the index line; used by BulkMonitoring
*/
public BulkRequestParser(boolean errorOnType) {
this.errorOnType = errorOnType;
}

private static int findNextMarker(byte marker, int from, BytesReference data) {
final int res = data.indexOf(marker, from);
if (res != -1) {
Expand Down Expand Up @@ -136,7 +123,7 @@ public void parse(
@Nullable Boolean defaultRequireAlias,
boolean allowExplicitIndex,
XContentType xContentType,
BiConsumer<IndexRequest, String> indexRequestConsumer,
Consumer<IndexRequest> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer
) throws IOException {
Expand Down Expand Up @@ -192,7 +179,6 @@ public void parse(
String action = parser.currentName();

String index = defaultIndex;
String type = null;
String id = null;
String routing = defaultRouting;
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
Expand All @@ -205,7 +191,7 @@ public void parse(
String pipeline = defaultPipeline;
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;

// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// at this stage, next token can either be END_OBJECT (and use default index with auto generated id)
// or START_OBJECT which will have another set of parameters
token = parser.nextToken();

Expand All @@ -220,13 +206,6 @@ public void parse(
throw new IllegalArgumentException("explicit index in bulk is not allowed");
}
index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
if (errorOnType) {
throw new IllegalArgumentException(
"Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"
);
}
type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
id = parser.text();
} else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down Expand Up @@ -322,8 +301,7 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias),
type
.setRequireAlias(requireAlias)
);
} else {
indexRequestConsumer.accept(
Expand All @@ -336,8 +314,7 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias),
type
.setRequireAlias(requireAlias)
);
}
} else if ("create".equals(action)) {
Expand All @@ -351,8 +328,7 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias),
type
.setRequireAlias(requireAlias)
);
} else if ("update".equals(action)) {
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,7 @@ public RestBulkAction(Settings settings) {
@Override
public List<Route> routes() {
return unmodifiableList(
asList(
new Route(POST, "/_bulk"),
new Route(PUT, "/_bulk"),
new Route(POST, "/{index}/_bulk"),
new Route(PUT, "/{index}/_bulk"),
// Deprecated typed endpoints.
new Route(POST, "/{index}/{type}/_bulk"),
new Route(PUT, "/{index}/{type}/_bulk")
)
asList(new Route(POST, "/_bulk"), new Route(PUT, "/_bulk"), new Route(POST, "/{index}/_bulk"), new Route(PUT, "/{index}/_bulk"))
);
}

Expand All @@ -95,9 +87,6 @@ public String getName() {
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
BulkRequest bulkRequest = Requests.bulkRequest();
String defaultIndex = request.param("index");
if (request.hasParam("type")) {
request.param("type");
}
String defaultRouting = request.param("routing");
FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
String defaultPipeline = request.param("pipeline");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,7 @@ public class RestDeleteAction extends BaseRestHandler {

@Override
public List<Route> routes() {
return unmodifiableList(
asList(
new Route(DELETE, "/{index}/_doc/{id}"),
// Deprecated typed endpoint.
new Route(DELETE, "/{index}/{type}/{id}")
)
);
return unmodifiableList(asList(new Route(DELETE, "/{index}/_doc/{id}")));
}

@Override
Expand All @@ -73,9 +67,6 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
if (request.hasParam("type")) {
request.param("type");
}
DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("id"));
deleteRequest.routing(request.param("routing"));
deleteRequest.timeout(request.paramAsTime("timeout", DeleteRequest.DEFAULT_TIMEOUT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,7 @@ public class RestIndexAction extends BaseRestHandler {

@Override
public List<Route> routes() {
return unmodifiableList(
asList(
new Route(POST, "/{index}/_doc/{id}"),
new Route(PUT, "/{index}/_doc/{id}"),
new Route(POST, "/{index}/{type}/{id}"),
new Route(PUT, "/{index}/{type}/{id}")
)
);
return unmodifiableList(asList(new Route(POST, "/{index}/_doc/{id}"), new Route(PUT, "/{index}/_doc/{id}")));
}

@Override
Expand All @@ -92,14 +85,7 @@ public String getName() {

@Override
public List<Route> routes() {
return unmodifiableList(
asList(
new Route(POST, "/{index}/_create/{id}"),
new Route(PUT, "/{index}/_create/{id}"),
new Route(POST, "/{index}/{type}/{id}/_create"),
new Route(PUT, "/{index}/{type}/{id}/_create")
)
);
return unmodifiableList(asList(new Route(POST, "/{index}/_create/{id}"), new Route(PUT, "/{index}/_create/{id}")));
}

@Override
Expand Down Expand Up @@ -136,7 +122,7 @@ public String getName() {

@Override
public List<Route> routes() {
return unmodifiableList(asList(new Route(POST, "/{index}/_doc"), new Route(POST, "/{index}/{type}")));
return unmodifiableList(asList(new Route(POST, "/{index}/_doc")));
}

@Override
Expand All @@ -153,9 +139,6 @@ public RestChannelConsumer prepareRequest(RestRequest request, final NodeClient
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
IndexRequest indexRequest = new IndexRequest(request.param("index"));
if (request.hasParam("type")) {
request.param("type");
}
indexRequest.id(request.param("id"));
indexRequest.routing(request.param("routing"));
indexRequest.setPipeline(request.param("pipeline"));
Expand Down
Loading