Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async cache tooling #104

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions tools/scripts/asynccache/AsyncCache.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Options for caching a block asynchronously
// next available id: 6
message AsyncCacheRequest {
optional int64 block_id = 1;
// TODO(calvin): source host and port should be replace with WorkerNetAddress
optional string source_host = 2;
optional int32 source_port = 3;
optional OpenUfsBlockOptions open_ufs_block_options = 4;
optional int64 length = 5;
}

// Options to open a UFS block.
// next available id: 7
message OpenUfsBlockOptions {
optional string ufs_path = 1;
// The offset of the block in within the file.
optional int64 offset_in_file = 2;
// The block size.
optional int64 block_size = 3;
optional int32 maxUfsReadConcurrency = 4;
optional int64 mountId = 5;
// If set, do not try to cache the block locally when reading the data from the UFS.
optional bool no_cache = 6;
// The client does not need to set this. This is set by the worker.
optional string user = 7;
}

message LocalBlockOpenResponse {
optional string path = 1;
}

160 changes: 160 additions & 0 deletions tools/scripts/asynccache/alluxiosc.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#define _GNU_SOURCE_

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <dlfcn.h>
#include <string.h>
#include <limits.h>
#include <mntent.h>
#include <sys/types.h>
#include <sys/stat.h>

#define MNT_MAX 100
char *seed = ".tmp.ava.alluxiosc.tmp"; // keep same with server
char *cache_seed = ".tmp.cache.tmp.ava.alluxiosc.tmp"; // keep same with server

typedef struct ava_struct {
char *mnt_slots[MNT_MAX];
int mnt_idx;
int debug;
int query;
int cache;
} ava_struct;

ava_struct ava;

FILE* (*my_fopen)(const char *filename, const char* mode);
int (*my_open)(const char *filename, int flags);
int (*my_open64)(const char *filename, int flags);

int (*my__xstat)(int ver, const char *path, struct stat *buf);
int (*my__xstat64)(int ver, const char *path, struct stat64 *buf);

void __attribute__ ((constructor)) init(void){
FILE *file= setmntent("/proc/mounts", "r");
struct mntent *ent;

ava.debug = (getenv("alluxiosc_debug") != NULL);
ava.query = (getenv("alluxiosc_query") != NULL);
ava.cache = (getenv("alluxiosc_cache") != NULL);
if (ava.cache) seed = cache_seed;
ava.mnt_idx = 0;

my_fopen = dlsym(RTLD_NEXT, "fopen");
my_open = dlsym(RTLD_NEXT, "open");
my_open64 = dlsym(RTLD_NEXT, "open64");
my__xstat = dlsym(RTLD_NEXT, "__xstat");
my__xstat64 = dlsym(RTLD_NEXT, "__xstat64");

if (file== NULL) {
perror("setmntent");
return;
}

while (NULL != (ent = getmntent(file))) {
if (0 == strcmp("alluxio-fuse", ent->mnt_fsname)) {
ava.mnt_slots[ava.mnt_idx++] = strdup(ent->mnt_dir);
}
}
endmntent(file);
}

int is_alluxio_file(const char *fullpath)
{
for (int i = 0; i < ava.mnt_idx; i++) {
char *path = strstr(fullpath, ava.mnt_slots[i]);
if (path == fullpath) {
return 1;
}
}
return 0;
}

int get_sc(const char *filename, char *sc, int size, int *af)
{
char buf[PATH_MAX + 1], *path = NULL;
int rc = 0, i = 0;
FILE *f = NULL;

*af = 0;
if (NULL == realpath(filename, buf)) return 0;
if (!is_alluxio_file(buf)) return 0;
*af = 1;

strcat(buf, seed);

if ((f = my_fopen(buf, "r")) && fgets(sc, size, f)) {
i = strlen(sc);
if (sc[i - 1] == '\n') sc[i - 1] = '\0';
rc = ('/' == sc[0]);
}
if (f) fclose(f);
return rc;
}

int __xstat(int ver, const char *path, struct stat *buf)
{
char full[PATH_MAX + 1];
if (ava.query) {
char sc[PATH_MAX + 1];
int af, rc = get_sc(path, sc, sizeof(sc), &af);
printf("query=%s\n", sc);
return my__xstat(ver, path, buf);
}

if (NULL == realpath(path, full)) return my__xstat(ver, path, buf); // error out
if (!is_alluxio_file(full)) return my__xstat(ver, path, buf);

if (ava.debug) fprintf(stderr, "--- async cache %s\n", full);
strcat(full, seed);
return my__xstat(ver, full, buf);
}

int __xstat64(int ver, const char *path, struct stat64 *buf)
{
char full[PATH_MAX + 1];
if (ava.query) {
char sc[PATH_MAX + 1];
int af, rc = get_sc(path, sc, sizeof(sc), &af);
printf("query=%s\n", sc);
return my__xstat64(ver, path, buf);
}

if (NULL == realpath(path, full)) return my__xstat64(ver, path, buf); // error out
if (!is_alluxio_file(full)) return my__xstat64(ver, path, buf);

if (ava.debug) fprintf(stderr, "--- async cache64 %s\n", full);
strcat(full, seed);
return my__xstat64(ver, full, buf);
}

