Skip to content

Commit

Permalink
removed workingDir, no extends runnable
Browse files Browse the repository at this point in the history
  • Loading branch information
jo-pol committed Nov 11, 2024
1 parent 38eeb1c commit 8867f45
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 66 deletions.
1 change: 0 additions & 1 deletion debug-init-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ echo "OK"

echo -n "Creating directories for test data and processing..."
mkdir $TEMPDIR/transfer-inbox
mkdir $TEMPDIR/easy-mirror-deposit-working-directory
mkdir $TEMPDIR/easy-ingest-flow-inbox
mkdir $TEMPDIR/easy-mirror-deposit-failed
mkdir $TEMPDIR/easy-mirror-store
Expand Down
1 change: 0 additions & 1 deletion src/main/assembly/dist/cfg/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ mirroringService:
- path: '/var/opt/dans.knaw.nl/tmp/transfer-inboxes/phys-techsciences'

pollingInterval: 1000
workDir: '/var/opt/dans.knaw.nl/tmp/easy-mirror-deposit-working-directory'
failedBox: '/var/opt/dans.knaw.nl/tmp/easy-mirror-deposit-failed'
easyMirrorStore: '/data/easy-mirror-store'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public String getName() {
@Override
public void run(final EasyMirrorDepositConfiguration configuration, final Environment environment) {
final ExecutorService taskExecutor = configuration.getTaskQueue().build(environment);
final MirroringService mirroringService = configuration.getMirroringService()
.build(taskExecutor);
final MirroringService mirroringService = configuration.getMirroringService().build();
environment.lifecycle().manage(mirroringService);
for (Inbox inbox : configuration.getMirroringService().getInboxes()) {
environment.healthChecks().register(String.format("Inbox-%s", inbox.getPath().toString()), new InboxHealth(inbox.getPath()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ public class MirroringServiceFactory {

private int pollingInterval;

@NotNull
@Valid
private Path workDir;

@NotNull
@Valid
private Path failedBox;
Expand All @@ -44,8 +40,8 @@ public class MirroringServiceFactory {
@Valid
private Path easyMirrorStore;

public MirroringService build(ExecutorService executorService) {
return new MirroringService(executorService, pollingInterval, inboxes, workDir,
public MirroringService build() {
return new MirroringService(pollingInterval, inboxes,
failedBox, easyMirrorStore);
}

Expand All @@ -65,14 +61,6 @@ public void setPollingInterval(int pollingInterval) {
this.pollingInterval = pollingInterval;
}

public Path getWorkDir() {
return workDir;
}

public void setWorkDir(Path workDir) {
this.workDir = workDir;
}

public Path getFailedBox() {
return failedBox;
}
Expand Down
9 changes: 3 additions & 6 deletions src/main/java/nl/knaw/dans/easy/mirror/core/MirrorTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,18 @@
import java.nio.file.Files;
import java.nio.file.Path;

public class MirrorTask implements Runnable {
public class MirrorTask {
private static final Logger log = LoggerFactory.getLogger(MirrorTask.class);

private final Path datasetVersionExportZip;
private final Path failedBox;
private final MirrorStore mirrorStore;

public MirrorTask(Path datasetVersionExportZip, Path failedBox, MirrorStore mirrorStore) {
this.datasetVersionExportZip = datasetVersionExportZip;
public MirrorTask(Path failedBox, MirrorStore mirrorStore) {
this.failedBox = failedBox;
this.mirrorStore = mirrorStore;
}

@Override
public void run() {
public void move(Path datasetVersionExportZip) {
log.info("Processing {}", datasetVersionExportZip.getFileName());

try {
Expand Down
46 changes: 10 additions & 36 deletions src/main/java/nl/knaw/dans/easy/mirror/core/MirroringService.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,13 @@
import java.nio.file.Paths;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.stream.Stream;

public class MirroringService implements Managed {
private static final Logger log = LoggerFactory.getLogger(MirroringService.class);
private final ExecutorService executorService;
private final int pollingInterval;
private final List<Inbox> inboxes;
private final Path failedBox;
private final Path workDirectory;
private final MirrorStore mirrorStore;

private boolean initialized = false;
private boolean tasksCreatedInitialization = false;
private final MirrorTask mirrorTask;

private class EventHandler extends FileAlterationListenerAdaptor {
private final Inbox inbox;
Expand All @@ -55,32 +48,21 @@ public EventHandler(Inbox inbox) {
@Override
public void onStart(FileAlterationObserver observer) {
log.trace("onStart called");
if (!initialized) {
initialized = true;
processAllFromInbox(inbox);
}
processAllFromInbox(inbox);
}

@Override
public void onFileCreate(File file) {
log.trace("onFileCreate: {}", file);
if (tasksCreatedInitialization) {
tasksCreatedInitialization = false;
return; // file already added to queue by onStart
}
scheduleDatasetVersionExport(file.toPath());
executeMirrorTask(file.toPath());
}
}

public MirroringService(ExecutorService executorService, int pollingInterval, List<Inbox> inboxes,
Path workDirectory,
public MirroringService(int pollingInterval, List<Inbox> inboxes,
Path failedBox, Path mirrorStore) {
this.executorService = executorService;
this.pollingInterval = pollingInterval;
this.inboxes = inboxes;
this.workDirectory = workDirectory;
this.failedBox = failedBox;
this.mirrorStore = new MirrorStore(mirrorStore);
this.mirrorTask =new MirrorTask(failedBox, new MirrorStore(mirrorStore));
}

@Override
Expand Down Expand Up @@ -113,21 +95,17 @@ private void processAllFromInbox(Inbox inbox) {
DveFileFilter fileFilter = new DveFileFilter(inbox.getPath());
try (Stream<Path> files = Files.list(inbox.getPath())) {
files.filter(f -> fileFilter.accept(f.toFile()))
.forEach(dve -> {
scheduleDatasetVersionExport(dve);
tasksCreatedInitialization = true;
});
.forEach(this::executeMirrorTask);
}
}
catch (IOException e) {
throw new IllegalStateException("Could not read DVEs from inbox", e);
}
}

private void scheduleDatasetVersionExport(Path dve) {
log.info("Scheduling " + dve.getFileName());
private void executeMirrorTask(Path dve) {
log.info("Executing " + dve.getFileName());
try {
Path workingDve = Files.move(dve, workDirectory.resolve(dve.getFileName()));
Optional<Path> optXmlFile = getAssociatedXmlFile(dve);
if (optXmlFile.isPresent()) {
log.debug("Removing associated XML file {}", optXmlFile.get());
Expand All @@ -136,10 +114,10 @@ private void scheduleDatasetVersionExport(Path dve) {
else {
log.warn("Associated XML file was not found");
}
executorService.execute(new MirrorTask(workingDve, failedBox, mirrorStore));
mirrorTask.move(dve);
}
catch (IOException e) {
log.error("Could not move DVE to work directory", e);
log.error("Could not move DVE to mirror store", e);
}
}

Expand All @@ -152,8 +130,4 @@ private Optional<Path> getAssociatedXmlFile(Path path) {
return Optional.empty();
}
}

public void stop() {
executorService.shutdown();
}
}
10 changes: 5 additions & 5 deletions src/test/java/nl/knaw/dans/easy/mirror/core/MirrorTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,31 @@ public void setUp() throws Exception {
Files.createDirectories(mirrorStoreDir);
}

private MirrorTask createTask(Path dve) throws Exception {
private void executeTask(Path dve) throws Exception {
Path dveInInbox = inbox.resolve(dve.getFileName());
Files.copy(dveRootDir.resolve(dve), dveInInbox);
return new MirrorTask(dveInInbox, failedBox, mirrorStore);
new MirrorTask(failedBox, mirrorStore).move(dveInInbox);
}

@Test
public void dve_with_invalid_name_goes_to_failedBox() throws Exception {
Path dve = Paths.get("invalid-names/not-a-dve.zip");
createTask(dve).run();
executeTask(dve);
assertTrue(Files.exists(failedBox.resolve(dve.getFileName())));
assertFalse(mirrorStore.contains(dve));
}

@Test
public void dve_V1_1_goes_only_to_mirror_store() throws Exception {
Path dve = Paths.get("valid/doi-10-5072-fk2-xcfq1bv1.1.zip");
createTask(dve).run();
executeTask(dve);
assertTrue(mirrorStore.contains(dve));
}

@Test
public void dve_V1_goes_to_mirror_store_and_produces_deposit() throws Exception {
Path dve = Paths.get("valid/doi-10-5072-fk2-xcfq1bv1.0.zip");
createTask(dve).run();
executeTask(dve);
assertTrue(mirrorStore.contains(dve));
}
}
1 change: 0 additions & 1 deletion src/test/resources/debug-etc/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ taskQueue:
mirroringService:
inbox: 'data/transfer-inbox'
pollingInterval: 500
workDir: 'data/easy-mirror-deposit-working-directory'
failedBox: 'data/easy-mirror-deposit-failed'
easyMirrorStore: 'data/easy-mirror-store'

Expand Down

0 comments on commit 8867f45

Please sign in to comment.