Skip to content

Commit

Permalink
Get rid of FTP and write back to minio instead
Browse files Browse the repository at this point in the history
  • Loading branch information
jasha64 committed Aug 8, 2024
1 parent e7f4e4a commit 9a8504e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void shutdown()
log.info("rpc server is trying to shutdown");
FileAppender appender = LoggerContext.getContext().getConfiguration().getAppender("log");
String logFilename = appender.getFileName();
Utils.append(logFilename, logFilename);
Utils.upload(logFilename, logFilename);
this.rpcServer.shutdown().awaitTermination(5, TimeUnit.SECONDS);
log.info("rpc server close successfully");
} catch (InterruptedException e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.pixelsdb.pixels.common.utils.Constants;
import io.pixelsdb.pixels.worker.vhive.StreamWorkerCommon;
import one.profiler.AsyncProfiler;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import io.pixelsdb.pixels.common.physical.Storage;

import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
Expand All @@ -33,77 +35,33 @@

public class Utils
{
private static final Storage minio = StreamWorkerCommon.getStorage(Storage.Scheme.minio);
private static final AsyncProfiler PROFILER = AsyncProfiler.getInstance();
private static final String EVENT = System.getenv("PROFILING_EVENT");
private static final String FTP_HOST = System.getenv("FTP_HOST");
private static final String FTP_PORT = System.getenv("FTP_PORT");
private static final String FTP_USERNAME = System.getenv("FTP_USERNAME");
private static final String FTP_PASSWORD = System.getenv("FTP_PASSWORD");
private static final String FTP_WORKDIR = System.getenv("FTP_WORKDIR");
private static final String LOG_WORKDIR = System.getenv("LOG_WORKDIR");

private static void createDirectoryTree(FTPClient client, String dirTree) throws IOException
{
if (dirTree.startsWith(client.printWorkingDirectory()))
{
dirTree = dirTree.substring(client.printWorkingDirectory().length());
}
//tokenize the string and attempt to change into each directory level. If you cannot, then start creating.
String[] directories = dirTree.split("/");
for (String dir : directories)
{
if (!dir.isEmpty())
{
if (!client.changeWorkingDirectory(dir))
{
if (!client.makeDirectory(dir))
{
throw new IOException("Unable to create remote directory '" + dir + "'. error='" + client.getReplyString() + "'");
}
if (!client.changeWorkingDirectory(dir))
{
throw new IOException("Unable to change into newly created remote directory '" + dir + "'. error='" + client.getReplyString() + "'");
}
}
}
}
}

public static void append(String src, String dest) throws IOException
public static void upload(String src, String dest) throws IOException
{
// append the log file to FTP server
FTPClient ftpClient = new FTPClient();
ftpClient.connect(FTP_HOST, Integer.parseInt(FTP_PORT));
ftpClient.login(FTP_USERNAME, FTP_PASSWORD);
ftpClient.enterLocalPassiveMode();
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
// write to the log file
dest = String.format("%s/%s", LOG_WORKDIR, dest);
// String dir = dest.substring(0, dest.lastIndexOf("/"));
// minio.mkdirs(dir);

dest = String.format("%s/%s", FTP_WORKDIR, dest);
String dir = dest.substring(0, dest.lastIndexOf("/"));
createDirectoryTree(ftpClient, dir);
try (FileInputStream iStream = new FileInputStream(src);
DataOutputStream oStream = minio.create(dest, false, Constants.S3_BUFFER_SIZE)) {

FileInputStream inputStream = new FileInputStream(src);
ftpClient.appendFile(dest, inputStream);
inputStream.close();
ftpClient.logout();
}
byte[] buffer = new byte[4096];
int bytesRead;

public static void upload(String src, String dest) throws IOException
{
// store the JFR profiling file to FTP server
FTPClient ftpClient = new FTPClient();
ftpClient.connect(FTP_HOST, Integer.parseInt(FTP_PORT));
ftpClient.login(FTP_USERNAME, FTP_PASSWORD);
ftpClient.enterLocalPassiveMode();
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
while ((bytesRead = iStream.read(buffer)) != -1) {
oStream.write(buffer, 0, bytesRead);
}

dest = String.format("%s/%s", FTP_WORKDIR, dest);
String dir = dest.substring(0, dest.lastIndexOf("/"));
createDirectoryTree(ftpClient, dir);
oStream.flush();

FileInputStream inputStream = new FileInputStream(src);
ftpClient.storeFile(dest, inputStream);
inputStream.close();
ftpClient.logout();
} catch (IOException e) {
e.printStackTrace();
}
}

public static void dump(String filename, Object... contents) throws IOException
Expand Down

0 comments on commit 9a8504e

Please sign in to comment.