Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28120][SS] Rocksdb state storage implementation #24922

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions core/src/main/java/org/apache/spark/io/FileUtility.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.io;

import org.apache.commons.compress.archivers.ArchiveException;
import org.apache.commons.compress.archivers.ArchiveStreamFactory;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.utils.IOUtils;

import java.io.*;

public class FileUtility {
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved

/**
* Untar an input file into an output file.
*
* The output file is created in the output folder, having the same name as
* the input file, minus the '.tar' extension.
*
* @param inputFile the input .tar file
* @throws IOException
*
* @throws ArchiveException
*/
public static void unTar(final File inputFile)
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
throws IOException, ArchiveException {

String outputDir = inputFile.getAbsolutePath().split(".tar")[0];
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
File outputTarDir = new File(outputDir);
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
outputTarDir.mkdir();
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
final InputStream is = new FileInputStream(inputFile);
final TarArchiveInputStream debInputStream = (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream(
"tar", is);
TarArchiveEntry entry = null;
while ((entry = (TarArchiveEntry) debInputStream.getNextEntry()) != null) {
final File outputFile = new File(outputDir, entry.getName());
if (entry.isDirectory()) {
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
if (!outputFile.exists()) {
if (!outputFile.mkdirs()) {
throw new IllegalStateException(String.format(
"Couldn't create directory %s.", outputFile.getAbsolutePath()));
}
}
} else {
final OutputStream outputFileStream = new FileOutputStream(outputFile);
IOUtils.copy(debInputStream, outputFileStream);
outputFileStream.close();
}
}
debInputStream.close();
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
}

public static void createTarFile(String source, String destFileName) throws Exception {
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
TarArchiveOutputStream tarOs = null;
File f = new File(destFileName);
if (f.exists()) {
f.delete();
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
}
try {
FileOutputStream fos = new FileOutputStream(destFileName);
tarOs = (TarArchiveOutputStream) new ArchiveStreamFactory().createArchiveOutputStream("tar", fos);
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
tarOs = new TarArchiveOutputStream(fos);
File folder = new File(source);
File[] fileNames = folder.listFiles();
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
for(File file : fileNames){
TarArchiveEntry tar_file = new TarArchiveEntry(file.getName());
tar_file.setSize(file.length());
tarOs.putArchiveEntry(tar_file);
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
IOUtils.copy(bis, tarOs);
bis.close();
tarOs.closeArchiveEntry();
}
} catch (IOException e) {
throw new IllegalStateException(String.format(
"createTarFile failed with exception %s.", e.getMessage()));
} finally {
try {
tarOs.finish();
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
tarOs.close();
} catch (IOException e) {
e.printStackTrace();
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
}
}
}


}
6 changes: 6 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<!-- RocksDB dependency for Structured Streaming State Store -->
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
Copy link
Contributor

@skonto skonto Jun 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency has all the files packed for all major OSs. Flink uses a custom build . Digging into this a bit more I see some additions modifications as described here. I understand this is flink specific but how about the TTL thing mentioned there, https://issues.apache.org/jira/browse/FLINK-10471 looks interesting. Structured Streaming fetches all state here (memory) and filters out the timed out ones, is RockDB performing well there? Shouldnt we have the same mechanism or a similar one so we dont fectch everything and delegate this to state backend (which could run in the background btw)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will take a look at the Flink build and see if I can pick only relevant packages in rocksdb dependency.

IMO, abstracting out how state backend should filter out timed out states can be treated as a separate problem so that we don't end up increasing the scope of this PR. Once the abstraction is added, we can file a separate jira to implement it for rocksdb state backend.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RocksDB might not be the best backend. Instead of adding the extra dependency, I think we should just do it as a separate third-party package. The community can always build their own backend based on their needs. Doing it is simple.

Can you submit it to https://spark-packages.org/?

cc @marmbrus @tdas @zsxwing

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile - what are the alternatives if rocksdb is not the best backend. Other streaming technologies such as flink and kstreams are using rocksdb as primary storage engine.

With integration in spark codebase, we can probably change the code in any way later, but if we take the separate jar route, the kind of extensions you can make are limited by the current contract. For example @skonto mentioned one of way where we can abstract state storage implementation to get the best out of rocksdb. How can we support such improvement of we take spark package route?

Current implementation based on in memory hashmap is not scalable beyond a point. How shall we go about solving it?

<version>6.0.1</version>
itsvikramagr marked this conversation as resolved.
Show resolved Hide resolved
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Loading