Skip to content

Commit

Permalink
Minor updates
Browse files Browse the repository at this point in the history
  • Loading branch information
houjun committed Nov 2, 2024
1 parent 0590a96 commit 69576fd
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 34 deletions.
60 changes: 36 additions & 24 deletions src/api/pdc_client_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -7338,6 +7338,14 @@ PDC_add_kvtag(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont)

FUNC_ENTER(NULL);

if (use_shm_meta_query_g) {
// Invalidate the cached metadata snapshot
if (deserializedBulki_g) {
BULKI_free(deserializedBulki_g, 1);
deserializedBulki_g = NULL;
}
}

if (is_cont == 0) {
obj_prop = PDC_obj_get_info(obj_id);
meta_id = obj_prop->obj_info_pub->meta_id;
Expand All @@ -7351,7 +7359,6 @@ PDC_add_kvtag(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont)
in.hash_value = PDC_get_hash_by_name(cont_prop->cont_info_pub->name);
}

// TODO: delete this line after debugging.
// printf("==CLIENT[%d]: PDC_add_kvtag::in.obj_id = %llu \n ", pdc_client_mpi_rank_g, in.obj_id);

server_id = PDC_get_server_by_obj_id(meta_id, pdc_server_num_g);
Expand All @@ -7366,7 +7373,6 @@ PDC_add_kvtag(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont)
&metadata_add_kvtag_handle);

// Fill input structure

