Skip to content

Commit

Permalink
Addressed comments
Browse files Browse the repository at this point in the history
Signed-off-by: Manasvini B S <manasvis@amazon.com>
  • Loading branch information
manasvinibs committed Aug 20, 2024
1 parent 167dce0 commit 013b9cd
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,16 @@
import com.google.common.base.Strings;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.Setter;
import lombok.SneakyThrows;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -115,7 +111,8 @@ public class DefaultCursor implements Cursor {
*/
private static final NamedXContentRegistry xContentRegistry =
new NamedXContentRegistry(
new SearchModule(Settings.builder().build(), new ArrayList<>()).getNamedXContents());
new SearchModule(Settings.builder().build(), Collections.emptyList())
.getNamedXContents());

@Override
public CursorType getType() {
Expand All @@ -124,11 +121,7 @@ public CursorType getType() {

@Override
public String generateCursorId() {
boolean isCursorValid =
LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)
? Strings.isNullOrEmpty(pitId)
: Strings.isNullOrEmpty(scrollId);
if (rowsLeft <= 0 || isCursorValid) {
if (rowsLeft <= 0 || isCursorValid()) {
return null;
}
JSONObject json = new JSONObject();
Expand Down Expand Up @@ -156,14 +149,18 @@ public String generateCursorId() {
return String.format("%s:%s", type.getId(), encodeCursor(json, searchSourceBuilder));
}

@SneakyThrows
private boolean isCursorValid() {
return LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)
? Strings.isNullOrEmpty(pitId)
: Strings.isNullOrEmpty(scrollId);
}

public static DefaultCursor from(String cursorId) {
/**
* It is assumed that cursorId here is the second part of the original cursor passed by the
* client after removing first part which identifies cursor type
*/
String[] parts = cursorId.split(":::");
JSONObject json = decodeCursor(parts[0]);
JSONObject json = decodeCursor(cursorId);
DefaultCursor cursor = new DefaultCursor();
cursor.setFetchSize(json.getInt(FETCH_SIZE));
cursor.setRowsLeft(json.getLong(ROWS_LEFT));
Expand All @@ -183,14 +180,20 @@ public static DefaultCursor from(String cursorId) {
});
cursor.setSortFields(sortFieldValue);

byte[] bytes = Base64.getDecoder().decode(parts[1]);
// Retrieve the SearchSourceBuilder from the JSON field
String searchSourceBuilderBase64 = json.getString("searchSourceBuilder");
byte[] bytes = Base64.getDecoder().decode(searchSourceBuilderBase64);
ByteArrayInputStream streamInput = new ByteArrayInputStream(bytes);
XContentParser parser =
XContentType.JSON
.xContent()
.createParser(xContentRegistry, IGNORE_DEPRECATIONS, streamInput);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.fromXContent(parser);
cursor.searchSourceBuilder = sourceBuilder;
try {
XContentParser parser =
XContentType.JSON
.xContent()
.createParser(xContentRegistry, IGNORE_DEPRECATIONS, streamInput);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.fromXContent(parser);
cursor.searchSourceBuilder = sourceBuilder;
} catch (IOException ex) {
throw new RuntimeException("Failed to get searchSourceBuilder from cursor Id", ex);
}
} else {
cursor.setScrollId(json.getString(SCROLL_ID));
}
Expand Down Expand Up @@ -220,18 +223,20 @@ private JSONObject schemaEntry(String name, String alias, String type) {
return entry;
}

@SneakyThrows
private static String encodeCursor(JSONObject cursorJson, SearchSourceBuilder sourceBuilder) {
String jsonBase64 = Base64.getEncoder().encodeToString(cursorJson.toString().getBytes());

ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
XContentBuilder builder = XContentFactory.jsonBuilder(outputStream);
sourceBuilder.toXContent(builder, null);
builder.close();

String searchRequestBase64 = Base64.getEncoder().encodeToString(outputStream.toByteArray());

return jsonBase64 + ":::" + searchRequestBase64;
try {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
XContentBuilder builder = XContentFactory.jsonBuilder(outputStream);
sourceBuilder.toXContent(builder, null);
builder.close();

String searchRequestBase64 = Base64.getEncoder().encodeToString(outputStream.toByteArray());
cursorJson.put("searchSourceBuilder", searchRequestBase64);

return Base64.getEncoder().encodeToString(cursorJson.toString().getBytes());
} catch (IOException ex) {
throw new RuntimeException("Failed to encode cursor", ex);
}
}

private static JSONObject decodeCursor(String cursorId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,6 @@ public void onFailure(Exception e) {
LOG.error("Error occurred while creating PIT", e);
}
});
while (createStatus == null) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.error("Error occurred while creating PIT", e);
}
}
return createStatus;
}

Expand Down Expand Up @@ -113,14 +106,6 @@ public void onFailure(Exception e) {
LOG.error("Error occurred while deleting PIT", e);
}
});

while (deleteStatus == null) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.error("Error occurred while deleting PIT", e);
}
}
return deleteStatus;
}
}

0 comments on commit 013b9cd

Please sign in to comment.