Skip to content

Commit

Permalink
Ion and Philipp's table retries (#10)
Browse files Browse the repository at this point in the history
* Ion and Philipp's table retries

* Refactor the retry struct:
- Rename it from retry_struct to retry_info
- Retry information contains the failure callback, not the retry callback
- All functions take in retry information as an arg instead of its expanded fields

* Rename cb -> callback

* Remove prints

* Fix compiler warnings

* Change some CHECKs to greatest ASSERTs

* Key outstanding callbacks hash table with timer ID instead of callback data pointer

* Use the new retry API for table commands

* Memory cleanup in plasma unit tests

* fix Robert's comments

* add valgrind for common
  • Loading branch information
istoica authored and pcmoritz committed Oct 29, 2016
1 parent 84c581c commit ee3718c
Show file tree
Hide file tree
Showing 23 changed files with 1,914 additions and 240 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ matrix:
- sudo apt-get update -qq
- sudo apt-get install -qq valgrind
script:
- cd src/common
- make valgrind
- cd ../..

- cd src/plasma
- make valgrind
- cd ../..
Expand Down
16 changes: 12 additions & 4 deletions src/common/Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
CC = gcc
CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -fPIC -I. -Ithirdparty -Ithirdparty/ae -Wno-typedef-redefinition -Werror
CFLAGS = -g -Wall -Wno-typedef-redefinition --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -fPIC -I. -Ithirdparty -Ithirdparty/ae
BUILD = build

all: hiredis $(BUILD)/libcommon.a

$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o thirdparty/ae/ae.o
$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o state/table.o state/object_table.o state/task_log.o thirdparty/ae/ae.o
ar rcs $@ $^

$(BUILD)/common_tests: test/common_tests.c $(BUILD)/libcommon.a
Expand All @@ -13,6 +13,12 @@ $(BUILD)/common_tests: test/common_tests.c $(BUILD)/libcommon.a
$(BUILD)/db_tests: hiredis test/db_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ test/db_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS)

$(BUILD)/object_table_tests: hiredis test/object_table_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ test/object_table_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS)

$(BUILD)/task_log_tests: hiredis test/task_log_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ test/task_log_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS)

$(BUILD)/io_tests: test/io_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ $^ $(CFLAGS)

Expand All @@ -32,15 +38,17 @@ redis:
hiredis:
git submodule update --init --recursive -- "thirdparty/hiredis" ; cd thirdparty/hiredis ; make

test: hiredis redis $(BUILD)/common_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE
test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_log_tests $(BUILD)/object_table_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE
./thirdparty/redis-3.2.3/src/redis-server &
sleep 1s ; ./build/common_tests ; ./build/db_tests ; ./build/io_tests ; ./build/task_tests ; ./build/redis_tests
sleep 1s ; ./build/common_tests ; ./build/db_tests ; ./build/task_log_tests ; ./build/object_table_tests ; ./build/io_tests ; ./build/task_tests ; ./build/redis_tests

valgrind: test
valgrind --leak-check=full --error-exitcode=1 ./build/common_tests
valgrind --leak-check=full --error-exitcode=1 ./build/db_tests
valgrind --leak-check=full --error-exitcode=1 ./build/io_tests
valgrind --leak-check=full --error-exitcode=1 ./build/task_tests
valgrind --leak-check=full --error-exitcode=1 ./build/redis_tests
valgrind --leak-check=full --error-exitcode=1 ./build/task_log_tests
valgrind --leak-check=full --error-exitcode=1 ./build/object_table_tests

FORCE:
5 changes: 5 additions & 0 deletions src/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <inttypes.h>

#ifndef RAY_COMMON_DEBUG
#define LOG_DEBUG(M, ...)
Expand Down Expand Up @@ -36,6 +37,10 @@
} \
} while (0);

/** This macro indicates that this pointer owns the data it is pointing to
* and is responsible for freeing it. */
#define OWNER

#define UNIQUE_ID_SIZE 20

