Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

[SQL-DS-CACHE-188][POAE7-1253] improvement of fallback from plasma cache to simple cache #189

Merged
merged 1 commit into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -597,18 +597,18 @@ public static ParquetFileReader open(InputFile file, ParquetReadOptions options)
}

private final InputFile file;
protected final SeekableInputStream f;
public final SeekableInputStream f;
private final ParquetReadOptions options;
protected final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<>();
public final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<>();
private final FileMetaData fileMetaData; // may be null
protected final List<BlockMetaData> blocks;
public final List<BlockMetaData> blocks;

// not final. in some cases, this may be lazily loaded for backward-compat.
private ParquetMetadata footer;

protected int currentBlock = 0;
protected ColumnChunkPageReadStore currentRowGroup = null;
protected DictionaryPageReader nextDictionaryReader = null;
public int currentBlock = 0;
public ColumnChunkPageReadStore currentRowGroup = null;
public DictionaryPageReader nextDictionaryReader = null;

/**
* @param configuration the Hadoop conf
Expand Down Expand Up @@ -823,7 +823,7 @@ public boolean skipNextRowGroup() {
return advanceToNextBlock();
}

protected boolean advanceToNextBlock() {
public boolean advanceToNextBlock() {
if (currentBlock == blocks.size()) {
return false;
}
Expand Down Expand Up @@ -1036,7 +1036,7 @@ public BytesInput readAsBytesInput(int size) throws IOException {
/**
* deals with a now fixed bug where compressedLength was missing a few bytes.
*/
protected class WorkaroundChunk extends Chunk {
public class WorkaroundChunk extends Chunk {

private final SeekableInputStream f;

Expand Down Expand Up @@ -1098,7 +1098,7 @@ public BytesInput readAsBytesInput(int size) throws IOException {
*/
static class ChunkDescriptor {

protected final ColumnDescriptor col;
public final ColumnDescriptor col;
private final ColumnChunkMetaData metadata;
private final long fileOffset;
private final int size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private[sql] object MemoryManager extends Logging {
case "tmp" => new TmpDramMemoryManager(sparkEnv)
case "kmem" => new DaxKmemMemoryManager(sparkEnv)
case "plasma" =>
if (plasmaServerDetect()) {
if (plasmaServerDetect(sparkEnv)) {
new PlasmaMemoryManager(sparkEnv)
} else {
new OffHeapMemoryManager(sparkEnv)
Expand Down Expand Up @@ -164,7 +164,7 @@ private[sql] object MemoryManager extends Logging {
case "noevict" => new HybridMemoryManager(sparkEnv)
case "vmem" => new TmpDramMemoryManager(sparkEnv)
case "external" =>
if (plasmaServerDetect()) {
if (plasmaServerDetect(sparkEnv)) {
new PlasmaMemoryManager(sparkEnv)
} else {
new OffHeapMemoryManager(sparkEnv)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,23 @@ private[filecache] class CacheGuardian(maxMemory: Long) extends Thread with Logg
}

private[filecache] object OapCache extends Logging {
def plasmaServerDetect(): Boolean = {
val command = "ps -ef" #| "grep plasma"
val plasmaServerStatus = command.!!
if (plasmaServerStatus.indexOf("plasma-store-server") == -1) {
logWarning("External cache strategy requires plasma-store-server launched, " +
"failed to detect plasma-store-server, will fallback to simpleCache.")
return false
def plasmaServerDetect(sparkEnv: SparkEnv): Boolean = {
val socket = sparkEnv.conf.get(OapConf.OAP_EXTERNAL_CACHE_SOCKET_PATH)
try {
System.loadLibrary("plasma_java")
} catch {
case e: Exception => logError(s"load plasma jni lib failed " + e.getMessage)
}
true
var plasmaDetected: Boolean = true;
try {
val conn: plasma.PlasmaClient = new plasma.PlasmaClient(socket, "", 0)
} catch {
case e: PlasmaClientException =>
logWarning("External cache strategy requires plasma-store-server launched, " +
"failed to detect plasma-store-server, will fallback to simpleCache." + e.getMessage)
plasmaDetected = false;
}
plasmaDetected
}
def cacheFallBackDetect(sparkEnv: SparkEnv,
fallBackEnabled: Boolean = true,
Expand Down Expand Up @@ -299,7 +307,7 @@ private[filecache] object OapCache extends Logging {

oapCacheOpt match {
case "external" =>
if (plasmaServerDetect()) new ExternalCache(fiberType)
if (plasmaServerDetect(sparkEnv)) new ExternalCache(fiberType)
else new SimpleOapCache()
case "guava" =>
if (cacheFallBackDetect(sparkEnv, fallBackEnabled.toBoolean, fallBackRes.toBoolean)) {
Expand Down Expand Up @@ -956,7 +964,8 @@ class MixCache(dataCacheMemory: Long,

class ExternalCache(fiberType: FiberType) extends OapCache with Logging {
private val conf = SparkEnv.get.conf
private val externalStoreCacheSocket: String = "/tmp/plasmaStore"
private val externalStoreCacheSocket: String =
conf.get(OapConf.OAP_EXTERNAL_CACHE_SOCKET_PATH)
private var cacheInit: Boolean = false
private var externalDBClient: ExternalDBClient = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,4 +772,10 @@ object OapConf {
.stringConf
.createWithDefault("RedisClient")

val OAP_EXTERNAL_CACHE_SOCKET_PATH =
SqlConfAdapter.buildConf("spark.sql.oap.external.cache.socket.path")
.internal()
.doc("The socket path of plasma cache")
.stringConf
.createWithDefault("/tmp/plasmaStore")
}
2 changes: 2 additions & 0 deletions docs/User-Guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ spark.oap.cache.strategy external
spark.sql.oap.dcpmm.free.wait.threshold 50000000000
# according to your executor core number
spark.executor.sql.oap.cache.external.client.pool.size 10
# The socket path of plasma server, default is /tmp/plasmaStore
spark.sql.oap.external.cache.socket.path /tmp/plasmaStore
```
Start Plasma service manually

Expand Down