Skip to content

Commit

Permalink
Add query/reply attachment (#403)
Browse files Browse the repository at this point in the history
* fix: add and call function to drop encoded attachments

* feat: add n arg for z_queryable example

* feat: add attachment to queries

* feat: add query attachment to examples

* feat: add attachment to query replies

* feat: add attachment to query reply examples

* fix: compile error when attachment deactivated

* test: add attachment support in modularity test

* build: add a ci stage for no attachment case

* feat: drop sample attachment with z_sample_drop

* fix: init dst in z_bytes_copy

* fix: allocate attachment z_bytes to avoid going out of scope

* chore: auto format python script

* fix: remove unused function

* feat: add query attachment accessor

* fix: free attachment when query is dropped

* fix: drop sample attachment automatically

* fix: set attachment_drop as a private function

* fix: replace private function _z_bytes_wrap

* fix: compilation error when attachment is off
  • Loading branch information
jean-roland authored Apr 29, 2024
1 parent 4893219 commit 2a767bc
Show file tree
Hide file tree
Showing 22 changed files with 318 additions and 117 deletions.
37 changes: 36 additions & 1 deletion .github/workflows/build-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ jobs:
run: |
sudo apt install -y ninja-build
CMAKE_GENERATOR=Ninja make
python3 ./build/tests/modularity.py --pub $Z_FEATURE_PUBLICATION --sub $Z_FEATURE_SUBSCRIPTION --queryable $Z_FEATURE_QUERYABLE --query $Z_FEATURE_QUERY
python3 ./build/tests/modularity.py --pub $Z_FEATURE_PUBLICATION --sub $Z_FEATURE_SUBSCRIPTION --queryable $Z_FEATURE_QUERYABLE --query $Z_FEATURE_QUERY --attachment 1
timeout-minutes: 5
env:
Z_FEATURE_PUBLICATION: ${{ matrix.feature_publication }}
Expand Down Expand Up @@ -202,3 +202,38 @@ jobs:
- name: Kill Zenoh router
if: always()
run: kill ${{ steps.run-zenoh.outputs.zenohd-pid }}

no_attachment_test:
needs: zenoh_build
name: Build and test without attachment on ubuntu-latest
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Download Zenoh artifacts
uses: actions/download-artifact@v4
with:
name: ${{ needs.zenoh_build.outputs.artifact-name }}

- name: Unzip Zenoh artifacts
run: unzip ${{ needs.zenoh_build.outputs.artifact-name }} -d zenoh-standalone

- id: run-zenoh
name: Run Zenoh router
run: |
RUST_LOG=debug ./zenoh-standalone/zenohd &
echo "zenohd-pid=$!" >> $GITHUB_OUTPUT
- name: Build project and run test
run: |
sudo apt install -y ninja-build
CMAKE_GENERATOR=Ninja make
python3 ./build/tests/modularity.py --pub 1 --sub 1 --queryable 1 --query 1 --attachment 0
timeout-minutes: 5
env:
Z_FEATURE_ATTACHMENT: 0

- name: Kill Zenoh router
if: always()
run: kill ${{ steps.run-zenoh.outputs.zenohd-pid }}
23 changes: 23 additions & 0 deletions examples/unix/c11/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,26 @@ void reply_dropper(void *ctx) {
z_condvar_free(&cond);
}

#if Z_FEATURE_ATTACHMENT == 1
int8_t attachment_handler(z_bytes_t key, z_bytes_t att_value, void *ctx) {
(void)ctx;
printf(">>> %.*s: %.*s\n", (int)key.len, key.start, (int)att_value.len, att_value.start);
return 0;
}
#endif

void reply_handler(z_owned_reply_t *reply, void *ctx) {
(void)(ctx);
if (z_reply_is_ok(reply)) {
z_sample_t sample = z_reply_ok(reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
#if Z_FEATURE_ATTACHMENT == 1
if (z_attachment_check(&sample.attachment)) {
printf("Attachement found\n");
z_attachment_iterate(sample.attachment, attachment_handler, NULL);
}
#endif
z_drop(z_move(keystr));
} else {
printf(">> Received an error\n");
Expand Down Expand Up @@ -116,6 +130,11 @@ int main(int argc, char **argv) {
if (value != NULL) {
opts.value.payload = _z_bytes_wrap((const uint8_t *)value, strlen(value));
}
#if Z_FEATURE_ATTACHMENT == 1
z_owned_bytes_map_t map = z_bytes_map_new();
z_bytes_map_insert_by_alias(&map, z_bytes_from_str("hi"), z_bytes_from_str("there"));
opts.attachment = z_bytes_map_as_attachment(&map);
#endif
z_owned_closure_reply_t callback = z_closure(reply_handler, reply_dropper);
if (z_get(z_loan(s), ke, "", z_move(callback), &opts) < 0) {
printf("Unable to send query.\n");
Expand All @@ -130,6 +149,10 @@ int main(int argc, char **argv) {

z_close(z_move(s));

#if Z_FEATURE_ATTACHMENT == 1
z_bytes_map_drop(&map);
#endif

return 0;
}
#else
Expand Down
2 changes: 1 addition & 1 deletion examples/unix/c11/z_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ int main(int argc, char **argv) {
options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL);
#if Z_FEATURE_ATTACHMENT == 1
z_owned_bytes_map_t map = z_bytes_map_new();
z_bytes_map_insert_by_alias(&map, _z_bytes_wrap((uint8_t *)"hi", 2), _z_bytes_wrap((uint8_t *)"there", 5));
z_bytes_map_insert_by_alias(&map, z_bytes_from_str("hi"), z_bytes_from_str("there"));
options.attachment = z_bytes_map_as_attachment(&map);
#endif
if (z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)value, strlen(value), &options) < 0) {
Expand Down
44 changes: 41 additions & 3 deletions examples/unix/c11/z_queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,60 @@
#if Z_FEATURE_QUERYABLE == 1
const char *keyexpr = "demo/example/zenoh-pico-queryable";
const char *value = "Queryable from Pico!";
static int msg_nb = 0;

#if Z_FEATURE_ATTACHMENT == 1
int8_t attachment_handler(z_bytes_t key, z_bytes_t att_value, void *ctx) {
(void)ctx;
printf(">>> %.*s: %.*s\n", (int)key.len, key.start, (int)att_value.len, att_value.start);
return 0;
}
#endif

void query_handler(const z_query_t *query, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query));
z_bytes_t pred = z_query_parameters(query);
z_value_t payload_value = z_query_value(query);
printf(" >> [Queryable handler] Received Query '%s?%.*s'\n", z_loan(keystr), (int)pred.len, pred.start);
printf(">> [Queryable handler] Received Query '%s?%.*s'\n", z_loan(keystr), (int)pred.len, pred.start);
if (payload_value.payload.len > 0) {
printf(" with value '%.*s'\n", (int)payload_value.payload.len, payload_value.payload.start);
}
#if Z_FEATURE_ATTACHMENT == 1
z_attachment_t attachment = z_query_attachment(query);
if (z_attachment_check(&attachment)) {
printf("Attachement found\n");
z_attachment_iterate(attachment, attachment_handler, NULL);
}
#endif

z_query_reply_options_t options = z_query_reply_options_default();
options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL);

#if Z_FEATURE_ATTACHMENT == 1
// Add attachment
z_owned_bytes_map_t map = z_bytes_map_new();
z_bytes_map_insert_by_alias(&map, z_bytes_from_str("hello"), z_bytes_from_str("world"));
options.attachment = z_bytes_map_as_attachment(&map);
#endif

z_query_reply(query, z_keyexpr(keyexpr), (const unsigned char *)value, strlen(value), &options);
z_drop(z_move(keystr));
msg_nb++;

#if Z_FEATURE_ATTACHMENT == 1
z_bytes_map_drop(&map);
#endif
}

int main(int argc, char **argv) {
const char *mode = "client";
char *clocator = NULL;
char *llocator = NULL;
int n = 0;

int opt;
while ((opt = getopt(argc, argv, "k:e:m:v:l:")) != -1) {
while ((opt = getopt(argc, argv, "k:e:m:v:l:n:")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
Expand All @@ -60,8 +91,12 @@ int main(int argc, char **argv) {
case 'v':
value = optarg;
break;
case 'n':
n = atoi(optarg);
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l') {
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l' ||
optopt == 'n') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
Expand Down Expand Up @@ -111,6 +146,9 @@ int main(int argc, char **argv) {

printf("Press CTRL-C to quit...\n");
while (1) {
if ((n != 0) && (msg_nb >= n)) {
break;
}
sleep(1);
}

Expand Down
18 changes: 16 additions & 2 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -519,13 +519,27 @@ z_bytes_t z_query_parameters(const z_query_t *query);
* Note: This API has been marked as unstable: it works as advertised, but we may change it in a future release.
*
* Parameters:
* query: Pointer to the query to get the value selector from.
* query: Pointer to the query to get the payload from.
*
* Returns:
* Returns the payload value wrapped as a :c:type:`z_value_t`, since payload value is a user-defined representation.
* Returns the payload wrapped as a :c:type:`z_value_t`, since payload value is a user-defined representation.
*/
z_value_t z_query_value(const z_query_t *query);

#if Z_FEATURE_ATTACHMENT == 1
/**
* Get a query's attachment value by aliasing it.
* Note: This API has been marked as unstable: it works as advertised, but we may change it in a future release.
*
* Parameters:
* query: Pointer to the query to get the attachment from.
*
* Returns:
* Returns the attachment wrapped as a :c:type:`z_attachment_t`, since attachment is a user-defined representation.
*/
z_attachment_t z_query_attachment(const z_query_t *query);
#endif

/**
* Get a query's key by aliasing it.
*
Expand Down
6 changes: 2 additions & 4 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,7 @@ typedef struct {
*/
typedef struct {
z_encoding_t encoding;
#if Z_FEATURE_ATTACHMENT == 1
// TODO:ATT z_attachment_t attachment;
#endif
z_attachment_t attachment;
} z_query_reply_options_t;

/**
Expand Down Expand Up @@ -337,7 +335,7 @@ typedef struct {
z_query_target_t target;
uint32_t timeout_ms;
#if Z_FEATURE_ATTACHMENT == 1
// TODO:ATT z_attachment_t attachment;
z_attachment_t attachment;
#endif
} z_get_options_t;

Expand Down
4 changes: 3 additions & 1 deletion include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,11 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle);
* query: The query to reply to. The caller keeps its ownership.
* key: The resource key of this reply. The caller keeps the ownership.
* payload: The value of this reply, the caller keeps ownership.
* kind: The type of operation.
* att: The optional attachment to the sample.
*/
int8_t _z_send_reply(const _z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload,
const z_sample_kind_t kind);
const z_sample_kind_t kind, z_attachment_t att);
#endif

#if Z_FEATURE_QUERY == 1
Expand Down
5 changes: 4 additions & 1 deletion include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ typedef struct _z_query_t {
_z_keyexpr_t _key;
uint32_t _request_id;
_z_session_t *_zn;
#if Z_FEATURE_ATTACHMENT == 1
z_attachment_t attachment;
#endif
char *_parameters;
_Bool _anyke;
} _z_query_t;
Expand All @@ -52,7 +55,7 @@ typedef struct {

#if Z_FEATURE_QUERYABLE == 1
_z_query_t _z_query_create(const _z_value_t *value, const _z_keyexpr_t *key, const _z_bytes_t *parameters,
_z_session_t *zn, uint32_t request_id);
_z_session_t *zn, uint32_t request_id, z_attachment_t att);
void _z_queryable_clear(_z_queryable_t *qbl);
void _z_queryable_free(_z_queryable_t **qbl);
#endif
Expand Down
23 changes: 15 additions & 8 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ typedef struct {
uint64_t time;
} _z_timestamp_t;

#if Z_FEATURE_ATTACHMENT == 1
/**
* The body of a loop over an attachment's key-value pairs.
*
Expand Down Expand Up @@ -110,24 +109,32 @@ typedef struct z_attachment_t {
z_attachment_iter_driver_t iteration_driver;
} z_attachment_t;

z_attachment_t z_attachment_null(void);
_Bool z_attachment_check(const z_attachment_t *attachment);
int8_t z_attachment_iterate(z_attachment_t this_, z_attachment_iter_body_t body, void *ctx);
_z_bytes_t z_attachment_get(z_attachment_t this_, _z_bytes_t key);

typedef struct {
union {
z_attachment_t decoded;
_z_bytes_t encoded;
} body;
_Bool is_encoded;
} _z_owned_encoded_attachment_t;

z_attachment_t z_attachment_null(void);
z_attachment_t _z_encoded_as_attachment(const _z_owned_encoded_attachment_t *att);

#if Z_FEATURE_ATTACHMENT == 1

_Bool z_attachment_check(const z_attachment_t *attachment);
int8_t z_attachment_iterate(z_attachment_t this_, z_attachment_iter_body_t body, void *ctx);
_z_bytes_t z_attachment_get(z_attachment_t this_, _z_bytes_t key);

/**
* Estimate the length of an attachment once encoded.
*/
size_t _z_attachment_estimate_length(z_attachment_t att);
z_attachment_t _z_encoded_as_attachment(const _z_owned_encoded_attachment_t *att);
void _z_encoded_attachment_drop(_z_owned_encoded_attachment_t *att);

/**
* Drop an attachment that was decoded from a received message
*/
void _z_attachment_drop(z_attachment_t *att);
#endif

_z_timestamp_t _z_timestamp_duplicate(const _z_timestamp_t *tstamp);
Expand Down
2 changes: 0 additions & 2 deletions include/zenoh-pico/protocol/definitions/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ typedef struct {
_z_source_info_t _ext_info;
_z_value_t _ext_value;
z_consolidation_mode_t _consolidation;
#if Z_FEATURE_ATTACHMENT == 1
_z_owned_encoded_attachment_t _ext_attachment;
#endif
} _z_msg_query_t;
typedef struct {
_Bool info;
Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/session/queryable.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ _z_session_queryable_rc_t *_z_get_session_queryable_by_id(_z_session_t *zn, cons
_z_session_queryable_rc_list_t *_z_get_session_queryable_by_key(_z_session_t *zn, const _z_keyexpr_t key);

_z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_session_queryable_t *q);
int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid);
int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid,
z_attachment_t att);
void _z_unregister_session_queryable(_z_session_t *zn, _z_session_queryable_rc_t *q);
void _z_flush_session_queryable(_z_session_t *zn);
#endif
Expand Down
14 changes: 10 additions & 4 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ z_bytes_t z_query_parameters(const z_query_t *query) {

z_value_t z_query_value(const z_query_t *query) { return query->_val._rc.in->val._value; }

#if Z_FEATURE_ATTACHMENT == 1
z_attachment_t z_query_attachment(const z_query_t *query) { return query->_val._rc.in->val.attachment; }
#endif

z_keyexpr_t z_query_keyexpr(const z_query_t *query) { return query->_val._rc.in->val._key; }

_Bool z_value_is_initialized(z_value_t *value) {
Expand Down Expand Up @@ -834,7 +838,7 @@ z_get_options_t z_get_options_default(void) {
.target = z_query_target_default(), .consolidation = z_query_consolidation_default(),
.value = {.encoding = z_encoding_default(), .payload = _z_bytes_empty()},
#if Z_FEATURE_ATTACHMENT == 1
// TODO:ATT.attachment = z_attachment_null()
.attachment = z_attachment_null(),
#endif
.timeout_ms = Z_GET_TIMEOUT_DEFAULT
};
Expand Down Expand Up @@ -864,6 +868,9 @@ int8_t z_get(z_session_t zs, z_keyexpr_t keyexpr, const char *parameters, z_owne
opt.consolidation = options->consolidation;
opt.target = options->target;
opt.value = options->value;
#if Z_FEATURE_ATTACHMENT == 1
opt.attachment = options->attachment;
#endif
}

if (opt.consolidation.mode == Z_CONSOLIDATION_MODE_AUTO) {
Expand All @@ -888,8 +895,7 @@ int8_t z_get(z_session_t zs, z_keyexpr_t keyexpr, const char *parameters, z_owne
__z_reply_handler, wrapped_ctx, callback->drop, ctx, opt.timeout_ms
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_null()
// TODO:ATT opt.attachment
opt.attachment
#endif
);
return ret;
Expand Down Expand Up @@ -969,7 +975,7 @@ int8_t z_query_reply(const z_query_t *query, const z_keyexpr_t keyexpr, const ui
.len = payload_len,
},
.encoding = {.id = opts.encoding.id, .schema = opts.encoding.schema}};
return _z_send_reply(&query->_val._rc.in->val, keyexpr, value, Z_SAMPLE_KIND_PUT);
return _z_send_reply(&query->_val._rc.in->val, keyexpr, value, Z_SAMPLE_KIND_PUT, opts.attachment);
return _Z_ERR_GENERIC;
}
#endif
Expand Down
Loading

0 comments on commit 2a767bc

Please sign in to comment.