Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28120][SS] Rocksdb state storage implementation #24922

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions core/src/main/java/org/apache/spark/io/FileUtility.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.io;

import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.utils.IOUtils;

import java.io.*;

public class FileUtility {
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved

/**
* Extract an input tar file into an output files and directories.
* inputTarFileLoc: the input file location for the tar file
* destDirLoc: destination for the extracted files
*
* throws IllegalStateException
*/
public static final String ENCODING = "utf-8";

public static void extractTarFile(String inputTarFileLoc, String destDirLoc)
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
throws IllegalStateException {
File inputFile = new File(inputTarFileLoc);
if (!inputTarFileLoc.endsWith(".tar")) {
throw new IllegalStateException(String.format(
"Input File[%s] should end with tar extension.", inputTarFileLoc));
}
File destDir = new File(destDirLoc);
if (destDir.exists() && !destDir.delete()) {
throw new IllegalStateException(String.format(
"Couldn't delete the existing destination directory[%s] ", destDirLoc));
} else if (!destDir.mkdir()) {
throw new IllegalStateException(String.format(
"Couldn't create directory %s ", destDirLoc));
}

try (InputStream is = new FileInputStream(inputFile);
TarArchiveInputStream debInputStream = new TarArchiveInputStream(is, ENCODING)) {
TarArchiveEntry entry;
while ((entry = (TarArchiveEntry) debInputStream.getNextEntry()) != null) {
final File outputFile = new File(destDirLoc, entry.getName());
if (entry.isDirectory()) {
if (!outputFile.exists() && !outputFile.mkdirs()) {
throw new IllegalStateException(String.format(
"Couldn't create directory %s.", outputFile.getAbsolutePath()));
}
} else {
try (OutputStream outputFileStream = new FileOutputStream(outputFile)) {
IOUtils.copy(debInputStream, outputFileStream);
}
}
}
} catch (IOException e){
throw new IllegalStateException(String.format(
"extractTarFile failed with exception %s.", e.getMessage()));
}
}

/**
* create a tar file for input source directory location .
* source: the source directory location
* destFileLoc: destination of the created tarball
*
* throws IllegalStateException
*/

public static void createTarFile(String source, String destFileLoc)
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
throws IllegalStateException {
File f = new File(destFileLoc);
if (f.exists() && !f.delete()) {
throw new IllegalStateException(String.format(
"Couldn't delete the destination file location[%s]", destFileLoc));
}
File folder = new File(source);
if (!folder.exists()) {
throw new IllegalStateException(String.format(
"Source folder[%s] does not exist", source));
}

try (FileOutputStream fos = new FileOutputStream(destFileLoc);
TarArchiveOutputStream tarOs = new TarArchiveOutputStream(fos, ENCODING)) {
File[] fileNames = folder.listFiles();
for (File file : fileNames) {
TarArchiveEntry tar_file = new TarArchiveEntry(file.getName());
tar_file.setSize(file.length());
tarOs.putArchiveEntry(tar_file);
try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file))) {
IOUtils.copy(bis, tarOs);
tarOs.closeArchiveEntry();
}
}
tarOs.finish();
} catch (IOException e) {
throw new IllegalStateException(String.format(
"createTarFile failed with exception %s.", e.getMessage()));
}
}

}
77 changes: 77 additions & 0 deletions core/src/test/java/org/apache/spark/io/FileUtilitySuite.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.io;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.spark.util.Utils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.IOException;