if (kvtag != NULL && kvtag != NULL && kvtag->size != 0) {
in.kvtag.name = kvtag->name;
in.kvtag.value = kvtag->value;
Expand Down Expand Up @@ -7750,6 +7756,13 @@ PDCtag_delete(pdcid_t obj_id, char *tag_name, int is_cont)
struct _pdc_client_lookup_args lookup_args;

FUNC_ENTER(NULL);
if (use_shm_meta_query_g) {
// Invalidate the cached metadata snapshot
if (deserializedBulki_g) {
BULKI_free(deserializedBulki_g, 1);
deserializedBulki_g = NULL;
}
}

if (is_cont) {
cont_prop = PDC_cont_get_info(obj_id);
Expand Down Expand Up @@ -9413,8 +9426,7 @@ PDC_Client_query_kvtag_mpi(const pdc_kvtag_t *kvtag, int *n_res, uint64_t **pdc_

MPI_Scatter(shm_sizes, 1, MPI_UINT64_T, &shm_size, 1, MPI_UINT64_T, 0, PDC_SAME_NODE_COMM_g);

printf("==PDC_CLIENT[%d]: recv server %d shm size %llu\n", pdc_client_mpi_rank_g, server_rank,
shm_size);
/* printf("==PDC_CLIENT[%d]: recv server %d shm size %llu\n", pdc_client_mpi_rank_g, server_rank, shm_size); */

// Open shared memory and map to data buf
snprintf(shm_name, 64, "meta_shm.%d.%d", server_rank, pdc_client_same_node_rank_g);
Expand All @@ -9441,30 +9453,30 @@ PDC_Client_query_kvtag_mpi(const pdc_kvtag_t *kvtag, int *n_res, uint64_t **pdc_
BULKI_KV_Pair * bulki_kv;
// Iterate and get query result
while (NULL != (bulki_kv = BULKI_KV_Pair_iterator_next(bulki_iter))) {
printf("key: [%s]\n", (char *)bulki_kv->key.data);
if (strcmp("_pdc_id", (char *)bulki_kv->key.data) == 0) {
pdc_id = *((uint64_t *)bulki_kv->value.data);
printf("value: %llu\n", pdc_id);
/* printf("key: [%s]\n", (char*)bulki_kv->key.data); */
if (strcmp("_pdc_id", (char*)bulki_kv->key.data) == 0) {
pdc_id = *((uint64_t*)bulki_kv->value.data);
/* printf("value: %llu\n", pdc_id); */
}
else {
query_tag.name = (char *)bulki_kv->key.data;
query_tag.value = (void *)bulki_kv->value.data;
/* it->bulki->data->values[it->current_idx]; */
query_tag.type = bulki_iter->bulki->data->values[bulki_iter->current_idx - 1].pdc_type;
query_tag.size = bulki_iter->bulki->data->values[bulki_iter->current_idx - 1].size;
/* printf("value: %d\n", *((int*)bulki_kv->value.data)); */
if (PDC_is_matching_kvtag(kvtag, &query_tag) == TRUE) {
if (iter >= alloc_size) {
alloc_size *= 2;
*pdc_ids = (void *)realloc(*pdc_ids, alloc_size * sizeof(uint64_t));
query_tag.type = bulki_iter->bulki->data->values[bulki_iter->current_idx-1].pdc_type;
if (query_tag.type == kvtag->type) {
query_tag.name = (char*)bulki_kv->key.data;
query_tag.value = (void*)bulki_kv->value.data;
query_tag.size = bulki_iter->bulki->data->values[bulki_iter->current_idx-1].size;
/* printf("value: %d\n", *((int*)bulki_kv->value.data)); */
if (PDC_is_matching_kvtag(kvtag, &query_tag) == TRUE) {
if (iter >= alloc_size) {
alloc_size *= 2;
*pdc_ids = (void *)realloc(*pdc_ids, alloc_size * sizeof(uint64_t));
}
(*pdc_ids)[iter++] = pdc_id;
/* printf("Found match %s:%d\n", query_tag.name, *(int*)query_tag.value); */
}
(*pdc_ids)[iter++] = pdc_id;
printf("Found match %s:%d\n", query_tag.name, *(int *)query_tag.value);
}
}
}
} // End if same type
} // End else
} // End while
*n_res = iter;
/* BULKI_free(deserializedBulki_g, 1); */
}
else {
ret_value = PDC_Client_query_kvtag_col(kvtag, n_res, pdc_ids, &query_sent);
Expand Down
22 changes: 15 additions & 7 deletions src/commons/serde/bulki/bulki.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "bulki.h"
#include "bulki_vle_util.h"

static int print_debug_g = 0;

size_t
get_BULKI_Entity_size(BULKI_Entity *bulk_entity)
{
Expand Down Expand Up @@ -575,30 +577,36 @@ BULKI_Entity_free(BULKI_Entity *bulk_entity, int free_struct)
for (size_t i = 0; i < bulk_entity->count; i++) {
BULKI_free(&bulki_array[i], 0);
}
printf("Freeing bulki_array 1\n");
if (print_debug_g)
printf("Freeing bulki_array 1\n");
bulki_array = NULL;
}
else if (bulk_entity->pdc_type == PDC_BULKI_ENT && bulk_entity->data != NULL) {
BULKI_Entity *bulki_entity_array = (BULKI_Entity *)bulk_entity->data;
for (size_t i = 0; i < bulk_entity->count; i++) {
BULKI_Entity_free(&bulki_entity_array[i], 0);
}
printf("Freeing bulki_array 2\n");
if (print_debug_g)
printf("Freeing bulki_array 2\n");
bulki_entity_array = NULL;
}
}
else if (bulk_entity->pdc_class == PDC_CLS_ITEM) {
if (bulk_entity->pdc_type == PDC_BULKI && bulk_entity->data != NULL) {
BULKI_free((BULKI *)bulk_entity->data, 0);
bulk_entity->data = NULL;
printf("Freeing bulki_item 1\n");
if (print_debug_g)
printf("Freeing bulki_item 1\n");
}
}
printf("Freeing bulk_entity\n");
if (print_debug_g)
printf("Freeing bulk_entity\n");
if (bulk_entity->data != NULL) {
printf("bulki_entity->class: %d, bulki_entity->class: %d, bulki_entity->data: %p, bulki_entity: "
"%p\n",
bulk_entity->pdc_class, bulk_entity->pdc_type, bulk_entity->data, bulk_entity);
if (print_debug_g) {
printf("bulki_entity->class: %d, bulki_entity->class: %d, bulki_entity->data: %p, "
"bulki_entity: %p\n",
bulk_entity->pdc_class, bulk_entity->pdc_type, bulk_entity->data, bulk_entity);
}
free(bulk_entity->data);
bulk_entity->data = NULL;
}
Expand Down
3 changes: 2 additions & 1 deletion src/server/pdc_server_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -1842,7 +1842,8 @@ PDC_Server_create_shm(char *shm_name, uint64_t size)
int shm_fd = -1;
void *buf;

printf("==PDC_SERVER[%d]: create shm [%s], size %llu!\n", pdc_server_rank_g, shm_name, size);
/* printf("==PDC_SERVER[%d]: create shm [%s], size %llu!\n", */
/* pdc_server_rank_g, shm_name, size); */

remove(shm_name);

Expand Down
4 changes: 2 additions & 2 deletions src/tests/kvtag_query.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ main(int argc, char *argv[])
}
}
else {
println("Rank %d: [%s] [%d], len %d\n", my_rank, kvtag.name, v, kvtag.size);
/* println("Rank %d: [%s] [%d], len %d\n", my_rank, kvtag.name, v, kvtag.size); */
if (PDCobj_put_tag(obj_ids[i], kvtag.name, kvtag.value, kvtag.type, kvtag.size) < 0) {
printf("fail to add a kvtag to o%d\n", i + my_obj_s);
}
Expand Down Expand Up @@ -220,7 +220,7 @@ main(int argc, char *argv[])
total_time = MPI_Wtime() - stime;

if (my_rank == 0)
println("Total time to query %d tags: %.5f", nres, total_time);
println("Total time to query %d tags: %.5f s", nres, total_time);

// close a container
if (PDCcont_close(cont) < 0)
Expand Down

0 comments on commit 69576fd

Please sign in to comment.