diff --git a/aeron-archive/src/main/java/io/aeron/archive/client/ControlResponsePoller.java b/aeron-archive/src/main/java/io/aeron/archive/client/ControlResponsePoller.java index 6af225bf76..cfd8c2e40e 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/client/ControlResponsePoller.java +++ b/aeron-archive/src/main/java/io/aeron/archive/client/ControlResponsePoller.java @@ -28,7 +28,7 @@ */ public class ControlResponsePoller implements ControlledFragmentHandler { - private static final int FRAGMENT_LIMIT = 10; + private static final int FRAGMENT_LIMIT = 1; private final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder(); private final ControlResponseDecoder controlResponseDecoder = new ControlResponseDecoder(); diff --git a/aeron-archive/src/test/java/io/aeron/archive/ArchiveTest.java b/aeron-archive/src/test/java/io/aeron/archive/ArchiveTest.java index 20da0cd426..c8da689716 100644 --- a/aeron-archive/src/test/java/io/aeron/archive/ArchiveTest.java +++ b/aeron-archive/src/test/java/io/aeron/archive/ArchiveTest.java @@ -15,8 +15,15 @@ */ package io.aeron.archive; +import io.aeron.archive.client.AeronArchive; +import io.aeron.driver.MediaDriver; import org.junit.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; @@ -33,4 +40,36 @@ public void shouldGenerateRecordingName() assertThat(actual, is(expected)); } + + @Test + public void shouldAllowMultipleConnectionsInParallel() throws InterruptedException + { + final int numberOfArchiveClients = 5; + final CountDownLatch latch = new CountDownLatch(numberOfArchiveClients); + final ExecutorService executorService = Executors.newFixedThreadPool(numberOfArchiveClients); + + final MediaDriver.Context driverCtx = new MediaDriver.Context(); + final Archive.Context archiveCtx = new Archive.Context(); + + try (ArchivingMediaDriver driver = ArchivingMediaDriver.launch(driverCtx, archiveCtx)) + { + for (int i = 0; i < numberOfArchiveClients; i++) + { + executorService.execute(() -> + { + AeronArchive.connect(); + latch.countDown(); + }); + } + + latch.await(10, TimeUnit.SECONDS); + + assertThat(latch.getCount(), is(0L)); + } + finally + { + archiveCtx.deleteArchiveDirectory(); + driverCtx.deleteAeronDirectory(); + } + } }