/**
* Tests functionality of {@link FileUtility}
*/
public class FileUtilitySuite {

protected File sourceFolder;
protected File destTarLoc;
protected File destFolder;

@Before
public void setUp() throws IOException {
String tmpDir = System.getProperty("java.io.tmpdir");
sourceFolder = Utils.createTempDir(tmpDir, "FileUtilTest-src-" + RandomUtils.nextLong());
destFolder = Utils.createTempDir(tmpDir, "FileUtilTest-dest-" + RandomUtils.nextLong());
destTarLoc= File.createTempFile("dest-tar", ".tar");
}

@After
public void tearDown() {
destTarLoc.delete();
}

@Test
public void testCreationAndExtraction() throws IllegalStateException, IOException {
// Create a temp file in the source folder
Assert.assertEquals(sourceFolder.listFiles().length , 0);
File inputFile = File.createTempFile("source-file", ".tmp", sourceFolder);
// Create a byte array of size 1 KB with random bytes
byte[] randomBytes = RandomUtils.nextBytes(1 * 1024);
FileUtils.writeByteArrayToFile(inputFile, randomBytes);

// Create the tarball
destTarLoc.delete();
Assert.assertFalse(destTarLoc.exists());
FileUtility.createTarFile(sourceFolder.toString(), destTarLoc.getAbsolutePath());
Assert.assertTrue(destTarLoc.exists());

// Extract the tarball
Assert.assertEquals(destFolder.listFiles().length , 0);
FileUtility.extractTarFile(destTarLoc.getAbsolutePath(), destFolder.getAbsolutePath());

// Verify that the extraction was successful
Assert.assertTrue(destFolder.exists());
Assert.assertEquals(destFolder.listFiles().length , 1);
Assert.assertArrayEquals(randomBytes, FileUtils.readFileToByteArray(destFolder.listFiles()[0]));
}

}
1 change: 1 addition & 0 deletions dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ parquet-jackson-1.10.1.jar
protobuf-java-2.5.0.jar
py4j-0.10.8.1.jar
pyrolite-4.30.jar
rocksdbjni-6.2.2.jar
scala-collection-compat_2.12-2.1.1.jar
scala-compiler-2.12.10.jar
scala-library-2.12.10.jar
Expand Down
1 change: 1 addition & 0 deletions dev/deps/spark-deps-hadoop-3.2
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ protobuf-java-2.5.0.jar
py4j-0.10.8.1.jar
pyrolite-4.30.jar
re2j-1.1.jar
rocksdbjni-6.2.2.jar
scala-collection-compat_2.12-2.1.1.jar
scala-compiler-2.12.10.jar
scala-library-2.12.10.jar
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@
<jpam.version>1.1</jpam.version>
<selenium.version>2.52.0</selenium.version>
<htmlunit.version>2.22</htmlunit.version>
<rocksdb.version>6.2.2</rocksdb.version>
<!--
Managed up from older version from Avro; sync with jackson-module-paranamer dependency version
-->
Expand Down
5 changes: 5 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
Copy link
Contributor

@skonto skonto Jun 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency has all the files packed for all major OSs. Flink uses a custom build . Digging into this a bit more I see some additions modifications as described here. I understand this is flink specific but how about the TTL thing mentioned there, https://issues.apache.org/jira/browse/FLINK-10471 looks interesting. Structured Streaming fetches all state here (memory) and filters out the timed out ones, is RockDB performing well there? Shouldnt we have the same mechanism or a similar one so we dont fectch everything and delegate this to state backend (which could run in the background btw)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will take a look at the Flink build and see if I can pick only relevant packages in rocksdb dependency.

IMO, abstracting out how state backend should filter out timed out states can be treated as a separate problem so that we don't end up increasing the scope of this PR. Once the abstraction is added, we can file a separate jira to implement it for rocksdb state backend.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RocksDB might not be the best backend. Instead of adding the extra dependency, I think we should just do it as a separate third-party package. The community can always build their own backend based on their needs. Doing it is simple.

Can you submit it to https://spark-packages.org/?

cc @marmbrus @tdas @zsxwing

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile - what are the alternatives if rocksdb is not the best backend. Other streaming technologies such as flink and kstreams are using rocksdb as primary storage engine.

With integration in spark codebase, we can probably change the code in any way later, but if we take the separate jar route, the kind of extensions you can make are limited by the current contract. For example @skonto mentioned one of way where we can abstract state storage implementation to get the best out of rocksdb. How can we support such improvement of we take spark package route?

Current implementation based on in memory hashmap is not scalable beyond a point. How shall we go about solving it?

<version>${rocksdb.version}</version>
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Loading