Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix two bugs #49

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,070 changes: 1,070 additions & 0 deletions .tags

Large diffs are not rendered by default.

1,070 changes: 1,070 additions & 0 deletions .tags_sorted_by_file

Large diffs are not rendered by default.

31 changes: 27 additions & 4 deletions cetcd.c
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,8 @@ static int cetcd_reap_watchers(cetcd_client *cli, CURLM *mcurl) {
curl_multi_remove_handle(mcurl, curl);
cetcd_watcher_reset(watcher);

if (watcher->index) {
if (index) {
// if (watcher->index) {
watcher->index = index + 1;
url = cetcd_watcher_build_url(cli, watcher);
curl_easy_setopt(watcher->curl, CURLOPT_URL, url);
Expand All @@ -418,6 +419,26 @@ int cetcd_multi_watch(cetcd_client *cli, cetcd_array *watchers) {
cetcd_watcher *watcher;
CURLM *mcurl;


// typedef enum {
// CURLM_CALL_MULTI_PERFORM = -1, /* please call curl_multi_perform() or
// curl_multi_socket*() soon */
// CURLM_OK,
// CURLM_BAD_HANDLE, /* the passed-in handle is not a valid CURLM handle */
// CURLM_BAD_EASY_HANDLE, /* an easy handle was not good/valid */
// CURLM_OUT_OF_MEMORY, /* if you ever get this, you're in deep sh*t */
// CURLM_INTERNAL_ERROR, this is a libcurl bug
// CURLM_BAD_SOCKET, /* the passed in socket argument did not match */
// CURLM_UNKNOWN_OPTION, /* curl_multi_setopt() with unsupported option */
// CURLM_ADDED_ALREADY, /* an easy handle already added to a multi handle was
// attempted to get added - again */
// CURLM_RECURSIVE_API_CALL, /* an api function was called from inside a
// callback */
// CURLM_LAST
// }
CURLMcode ret;


struct timeval tv;

mcurl = curl_multi_init();
Expand All @@ -430,7 +451,8 @@ int cetcd_multi_watch(cetcd_client *cli, cetcd_array *watchers) {
backoff = 100; /*100ms*/
backoff_max = 1000; /*1 sec*/
for(;;) {
curl_multi_perform(mcurl, &left);
ret = curl_multi_perform(mcurl, &left);
// printf("\t\t ****** 1111 ret : %d.\n", ret);
if (left) {
FD_ZERO(&r);
FD_ZERO(&w);
Expand All @@ -448,7 +470,8 @@ int cetcd_multi_watch(cetcd_client *cli, cetcd_array *watchers) {
/*TODO handle errors*/
select(maxfd+1, &r, &w, &e, &tv);

curl_multi_perform(mcurl, &left);
ret = curl_multi_perform(mcurl, &left);
// printf("\t\t +++++ 2222 ret : %d.\n", ret);
}
added = cetcd_reap_watchers(cli, mcurl);
if (added == 0 && left == 0) {
Expand Down Expand Up @@ -489,7 +512,7 @@ cetcd_watch_id cetcd_multi_watch_async(cetcd_client *cli, cetcd_array *watchers)
args = calloc(2, sizeof(void *));
args[0] = cli;
args[1] = watchers;
pthread_create(&thread, NULL, (void *(*)(void *))cetcd_multi_watch_wrapper, args);
pthread_create(thread, NULL, (void *(*)(void *))cetcd_multi_watch_wrapper, args);
return thread;
}
int cetcd_multi_watch_async_stop(cetcd_client *cli, cetcd_watch_id wid) {
Expand Down
Binary file added examples/a.out
Binary file not shown.
201 changes: 201 additions & 0 deletions examples/cetcd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
#ifndef CETCD_CETCD_H
#define CETCD_CETCD_H

#ifdef __cplusplus
extern "C" {
#endif

#include <curl/curl.h>
#include <pthread.h>
#include <stdint.h>
#include "sds/sds.h"
#include "cetcd_array.h"
#define CETCD_VERSION release-v0.0.5
typedef sds cetcd_string;
typedef pthread_t cetcd_watch_id;

enum HTTP_METHOD {
ETCD_HTTP_GET,
ETCD_HTTP_POST,
ETCD_HTTP_PUT,
ETCD_HTTP_DELETE,
ETCD_HTTP_HEAD,
ETCD_HTTP_OPTION
};
enum ETCD_EVENT_ACTION {
ETCD_SET,
ETCD_GET,
ETCD_UPDATE,
ETCD_CREATE,
ETCD_DELETE,
ETCD_EXPIRE,
ETCD_CAS,
ETCD_CAD,
ETCD_ACTION_MAX
};
/* etcd error codes range is [100, 500]
* We use 1000+ as cetcd error codes;
* */
#define error_response_parsed_failed 1000
#define error_send_request_failed 1001
#define error_cluster_failed 1002
typedef struct cetcd_error_t {
int ecode;
char *message;
char *cause;
uint64_t index;
}cetcd_error;
typedef struct cetcd_client_t {
CURL *curl;
cetcd_error *err;
cetcd_array watchers; /*curl watch handlers*/
cetcd_array *addresses;/*cluster addresses*/
const char *keys_space;
const char *stat_space;
const char *member_space;
int picked;
struct {
int verbose;
uint64_t ttl;
uint64_t connect_timeout;
uint64_t read_timeout;
uint64_t write_timeout;
char *user;
char *password;
} settings;

} cetcd_client;

typedef struct cetcd_response_node_t {
cetcd_array *nodes; //struct etcd_response_node_t
char *key;
char *value;
int dir; /* 1 for true, and 0 for false */
uint64_t expiration;
int64_t ttl;
uint64_t modified_index;
uint64_t created_index;

} cetcd_response_node;

typedef struct cetcd_reponse_t {
cetcd_error *err;
int action;
struct cetcd_response_node_t *node;
struct cetcd_response_node_t *prev_node;
uint64_t etcd_index;
uint64_t raft_index;
uint64_t raft_term;
} cetcd_response;

struct cetcd_response_parser_t ;

typedef int (*cetcd_watcher_callback) (void *userdata, cetcd_response *resp);
typedef struct cetcd_watcher_t {
cetcd_client *cli;
struct cetcd_response_parser_t *parser;
int attempts;
int array_index; /*the index in array cli->wachers*/

CURL *curl;
int once;
int recursive;
uint64_t index;
char * key;
void *userdata;
cetcd_watcher_callback callback;
} cetcd_watcher;

/*cetcd_client_create allocate the cetcd_client and return the pointer*/
cetcd_client* cetcd_client_create(cetcd_array *addresses);
/*cetcd_client_init initialize a cetcd_client*/
void cetcd_client_init(cetcd_client *cli, cetcd_array *addresses);
/*cetcd_client_destroy destroy the resource a client used*/
void cetcd_client_destroy(cetcd_client *cli);
/*cetcd_client_release free the cetcd_client object*/
void cetcd_client_release(cetcd_client *cli);
/*cetcd_addresses_release free the array of an etcd cluster addresses*/
void cetcd_addresses_release(cetcd_array *addrs);
/*cetcd_client_sync_cluster sync the members of an etcd cluster, this may be used
* when the members of etcd changed
* */
void cetcd_client_sync_cluster(cetcd_client *cli);

/*cetcd_setup_user set the auth username and password*/
void cetcd_setup_user(cetcd_client *cli, const char *user, const char *password);

/*cetcd_setup_tls setup the tls cert and key*/
void cetcd_setup_tls(cetcd_client *cli, const char *CA,
const char *cert, const char *key);

/*cetcd_get get the value of a key*/
cetcd_response *cetcd_get(cetcd_client *cli, const char *key);
/*cetcd_lsdir list the nodes under a directory*/
cetcd_response *cetcd_lsdir(cetcd_client *cli, const char *key, int sort, int recursive);

/*cetcd_set set the value of a key*/
cetcd_response *cetcd_set(cetcd_client *cli, const char *key,
const char *value, uint64_t ttl);

/*cetcd_mkdir create a directroy, it will fail if the key has exist*/
cetcd_response *cetcd_mkdir(cetcd_client *cli, const char *key, uint64_t ttl);

/*cetcd_mkdir create a directory whether it exist or not*/
cetcd_response *cetcd_setdir(cetcd_client *cli, const char *key, uint64_t ttl);

/*cetcd_updatedir update the ttl of a directory*/
cetcd_response *cetcd_updatedir(cetcd_client *cli, const char *key, uint64_t ttl);

/*cetcd_update update the value or ttl of a key, only refresh the ttl if refresh is set*/
cetcd_response *cetcd_update(cetcd_client *cli, const char *key,
const char *value, uint64_t ttl, int refresh);
/*cetcd_create create a node with value*/
cetcd_response *cetcd_create(cetcd_client *cli, const char *key,
const char *value, uint64_t ttl);
/*cetcd_create_in_order create in order keys*/
cetcd_response *cetcd_create_in_order(cetcd_client *cli, const char *key,
const char *value, uint64_t ttl);
/*cetcd_delete delete a key*/
cetcd_response *cetcd_delete(cetcd_client *cli, const char *key);
/*cetcd_rmdir delete a directory*/
cetcd_response *cetcd_rmdir(cetcd_client *cli, const char *key, int recursive);

/*cetcd_watch watch the changes of a key*/
cetcd_response *cetcd_watch(cetcd_client *cli, const char *key, uint64_t index);
/*cetcd_watch_recursive watch a key and all its sub keys*/
cetcd_response *cetcd_watch_recursive(cetcd_client *cli, const char *key, uint64_t index);

cetcd_response *cetcd_cmp_and_swap(cetcd_client *cli, const char *key, const char *value,
const char *prev, uint64_t ttl);
cetcd_response *cetcd_cmp_and_swap_by_index(cetcd_client *cli, const char *key, const char *value,
uint64_t prev, uint64_t ttl);
cetcd_response *cetcd_cmp_and_delete(cetcd_client *cli, const char *key, const char *prev);
cetcd_response *cetcd_cmp_and_delete_by_index(cetcd_client *cli, const char *key, uint64_t prev);

/*cetcd_watcher_create create a watcher object*/
cetcd_watcher *cetcd_watcher_create(cetcd_client *cli, const char *key, uint64_t index,
int recursive, int once, cetcd_watcher_callback callback, void *userdata);
/*cetcd_add_watcher add a watcher to the array*/
int cetcd_add_watcher(cetcd_array *watchers, cetcd_watcher *watcher);
/*cetcd_del_watcher delete a watcher from the array*/
int cetcd_del_watcher(cetcd_array *watchers, cetcd_watcher *watcher);

/*cetcd_multi_watch setup all watchers and wait*/
int cetcd_multi_watch(cetcd_client *cli, cetcd_array *watchers);
/*cetcd_multi_watch setup all watchers in a seperate thread and return the watch id*/
cetcd_watch_id cetcd_multi_watch_async(cetcd_client *cli, cetcd_array *watchers);
/*cetcd_multi_watch stop the watching thread with the watch id*/
int cetcd_multi_watch_async_stop(cetcd_client *cli, cetcd_watch_id wid);
/*cetcd_stop_watcher stop a watcher which has been setup*/
int cetcd_stop_watcher(cetcd_client *cli, cetcd_watcher *watcher);


void cetcd_response_print(cetcd_response *resp);
void cetcd_response_release(cetcd_response *resp);
void cetcd_error_release(cetcd_error *err);

#ifdef __cplusplus
} // extern "C"
#endif

#endif
28 changes: 28 additions & 0 deletions examples/cetcd_array.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#ifndef CETCD_ARRAY_H
#define CETCD_ARRAY_H
#include <stdlib.h>

typedef struct cetcd_array_t {
void **elem;
size_t count;
size_t cap;
} cetcd_array;

size_t cetcd_array_size(cetcd_array *ca);
size_t cetcd_array_cap(cetcd_array *ca);

cetcd_array *cetcd_array_create(size_t cap);
void cetcd_array_release(cetcd_array *ca);

int cetcd_array_init(cetcd_array *ca, size_t cap);
int cetcd_array_destroy(cetcd_array *ca);
int cetcd_array_append(cetcd_array *ca, void *p);

void *cetcd_array_get(cetcd_array *ca, size_t index);
int cetcd_array_set(cetcd_array *ca, size_t index, void *p);
void *cetcd_array_top(cetcd_array *ca);
void *cetcd_array_pop(cetcd_array *ca);

cetcd_array *cetcd_array_shuffle(cetcd_array *cards);

#endif
4 changes: 2 additions & 2 deletions examples/cetcd_get.c
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#include "../cetcd.h"
#include "cetcd.h"

int main(int argc, char *argv[]) {
cetcd_client cli;
cetcd_response *resp;
cetcd_array addrs;

cetcd_array_init(&addrs, 3);
cetcd_array_append(&addrs, "http://127.0.0.1:2379");
cetcd_array_append(&addrs, "http://172.253.33.14:2379");

cetcd_client_init(&cli, &addrs);

Expand Down
Loading