FILE* fopen(const char* filename, const char* mode){
char sc[PATH_MAX + 1];
memset(sc, 0, sizeof(sc));
int af, rc = get_sc(filename, sc, sizeof(sc), &af);
if (ava.debug && af) fprintf(stderr, "--- fopen filename=%s, rc=%d, sc=%s\n", filename, rc, sc);
FILE *f = rc ? my_fopen(sc, mode) : NULL;
return f ? f : my_fopen(filename, mode);
}

int open(const char *filename, int flags)
{
char sc[PATH_MAX + 1];
memset(sc, 0, sizeof(sc));
int af, rc = get_sc(filename, sc, sizeof(sc), &af);
if (ava.debug && af) fprintf(stderr, "--- open filename=%s, rc=%d, sc=%s\n", filename, rc, sc);
int fd = rc ? my_open(sc, flags) : 0;
return fd ? fd : my_open(filename, flags);
}

int open64(const char *filename, int flags)
{
char sc[PATH_MAX + 1];
memset(sc, 0, sizeof(sc));
int af, rc = get_sc(filename, sc, sizeof(sc), &af);
if (ava.debug && af) fprintf(stderr, "--- open64 filename=%s, rc=%d, sc=%s\n", filename, rc, sc);
int fd = rc ? my_open64(sc, flags) : 0;
return fd ? fd : my_open64(filename, flags);
}

81 changes: 81 additions & 0 deletions tools/scripts/asynccache/ava_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#! /usr/bin/python3
"""
apt-get update
apt-get install -y python3-pip
pip3 install google-apputils
pip3 install protobuf
protoc --proto_path=. --python_out=. AsyncCache.proto
"""

from AsyncCache_pb2 import AsyncCacheRequest, LocalBlockOpenResponse
import sys
import socket
import struct
import time

if len(sys.argv) != 4:
print("Usage:", sys.argv[0], "ip port file_list")
sys.exit(-1)

ip = sys.argv[1]
port = int(sys.argv[2])
flist = sys.argv[3]

address = (ip, port)
clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
clientsocket.connect(address)

TH_HI = 100
TH_LOW = 50
ID_PURGE_BLOCK = 0
ID_QUERY_BLOCK = -1


def async_cache(id, length):
data = AsyncCacheRequest()
data.length = length
data.block_id = id
s = data.SerializeToString()
# (long)length - (int)id - (int)packet_length - packet
clientsocket.sendall(struct.pack('>QII', 16 + len(s), 112, len(s)) + s)


def evict(id):
async_cache(id, ID_PURGE_BLOCK)


def trim(idlist, low):
rsp = LocalBlockOpenResponse()
while len(idlist) > low:
not_done = []
for id in idlist:
async_cache(id, ID_QUERY_BLOCK)
result = clientsocket.recv(16)
if result == b'':
raise RuntimeError("socket connection broken")
alen, retid, plen = struct.unpack('>QII', result)
if retid != 106:
raise RuntimeError("not expect id")
result = clientsocket.recv(plen)
rsp.ParseFromString(result)
if rsp.path[0] != '/':
print("done ", id)
else:
not_done.append(id)
evict(id)
idlist = not_done
time.sleep(2)
return idlist


idlist = []
with open(flist, "r") as lines:
for line in lines:
id = int(line.split()[0])
if len(idlist) >= TH_HI:
idlist = trim(idlist, TH_LOW)
print("evicting ", id)
evict(id)
idlist.append(id)

trim(idlist, 0)
66 changes: 66 additions & 0 deletions tools/scripts/asynccache/ava_cache.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/bin/bash

if [ $# -lt 2 ]
then
echo usage "$0 <alluxiosc.so path> <list file>"
exit -1
fi

so=$1
list=$2
if [ ! -e $so ]
then
echo "so file $so does not exist"
exit -1
fi
if [ ! -e $list ]
then
echo "list file $list does not exist"
exit -1
fi

MAX=50

cache=()
trim() {
left=()
for f in ${cache[@]}
do
if [ ! -e "$f" ]
then
echo "remove non existing $f ..."
continue
fi
ok=`alluxiosc_query=1 LD_PRELOAD=$so ls "$f" | grep 'query=/'`
if [ -z "$ok" ]
then
echo "trying to cache again $f ..."
left+=("$f")
alluxiosc_cache=1 LD_PRELOAD=$so ls "$f" > /dev/null &
else
echo "done cache $f ..."
fi
done
cache=(${left[@]})
}

while read line
do
while [ ${#cache[@]} -gt $MAX ]
do
sleep 2
trim
done

f=`echo $line | awk '{print $1}'`
cache+=("$f")
echo "trying to cache $f ..."
alluxiosc_cache=1 LD_PRELOAD=$so ls "$f" > /dev/null &
done < $list

while [ ${#cache[@]} -gt 0 ]
do
sleep 1
trim
done

2 changes: 2 additions & 0 deletions tools/scripts/asynccache/mk_alluxiosc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
rm alluxiosc.so
gcc -D_GNU_SOURCE -fPIC -shared -O2 alluxiosc.c -o alluxiosc.so -ldl