/* Cleanup method for running tests with the greatest library.
Expand Down
9 changes: 4 additions & 5 deletions src/common/event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,14 @@ void event_loop_remove_file(event_loop *loop, int fd) {
}

int64_t event_loop_add_timer(event_loop *loop,
int64_t milliseconds,
int64_t timeout,
event_loop_timer_handler handler,
void *context) {
return aeCreateTimeEvent(loop, milliseconds, handler, context, NULL);
return aeCreateTimeEvent(loop, timeout, handler, context, NULL);
}

void event_loop_remove_timer(event_loop *loop, timer_id timer_id) {
int err = aeDeleteTimeEvent(loop, timer_id);
CHECK(err == AE_OK); /* timer id found? */
int event_loop_remove_timer(event_loop *loop, int64_t id) {
return aeDeleteTimeEvent(loop, id);
}

void event_loop_run(event_loop *loop) {
Expand Down
29 changes: 23 additions & 6 deletions src/common/event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#include <stdint.h>
#include "ae/ae.h"

/* Unique timer ID that will be generated when the timer is added to the
* event loop. Will not be reused later on in another call
* to event_loop_add_timer. */
typedef long long timer_id;

typedef aeEventLoop event_loop;
Expand Down Expand Up @@ -57,16 +60,30 @@ void event_loop_add_file(event_loop *loop,
/* Remove a registered file event handler from the event loop. */
void event_loop_remove_file(event_loop *loop, int fd);

/* Register a handler that will be called after a time slice of
* "milliseconds" milliseconds. Can specify a context that will be passed
* as an argument to the handler. Return the id of the time event. */
/** Register a handler that will be called after a time slice of
* "timeout" milliseconds.
*
* @param loop The event loop.
* @param timeout The timeout in milliseconds.
* @param handler The handler for the timeout.
* @param context User context that can be passed in and will be passed in
* as an argument for the timer handler.
* @return The ID of the timer.
*/
int64_t event_loop_add_timer(event_loop *loop,
int64_t milliseconds,
int64_t timeout,
event_loop_timer_handler handler,
void *context);

/* Remove a registered time event handler from the event loop. */
void event_loop_remove_timer(event_loop *loop, timer_id timer_id);
/**
* Remove a registered time event handler from the event loop. Can be called
* multiple times on the same timer.
*
* @param loop The event loop.
* @param timer_id The ID of the timer to be removed.
* @return Returns 0 if the removal was successful.
*/
int event_loop_remove_timer(event_loop *loop, int64_t timer_id);

/* Run the event loop. */
void event_loop_run(event_loop *loop);
Expand Down
2 changes: 1 addition & 1 deletion src/common/state/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#include "event_loop.h"

typedef struct db_handle_impl db_handle;
typedef struct db_handle db_handle;

/* Connect to the global system store at address and port. Returns
* a handle to the database, which must be freed with db_disconnect
Expand Down
38 changes: 38 additions & 0 deletions src/common/state/object_table.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#include "object_table.h"
#include "redis.h"

void object_table_lookup(db_handle *db_handle,
object_id object_id,
retry_info *retry,
object_table_lookup_done_callback done_callback,
void *user_context) {
init_table_callback(db_handle, object_id, NULL, retry, done_callback,
redis_object_table_lookup, user_context);
}

void object_table_add(db_handle *db_handle,
object_id object_id,
retry_info *retry,
object_table_done_callback done_callback,
void *user_context) {
init_table_callback(db_handle, object_id, NULL, retry, done_callback,
redis_object_table_add, user_context);
}

void object_table_subscribe(
db_handle *db_handle,
object_id object_id,
object_table_object_available_callback object_available_callback,
void *subscribe_context,
retry_info *retry,
object_table_done_callback done_callback,
void *user_context) {
object_table_subscribe_data *sub_data =
malloc(sizeof(object_table_subscribe_data));
utarray_push_back(db_handle->callback_freelist, &sub_data);
sub_data->object_available_callback = object_available_callback;
sub_data->subscribe_context = subscribe_context;

init_table_callback(db_handle, object_id, sub_data, retry, done_callback,
redis_object_table_subscribe, user_context);
}
131 changes: 116 additions & 15 deletions src/common/state/object_table.h
Original file line number Diff line number Diff line change
@@ -1,25 +1,126 @@
#ifndef OBJECT_TABLE_H
#define OBJECT_TABLE_H

#include "common.h"
#include "table.h"
#include "db.h"

/* The callback that is called when the result of a lookup
* in the object table comes back. The callback should free
* the manager_vector array, but NOT the strings they are pointing to. */
typedef void (*lookup_callback)(object_id object_id,
int manager_count,
const char *manager_vector[],
void *context);
/*
* ==== Lookup call and callback ====
*/

/* Register a new object with the directory. */
/* TODO(pcm): Retry, print for each attempt. */
void object_table_add(db_handle *db, object_id object_id);
/* Callback called when the lookup completes. The callback should free
* the manager_vector array, but NOT the strings they are pointing to.
*/
typedef void (*object_table_lookup_done_callback)(
object_id object_id,
int manager_count,
OWNER const char *manager_vector[],
void *user_context);

/* Remove object from the directory. */
void object_table_remove(db_handle *db,
/**
* Return the list of nodes storing object_id in their plasma stores.
*
* @param db_handle Handle to object_table database.
* @param object_id ID of the object being looked up.
* @param retry Information about retrying the request to the database.
* @param done_callback Function to be called when database returns result.
* @param user_context Context passed by the caller.
* @return Void.
*/
void object_table_lookup(db_handle *db_handle,
object_id object_id,
const char *manager);
retry_info *retry,
object_table_lookup_done_callback done_callback,
void *user_context);

/*
* ==== Add object call and callback ====
*/

/* Callback called when the object add/remove operation completes. */
typedef void (*object_table_done_callback)(object_id object_id,
void *user_context);

/* Look up entry from the directory */
void object_table_lookup(db_handle *db,
/**
* Add the plasma manager that created the db_handle to the
* list of plasma managers that have the object_id.
*
* @param db_handle Handle to db.
* @param object_id Object unique identifier.
* @param retry Information about retrying the request to the database.
* @param done_callback Callback to be called when lookup completes.
* @param user_context User context to be passed in the callbacks.
* @return Void.
*/
void object_table_add(db_handle *db_handle,
object_id object_id,
retry_info *retry,
object_table_done_callback done_callback,
void *user_context);

/*
* ==== Remove object call and callback ====
*/

/**
* Object remove function.
*
* @param db_handle Handle to db.
* @param object_id Object unique identifier.
* @param retry Information about retrying the request to the database.
* @param done_callback Callback to be called when lookup completes.
* @param user_context User context to be passed in the callbacks.
* @return Void.
*/
/*
void object_table_remove(db_handle *db,
object_id object_id,
lookup_callback callback,
void *context);
retry_info *retry,
object_table_done_callback done_callback,
void *user_context);
*/

/*
* ==== Subscribe to be announced when new object available ====
*/

/* Callback called when object object_id is available. */
typedef void (*object_table_object_available_callback)(object_id object_id,
void *user_context);

/**
* Subcribing to new object available function.
*
* @param db_handle Handle to db.
* @param object_id Object unique identifier.
* @param object_available_callback callback to be called when new object
* becomes
* available.
* @param subscribe_context caller context which will be passed back in the
* object_available_callback.
* @param retry Information about retrying the request to the database.
* @param done_callback Callback to be called when subscription is installed.
* @param user_context User context to be passed in the callbacks.
* @return Void.
*/

void object_table_subscribe(
db_handle *db,
object_id object_id,
object_table_object_available_callback object_available_callback,
void *subscribe_context,
retry_info *retry,
object_table_done_callback done_callback,
void *user_context);

/* Data that is needed to register new object available callbacks with the state
* database. */
typedef struct {
object_table_object_available_callback object_available_callback;
void *subscribe_context;
} object_table_subscribe_data;

#endif /* OBJECT_TABLE_H */
Loading

0 comments on commit ee3718c

Please sign in to comment.