Skip to content

Commit

Permalink
Plasma Utils which provide contain, remove method (apache#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
yeyuqiang authored Jan 10, 2021
1 parent 808d96d commit ce90af9
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 59 deletions.
39 changes: 35 additions & 4 deletions core/src/main/java/org/apache/spark/io/pmem/MyPlasmaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.spark.SparkEnv;
import org.apache.spark.internal.config.package$;

import java.nio.ByteBuffer;

/**
* Upstream Plasma Client Wrapper.
* To simplify the parameter passed to plasma client.
Expand All @@ -34,11 +36,39 @@ public MyPlasmaClient(String storeSocketName) {
/**
* Write to plasma store with an child object id
*/
public void write(String parentObjectId, int index, byte[] buffer) {
public void writeChildObject(String parentObjectId, int index, byte[] buffer) {
ChildObjectId objectId = new ChildObjectId(parentObjectId, index);
put(objectId.toBytes(), buffer, null);
}

public ByteBuffer getChildObject(String parentObjectId, int index) {
ChildObjectId childObjectId = new ChildObjectId(parentObjectId, index);
ByteBuffer buffer = getObjAsByteBuffer(childObjectId.toBytes(), 0, false);
return buffer;
}

/**
* record the total child number
*/
public void recordChildObjectNumber(String parentObjectId, int num) {
put(paddingParentObjectId(parentObjectId).getBytes(),
ByteBuffer.allocate(4).putInt(num).array(), null);
}

public int getChildObjectNumber(String parentObjectId) {
ByteBuffer buffer = getObjAsByteBuffer(paddingParentObjectId(parentObjectId).getBytes(),
0, false);
if (buffer == null) {
return -1;
}
return buffer.getInt();
}

String paddingParentObjectId(String parentObjectId) {
// Padding with - to prevent duplicate from child object id.
return StringUtils.rightPad(parentObjectId, 20, "-");
}

@Override
public void finalize() {
super.finalize();
Expand Down Expand Up @@ -84,8 +114,9 @@ public static MyPlasmaClient get() {
}

public static void close() {
client.finalize();
client = null;
if (client != null) {
client.finalize();
client = null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ public PlasmaInputStream(String parentObjectId) {
private boolean refill() {
if (!buffer.hasRemaining()) {
buffer.clear();
ChildObjectId childObjectId = new ChildObjectId(parentObjectId, currChildObjectNumber++);
ByteBuffer bufFromPlasma = client.getObjAsByteBuffer(childObjectId.toBytes(), 0, false);
ByteBuffer bufFromPlasma = client.getChildObject(parentObjectId, currChildObjectNumber++);
if (bufFromPlasma == null) {
return false;
}
Expand All @@ -74,11 +73,8 @@ private boolean refill() {
}

@Override
public synchronized int read() {
if (!refill()) {
return -1;
}
return buffer.get() & 0xFF;
public int read() {
throw new UnsupportedOperationException("The method is not implemented");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,7 @@ public PlasmaOutputStream(String parentObjectId) {

@Override
public void write(int b) {
buffer.put((byte) b);
if (!buffer.hasRemaining()) {
writeToPlasma();
buffer.clear();
}
throw new UnsupportedOperationException("The method is not implemented");
}

@Override
Expand Down Expand Up @@ -95,13 +91,14 @@ public void write(byte[] b, int off, int len) {
buffer.clear();
currChildObjectNumber++;
}
client.recordChildObjectNumber(parentObjectId, currChildObjectNumber);
}

private void writeToPlasma() {
if (buffer.hasRemaining()) {
client.write(parentObjectId, currChildObjectNumber, shrinkLastObjBuffer());
client.writeChildObject(parentObjectId, currChildObjectNumber, shrinkLastObjBuffer());
} else {
client.write(parentObjectId, currChildObjectNumber, buffer.array());
client.writeChildObject(parentObjectId, currChildObjectNumber, buffer.array());
}
}

Expand Down
39 changes: 39 additions & 0 deletions core/src/main/java/org/apache/spark/io/pmem/PlasmaUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.io.pmem;

/**
* Utility to operate object stored in plasma server
*/
public class PlasmaUtils {

private static MyPlasmaClient client = MyPlasmaClientHolder.get();

public static boolean contains(String objectId) {
int num = client.getChildObjectNumber(objectId);
return num > 0;
}

public static void remove(String objectId) {
int num = client.getChildObjectNumber(objectId);
for (int i = 0; i < num; i++) {
ChildObjectId childObjectId = new ChildObjectId(objectId, i);
client.release(childObjectId.toBytes());
client.delete(childObjectId.toBytes());
}
client.release(client.paddingParentObjectId(objectId).getBytes());
client.delete(client.paddingParentObjectId(objectId).getBytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.*;

import java.io.*;
import java.nio.ByteBuffer;
Expand All @@ -13,15 +11,16 @@
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.Assume.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

/**
* Tests functionality of {@link PlasmaInputStream} and {@link PlasmaOutputStream}
Expand All @@ -30,15 +29,15 @@ public class PlasmaOutputInputStreamSuite {

private final static int DEFAULT_BUFFER_SIZE = 4096;

private final String plasmaStoreServer = "plasma-store-server";
private final String plasmaStoreSocket = "/tmp/PlasmaOutputInputStreamSuite_socket_file";
private final long memoryInBytes = 1000000000;
private final static String plasmaStoreServer = "plasma-store-server";
private final static String plasmaStoreSocket = "/tmp/PlasmaOutputInputStreamSuite_socket_file";
private final static long memoryInBytes = 1000000000;

private Process process;
private static Process process;
private final Random random = new Random();

@Before
public void setUp() {
@BeforeClass
public static void setUp() {
boolean isAvailable = isPlasmaJavaAvailable();
assumeTrue("Please make sure libplasma_java.so is installed" +
" under LD_LIBRARY_PATH or java.library.path", isAvailable);
Expand All @@ -63,24 +62,6 @@ public void testWithNullData() throws IOException {
pos.write(null);
}

@Test
public void testSingleWriteRead() {
String blockId = "block_id_" + random.nextInt(10000000);
byte[] bytesWrite = prepareByteBlockToWrite(1);
PlasmaOutputStream pos = new PlasmaOutputStream(blockId);
for (byte b : bytesWrite) {
pos.write(b);
}

byte[] bytesRead = new byte[bytesWrite.length];
PlasmaInputStream pis = new PlasmaInputStream(blockId);
for (int i = 0; i < bytesRead.length; i++) {
bytesRead[i] = (byte) pis.read();
}

assertArrayEquals(bytesWrite, bytesRead);
}

@Test
public void testBufferWriteRead() throws IOException {
String blockId = "block_id_" + random.nextInt(10000000);
Expand All @@ -92,6 +73,9 @@ public void testBufferWriteRead() throws IOException {
PlasmaInputStream pis = new PlasmaInputStream(blockId);
pis.read(bytesRead);
assertArrayEquals(bytesWrite, bytesRead);

PlasmaUtils.remove(blockId);
assertFalse(PlasmaUtils.contains(blockId));
}

@Test
Expand All @@ -108,8 +92,10 @@ public void testPartialBlockWriteRead() throws IOException {
while ((len = pis.read(buffer)) != -1) {
bytesRead.put(buffer, 0, len);
}

assertArrayEquals(bytesWrite, bytesRead.array());

PlasmaUtils.remove(blockId);
assertFalse(PlasmaUtils.contains(blockId));
}

@Test
Expand All @@ -125,12 +111,46 @@ public void testMultiBlocksWriteRead() throws IOException {
while (pis.read(buffer) != -1) {
bytesRead.put(buffer);
}

assertArrayEquals(bytesWrite, bytesRead.array());

PlasmaUtils.remove(blockId);
assertFalse(PlasmaUtils.contains(blockId));
}

@Test
public void testMultiThreadReadWrite() throws InterruptedException {
int processNum = Runtime.getRuntime().availableProcessors();
ExecutorService threadPool = Executors.newFixedThreadPool(processNum);
for (int i = 0; i < 10 * processNum; i++) {
threadPool.submit(() -> {
try {
String blockId = "block_id_" + random.nextInt(10000000);
byte[] bytesWrite = prepareByteBlockToWrite(5.7);
PlasmaOutputStream pos = new PlasmaOutputStream(blockId);
pos.write(bytesWrite);

ByteBuffer bytesRead = ByteBuffer.allocate(bytesWrite.length);
PlasmaInputStream pis = new PlasmaInputStream(blockId);
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
int len;
while ((len = pis.read(buffer)) != -1) {
bytesRead.put(buffer, 0, len);
}
assertArrayEquals(bytesWrite, bytesRead.array());

PlasmaUtils.remove(blockId);
assertFalse(PlasmaUtils.contains(blockId));
} catch (IOException ex) {
ex.printStackTrace();
}
});
}
threadPool.shutdown();
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
}

@After
public void tearDown() {
@AfterClass
public static void tearDown() {
try {
MyPlasmaClientHolder.close();
stopPlasmaStore();
Expand All @@ -140,7 +160,7 @@ public void tearDown() {
}
}

public boolean isPlasmaJavaAvailable() {
public static boolean isPlasmaJavaAvailable() {
boolean available = true;
try {
System.loadLibrary("plasma_java");
Expand All @@ -150,22 +170,22 @@ public boolean isPlasmaJavaAvailable() {
return available;
}

private boolean isPlasmaStoreExist() {
private static boolean isPlasmaStoreExist() {
return Stream.of(System.getenv("PATH").split(Pattern.quote(File.pathSeparator)))
.map(Paths::get)
.anyMatch(path -> Files.exists(path.resolve(plasmaStoreServer)));
}

private Process startProcess(String command) throws IOException {
private static Process startProcess(String command) throws IOException {
List<String> cmdList = Arrays.stream(command.split(" ")).collect(Collectors.toList());
ProcessBuilder processBuilder = new ProcessBuilder(cmdList).inheritIO();
Process process = processBuilder.start();
return process;
}

private boolean startPlasmaStore() throws IOException, InterruptedException {
private static boolean startPlasmaStore() throws IOException, InterruptedException {
String command = plasmaStoreServer + " -s " + plasmaStoreSocket + " -m " + memoryInBytes;
this.process = startProcess(command);
process = startProcess(command);
int ticktock = 60;
if (process != null) {
while(!process.isAlive()) {
Expand All @@ -180,7 +200,7 @@ private boolean startPlasmaStore() throws IOException, InterruptedException {
return false;
}

private void stopPlasmaStore() throws InterruptedException {
private static void stopPlasmaStore() throws InterruptedException {
if (process != null && process.isAlive()) {
process.destroyForcibly();
int ticktock = 60;
Expand All @@ -194,7 +214,7 @@ private void stopPlasmaStore() throws InterruptedException {
}
}

private void deletePlasmaSocketFile() {
private static void deletePlasmaSocketFile() {
File socketFile = new File(plasmaStoreSocket);
if (socketFile != null && socketFile.exists()) {
socketFile.delete();
Expand All @@ -207,7 +227,7 @@ private byte[] prepareByteBlockToWrite(double numOfBlock) {
return bytesToWrite;
}

private void mockSparkEnv() {
private static void mockSparkEnv() {
SparkConf conf = new SparkConf();
conf.set("spark.io.plasma.server.socket", plasmaStoreSocket);
SparkEnv mockEnv = mock(SparkEnv.class);
Expand Down

0 comments on commit ce90af9

Please sign in to comment.