diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index e01b780f0d5..807c5835d6f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -86,7 +86,6 @@ import org.apache.pinot.core.auth.Actions; import org.apache.pinot.core.auth.Authorize; import org.apache.pinot.core.auth.TargetType; -import org.apache.pinot.segment.local.data.manager.StaleSegmentsResponse; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.DateTimeFieldSpec; @@ -898,7 +897,7 @@ public String getTableReloadMetadata( @Produces(MediaType.APPLICATION_JSON) @ApiOperation(value = "Gets a list of segments that are stale from servers hosting the table", notes = "Gets a list of segments that are stale from servers hosting the table") - public Map> getStaleSegments( + public Map getStaleSegments( @ApiParam(value = "Table name with type", required = true, example = "myTable_REALTIME") @PathParam("tableNameWithType") String tableNameWithType, @Context HttpHeaders headers) { tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableStaleSegmentResponse.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableStaleSegmentResponse.java new file mode 100644 index 00000000000..3fb9d9bc1a5 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableStaleSegmentResponse.java @@ -0,0 +1,49 @@ +package org.apache.pinot.controller.api.resources; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import org.apache.pinot.segment.local.data.manager.StaleSegment; + + +public class TableStaleSegmentResponse { + private final List _staleSegmentList; + private final boolean _isValidResponse; + private final String _errorMessage; + + @JsonCreator + public TableStaleSegmentResponse(@JsonProperty("staleSegmentList") List staleSegmentList, + @JsonProperty("validResponse") boolean isValidResponse, + @JsonProperty("errorMessage") String errorMessage) { + _staleSegmentList = staleSegmentList; + _isValidResponse = isValidResponse; + _errorMessage = errorMessage; + } + + public TableStaleSegmentResponse(List staleSegmentList) { + _staleSegmentList = staleSegmentList; + _isValidResponse = true; + _errorMessage = null; + } + + public TableStaleSegmentResponse(String errorMessage) { + _staleSegmentList = null; + _isValidResponse = false; + _errorMessage = errorMessage; + } + + @JsonProperty + public List getStaleSegmentList() { + return _staleSegmentList; + } + + @JsonProperty + public boolean isValidResponse() { + return _isValidResponse; + } + + @JsonProperty + public String getErrorMessage() { + return _errorMessage; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java index 050f1d80b9d..8dde7f08fee 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java @@ -47,7 +47,8 @@ import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse; import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo; import org.apache.pinot.common.utils.RoaringBitmapUtils; -import org.apache.pinot.segment.local.data.manager.StaleSegmentsResponse; +import org.apache.pinot.controller.api.resources.TableStaleSegmentResponse; +import org.apache.pinot.segment.local.data.manager.StaleSegment; import org.apache.pinot.spi.utils.JsonUtils; import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.client.ClientProperties; @@ -398,27 +399,27 @@ public ValidDocIdsBitmapResponse getValidDocIdsBitmapFromServer(String tableName return response; } - public Map> getStaleSegmentsFromServer( + public Map getStaleSegmentsFromServer( String tableNameWithType, Set serverInstances, BiMap endpoints, int timeoutMs) { LOGGER.debug("Getting list of segments for refresh from servers for table {}.", tableNameWithType); List serverURLs = new ArrayList<>(); for (String serverInstance : serverInstances) { - serverURLs.add(generateNeedRefreshSegmentsServerURL(tableNameWithType, endpoints.get(serverInstance))); + serverURLs.add(generateStaleSegmentsServerURL(tableNameWithType, endpoints.get(serverInstance))); } BiMap endpointsToServers = endpoints.inverse(); CompletionServiceHelper completionServiceHelper = new CompletionServiceHelper(_executor, _connectionManager, endpointsToServers); CompletionServiceHelper.CompletionServiceResponse serviceResponse = completionServiceHelper.doMultiGetRequest(serverURLs, tableNameWithType, false, timeoutMs); - Map> serverResponses = new HashMap<>(); + Map serverResponses = new HashMap<>(); for (Map.Entry streamResponse : serviceResponse._httpResponses.entrySet()) { try { - serverResponses.put(streamResponse.getKey(), - JsonUtils.stringToObject(streamResponse.getValue(), - new TypeReference>() { })); + List staleSegments = JsonUtils.stringToObject(streamResponse.getValue(), + new TypeReference>() { }); + serverResponses.put(streamResponse.getKey(), new TableStaleSegmentResponse(staleSegments)); } catch (Exception e) { - serverResponses.put(streamResponse.getKey(), null); + serverResponses.put(streamResponse.getKey(), new TableStaleSegmentResponse(e.getMessage())); LOGGER.error("Unable to parse server {} response for needRefresh for table {} due to an error: ", streamResponse.getKey(), tableNameWithType, e); } @@ -500,8 +501,8 @@ private String generateColumnsParam(List columns) { return paramsStr; } - private String generateNeedRefreshSegmentsServerURL(String tableNameWithType, String endpoint) { + private String generateStaleSegmentsServerURL(String tableNameWithType, String endpoint) { tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8); - return String.format("%s/tables/%s/segments/needRefresh", endpoint, tableNameWithType); + return String.format("%s/tables/%s/segments/isStale", endpoint, tableNameWithType); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java index 178aca1b825..48f53577a84 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java @@ -33,8 +33,8 @@ import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.restlet.resources.TableMetadataInfo; import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo; +import org.apache.pinot.controller.api.resources.TableStaleSegmentResponse; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; -import org.apache.pinot.segment.local.data.manager.StaleSegmentsResponse; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -201,7 +201,7 @@ public JsonNode getAggregateValidDocIdsMetadata(String tableNameWithType, List> getStaleSegments(String tableNameWithType, + public Map getStaleSegments(String tableNameWithType, int timeoutMs) throws InvalidConfigException, IOException { TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 11798d7034d..664f3e676fc 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -61,7 +61,7 @@ import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; import org.apache.pinot.core.util.PeerServerSegmentFinder; -import org.apache.pinot.segment.local.data.manager.StaleSegmentsResponse; +import org.apache.pinot.segment.local.data.manager.StaleSegment; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; @@ -1065,12 +1065,12 @@ public boolean needReloadSegments() } @Override - public List getStaleSegments(TableConfig tableConfig, Schema schema) { - List staleSegments = new ArrayList<>(); + public List getStaleSegments(TableConfig tableConfig, Schema schema) { + List staleSegments = new ArrayList<>(); List segmentDataManagers = acquireAllSegments(); try { for (SegmentDataManager segmentDataManager : segmentDataManagers) { - StaleSegmentsResponse response = isSegmentStale(tableConfig, schema, segmentDataManager); + StaleSegment response = isSegmentStale(tableConfig, schema, segmentDataManager); if (response.isStale()) { staleSegments.add(response); } @@ -1084,7 +1084,7 @@ public List getStaleSegments(TableConfig tableConfig, Sch return staleSegments; } - protected StaleSegmentsResponse isSegmentStale(TableConfig tableConfig, Schema schema, + protected StaleSegment isSegmentStale(TableConfig tableConfig, Schema schema, SegmentDataManager segmentDataManager) { String tableNameWithType = tableConfig.getTableName(); Map indexConfigsMap = @@ -1100,7 +1100,7 @@ protected StaleSegmentsResponse isSegmentStale(TableConfig tableConfig, Schema s if (timeColumn != null) { if (segmentMetadata.getTimeColumn() == null || !segmentMetadata.getTimeColumn().equals(timeColumn)) { LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: time column", tableNameWithType, segmentName); - return new StaleSegmentsResponse(segmentName, true, "time column"); + return new StaleSegment(segmentName, true, "time column"); } } @@ -1123,7 +1123,7 @@ protected StaleSegmentsResponse isSegmentStale(TableConfig tableConfig, Schema s // Column is added if (!columnsInSegment.containsAll(schema.getPhysicalColumnNames())) { LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: column added", tableNameWithType, segmentName); - return new StaleSegmentsResponse(segmentName, true, "column added"); + return new StaleSegment(segmentName, true, "column added"); } // Get Index configuration for the Table Config @@ -1165,7 +1165,7 @@ protected StaleSegmentsResponse isSegmentStale(TableConfig tableConfig, Schema s // metadata. if (!StarTreeBuilderUtils.areStarTreeBuilderConfigListsEqual(builderConfigFromTableConfigs, builderConfigFromSegmentMetadata)) { - return new StaleSegmentsResponse(segmentName, true, "startree index"); + return new StaleSegment(segmentName, true, "startree index"); } for (String columnName : segmentPhysicalColumns) { @@ -1179,28 +1179,28 @@ protected StaleSegmentsResponse isSegmentStale(TableConfig tableConfig, Schema s if (fieldSpecInSchema == null) { LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: column deleted", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "column deleted: " + columnName); + return new StaleSegment(segmentName, true, "column deleted: " + columnName); } // Field type changed if (columnMetadata.getFieldType().compareTo(fieldSpecInSchema.getFieldType()) != 0) { LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: field type", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "field type changed: " + columnName); + return new StaleSegment(segmentName, true, "field type changed: " + columnName); } // Data type changed if (!columnMetadata.getDataType().equals(fieldSpecInSchema.getDataType())) { LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: data type", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "data type changed: " + columnName); + return new StaleSegment(segmentName, true, "data type changed: " + columnName); } // SV/MV changed if (columnMetadata.isSingleValue() != fieldSpecInSchema.isSingleValueField()) { LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: single / multi value", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "single / multi value changed: " + columnName); + return new StaleSegment(segmentName, true, "single / multi value changed: " + columnName); } // TODO: detect if an index changes from Dictionary to Variable Length Dictionary or vice versa. @@ -1222,7 +1222,7 @@ protected StaleSegmentsResponse isSegmentStale(TableConfig tableConfig, Schema s if (incompatible) { LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: dictionary encoding,", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "dictionary encoding changed: " + columnName); + return new StaleSegment(segmentName, true, "dictionary encoding changed: " + columnName); } else { LOGGER.debug("tableNameWithType: {}, segmentName: {}, no change as dictionary overrides applied to col: {}", tableNameWithType, segmentName, columnName); @@ -1233,37 +1233,37 @@ protected StaleSegmentsResponse isSegmentStale(TableConfig tableConfig, Schema s if (columnName.equals(sortedColumn) && !columnMetadata.isSorted()) { LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: sort column", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "sort column changed: " + columnName); + return new StaleSegment(segmentName, true, "sort column changed: " + columnName); } if (Objects.isNull(source.getBloomFilter()) == bloomFilters.contains(columnName)) { LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: bloom filter changed", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "bloom filter changed: " + columnName); + return new StaleSegment(segmentName, true, "bloom filter changed: " + columnName); } if (Objects.isNull(source.getJsonIndex()) == jsonIndex.contains(columnName)) { LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: json index changed", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "json index changed: " + columnName); + return new StaleSegment(segmentName, true, "json index changed: " + columnName); } if (Objects.isNull(source.getTextIndex()) == textIndexes.contains(columnName)) { LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: text index changed", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "text index changed: " + columnName); + return new StaleSegment(segmentName, true, "text index changed: " + columnName); } if (Objects.isNull(source.getFSTIndex()) == fstIndexes.contains(columnName)) { LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: fst index changed", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "fst index changed: " + columnName); + return new StaleSegment(segmentName, true, "fst index changed: " + columnName); } if (Objects.isNull(source.getH3Index()) == h3Indexes.contains(columnName)) { LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: h3 index changed", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "hst index changed: " + columnName); + return new StaleSegment(segmentName, true, "hst index changed: " + columnName); } // If a segment is sorted then it will automatically be given an inverted index and that overrides the @@ -1275,7 +1275,7 @@ protected StaleSegmentsResponse isSegmentStale(TableConfig tableConfig, Schema s LOGGER.debug( "tableNameWithType: {}, columnName: {}, segmentName: {}, change: inverted index added to sorted column", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "invert index added to sort column: " + columnName); + return new StaleSegment(segmentName, true, "invert index added to sort column: " + columnName); } } else { if ((Objects.isNull(source.getInvertedIndex())) == invertedIndex.contains(columnName)) { @@ -1283,7 +1283,7 @@ protected StaleSegmentsResponse isSegmentStale(TableConfig tableConfig, Schema s "tableNameWithType: {}, columnName: {}, segmentName: {}, change: inverted index changed on unsorted " + "column", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "inverted index changed on unsorted column: " + columnName); + return new StaleSegment(segmentName, true, "inverted index changed on unsorted column: " + columnName); } } @@ -1295,13 +1295,13 @@ protected StaleSegmentsResponse isSegmentStale(TableConfig tableConfig, Schema s LOGGER.debug( "tableNameWithType: {}, columnName: {}, segmentName: {}, change: null value vector index removed from " + "column and cannot be added back to this segment.", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "null value vector index removed from column: " + columnName); + return new StaleSegment(segmentName, true, "null value vector index removed from column: " + columnName); } if (Objects.isNull(source.getRangeIndex()) == rangeIndex.contains(columnName)) { LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: range index changed", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "range index changed: " + columnName); + return new StaleSegment(segmentName, true, "range index changed: " + columnName); } // Partition changed or segment not properly partitioned @@ -1310,28 +1310,28 @@ protected StaleSegmentsResponse isSegmentStale(TableConfig tableConfig, Schema s if (partitionFunction == null) { LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: partition function", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "partition function added: " + columnName); + return new StaleSegment(segmentName, true, "partition function added: " + columnName); } if (!partitionFunction.getName().equalsIgnoreCase(partitionConfig.getFunctionName())) { LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: partition function name", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "partition function name changed: " + columnName); + return new StaleSegment(segmentName, true, "partition function name changed: " + columnName); } if (partitionFunction.getNumPartitions() != partitionConfig.getNumPartitions()) { LOGGER.debug("tableNameWithType: {}, columnName: {},, segmentName: {}, change: num partitions", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "num partitions changed: " + columnName); + return new StaleSegment(segmentName, true, "num partitions changed: " + columnName); } Set partitions = columnMetadata.getPartitions(); if (partitions == null || partitions.size() != 1) { LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, change: partitions", tableNameWithType, columnName, segmentName); - return new StaleSegmentsResponse(segmentName, true, "partitions changed: " + columnName); + return new StaleSegment(segmentName, true, "partitions changed: " + columnName); } } } - return new StaleSegmentsResponse(segmentName, false, null); + return new StaleSegment(segmentName, false, null); } private SegmentDirectory initSegmentDirectory(String segmentName, String segmentCrc, diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java index e81deb17b0b..9161f688451 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java @@ -25,7 +25,7 @@ import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; -import org.apache.pinot.segment.local.data.manager.StaleSegmentsResponse; +import org.apache.pinot.segment.local.data.manager.StaleSegment; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; @@ -189,7 +189,7 @@ void testAddTimeColumn() createImmutableSegmentDataManager(tableConfig, schema, "noChanges", List.of(row)); BaseTableDataManager tableDataManager = BaseTableDataManagerTest.createTableManager(); - StaleSegmentsResponse response = + StaleSegment response = tableDataManager.isSegmentStale(tableConfig, schema, segmentDataManager); assertFalse(response.isStale()); @@ -201,7 +201,7 @@ void testAddTimeColumn() @Test void testChangeTimeColumn() { - StaleSegmentsResponse response = BASE_TABLE_DATA_MANAGER.isSegmentStale( + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale( getTableConfigBuilder().setTimeColumnName(MS_SINCE_EPOCH_COLUMN_NAME).build(), SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); @@ -213,7 +213,7 @@ void testRemoveColumn() throws Exception { Schema schema = getSchema(); schema.removeField(TEXT_INDEX_COLUMN); - StaleSegmentsResponse response = + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "column deleted: textColumn"); @@ -226,7 +226,7 @@ void testFieldType() schema.removeField(TEXT_INDEX_COLUMN); schema.addField(new MetricFieldSpec(TEXT_INDEX_COLUMN, FieldSpec.DataType.STRING, true)); - StaleSegmentsResponse response = + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "field type changed: textColumn"); @@ -239,7 +239,7 @@ void testChangeDataType() schema.removeField(TEXT_INDEX_COLUMN); schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, FieldSpec.DataType.INT, true)); - StaleSegmentsResponse response = + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "data type changed: textColumn"); @@ -252,7 +252,7 @@ void testChangeToMV() schema.removeField(TEXT_INDEX_COLUMN); schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, FieldSpec.DataType.STRING, false)); - StaleSegmentsResponse response = + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "single / multi value changed: textColumn"); @@ -265,7 +265,7 @@ void testChangeToSV() schema.removeField(TEXT_INDEX_COLUMN_MV); schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN_MV, FieldSpec.DataType.STRING, true)); - StaleSegmentsResponse response = + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "single / multi value changed: textColumnMV"); @@ -274,7 +274,7 @@ void testChangeToSV() @Test void testSortColumnMismatch() { // Check with a column that is not sorted - StaleSegmentsResponse response = + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(getTableConfigBuilder().setSortedColumn(MS_SINCE_EPOCH_COLUMN_NAME).build(), SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); @@ -317,7 +317,7 @@ void testFilter(String segmentName, TableConfig tableConfigWithFilter, String ex createImmutableSegmentDataManager(tableConfigWithFilter, SCHEMA, segmentName, generateRows()); // When TableConfig has a filter but segment does not have, needRefresh is true. - StaleSegmentsResponse response = + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfigWithFilter, SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), expectedReason); @@ -340,7 +340,7 @@ void testPartition() createImmutableSegmentDataManager(partitionedTableConfig, SCHEMA, "partitionWithModulo", generateRows()); // when segment has no partition AND tableConfig has partitions then needRefresh = true - StaleSegmentsResponse response = + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfig, SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "partition function added: partitionedColumn"); @@ -374,7 +374,7 @@ void testNullValueVector() createImmutableSegmentDataManager(withoutNullHandling, SCHEMA, "withoutNullHandling", generateRows()); // If null handling is removed from table config AND segment has NVV, then NVV can be removed. needRefresh = true - StaleSegmentsResponse response = + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(withoutNullHandling, SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "null value vector index removed from column: NullValueColumn"); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java index a5b7c6cb035..c2fbd83cb94 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java @@ -27,9 +27,9 @@ import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; +import org.apache.pinot.controller.api.resources.TableStaleSegmentResponse; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; -import org.apache.pinot.segment.local.data.manager.StaleSegmentsResponse; import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.TableConfig; @@ -148,9 +148,9 @@ public void testAddRemoveSortedIndex() indexingConfig.setSortedColumn(Collections.singletonList("Carrier")); updateTableConfig(_tableConfig); - Map> needRefreshResponses = getStaleSegmentsResponse(); + Map needRefreshResponses = getStaleSegmentsResponse(); assertEquals(needRefreshResponses.size(), 1); - assertEquals(needRefreshResponses.values().iterator().next().size(), 12); + assertEquals(needRefreshResponses.values().iterator().next().getStaleSegmentList().size(), 12); } @Test(dependsOnMethods = "testAddRemoveSortedIndex") @@ -161,9 +161,9 @@ public void testAddRemoveRawIndex() indexingConfig.setNoDictionaryColumns(Collections.singletonList("ActualElapsedTime")); updateTableConfig(_tableConfig); - Map> needRefreshResponses = getStaleSegmentsResponse(); + Map needRefreshResponses = getStaleSegmentsResponse(); assertEquals(needRefreshResponses.size(), 1); - assertEquals(needRefreshResponses.values().iterator().next().size(), 12); + assertEquals(needRefreshResponses.values().iterator().next().getStaleSegmentList().size(), 12); } @Test(dependsOnMethods = "testAddRemoveSortedIndex") @@ -173,17 +173,17 @@ public void testH3IndexChange() _tableConfig.setFieldConfigList(Collections.singletonList(getH3FieldConfig())); updateTableConfig(_tableConfig); - Map> needRefreshResponses = getStaleSegmentsResponse(); + Map needRefreshResponses = getStaleSegmentsResponse(); assertEquals(needRefreshResponses.size(), 1); - assertEquals(needRefreshResponses.values().iterator().next().size(), 12); + assertEquals(needRefreshResponses.values().iterator().next().getStaleSegmentList().size(), 12); } - private Map> getStaleSegmentsResponse() + private Map getStaleSegmentsResponse() throws IOException { return JsonUtils.stringToObject(sendGetRequest( _controllerRequestURLBuilder.forStaleSegments( TableNameBuilder.OFFLINE.tableNameWithType(getTableName()))), - new TypeReference>>() { }); + new TypeReference>() { }); } @AfterClass diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/StaleSegmentsResponse.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/StaleSegment.java similarity index 79% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/StaleSegmentsResponse.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/StaleSegment.java index c60bf43d919..3e67093f1d7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/StaleSegmentsResponse.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/StaleSegment.java @@ -24,22 +24,21 @@ /** - * Encapsulates the response to get list of segments that need to be refreshed. The response also contains the reason - * why a segment has to be refreshed. + * Encapsulates information for a stale segment. It captures segment name, staleness and reason if it is stale. */ -public class StaleSegmentsResponse { +public class StaleSegment { private final String _segmentName; private final boolean _isStale; private final String _reason; @JsonCreator - public StaleSegmentsResponse(@JsonProperty("segmentName") String segmentName, @JsonProperty("reason") String reason) { + public StaleSegment(@JsonProperty("segmentName") String segmentName, @JsonProperty("reason") String reason) { _segmentName = segmentName; _isStale = true; _reason = reason; } - public StaleSegmentsResponse(String segmentName, boolean isStale, String reason) { + public StaleSegment(String segmentName, boolean isStale, String reason) { _segmentName = segmentName; _isStale = isStale; _reason = reason; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index 759a8e71d5b..cf7e6232697 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -328,7 +328,7 @@ default void onConsumingToOnline(String segmentNameStr) { * Return list of segment names that are stale along with reason. * @param tableConfig Table Config of the table * @param schema Schema of the table - * @return List of {@link StaleSegmentsResponse} with segment names and reason why it is stale + * @return List of {@link StaleSegment} with segment names and reason why it is stale */ - List getStaleSegments(TableConfig tableConfig, Schema schema); + List getStaleSegments(TableConfig tableConfig, Schema schema); } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index b299cb1d116..9b19f1635b0 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -82,7 +82,7 @@ import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; import org.apache.pinot.core.data.manager.realtime.SegmentUploader; -import org.apache.pinot.segment.local.data.manager.StaleSegmentsResponse; +import org.apache.pinot.segment.local.data.manager.StaleSegment; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; @@ -992,7 +992,7 @@ public String checkSegmentsReload( @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal Server error", response = ErrorInfo.class) }) - public List getStaleSegments( + public List getStaleSegments( @ApiParam(value = "Table Name with type", required = true) @PathParam("tableName") String tableName, @Context HttpHeaders headers) { tableName = DatabaseUtils.translateTableName(tableName, headers);