Skip to content

Commit

Permalink
Merge pull request apache#130 from seznam/pete/113/list-storage
Browse files Browse the repository at this point in the history
apache#113 Spilling list storage
  • Loading branch information
xitep authored May 18, 2017
2 parents bb11dd7 + d4040ed commit 0d4edf0
Show file tree
Hide file tree
Showing 13 changed files with 841 additions and 213 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/**
* Copyright 2016-2017 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.executor.storage;

import cz.seznam.euphoria.core.client.operator.state.ListStorage;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.net.URI;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;

/**
* A list storage implementation keeping a given maximum number of received
* elements, eventually starting to spill them to the local file system.
* Intended for use in batch executors.
*
* @param <T> the type of elements stored in this list storage
*/
public class FsSpillingListStorage<T> implements ListStorage<T> {

/**
* A factory for spill files.
*/
@FunctionalInterface
public interface SpillFileFactory {
/**
* Invoked to request a unique path to a new spill file.
*
* @return the path to a new spill file
*/
File newSpillFile();
}

/**
* The default spill file factory to create new files in the current
* working directory.
*/
public static class DefaultSpillFileFactory implements SpillFileFactory {
static final SpillFileFactory INSTANCE = new DefaultSpillFileFactory();
private DefaultSpillFileFactory() {}

public static SpillFileFactory getInstance() {
return INSTANCE;
}

@Override
public File newSpillFile() {
try {
Path workDir = FileSystems.getFileSystem(new URI("file:///")).getPath("./");
return Files.createTempFile(workDir, getClass().getName(), ".dat").toFile();
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}

private final SerializerFactory serializerFactory;
private final SpillFileFactory spillFileFactory;
private final int maxElemsInMemory;

// ~ new elements are appended to this list and eventually this list
// will be spilled to disk. therefore, this list contains the lastly
// added elements to this list-storage
private final List<T> elems = new ArrayList<>();

private File storageFile;
private SerializerFactory.Serializer serializerInstance;
private SerializerFactory.Serializer.OutputStream serializerStream;
private boolean needsFlush;

public FsSpillingListStorage(SerializerFactory serializerFactory,
SpillFileFactory spillFileFactory,
int maxElemsInMemory) {
this.serializerFactory = Objects.requireNonNull(serializerFactory);
this.spillFileFactory = Objects.requireNonNull(spillFileFactory);
this.maxElemsInMemory = maxElemsInMemory;
}

@Override
public void add(T element) {
elems.add(element);
if (elems.size() > maxElemsInMemory) {
spillElems();
}
}

private void spillElems() {
if (serializerStream == null) {
initDiskStorage();
}
for (T elem : elems) {
serializerStream.writeObject(elem);
}
elems.clear();
needsFlush = true;
}

@Override
public Iterable<T> get() {
return () -> {
if (serializerStream == null && elems.isEmpty()) {
return Collections.emptyIterator();
}

SerializerFactory.Serializer.InputStream is;
if (serializerStream != null) {
if (needsFlush) {
serializerStream.flush();
needsFlush = false;
}
try {
is = serializerInstance.newInputStream(new FileInputStream(storageFile));
} catch (FileNotFoundException e) {
throw new IllegalStateException(
"Failed to open spilling storage: "
+ storageFile, e);
}
} else {
is = null;
}

Iterator elemsIter = elems.iterator();
return new Iterator<T>() {
@Override
public boolean hasNext() {
boolean n = (is != null && !is.eof()) || elemsIter.hasNext();
if (!n && is != null) {
is.close();
}
return n;
}

@SuppressWarnings("unchecked")
@Override
public T next() {
if (is != null && !is.eof()) {
return (T) is.readObject();
}
if (elemsIter.hasNext()) {
return (T) elemsIter.next();
}
if (is != null) {
is.close();
}
throw new NoSuchElementException();
}
};
};
}

@Override
public void clear() {
if (serializerStream != null) {
serializerStream.close();
if (!storageFile.delete()) {
throw new IllegalStateException(
"Failed to clean up storage file: " + storageFile);
}
}
}

private void initDiskStorage() {
assert storageFile == null;
assert serializerInstance == null;
assert serializerStream == null;
storageFile = spillFileFactory.newSpillFile();
serializerInstance = serializerFactory.newSerializer();
try {
serializerStream = serializerInstance.newOutputStream(new FileOutputStream(storageFile));
} catch (FileNotFoundException e) {
throw new IllegalStateException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Copyright 2016-2017 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.executor.storage;

import java.io.Closeable;
import java.io.Serializable;

public interface SerializerFactory extends Serializable {

interface Serializer {

interface OutputStream extends Closeable {
void writeObject(Object o);
void flush();
void close();
}

interface InputStream extends Closeable {
Object readObject();
boolean eof();
void close();
}

OutputStream newOutputStream(java.io.OutputStream out);
InputStream newInputStream(java.io.InputStream in);
}

Serializer newSerializer();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* Copyright 2016-2017 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.executor.storage;

import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Lists;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class FsSpillingListStorageTest {

@Rule
public TemporaryFolder folder = new TemporaryFolder();

class TmpFolderSpillFileFactory implements FsSpillingListStorage.SpillFileFactory {
final List<File> served = new ArrayList<>();
@Override
public File newSpillFile() {
try {
File f = folder.newFile();
served.add(f);
return f;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

@Test
public void testAddMaxElemsDoesNotSpill() {
TmpFolderSpillFileFactory spillFiles = new TmpFolderSpillFileFactory();
FsSpillingListStorage<String> storage = new FsSpillingListStorage<>(
new JavaSerializationFactory(), spillFiles, 3);

// ~ add exactly 3 elements
storage.add("foo");
storage.add("bar");
storage.add("quux");

// ~ verify we can read them again (and repeatedly given the one iterable)
Iterable<String> elements = storage.get();
assertEquals(Arrays.asList("foo", "bar", "quux"), Lists.newArrayList(elements));
assertEquals(Arrays.asList("foo", "bar", "quux"), Lists.newArrayList(elements));
// ~ verify no spill files was created
assertEquals(Collections.emptyList(), spillFiles.served);
// ~ verify clear does not fail
storage.clear();
}

@Test
public void testAddOneExactSpill() {
TmpFolderSpillFileFactory spillFiles = new TmpFolderSpillFileFactory();
FsSpillingListStorage<String> storage = new FsSpillingListStorage<>(
new JavaSerializationFactory(), spillFiles, 3);
storage.addAll(Arrays.asList("one", "two", "three", "four"));

// ~ assert the data was spilled
assertEquals(1, spillFiles.served.size());
assertTrue(spillFiles.served.get(0).exists());

// ~ assert we can read the content (repeatedly)
Iterable<String> elements = storage.get();
assertEquals(Arrays.asList("one", "two", "three", "four"), Lists.newArrayList(elements));
assertEquals(Arrays.asList("one", "two", "three", "four"), Lists.newArrayList(elements));

// ~ assert that the spill files get properly cleaned up
storage.clear();
assertFalse(spillFiles.served.get(0).exists());
}

@Test
public void testMixedIteration() {
List<String> input = Arrays.asList("one", "two", "three", "four", "five", "six", "seven", "eight");

TmpFolderSpillFileFactory spillFiles = new TmpFolderSpillFileFactory();
FsSpillingListStorage<String> storage =
new FsSpillingListStorage<>(new JavaSerializationFactory(), spillFiles, 5);
storage.addAll(input);

// ~ assert the data was spilled
assertEquals(1, spillFiles.served.size());
assertTrue(spillFiles.served.get(0).exists());

// ~ assert we can read the content (repeatedly and concurrently)
Iterable<String> elements = storage.get();
Iterator<String> first = elements.iterator();
Iterator<String> second = elements.iterator();

// ~ try to read the two iterators interleaved (give first a small advantage)
assertEquals(input.get(0), first.next());
assertEquals(input.get(1), first.next());
for (int i = 0; i < input.size(); i++) {
if (i+2 < input.size()) {
assertEquals(input.get(i + 2), first.next());
}
assertEquals(input.get(i), second.next());
}
assertFalse(first.hasNext());
assertFalse(second.hasNext());

// ~ assert that the spill files get properly cleaned up
storage.clear();
assertFalse(spillFiles.served.get(0).exists());
}
}
Loading

0 comments on commit 0d4edf0

Please sign in to comment.