Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache segment metadata on the Overlord to speed up segment allocation and other task actions #17653

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentsMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentsMetadataCache;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
Expand Down Expand Up @@ -105,6 +108,13 @@ public void setUp()
derbyConnector
);
indexerMetadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentsMetadataTransactionFactory(
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
),
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentsMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentsMetadataCache;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerTestHelper;
Expand All @@ -56,6 +58,7 @@
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -112,6 +115,13 @@ public void setUp() throws Exception
);

metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentsMetadataTransactionFactory(
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
),
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.segment.cache.SegmentsMetadataCache;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;

Expand Down Expand Up @@ -88,6 +89,7 @@ public DruidOverlord(
final OverlordDutyExecutor overlordDutyExecutor,
@IndexingService final DruidLeaderSelector overlordLeaderSelector,
final SegmentAllocationQueue segmentAllocationQueue,
final SegmentsMetadataCache segmentsMetadataCache,
final CompactionScheduler compactionScheduler,
final ObjectMapper mapper,
final TaskContextEnricher taskContextEnricher
Expand Down Expand Up @@ -132,6 +134,7 @@ public void becomeLeader()

// First add "half leader" services: everything required for APIs except the supervisor manager.
// Then, become "half leader" so those APIs light up and supervisor initialization can proceed.
leaderLifecycle.addManagedInstance(segmentsMetadataCache);
leaderLifecycle.addManagedInstance(taskRunner);
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1170,7 +1170,7 @@ public void testSegmentIdMustNotBeReused()
// Allocate another id and ensure that it doesn't exist in the druid_segments table
final SegmentIdWithShardSpec theId =
allocate(task1, DateTimes.nowUtc(), Granularities.NONE, Granularities.ALL, "seq", "3");
Assert.assertNull(coordinator.retrieveSegmentForId(theId.asSegmentId().toString(), true));
Assert.assertNull(coordinator.retrieveSegmentForId(theId.getDataSource(), theId.asSegmentId().toString()));

lockbox.unlock(task1, Intervals.ETERNITY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentsMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentsMetadataCache;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaCache;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.joda.time.Period;
Expand Down Expand Up @@ -95,25 +98,33 @@ public void before()
);
final ObjectMapper objectMapper = new TestUtils().getTestObjectMapper();
segmentSchemaManager = new SegmentSchemaManager(metadataStorageTablesConfig, objectMapper, testDerbyConnector);
metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
final SqlSegmentsMetadataTransactionFactory transactionFactory = new SqlSegmentsMetadataTransactionFactory(
objectMapper,
metadataStorageTablesConfig,
testDerbyConnector,
segmentSchemaManager,
CentralizedDatasourceSchemaConfig.create()
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
)
{
@Override
public int getSqlMetadataMaxRetry()
public int getMaxRetries()
{
return 2;
}
};
metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
transactionFactory,
objectMapper,
metadataStorageTablesConfig,
testDerbyConnector,
segmentSchemaManager,
CentralizedDatasourceSchemaConfig.create()
);
taskLockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator);
segmentSchemaCache = new SegmentSchemaCache(NoopServiceEmitter.instance());
segmentsMetadataManager = new SqlSegmentsMetadataManager(
objectMapper,
Suppliers.ofInstance(new SegmentsMetadataManagerConfig()),
Suppliers.ofInstance(new SegmentsMetadataManagerConfig(null, null)),
Suppliers.ofInstance(metadataStorageTablesConfig),
testDerbyConnector,
segmentSchemaCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentsMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentsMetadataCache;
import org.apache.druid.segment.DataSegmentsWithSchemas;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9Factory;
Expand All @@ -85,6 +87,7 @@
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.testing.InitializedNullHandlingTest;
Expand Down Expand Up @@ -124,8 +127,6 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
private SegmentsMetadataManager segmentsMetadataManager;
private TaskLockbox lockbox;
private File baseDir;
private SegmentSchemaManager segmentSchemaManager;
private SegmentSchemaCache segmentSchemaCache;
private SupervisorManager supervisorManager;
private TestDataSegmentKiller dataSegmentKiller;
protected File reportsFile;
Expand All @@ -142,23 +143,30 @@ public void setUpIngestionTestBase() throws IOException
connector.createSegmentSchemasTable();
connector.createSegmentTable();
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
segmentSchemaManager = new SegmentSchemaManager(
SegmentSchemaManager segmentSchemaManager = new SegmentSchemaManager(
derbyConnectorRule.metadataTablesConfigSupplier().get(),
objectMapper,
derbyConnectorRule.getConnector()
);

storageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentsMetadataTransactionFactory(
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector(),
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
),
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector(),
segmentSchemaManager,
CentralizedDatasourceSchemaConfig.create()
);
segmentSchemaCache = new SegmentSchemaCache(NoopServiceEmitter.instance());
SegmentSchemaCache segmentSchemaCache = new SegmentSchemaCache(NoopServiceEmitter.instance());
segmentsMetadataManager = new SqlSegmentsMetadataManager(
objectMapper,
SegmentsMetadataManagerConfig::new,
() -> new SegmentsMetadataManagerConfig(null, null),
derbyConnectorRule.metadataTablesConfigSupplier(),
derbyConnectorRule.getConnector(),
segmentSchemaCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentsMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentsMetadataCache;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -80,6 +83,13 @@ public void setup()
lockbox = new TaskLockbox(
taskStorage,
new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentsMetadataTransactionFactory(
objectMapper,
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
),
objectMapper,
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentsMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentsMetadataCache;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.NumberedOverwritePartialShardSpec;
Expand Down Expand Up @@ -90,7 +93,6 @@ public class TaskLockboxTest
@Rule
public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule();

private ObjectMapper objectMapper;
private TaskStorage taskStorage;
private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
private TaskLockbox lockbox;
Expand All @@ -104,7 +106,7 @@ public class TaskLockboxTest
@Before
public void setup()
{
objectMapper = TestHelper.makeJsonMapper();
final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
objectMapper.registerSubtypes(NumberedShardSpec.class, HashBasedNumberedShardSpec.class);
final TestDerbyConnector derbyConnector = derby.getConnector();
derbyConnector.createTaskTables();
Expand All @@ -129,6 +131,13 @@ public void setup()
EasyMock.replay(emitter);

metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentsMetadataTransactionFactory(
objectMapper,
tablesConfig,
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
),
objectMapper,
tablesConfig,
derbyConnector,
Expand Down Expand Up @@ -463,6 +472,13 @@ public void testSyncWithUnknownTaskTypesFromModuleNotLoaded()
);

IndexerMetadataStorageCoordinator loadedMetadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentsMetadataTransactionFactory(
loadedMapper,
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
),
loadedMapper,
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentsMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentsMetadataCache;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.joda.time.Duration;
import org.joda.time.Period;
Expand Down Expand Up @@ -103,6 +106,13 @@ public void setUp()
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
segmentSchemaManager = new SegmentSchemaManager(derbyConnectorRule.metadataTablesConfigSupplier().get(), jsonMapper, derbyConnectorRule.getConnector());
final IndexerSQLMetadataStorageCoordinator storageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentsMetadataTransactionFactory(
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector(),
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
),
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.segment.cache.SegmentsMetadataCache;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
Expand Down Expand Up @@ -258,6 +259,7 @@ public MockTaskRunner get()
EasyMock.createNiceMock(OverlordDutyExecutor.class),
new TestDruidLeaderSelector(),
EasyMock.createNiceMock(SegmentAllocationQueue.class),
EasyMock.createNiceMock(SegmentsMetadataCache.class),
EasyMock.createNiceMock(CompactionScheduler.class),
new DefaultObjectMapper(),
new NoopTaskContextEnricher()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ public void testRegisterUpgradedPendingSegmentOnSupervisor()

replayAll();

final PendingSegmentRecord pendingSegment = new PendingSegmentRecord(
final PendingSegmentRecord pendingSegment = PendingSegmentRecord.create(
new SegmentIdWithShardSpec(
"DS",
Intervals.ETERNITY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentsMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentsMetadataCache;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
Expand Down Expand Up @@ -120,6 +122,7 @@
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CompressionUtils;
Expand Down Expand Up @@ -587,6 +590,13 @@ protected void makeToolboxFactory(TestUtils testUtils, ServiceEmitter emitter, b
);
segmentSchemaManager = new SegmentSchemaManager(derby.metadataTablesConfigSupplier().get(), objectMapper, derbyConnector);
metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentsMetadataTransactionFactory(
objectMapper,
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentsMetadataCache()
),
objectMapper,
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2025,7 +2025,7 @@ public void testRegisterNewVersionOfPendingSegment()
ImmutableSet.of()
);

final PendingSegmentRecord pendingSegmentRecord0 = new PendingSegmentRecord(
final PendingSegmentRecord pendingSegmentRecord0 = PendingSegmentRecord.create(
new SegmentIdWithShardSpec(
"DS",
Intervals.of("2024/2025"),
Expand All @@ -2037,7 +2037,7 @@ public void testRegisterNewVersionOfPendingSegment()
"someAppendedSegment0",
taskGroup0.getBaseSequenceName()
);
final PendingSegmentRecord pendingSegmentRecord1 = new PendingSegmentRecord(
final PendingSegmentRecord pendingSegmentRecord1 = PendingSegmentRecord.create(
new SegmentIdWithShardSpec(
"DS",
Intervals.of("2024/2025"),
Expand Down
Loading
Loading