diff --git a/engine-tests/src/test/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkProviderTest.java b/engine-tests/src/test/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkProviderTest.java index d56a25328aa..655266989f0 100644 --- a/engine-tests/src/test/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkProviderTest.java +++ b/engine-tests/src/test/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkProviderTest.java @@ -3,6 +3,7 @@ package org.terasology.engine.world.chunks.localChunkProvider; import com.google.common.collect.Maps; +import org.joml.Vector3f; import org.joml.Vector3i; import org.joml.Vector3ic; import org.junit.jupiter.api.AfterEach; @@ -13,11 +14,11 @@ import org.terasology.engine.entitySystem.entity.EntityManager; import org.terasology.engine.entitySystem.entity.EntityRef; import org.terasology.engine.entitySystem.event.Event; +import org.terasology.engine.logic.location.LocationComponent; import org.terasology.engine.world.BlockEntityRegistry; import org.terasology.engine.world.block.BeforeDeactivateBlocks; import org.terasology.engine.world.block.Block; import org.terasology.engine.world.block.BlockManager; -import org.terasology.engine.world.block.BlockRegion; import org.terasology.engine.world.block.OnActivatedBlocks; import org.terasology.engine.world.block.OnAddedBlocks; import org.terasology.engine.world.chunks.Chunk; @@ -36,17 +37,16 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; class LocalChunkProviderTest { - private static final int WAIT_CHUNK_IS_READY_IN_SECONDS = 30; + private static final int WAIT_CHUNK_IS_READY_IN_SECONDS = 5; private LocalChunkProvider chunkProvider; private EntityManager entityManager; @@ -58,6 +58,8 @@ class LocalChunkProviderTest { private Block blockAtBlockManager; private TestStorageManager storageManager; private TestWorldGenerator generator; + private RelevanceSystem relevanceSystem; + private EntityRef playerEntity; @BeforeEach public void setUp() { @@ -81,7 +83,8 @@ public void setUp() { chunkCache); chunkProvider.setBlockEntityRegistry(blockEntityRegistry); chunkProvider.setWorldEntity(worldEntity); - chunkProvider.setRelevanceSystem(new RelevanceSystem(chunkProvider)); // workaround. initialize loading pipeline + relevanceSystem = new RelevanceSystem(chunkProvider); + chunkProvider.setRelevanceSystem(relevanceSystem); // workaround. initialize loading pipeline } @AfterEach @@ -89,28 +92,24 @@ public void tearDown() { chunkProvider.shutdown(); } - private Future requestCreatingOrLoadingArea(Vector3ic chunkPosition, int radius) { - Future chunkFuture = chunkProvider.createOrLoadChunk(chunkPosition); - BlockRegion extentsRegion = new BlockRegion( - chunkPosition.x() - radius, chunkPosition.y() - radius, chunkPosition.z() - radius, - chunkPosition.x() + radius, chunkPosition.y() + radius, chunkPosition.z() + radius); - - extentsRegion.iterator().forEachRemaining(pos -> { - if (!pos.equals(chunkPosition)) { // remove center. we takes future for it already. - chunkProvider.createOrLoadChunk(pos); - } - }); - return chunkFuture; + private void requestCreatingOrLoadingArea(Vector3ic chunkPosition, int radius) { + playerEntity = mock(EntityRef.class); + when(playerEntity.exists()).thenReturn(true); + when(playerEntity.getComponent(LocationComponent.class)).thenReturn(new LocationComponent(new Vector3f(chunkPosition))); + Vector3i distance = new Vector3i(radius * 2, radius * 2, radius * 2); + relevanceSystem.addRelevanceEntity(playerEntity, distance, null); } - private Future requestCreatingOrLoadingArea(Vector3ic chunkPosition) { - return requestCreatingOrLoadingArea(chunkPosition, 1); + private void requestCreatingOrLoadingArea(Vector3ic chunkPosition) { + requestCreatingOrLoadingArea(chunkPosition, 1); } @Test void testGenerateSingleChunk() throws InterruptedException, ExecutionException, TimeoutException { Vector3i chunkPosition = new Vector3i(0, 0, 0); - requestCreatingOrLoadingArea(chunkPosition).get(WAIT_CHUNK_IS_READY_IN_SECONDS, TimeUnit.SECONDS); + requestCreatingOrLoadingArea(chunkPosition); + chunkProvider.notifyRelevanceChanged(); + chunkProvider.waitUntilGenerated(WAIT_CHUNK_IS_READY_IN_SECONDS * 1000); chunkProvider.update(); final ArgumentCaptor eventArgumentCaptor = ArgumentCaptor.forClass(Event.class); @@ -139,7 +138,9 @@ void testGenerateSingleChunkWithBlockLifeCycle() throws InterruptedException, Ex Vector3i chunkPosition = new Vector3i(0, 0, 0); blockAtBlockManager.setLifecycleEventsRequired(true); blockAtBlockManager.setEntity(mock(EntityRef.class)); - requestCreatingOrLoadingArea(chunkPosition).get(WAIT_CHUNK_IS_READY_IN_SECONDS, TimeUnit.SECONDS); + requestCreatingOrLoadingArea(chunkPosition); + chunkProvider.notifyRelevanceChanged(); + chunkProvider.waitUntilGenerated(WAIT_CHUNK_IS_READY_IN_SECONDS * 1000); chunkProvider.update(); final ArgumentCaptor worldEventCaptor = ArgumentCaptor.forClass(Event.class); @@ -181,7 +182,9 @@ void testLoadSingleChunk() throws InterruptedException, ExecutionException, Time generator.createChunk(chunk, null); storageManager.add(chunk); - requestCreatingOrLoadingArea(chunkPosition).get(WAIT_CHUNK_IS_READY_IN_SECONDS, TimeUnit.SECONDS); + requestCreatingOrLoadingArea(chunkPosition); + chunkProvider.notifyRelevanceChanged(); + chunkProvider.waitUntilGenerated(WAIT_CHUNK_IS_READY_IN_SECONDS * 1000); chunkProvider.update(); Assertions.assertTrue(((TestChunkStore) storageManager.loadChunkStore(chunkPosition)).isEntityRestored(), @@ -206,7 +209,9 @@ void testLoadSingleChunkWithBlockLifecycle() throws InterruptedException, Execut blockAtBlockManager.setLifecycleEventsRequired(true); blockAtBlockManager.setEntity(mock(EntityRef.class)); - requestCreatingOrLoadingArea(chunkPosition).get(WAIT_CHUNK_IS_READY_IN_SECONDS, TimeUnit.SECONDS); + requestCreatingOrLoadingArea(chunkPosition); + chunkProvider.notifyRelevanceChanged(); + chunkProvider.waitUntilGenerated(WAIT_CHUNK_IS_READY_IN_SECONDS * 1000); chunkProvider.update(); Assertions.assertTrue(((TestChunkStore) storageManager.loadChunkStore(chunkPosition)).isEntityRestored(), @@ -249,9 +254,13 @@ void testUnloadChunkAndDeactivationBlock() throws InterruptedException, TimeoutE blockAtBlockManager.setLifecycleEventsRequired(true); blockAtBlockManager.setEntity(mock(EntityRef.class)); - requestCreatingOrLoadingArea(chunkPosition).get(WAIT_CHUNK_IS_READY_IN_SECONDS, TimeUnit.SECONDS); + requestCreatingOrLoadingArea(chunkPosition); + chunkProvider.notifyRelevanceChanged(); + chunkProvider.waitUntilGenerated(WAIT_CHUNK_IS_READY_IN_SECONDS * 1000); + relevanceSystem.removeRelevanceEntity(playerEntity); + chunkProvider.notifyRelevanceChanged(); - //Wait BeforeDeactivateBlocks event + // Wait for BeforeDeactivateBlocks event Assertions.assertTimeoutPreemptively(Duration.of(WAIT_CHUNK_IS_READY_IN_SECONDS, ChronoUnit.SECONDS), () -> { ArgumentCaptor blockEventCaptor = ArgumentCaptor.forClass(Event.class); diff --git a/engine-tests/src/test/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipelineTest.java b/engine-tests/src/test/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipelineTest.java index a4f11bfe019..afe581d4427 100644 --- a/engine-tests/src/test/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipelineTest.java +++ b/engine-tests/src/test/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipelineTest.java @@ -3,6 +3,7 @@ package org.terasology.engine.world.chunks.pipeline; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.joml.Vector3i; @@ -12,7 +13,6 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; -import org.terasology.gestalt.assets.management.AssetManager; import org.terasology.engine.TerasologyTestingEnvironment; import org.terasology.engine.registry.CoreRegistry; import org.terasology.engine.world.block.BlockManager; @@ -22,22 +22,21 @@ import org.terasology.engine.world.chunks.blockdata.ExtraBlockDataManager; import org.terasology.engine.world.chunks.internal.ChunkImpl; import org.terasology.engine.world.chunks.pipeline.stages.ChunkTaskProvider; +import org.terasology.engine.world.chunks.remoteChunkProvider.ReceivedInitialChunkProvider; +import org.terasology.gestalt.assets.management.AssetManager; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; @TestInstance(TestInstance.Lifecycle.PER_METHOD) @Tag("TteTest") @@ -48,25 +47,36 @@ class ChunkProcessingPipelineTest extends TerasologyTestingEnvironment { private final ExtraBlockDataManager extraDataManager = new ExtraBlockDataManager(); private ChunkProcessingPipeline pipeline; + void waitUntilDone() throws InterruptedException { + pipeline.waitUntilDone(5000); + } + @Test - void simpleProcessingSuccess() throws ExecutionException, InterruptedException, TimeoutException { - pipeline = new ChunkProcessingPipeline((p) -> null, (o1, o2) -> 0); + void simpleProcessingSuccess() throws InterruptedException { + ReceivedInitialChunkProvider initialProvider = new ReceivedInitialChunkProvider((o1, o2) -> 0); + pipeline = new ChunkProcessingPipeline((p) -> null, initialProvider); Vector3i chunkPos = new Vector3i(0, 0, 0); Chunk chunk = createChunkAt(chunkPos); pipeline.addStage(ChunkTaskProvider.create("dummy task", (c) -> c)); + List result = new ArrayList<>(); + pipeline.addStage(ChunkTaskProvider.create("Chunk ready", (Consumer) result::add)); + + initialProvider.submit(chunk); + pipeline.start(); - Future chunkFuture = pipeline.invokeGeneratorTask(new Vector3i(0, 0, 0), () -> chunk); - Chunk chunkAfterProcessing = chunkFuture.get(1, TimeUnit.SECONDS); + waitUntilDone(); + Chunk chunkAfterProcessing = result.get(0); - Assertions.assertEquals(chunkAfterProcessing.getPosition(new Vector3i()), chunk.getPosition(new Vector3i()), + Assertions.assertEquals(chunkAfterProcessing.getPosition(), chunk.getPosition(), "Chunk after processing must have equals position, probably pipeline lost you chunk"); } @Test void simpleStopProcessingSuccess() { - pipeline = new ChunkProcessingPipeline((p) -> null, (o1, o2) -> 0); + ReceivedInitialChunkProvider initialProvider = new ReceivedInitialChunkProvider((o1, o2) -> 0); + pipeline = new ChunkProcessingPipeline((p) -> null, initialProvider); Vector3i position = new Vector3i(0, 0, 0); Chunk chunk = createChunkAt(position); @@ -80,20 +90,17 @@ void simpleStopProcessingSuccess() { return c; })); - Future chunkFuture = pipeline.invokeGeneratorTask(position, () -> chunk); + initialProvider.submit(chunk); pipeline.stopProcessingAt(position); - Assertions.assertThrows( - CancellationException.class, - () -> chunkFuture.get(1, TimeUnit.SECONDS), - "chunkFuture must be cancelled, when processing stopped" - ); + + Assertions.assertFalse(pipeline.isPositionProcessing(position)); } /** * Imagine that we have task, which requires neighbors with same Z level. neighbors chunk already in chunk cache. */ @Test - void multiRequirementsChunksExistsSuccess() throws ExecutionException, InterruptedException, TimeoutException { + void multiRequirementsChunksExistsSuccess() throws InterruptedException { Vector3i positionToGenerate = new Vector3i(0, 0, 0); Map chunkCache = getNearChunkPositions(positionToGenerate) @@ -105,7 +112,8 @@ void multiRequirementsChunksExistsSuccess() throws ExecutionException, Interrupt Function.identity() )); - pipeline = new ChunkProcessingPipeline(chunkCache::get, (o1, o2) -> 0); + ReceivedInitialChunkProvider initialProvider = new ReceivedInitialChunkProvider((o1, o2) -> 0); + pipeline = new ChunkProcessingPipeline(chunkCache::get, initialProvider); pipeline.addStage(ChunkTaskProvider.createMulti( "flat merging task", (chunks) -> chunks.stream() @@ -113,12 +121,17 @@ void multiRequirementsChunksExistsSuccess() throws ExecutionException, Interrupt .findFirst() // return central chunk. .get(), this::getNearChunkPositions)); + List result = new ArrayList<>(); + pipeline.addStage(ChunkTaskProvider.create("Chunk ready", (Consumer) result::add)); Chunk chunk = createChunkAt(positionToGenerate); - Future chunkFuture = pipeline.invokeGeneratorTask(new Vector3i(0, 0, 0), () -> chunk); - Chunk chunkAfterProcessing = chunkFuture.get(1, TimeUnit.SECONDS); + initialProvider.submit(chunk); + pipeline.start(); - Assertions.assertEquals(chunkAfterProcessing.getPosition(new Vector3i()), chunk.getPosition(new Vector3i()), + waitUntilDone(); + Chunk chunkAfterProcessing = result.get(0); + + Assertions.assertEquals(chunkAfterProcessing.getPosition(), chunk.getPosition(), "Chunk after processing must have equals position, probably pipeline lost you chunk"); } @@ -126,8 +139,7 @@ void multiRequirementsChunksExistsSuccess() throws ExecutionException, Interrupt * Imagine that we have task, which requires neighbors with same Z level. neighbor will generated. */ @Test - void multiRequirementsChunksWillGeneratedSuccess() throws ExecutionException, InterruptedException, - TimeoutException { + void multiRequirementsChunksWillGeneratedSuccess() throws InterruptedException { Vector3i positionToGenerate = new Vector3i(0, 0, 0); Map chunkToGenerate = getNearChunkPositions(positionToGenerate) @@ -139,43 +151,47 @@ void multiRequirementsChunksWillGeneratedSuccess() throws ExecutionException, In Function.identity() )); - pipeline = new ChunkProcessingPipeline((p) -> null, (o1, o2) -> 0); + ReceivedInitialChunkProvider initialProvider = new ReceivedInitialChunkProvider((o1, o2) -> 0); + pipeline = new ChunkProcessingPipeline((p) -> null, initialProvider); pipeline.addStage(ChunkTaskProvider.createMulti( "flat merging task", (chunks) -> chunks.stream() .filter((c) -> c.getPosition().equals(positionToGenerate)).findFirst() // return central chunk. .get(), this::getNearChunkPositions)); + List result = new ArrayList<>(); + pipeline.addStage(ChunkTaskProvider.create("Chunk ready", (Consumer) result::add)); Chunk chunk = createChunkAt(positionToGenerate); - Future chunkFuture = pipeline.invokeGeneratorTask(new Vector3i(0, 0, 0), () -> chunk); + initialProvider.submit(chunk); + pipeline.start(); - Thread.sleep(1_000); // sleep 1 second. and check future. - Assertions.assertFalse(chunkFuture.isDone(), "Chunk must be not generated, because ChunkTask have not exists " + - "neighbors in requirements"); + waitUntilDone(); + Assertions.assertTrue(result.isEmpty(), "Chunk must be not generated, because the ChunkTask's requirements don't exist"); - chunkToGenerate.forEach((position, neighborChunk) -> pipeline.invokeGeneratorTask(position, - () -> neighborChunk)); + chunkToGenerate.forEach((position, neighborChunk) -> initialProvider.submit(neighborChunk)); + pipeline.notifyUpdate(); - Chunk chunkAfterProcessing = chunkFuture.get(1, TimeUnit.SECONDS); + waitUntilDone(); + Assertions.assertEquals(1, result.size(), "Only one chunk has its requirements fulfilled"); + Chunk chunkAfterProcessing = result.get(0); - Assertions.assertEquals(chunkAfterProcessing.getPosition(new Vector3i()), chunk.getPosition(new Vector3i()), + Assertions.assertEquals(chunkAfterProcessing.getPosition(), chunk.getPosition(), "Chunk after processing must have equals position, probably pipeline lost you chunk"); } @Test void emulateEntityMoving() throws InterruptedException { final AtomicReference position = new AtomicReference<>(); - Map> futures = Maps.newHashMap(); Map chunkCache = Maps.newConcurrentMap(); - pipeline = new ChunkProcessingPipeline(chunkCache::get, (o1, o2) -> { + ReceivedInitialChunkProvider initialProvider = new ReceivedInitialChunkProvider((o1, o2) -> { if (position.get() != null) { Vector3ic entityPos = position.get(); - return (int) (entityPos.distance(((PositionFuture) o1).getPosition()) - - entityPos.distance(((PositionFuture) o2).getPosition())); + return (int) (entityPos.distance(o1.getPosition()) - entityPos.distance(o2.getPosition())); } return 0; }); + pipeline = new ChunkProcessingPipeline(chunkCache::get, initialProvider); pipeline.addStage(ChunkTaskProvider.createMulti( "flat merging task", (chunks) -> chunks.stream() @@ -189,9 +205,9 @@ void emulateEntityMoving() throws InterruptedException { this::getNearChunkPositions)); pipeline.addStage(ChunkTaskProvider.create("finish chunk", (c) -> { c.markReady(); - chunkCache.put(c.getPosition(new Vector3i()), c); + chunkCache.put(c.getPosition(), c); })); - + pipeline.start(); Set relativeRegion = Collections.emptySet(); for (int i = 0; i < 10; i++) { @@ -199,9 +215,7 @@ void emulateEntityMoving() throws InterruptedException { Set newRegion = getNearChunkPositions(position.get(), 10); Sets.difference(newRegion, relativeRegion).forEach(// load new chunks. (pos) -> { - Future future = pipeline.invokeGeneratorTask(new Vector3i(pos), - () -> createChunkAt(pos)); - futures.put(pos, future); + initialProvider.submit(createChunkAt(pos)); } ); @@ -213,6 +227,10 @@ void emulateEntityMoving() throws InterruptedException { } } ); + + pipeline.notifyUpdate(); + Thread.sleep(100); + relativeRegion = newRegion; Assertions.assertTrue(Sets.difference(chunkCache.keySet(), relativeRegion).isEmpty(), "We must haven't " + @@ -220,16 +238,8 @@ void emulateEntityMoving() throws InterruptedException { Assertions.assertTrue(Sets.difference(Sets.newHashSet(pipeline.getProcessingPosition()), relativeRegion).isEmpty(), "We must haven't chunks in processing not related to relativeRegion"); - Stream> relativeFutures = relativeRegion.stream().map(futures::get); - Assertions.assertTrue( - relativeFutures.noneMatch(Future::isCancelled), - "Relative futures must be not cancelled"); - - Stream> nonRelativeFutures = - Sets.difference(futures.keySet(), relativeRegion).stream().map(futures::get); - Assertions.assertTrue( - nonRelativeFutures.allMatch((f) -> f.isCancelled() || f.isDone()), - "Non relative futures must be cancelled or done"); + Assertions.assertTrue(relativeRegion.containsAll(Lists.newArrayList(pipeline.getProcessingPosition())), + "No non-relative chunks should be processing"); Thread.sleep(new Random().nextInt(500)); //think time } diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkInitialProvider.java b/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkInitialProvider.java new file mode 100644 index 00000000000..0dbd48809ea --- /dev/null +++ b/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkInitialProvider.java @@ -0,0 +1,79 @@ +// Copyright 2021 The Terasology Foundation +// SPDX-License-Identifier: Apache-2.0 + +package org.terasology.engine.world.chunks.localChunkProvider; + +import org.joml.Vector3ic; +import org.terasology.engine.world.block.BlockRegion; +import org.terasology.engine.world.chunks.Chunk; +import org.terasology.engine.world.chunks.internal.ChunkRelevanceRegion; +import org.terasology.engine.world.chunks.pipeline.InitialChunkProvider; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +public class LocalChunkInitialProvider implements InitialChunkProvider { + private final LocalChunkProvider chunkProvider; + private final RelevanceSystem relevanceSystem; + + private final List chunksInRange; + private BlockRegion[] lastRegions; + + public LocalChunkInitialProvider(LocalChunkProvider chunkProvider, RelevanceSystem relevanceSystem) { + this.chunkProvider = chunkProvider; + this.relevanceSystem = relevanceSystem; + chunksInRange = new ArrayList<>(); + updateList(); + } + + private void updateList() { + relevanceSystem.neededChunks() + .filter(pos -> !chunksInRange.contains(pos)) + .forEach(chunksInRange::add); + chunksInRange.removeIf(x -> !relevanceSystem.isChunkInRegions(x) || chunkProvider.isChunkReady(x)); + chunksInRange.sort(relevanceSystem.createChunkPosComparator().reversed()); + } + + private boolean checkForUpdate() { + Collection regions = relevanceSystem.getRegions(); + if (lastRegions == null || regions.size() != lastRegions.length) { + lastRegions = regions.stream().map(ChunkRelevanceRegion::getCurrentRegion).toArray(BlockRegion[]::new); + return true; + } + int i = 0; + boolean anyChanged = false; + for (ChunkRelevanceRegion region : regions) { + if (!lastRegions[i].equals(region.getCurrentRegion())) { + lastRegions[i].set(region.getCurrentRegion()); + anyChanged = true; + } + i++; + } + return anyChanged; + } + + @Override + public Optional next(Set currentlyGenerating) { + while (true) { + Vector3ic pos; + synchronized (this) { + if (checkForUpdate()) { + updateList(); + } + if (chunksInRange.isEmpty()) { + break; + } + pos = chunksInRange.remove(chunksInRange.size() - 1); + } + if (currentlyGenerating.contains(pos)) { + continue; + } + + return Optional.of(chunkProvider.getInitialChunk(pos)); + } + return Optional.empty(); + } +} diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkProvider.java b/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkProvider.java index 7b31d064343..e2dd4c3e9d0 100644 --- a/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkProvider.java +++ b/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/LocalChunkProvider.java @@ -41,7 +41,6 @@ import org.terasology.engine.world.chunks.event.OnChunkLoaded; import org.terasology.engine.world.chunks.event.PurgeWorldEvent; import org.terasology.engine.world.chunks.internal.ChunkImpl; -import org.terasology.engine.world.chunks.internal.ChunkRelevanceRegion; import org.terasology.engine.world.chunks.pipeline.ChunkProcessingPipeline; import org.terasology.engine.world.chunks.pipeline.stages.ChunkTaskProvider; import org.terasology.engine.world.generation.impl.EntityBufferImpl; @@ -57,7 +56,6 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -118,24 +116,29 @@ public LocalChunkProvider(StorageManager storageManager, EntityManager entityMan ChunkMonitor.fireChunkProviderInitialized(this); } + /** + * Wait until the pipeline is done generating and processing chunks, for use in tests. + */ + protected void waitUntilGenerated(long timeoutMillis) throws InterruptedException { + loadingPipeline.waitUntilDone(timeoutMillis); + } - protected Future createOrLoadChunk(Vector3ic chunkPos) { - Vector3i pos = new Vector3i(chunkPos); - return loadingPipeline.invokeGeneratorTask( - pos, - () -> { - ChunkStore chunkStore = storageManager.loadChunkStore(pos); - Chunk chunk; - EntityBufferImpl buffer = new EntityBufferImpl(); - if (chunkStore == null) { - chunk = new ChunkImpl(pos, blockManager, extraDataManager); - generator.createChunk(chunk, buffer); - generateQueuedEntities.put(chunk.getPosition(new Vector3i()), buffer.getAll()); - } else { - chunk = chunkStore.getChunk(); - } - return chunk; - }); + /** + * Load a chunk from disk or generate it, so that it's ready for processing in the chunk processing pipeline. + * This method is called from the chunk processing worker threads. + */ + protected Chunk getInitialChunk(Vector3ic pos) { + ChunkStore chunkStore = storageManager.loadChunkStore(pos); + Chunk chunk; + EntityBufferImpl buffer = new EntityBufferImpl(); + if (chunkStore == null) { + chunk = new ChunkImpl(pos, blockManager, extraDataManager); + generator.createChunk(chunk, buffer); + generateQueuedEntities.put(chunk.getPosition(), buffer.getAll()); + } else { + chunk = chunkStore.getChunk(); + } + return chunk; } public void setBlockEntityRegistry(BlockEntityRegistry value) { @@ -229,6 +232,13 @@ private void generateQueuedEntities(EntityStore store) { } } + /** + * Notify this provider's pipeline that the relevance regions changed, so new chunks might be available. + */ + protected void notifyRelevanceChanged() { + loadingPipeline.notifyUpdate(); + } + @Override public void update() { deactivateBlocks(); @@ -381,7 +391,6 @@ public boolean reloadChunk(Vector3ic coords) { if (unloadChunkInternal(coords)) { chunkCache.remove(coords); - createOrLoadChunk(coords); return true; } @@ -401,31 +410,9 @@ public void purgeWorld() { chunkCache.clear(); storageManager.deleteWorld(); worldEntity.send(new PurgeWorldEvent()); - - loadingPipeline = new ChunkProcessingPipeline(this::getChunk, relevanceSystem.createChunkTaskComporator()); - loadingPipeline.addStage( - ChunkTaskProvider.create("Chunk generate internal lightning", - (Consumer) InternalLightProcessor::generateInternalLighting)) - .addStage(ChunkTaskProvider.create("Chunk deflate", Chunk::deflate)) - .addStage(ChunkTaskProvider.createMulti("Light merging", - chunks -> { - Chunk[] localChunks = chunks.toArray(new Chunk[0]); - return new LightMerger().merge(localChunks); - }, - pos -> StreamSupport.stream(new BlockRegion(pos).expand(1, 1, 1).spliterator(), false) - .map(Vector3i::new) - .collect(Collectors.toSet()) - )) - .addStage(ChunkTaskProvider.create("Chunk ready", readyChunks::add)); + setRelevanceSystem(relevanceSystem); unloadRequestTaskMaster = TaskMaster.createFIFOTaskMaster("Chunk-Unloader", 8); ChunkMonitor.fireChunkProviderInitialized(this); - - for (ChunkRelevanceRegion chunkRelevanceRegion : relevanceSystem.getRegions()) { - for (Vector3ic pos : chunkRelevanceRegion.getCurrentRegion()) { - createOrLoadChunk(pos); - } - chunkRelevanceRegion.setUpToDate(); - } } @Override @@ -440,7 +427,7 @@ private boolean isChunkReady(Chunk chunk) { // TODO: move loadingPipeline initialization into constructor. public void setRelevanceSystem(RelevanceSystem relevanceSystem) { this.relevanceSystem = relevanceSystem; - loadingPipeline = new ChunkProcessingPipeline(this::getChunk, relevanceSystem.createChunkTaskComporator()); + loadingPipeline = new ChunkProcessingPipeline(this::getChunk, new LocalChunkInitialProvider(this, relevanceSystem)); loadingPipeline.addStage( ChunkTaskProvider.create("Chunk generate internal lightning", (Consumer) InternalLightProcessor::generateInternalLighting)) @@ -455,5 +442,6 @@ public void setRelevanceSystem(RelevanceSystem relevanceSystem) { .collect(Collectors.toCollection(Sets::newLinkedHashSet)) )) .addStage(ChunkTaskProvider.create("Chunk ready", readyChunks::add)); + loadingPipeline.start(); } } diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/RelevanceSystem.java b/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/RelevanceSystem.java index 6108c2c188a..c8e567a9832 100644 --- a/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/RelevanceSystem.java +++ b/engine/src/main/java/org/terasology/engine/world/chunks/localChunkProvider/RelevanceSystem.java @@ -30,6 +30,7 @@ import java.util.concurrent.Future; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Stream; import java.util.stream.StreamSupport; /** @@ -118,6 +119,12 @@ public void removeRelevanceEntity(EntityRef entity) { } } + public Stream neededChunks() { + return regions.values() + .stream() + .flatMap(x -> StreamSupport.stream(x.getNeededChunks().spliterator(), false)); + } + /** * Synchronize region center to entity's position and create/load chunks in that region. */ @@ -130,10 +137,9 @@ private void updateRelevance() { Chunk chunk = chunkProvider.getChunk(pos); if (chunk != null) { chunkRelevanceRegion.checkIfChunkIsRelevant(chunk); - } else { - chunkProvider.createOrLoadChunk(pos); } } + chunkProvider.notifyRelevanceChanged(); chunkRelevanceRegion.setUpToDate(); } } @@ -174,14 +180,11 @@ public void addRelevanceEntity(EntityRef entity, Vector3ic distance, ChunkRegion } StreamSupport.stream(region.getCurrentRegion().spliterator(), false) - .sorted(new PositionRelevanceComparator()) //<-- this is n^2 cost. not sure why this needs to be sorted like this. .forEach( pos -> { Chunk chunk = chunkProvider.getChunk(pos); if (chunk != null) { region.checkIfChunkIsRelevant(chunk); - } else { - chunkProvider.createOrLoadChunk(pos); } } ); @@ -203,12 +206,12 @@ public boolean isChunkInRegions(Vector3ic pos) { } /** - * Create comporator for ChunkTasks, which compare by distance from region centers + * Create comparator for chunk positions, which compare by distance from region centers * - * @return Comporator. + * @return Comparator. */ - public Comparator> createChunkTaskComporator() { - return new ChunkTaskRelevanceComparator(); + public Comparator createChunkPosComparator() { + return new PositionRelevanceComparator(); } /** @@ -270,22 +273,6 @@ private int regionsDistanceScore(Vector3ic chunk) { } } - /** - * Compare ChunkTasks by distance from region's centers. - */ - private class ChunkTaskRelevanceComparator implements Comparator> { - - @Override - public int compare(Future o1, Future o2) { - return score((PositionFuture) o1) - score((PositionFuture) o2); - } - - private int score(PositionFuture task) { - return RelevanceSystem.this.regionsDistanceScore(task.getPosition()); - } - } - - /** * Compare ChunkTasks by distance from region's centers. */ diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingInfo.java b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingInfo.java index c7267da1e86..74bc768ff89 100644 --- a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingInfo.java +++ b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingInfo.java @@ -3,38 +3,34 @@ package org.terasology.engine.world.chunks.pipeline; -import com.google.common.util.concurrent.SettableFuture; import org.joml.Vector3ic; +import org.terasology.engine.world.chunks.Chunk; import org.terasology.engine.world.chunks.pipeline.stages.ChunkTask; import org.terasology.engine.world.chunks.pipeline.stages.ChunkTaskProvider; -import org.terasology.engine.world.chunks.Chunk; import java.util.List; -import java.util.concurrent.Future; +import java.util.concurrent.locks.ReentrantLock; public final class ChunkProcessingInfo { + /** + * Used by ChunkProcessingPipeline worker threads to avoid working on the same chunk at the same time. + */ + protected final ReentrantLock lock = new ReentrantLock(); private final Vector3ic position; - private final SettableFuture externalFuture; private Chunk chunk; private ChunkTaskProvider chunkTaskProvider; - private Future currentFuture; private org.terasology.engine.world.chunks.pipeline.stages.ChunkTask chunkTask; - public ChunkProcessingInfo(Vector3ic position, SettableFuture externalFuture) { + public ChunkProcessingInfo(Vector3ic position) { this.position = position; - this.externalFuture = externalFuture; } public Vector3ic getPosition() { return position; } - public SettableFuture getExternalFuture() { - return externalFuture; - } - public Chunk getChunk() { return chunk; } @@ -51,14 +47,6 @@ public void setChunkTaskProvider(ChunkTaskProvider chunkTaskProvider) { this.chunkTaskProvider = chunkTaskProvider; } - public Future getCurrentFuture() { - return currentFuture; - } - - public void setCurrentFuture(Future currentFuture) { - this.currentFuture = currentFuture; - } - public org.terasology.engine.world.chunks.pipeline.stages.ChunkTask getChunkTask() { return chunkTask; } @@ -83,10 +71,6 @@ void nextStage(List stages) { chunkTaskProvider = stages.get(nextStageIndex); } - void endProcessing() { - externalFuture.set(chunk); - } - ChunkTask makeChunkTask() { if (chunkTask == null) { chunkTask = chunkTaskProvider.createChunkTask(position); @@ -95,7 +79,6 @@ ChunkTask makeChunkTask() { } void resetTaskState() { - currentFuture = null; chunkTask = null; } } diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipeline.java b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipeline.java index bba932da477..bd7552cd496 100644 --- a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipeline.java +++ b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/ChunkProcessingPipeline.java @@ -6,36 +6,22 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.util.concurrent.SettableFuture; -import org.joml.Vector3i; import org.joml.Vector3ic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.terasology.engine.monitoring.ThreadActivity; import org.terasology.engine.monitoring.ThreadMonitor; -import org.terasology.engine.utilities.ReflectionUtil; import org.terasology.engine.world.chunks.Chunk; import org.terasology.engine.world.chunks.pipeline.stages.ChunkTask; import org.terasology.engine.world.chunks.pipeline.stages.ChunkTaskProvider; -import java.util.Comparator; +import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.RunnableFuture; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; /** * Manages execution of chunk processing. @@ -44,120 +30,185 @@ */ public class ChunkProcessingPipeline { - private static final int NUM_TASK_THREADS = 4; + private static final int NUM_TASK_THREADS = 2; + private static final int NUM_CHUNKS_AT_ONCE = 8; private static final Logger logger = LoggerFactory.getLogger(ChunkProcessingPipeline.class); + /** + * This object can be `wait`ed on, and will notify all waiting threads when the provider is done generating chunks. + * It's mostly meant for testing. + */ + private final Object doneGeneratingSignal = new Object(); + private final AtomicInteger numAliveThreads = new AtomicInteger(0); + /** + * Another signal object, used to tell worker threads when new chunks become available. + */ + private final Object newChunksSignal = new Object(); + private final InitialChunkProvider initialChunkProvider; + private final AtomicBoolean shouldStop = new AtomicBoolean(false); + private final List stages = Lists.newArrayList(); - private final Thread reactor; - private final CompletionService chunkProcessor; - private final ThreadPoolExecutor executor; private final Function chunkProvider; private final Map chunkProcessingInfoMap = Maps.newConcurrentMap(); - private int threadIndex; /** * Create ChunkProcessingPipeline. */ - public ChunkProcessingPipeline(Function chunkProvider, Comparator> comparable) { + public ChunkProcessingPipeline(Function chunkProvider, InitialChunkProvider initialChunkProvider) { this.chunkProvider = chunkProvider; + this.initialChunkProvider = initialChunkProvider; + } - executor = new ThreadPoolExecutor( - NUM_TASK_THREADS, - NUM_TASK_THREADS, 0L, - TimeUnit.MILLISECONDS, - new PriorityBlockingQueue(800, unwrappingComporator(comparable)), - this::threadFactory, - this::rejectQueueHandler) { - @Override - protected RunnableFuture newTaskFor(Callable callable) { - RunnableFuture newTaskFor = super.newTaskFor(callable); - return new PositionFuture<>(newTaskFor, ((PositionalCallable) callable).getPosition()); - } - }; - chunkProcessor = new ExecutorCompletionService<>(executor, - new PriorityBlockingQueue<>(800, comparable)); - reactor = new Thread(this::chunkTaskHandler); - reactor.setDaemon(true); - reactor.setName("Chunk-Processing-Reactor"); - reactor.start(); + public void start() { + for (int i = 0; i < NUM_TASK_THREADS; i++) { + Thread worker = new Thread(this::runPoolThread); + worker.setDaemon(true); + worker.setName("Chunk-Processing-" + i); + worker.start(); + } } /** - * BlackMagic method: {@link ExecutorCompletionService} wraps task with QueueingFuture (private access) - * there takes wrapped task for comparing in {@link ThreadPoolExecutor} + * Notify the pipeline that new chunks are available, so if any worker threads were suspended, they should be resumed. */ - private Comparator unwrappingComporator(Comparator> comparable) { - return (o1, o2) -> { - Object unwrapped1 = ReflectionUtil.readField(o1, "task"); - Object unwrapped2 = ReflectionUtil.readField(o2, "task"); - return comparable.compare((Future) unwrapped1, (Future) unwrapped2); - }; + public void notifyUpdate() { + synchronized (newChunksSignal) { + newChunksSignal.notifyAll(); + } + } + + private void stopPoolThreads() { + shouldStop.set(true); + notifyUpdate(); } /** - * Reactor thread. Handles all ChunkTask dependency logic and running. + * Wait until this pipeline is done generating and processing chunks, or timeoutMillis milliseconds, whichever is earlier. */ - private void chunkTaskHandler() { + public void waitUntilDone(long timeoutMillis) throws InterruptedException { + synchronized (doneGeneratingSignal) { + doneGeneratingSignal.wait(timeoutMillis); + } + } + + private void runPoolThread() { + Preconditions.checkState(!stages.isEmpty(), "ChunkProcessingPipeline must have at least one stage"); + numAliveThreads.incrementAndGet(); + try { - while (!executor.isTerminated()) { - PositionFuture future = (PositionFuture) chunkProcessor.take(); - ChunkProcessingInfo chunkProcessingInfo = chunkProcessingInfoMap.get(future.getPosition()); - if (chunkProcessingInfo == null) { - continue; // chunk processing was cancelled. + while (!shouldStop.get()) { + // Prioritize advancing generation of chunks that were already started + if (processChunkTasks()) { + continue; + } + + // But if there aren't any chunks that can advance to the next stage right now, start some new chunks + for (int i = 0; i < NUM_CHUNKS_AT_ONCE; i++) { + Optional chunk = initialChunkProvider.next(chunkProcessingInfoMap.keySet()); + if (!chunk.isPresent()) { + // If i > 0, then this thread started some chunks earlier, so it's not done until they're processed + if (i > 0) { + continue; + } + if (shouldStop.get()) { + break; + } + // Don't signal that generation is done until all threads are done + if (numAliveThreads.decrementAndGet() == 0) { + synchronized (doneGeneratingSignal) { + doneGeneratingSignal.notifyAll(); + } + } + synchronized (newChunksSignal) { + newChunksSignal.wait(); + } + numAliveThreads.incrementAndGet(); + continue; + } + Vector3ic position = chunk.get().getPosition(); + + ChunkProcessingInfo chunkProcessingInfo = new ChunkProcessingInfo(position); + chunkProcessingInfo.setChunk(chunk.get()); + chunkProcessingInfo.nextStage(stages); + chunkProcessingInfo.makeChunkTask(); + chunkProcessingInfoMap.put(position, chunkProcessingInfo); } - onStageDone(future, chunkProcessingInfo); } } catch (InterruptedException e) { - if (!executor.isTerminated()) { - logger.error("Reactor thread was interrupted", e); - } - reactor.interrupt(); + logger.warn("Chunk processing thread interrupted"); + } finally { + numAliveThreads.decrementAndGet(); } } - private void onStageDone(PositionFuture future, ChunkProcessingInfo chunkProcessingInfo) throws InterruptedException { - try { - chunkProcessingInfo.resetTaskState(); - chunkProcessingInfo.setChunk(future.get()); + /** + * @return whether any progress was made. + */ + private boolean processChunkTasks() { + boolean anyChanged = false; + for (ChunkProcessingInfo chunkProcessingInfo : chunkProcessingInfoMap.values()) { + ChunkTask chunkTask = chunkProcessingInfo.getChunkTask(); + if (chunkTask != null) { + + List providedChunks = new ArrayList<>(); + boolean satisfied = true; + for (Vector3ic pos : chunkTask.getRequirements()) { + Chunk chunk = getChunkBy(chunkProcessingInfo.getChunkTaskProvider(), pos); + // If we don't have all the requirements generated yet, skip it + if (chunk == null) { + satisfied = false; + break; + } + providedChunks.add(chunk); + } - //Move by stage. - if (chunkProcessingInfo.hasNextStage(stages)) { - chunkProcessingInfo.nextStage(stages); - chunkProcessingInfo.makeChunkTask(); + // If another thread is running the task, just skip it + if (satisfied && chunkProcessingInfo.lock.tryLock()) { + try { + try (ThreadActivity ignored = ThreadMonitor.startThreadActivity(chunkTask.getName())) { + chunkProcessingInfo.setChunk(chunkTask.apply(providedChunks)); + } + chunkProcessingInfo.resetTaskState(); + + if (chunkProcessingInfo.hasNextStage(stages)) { + chunkProcessingInfo.nextStage(stages); + chunkProcessingInfo.makeChunkTask(); + anyChanged = true; + } else { + cleanup(chunkProcessingInfo); + } + } catch (Exception e) { + String stageName = + chunkProcessingInfo.getChunkTaskProvider() == null + ? "Generation or Loading" + : chunkProcessingInfo.getChunkTaskProvider().getName(); + logger.error( + String.format("ChunkTask at position %s and stage [%s] catch error: ", + chunkProcessingInfo.getPosition(), stageName), + e); + } finally { + chunkProcessingInfo.lock.unlock(); + } + } } else { - // haven't next stage - chunkProcessingInfo.endProcessing(); - cleanup(chunkProcessingInfo); - } - processChunkTasks(); + // It doesn't have a task, so either it needs to be advanced a stage, or it's done and needs to be removed + if (chunkProcessingInfo.lock.tryLock()) { + try { + if (chunkProcessingInfo.hasNextStage(stages)) { + chunkProcessingInfo.nextStage(stages); + chunkProcessingInfo.makeChunkTask(); + anyChanged = true; + } else { + cleanup(chunkProcessingInfo); + } + } finally { + chunkProcessingInfo.lock.unlock(); + } + } - } catch (ExecutionException e) { - String stageName = - chunkProcessingInfo.getChunkTaskProvider() == null - ? "Generation or Loading" - : chunkProcessingInfo.getChunkTaskProvider().getName(); - logger.error( - String.format("ChunkTask at position %s and stage [%s] catch error: ", - chunkProcessingInfo.getPosition(), stageName), - e); - chunkProcessingInfo.getExternalFuture().setException(e); - } catch (CancellationException ignored) { + } } - } - - private void processChunkTasks() { - chunkProcessingInfoMap.values().stream() - .filter(chunkProcessingInfo -> chunkProcessingInfo.getChunkTask() != null && chunkProcessingInfo.getCurrentFuture() == null) - .forEach(chunkProcessingInfo -> { - org.terasology.engine.world.chunks.pipeline.stages.ChunkTask chunkTask = chunkProcessingInfo.getChunkTask(); - Set providedChunks = chunkTask.getRequirements().stream() - .map(pos -> getChunkBy(chunkProcessingInfo.getChunkTaskProvider(), pos)) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); - if (providedChunks.size() == chunkTask.getRequirements().size()) { - chunkProcessingInfo.setCurrentFuture(runTask(chunkTask, providedChunks)); - } - }); + return anyChanged; } private Chunk getChunkBy(ChunkTaskProvider requiredStage, Vector3ic position) { @@ -175,25 +226,6 @@ private Chunk getChunkBy(ChunkTaskProvider requiredStage, Vector3ic position) { return chunk; } - private Future runTask(ChunkTask task, Set chunks) { - return chunkProcessor.submit(new PositionalCallable(() -> { - try (ThreadActivity ignored = ThreadMonitor.startThreadActivity(task.getName())) { - return task.apply(chunks); - } - }, task.getPosition())); - } - - private Thread threadFactory(Runnable runnable) { - Thread thread = new Thread(runnable); - thread.setDaemon(true); - thread.setName("Chunk-Processing-" + threadIndex++); - return thread; - } - - private void rejectQueueHandler(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) { - logger.error("Cannot run {} because queue is full", runnable); - } - /** * Add stage to pipeline. * @@ -205,51 +237,15 @@ public ChunkProcessingPipeline addStage(ChunkTaskProvider stage) { return this; } - /** - * Run generator task and then run pipeline processing with it. - *

- * Additionally add technical stages for cleaning pipeline after chunk processing and handles errors in stages. - * - * @param generatorTask ChunkTask which provides new chunk to pipeline - * @return Future of chunk processing. - */ - public Future invokeGeneratorTask(Vector3i position, Supplier generatorTask) { - Preconditions.checkState(!stages.isEmpty(), "ChunkProcessingPipeline must to have at least one stage"); - ChunkProcessingInfo chunkProcessingInfo = chunkProcessingInfoMap.get(position); - if (chunkProcessingInfo != null) { - return chunkProcessingInfo.getExternalFuture(); - } else { - SettableFuture exitFuture = SettableFuture.create(); - chunkProcessingInfo = new ChunkProcessingInfo(position, exitFuture); - chunkProcessingInfoMap.put(position, chunkProcessingInfo); - chunkProcessingInfo.setCurrentFuture(chunkProcessor.submit(new PositionalCallable(generatorTask::get, - position))); - return exitFuture; - } - } - - /** - * Send chunk to processing pipeline. If chunk not processing yet then pipeline will be setted. If chunk processed - * then chunk will be processing in next stage; - * - * @param chunk chunk to process. - */ - public Future invokePipeline(Chunk chunk) { - return invokeGeneratorTask(chunk.getPosition(new Vector3i()), () -> chunk); - } - public void shutdown() { - executor.shutdown(); + stopPoolThreads(); chunkProcessingInfoMap.keySet().forEach(this::stopProcessingAt); chunkProcessingInfoMap.clear(); - executor.getQueue().clear(); - reactor.interrupt(); } public void restart() { - chunkProcessingInfoMap.clear(); - executor.getQueue().clear(); - chunkProcessingInfoMap.keySet().forEach(this::stopProcessingAt); + shutdown(); + start(); } /** @@ -263,13 +259,6 @@ public void stopProcessingAt(Vector3ic pos) { return; } - removed.getExternalFuture().cancel(true); - - Future currentFuture = removed.getCurrentFuture(); - if (currentFuture != null) { - currentFuture.cancel(true); - } - Chunk chunk = removed.getChunk(); if (chunk != null) { chunk.dispose(); @@ -277,12 +266,14 @@ public void stopProcessingAt(Vector3ic pos) { } /** - * Cleanuping Chunk processing after done. + * Cleanup Chunk processing after done. * * @param chunkProcessingInfo chunk to cleanup */ private void cleanup(ChunkProcessingInfo chunkProcessingInfo) { - chunkProcessingInfoMap.remove(chunkProcessingInfo.getPosition(), chunkProcessingInfo); + synchronized (chunkProcessingInfoMap) { + chunkProcessingInfoMap.remove(chunkProcessingInfo.getPosition(), chunkProcessingInfo); + } } /** @@ -303,26 +294,4 @@ public boolean isPositionProcessing(Vector3ic pos) { public Iterable getProcessingPosition() { return chunkProcessingInfoMap.keySet(); } - - /** - * Dummy callable for passthru position for {@link java.util.concurrent.ThreadPoolExecutor}#newTaskFor - */ - private static final class PositionalCallable implements Callable { - private final Callable callable; - private final Vector3ic position; - - private PositionalCallable(Callable callable, Vector3ic position) { - this.callable = callable; - this.position = position; - } - - public Vector3ic getPosition() { - return position; - } - - @Override - public Chunk call() throws Exception { - return callable.call(); - } - } } diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/InitialChunkProvider.java b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/InitialChunkProvider.java new file mode 100644 index 00000000000..be1aea9c55b --- /dev/null +++ b/engine/src/main/java/org/terasology/engine/world/chunks/pipeline/InitialChunkProvider.java @@ -0,0 +1,22 @@ +// Copyright 2021 The Terasology Foundation +// SPDX-License-Identifier: Apache-2.0 + +package org.terasology.engine.world.chunks.pipeline; + +import org.joml.Vector3ic; +import org.terasology.engine.world.chunks.Chunk; + +import java.util.Optional; +import java.util.Set; + +/** + * An InitialChunkProvider provides input chunks to the ChunkProcessingPipeline. + * All methods should be able to be called by multiple threads simultaneously, taking care of any necessary synchronization themselves. + */ +public interface InitialChunkProvider { + /** + * @param currentlyGenerating the set of chunks which are currently being processed. + * This lets the InitialChunkProvider discard those chunks before trying to generate them at all. + */ + Optional next(Set currentlyGenerating); +} diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/remoteChunkProvider/ReceivedInitialChunkProvider.java b/engine/src/main/java/org/terasology/engine/world/chunks/remoteChunkProvider/ReceivedInitialChunkProvider.java new file mode 100644 index 00000000000..0ef79b7175b --- /dev/null +++ b/engine/src/main/java/org/terasology/engine/world/chunks/remoteChunkProvider/ReceivedInitialChunkProvider.java @@ -0,0 +1,35 @@ +// Copyright 2021 The Terasology Foundation +// SPDX-License-Identifier: Apache-2.0 + +package org.terasology.engine.world.chunks.remoteChunkProvider; + +import org.joml.Vector3ic; +import org.terasology.engine.world.chunks.Chunk; +import org.terasology.engine.world.chunks.pipeline.InitialChunkProvider; + +import java.util.Comparator; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; + +/** + * Passes chunks received from the server through into the `ChunkProcessingPipeline`. + */ +public class ReceivedInitialChunkProvider implements InitialChunkProvider { + private final BlockingQueue chunkQueue; + + public ReceivedInitialChunkProvider(Comparator comparator) { + chunkQueue = new PriorityBlockingQueue<>(800, comparator); + } + + public void submit(Chunk chunk) { + chunkQueue.add(chunk); + } + + @Override + public Optional next(Set currentlyGenerating) { + // The queue handles synchronization + return Optional.ofNullable(chunkQueue.poll()); + } +} diff --git a/engine/src/main/java/org/terasology/engine/world/chunks/remoteChunkProvider/RemoteChunkProvider.java b/engine/src/main/java/org/terasology/engine/world/chunks/remoteChunkProvider/RemoteChunkProvider.java index 71eb4e376ce..c2bfdad0888 100644 --- a/engine/src/main/java/org/terasology/engine/world/chunks/remoteChunkProvider/RemoteChunkProvider.java +++ b/engine/src/main/java/org/terasology/engine/world/chunks/remoteChunkProvider/RemoteChunkProvider.java @@ -14,28 +14,26 @@ import org.terasology.engine.entitySystem.entity.EntityRef; import org.terasology.engine.logic.players.LocalPlayer; import org.terasology.engine.monitoring.chunk.ChunkMonitor; -import org.terasology.engine.world.block.BlockRegionc; -import org.terasology.engine.world.internal.ChunkViewCore; -import org.terasology.engine.world.internal.ChunkViewCoreImpl; -import org.terasology.engine.world.propagation.light.InternalLightProcessor; -import org.terasology.engine.world.propagation.light.LightMerger; import org.terasology.engine.world.block.BlockManager; import org.terasology.engine.world.block.BlockRegion; +import org.terasology.engine.world.block.BlockRegionc; import org.terasology.engine.world.chunks.Chunk; import org.terasology.engine.world.chunks.ChunkProvider; import org.terasology.engine.world.chunks.Chunks; import org.terasology.engine.world.chunks.event.BeforeChunkUnload; import org.terasology.engine.world.chunks.event.OnChunkLoaded; import org.terasology.engine.world.chunks.pipeline.ChunkProcessingPipeline; -import org.terasology.engine.world.chunks.pipeline.PositionFuture; import org.terasology.engine.world.chunks.pipeline.stages.ChunkTaskProvider; +import org.terasology.engine.world.internal.ChunkViewCore; +import org.terasology.engine.world.internal.ChunkViewCoreImpl; +import org.terasology.engine.world.propagation.light.InternalLightProcessor; +import org.terasology.engine.world.propagation.light.LightMerger; import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -61,11 +59,12 @@ public class RemoteChunkProvider implements ChunkProvider { private final ChunkProcessingPipeline loadingPipeline; private EntityRef worldEntity = EntityRef.NULL; private ChunkReadyListener listener; + private final ReceivedInitialChunkProvider initialProvider; public RemoteChunkProvider(BlockManager blockManager, LocalPlayer localPlayer) { this.blockManager = blockManager; - loadingPipeline = new ChunkProcessingPipeline(this::getChunk, - new LocalPlayerRelativeChunkComparator(localPlayer)); + initialProvider = new ReceivedInitialChunkProvider(new LocalPlayerRelativeChunkComparator(localPlayer)); + loadingPipeline = new ChunkProcessingPipeline(this::getChunk, initialProvider); loadingPipeline.addStage( ChunkTaskProvider.create("Chunk generate internal lightning", @@ -81,6 +80,7 @@ public RemoteChunkProvider(BlockManager blockManager, LocalPlayer localPlayer) { .collect(Collectors.toSet()) )) .addStage(ChunkTaskProvider.create("", readyChunks::add)); + loadingPipeline.start(); ChunkMonitor.fireChunkProviderInitialized(this); } @@ -89,9 +89,9 @@ public void subscribe(ChunkReadyListener chunkReadyListener) { this.listener = chunkReadyListener; } - public void receiveChunk(final Chunk chunk) { - loadingPipeline.invokePipeline(chunk); + initialProvider.submit(chunk); + loadingPipeline.notifyUpdate(); } public void invalidateChunks(Vector3ic pos) { @@ -199,7 +199,7 @@ public void setWorldEntity(EntityRef entity) { this.worldEntity = entity; } - private final class LocalPlayerRelativeChunkComparator implements Comparator> { + private final class LocalPlayerRelativeChunkComparator implements Comparator { private final LocalPlayer localPlayer; private LocalPlayerRelativeChunkComparator(LocalPlayer localPlayer) { @@ -207,12 +207,12 @@ private LocalPlayerRelativeChunkComparator(LocalPlayer localPlayer) { } @Override - public int compare(Future o1, Future o2) { - return score((PositionFuture) o1) - score((PositionFuture) o2); + public int compare(Chunk o1, Chunk o2) { + return score(o1.getPosition()) - score(o2.getPosition()); } - private int score(PositionFuture task) { - return (int) Chunks.toChunkPos(localPlayer.getPosition(new Vector3f()), new Vector3i()).distance(task.getPosition()); + private int score(Vector3ic pos) { + return (int) Chunks.toChunkPos(localPlayer.getPosition(new Vector3f()), new Vector3i()).distance(pos); } } }