Skip to content

Commit

Permalink
Add TableStaleSegmentResponse
Browse files Browse the repository at this point in the history
  • Loading branch information
vrajat committed Nov 28, 2024
1 parent 9a2ae1b commit ebb92fc
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.segment.local.data.manager.StaleSegmentsResponse;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
Expand Down Expand Up @@ -898,7 +897,7 @@ public String getTableReloadMetadata(
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Gets a list of segments that are stale from servers hosting the table",
notes = "Gets a list of segments that are stale from servers hosting the table")
public Map<String, List<StaleSegmentsResponse>> getStaleSegments(
public Map<String, TableStaleSegmentResponse> getStaleSegments(
@ApiParam(value = "Table name with type", required = true, example = "myTable_REALTIME")
@PathParam("tableNameWithType") String tableNameWithType, @Context HttpHeaders headers) {
tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.apache.pinot.controller.api.resources;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import org.apache.pinot.segment.local.data.manager.StaleSegment;


public class TableStaleSegmentResponse {
private final List<StaleSegment> _staleSegmentList;
private final boolean _isValidResponse;
private final String _errorMessage;

@JsonCreator
public TableStaleSegmentResponse(@JsonProperty("staleSegmentList") List<StaleSegment> staleSegmentList,
@JsonProperty("validResponse") boolean isValidResponse,
@JsonProperty("errorMessage") String errorMessage) {
_staleSegmentList = staleSegmentList;
_isValidResponse = isValidResponse;
_errorMessage = errorMessage;
}

public TableStaleSegmentResponse(List<StaleSegment> staleSegmentList) {
_staleSegmentList = staleSegmentList;
_isValidResponse = true;
_errorMessage = null;
}

public TableStaleSegmentResponse(String errorMessage) {
_staleSegmentList = null;
_isValidResponse = false;
_errorMessage = errorMessage;
}

@JsonProperty
public List<StaleSegment> getStaleSegmentList() {
return _staleSegmentList;
}

@JsonProperty
public boolean isValidResponse() {
return _isValidResponse;
}

@JsonProperty
public String getErrorMessage() {
return _errorMessage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.segment.local.data.manager.StaleSegmentsResponse;
import org.apache.pinot.controller.api.resources.TableStaleSegmentResponse;
import org.apache.pinot.segment.local.data.manager.StaleSegment;
import org.apache.pinot.spi.utils.JsonUtils;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
Expand Down Expand Up @@ -398,27 +399,27 @@ public ValidDocIdsBitmapResponse getValidDocIdsBitmapFromServer(String tableName
return response;
}

public Map<String, List<StaleSegmentsResponse>> getStaleSegmentsFromServer(
public Map<String, TableStaleSegmentResponse> getStaleSegmentsFromServer(
String tableNameWithType, Set<String> serverInstances, BiMap<String, String> endpoints, int timeoutMs) {
LOGGER.debug("Getting list of segments for refresh from servers for table {}.", tableNameWithType);
List<String> serverURLs = new ArrayList<>();
for (String serverInstance : serverInstances) {
serverURLs.add(generateNeedRefreshSegmentsServerURL(tableNameWithType, endpoints.get(serverInstance)));
serverURLs.add(generateStaleSegmentsServerURL(tableNameWithType, endpoints.get(serverInstance)));
}
BiMap<String, String> endpointsToServers = endpoints.inverse();
CompletionServiceHelper completionServiceHelper =
new CompletionServiceHelper(_executor, _connectionManager, endpointsToServers);
CompletionServiceHelper.CompletionServiceResponse serviceResponse =
completionServiceHelper.doMultiGetRequest(serverURLs, tableNameWithType, false, timeoutMs);
Map<String, List<StaleSegmentsResponse>> serverResponses = new HashMap<>();
Map<String, TableStaleSegmentResponse> serverResponses = new HashMap<>();

for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
try {
serverResponses.put(streamResponse.getKey(),
JsonUtils.stringToObject(streamResponse.getValue(),
new TypeReference<List<StaleSegmentsResponse>>() { }));
List<StaleSegment> staleSegments = JsonUtils.stringToObject(streamResponse.getValue(),
new TypeReference<List<StaleSegment>>() { });
serverResponses.put(streamResponse.getKey(), new TableStaleSegmentResponse(staleSegments));
} catch (Exception e) {
serverResponses.put(streamResponse.getKey(), null);
serverResponses.put(streamResponse.getKey(), new TableStaleSegmentResponse(e.getMessage()));
LOGGER.error("Unable to parse server {} response for needRefresh for table {} due to an error: ",
streamResponse.getKey(), tableNameWithType, e);
}
Expand Down Expand Up @@ -500,8 +501,8 @@ private String generateColumnsParam(List<String> columns) {
return paramsStr;
}

private String generateNeedRefreshSegmentsServerURL(String tableNameWithType, String endpoint) {
private String generateStaleSegmentsServerURL(String tableNameWithType, String endpoint) {
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
return String.format("%s/tables/%s/segments/needRefresh", endpoint, tableNameWithType);
return String.format("%s/tables/%s/segments/isStale", endpoint, tableNameWithType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.controller.api.resources.TableStaleSegmentResponse;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.segment.local.data.manager.StaleSegmentsResponse;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
Expand Down Expand Up @@ -201,7 +201,7 @@ public JsonNode getAggregateValidDocIdsMetadata(String tableNameWithType, List<S
return JsonUtils.objectToJsonNode(aggregateTableMetadataInfo);
}

public Map<String, List<StaleSegmentsResponse>> getStaleSegments(String tableNameWithType,
public Map<String, TableStaleSegmentResponse> getStaleSegments(String tableNameWithType,
int timeoutMs)
throws InvalidConfigException, IOException {
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
Expand Down
Loading

0 comments on commit ebb92fc

Please sign in to comment.