Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/1.1.0 #785

Merged
merged 21 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
f8df508
Improve throughput performance (trigger local subscriber focus) (#730)
jean-roland Oct 11, 2024
4bd2ca2
Improve throughput performance #2 (refcount focus) (#737)
jean-roland Oct 14, 2024
3fea89c
Improve throughput performance #3 (rx buffer focus) (#749)
jean-roland Oct 21, 2024
b6a4599
Improve throughput performance #4 (rx defragmentation focus) (#754)
jean-roland Oct 22, 2024
ae30a60
Add manual batching mechanism (#758)
jean-roland Oct 24, 2024
c855c13
feat: switch multicast address to stack (#762)
jean-roland Oct 25, 2024
232c445
Add local query timeout (#763)
jean-roland Oct 25, 2024
5dd1fcd
Add peer tcp unicast (#764)
jean-roland Oct 28, 2024
7388049
Batching v2 (#765)
jean-roland Oct 28, 2024
de430cb
Improve sub frame decode performance (#777)
jean-roland Nov 8, 2024
0f1ce95
Improve query/reply perf (#781)
jean-roland Nov 13, 2024
1929633
Rebase 1.1 on main (#784)
jean-roland Nov 14, 2024
d427b0a
Merge liveliness in 1.1 (#786)
jean-roland Nov 18, 2024
eb1e7cf
Add corrections on 1.1 (#791)
jean-roland Nov 19, 2024
5071eda
fix: prevent code duplication during merge
jean-roland Nov 19, 2024
b6993fa
Merge branch 'main' into fix_merge_main
jean-roland Nov 19, 2024
11d2edf
Merge main in 1.1 (#793)
jean-roland Nov 20, 2024
c060a06
Merge pull request #796 from jean-roland/fix_merge_main
milyin Nov 20, 2024
ee4c5e8
Merge pull request #797 from eclipse-zenoh/dev/1.1.0-merge-main
milyin Nov 20, 2024
9e35c5f
Fix merge issue
sashacmc Nov 20, 2024
6eb0b42
Merge pull request #798 from ZettaScaleLabs/fix_merge_issue
milyin Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ set(Z_FEATURE_MULTICAST_TRANSPORT 1 CACHE STRING "Toggle multicast transport")
set(Z_FEATURE_UNICAST_TRANSPORT 1 CACHE STRING "Toggle unicast transport")
set(Z_FEATURE_RAWETH_TRANSPORT 0 CACHE STRING "Toggle raw ethernet transport")
set(Z_FEATURE_TCP_NODELAY 1 CACHE STRING "Toggle TCP_NODELAY")
set(Z_FEATURE_LOCAL_SUBSCRIBER 0 CACHE STRING "Toggle local subscriptions")
set(Z_FEATURE_PUBLISHER_SESSION_CHECK 1 CACHE STRING "Toggle publisher session check")
set(Z_FEATURE_BATCHING 1 CACHE STRING "Toggle batching")
set(Z_FEATURE_RX_CACHE 0 CACHE STRING "Toggle RX_CACHE")

# Add a warning message if someone tries to enable Z_FEATURE_LIVELINESS directly
if(Z_FEATURE_LIVELINESS AND NOT Z_FEATURE_UNSTABLE_API)
Expand Down
2 changes: 1 addition & 1 deletion GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Z_FEATURE_RAWETH_TRANSPORT?=0
# Buffer sizes
FRAG_MAX_SIZE?=300000
BATCH_UNICAST_SIZE?=65535
BATCH_MULTICAST_SIZE?=8096
BATCH_MULTICAST_SIZE?=8192

# zenoh-pico/ directory
ROOT_DIR:=$(shell dirname $(realpath $(firstword $(MAKEFILE_LIST))))
Expand Down
9 changes: 4 additions & 5 deletions examples/unix/c11/z_get_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,11 @@ void reply_handler(z_loaned_reply_t *reply, void *ctx) {

// Check attachment
const z_loaned_bytes_t *attachment = z_sample_attachment(sample);
if (attachment == NULL) {
return;
}
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
size_t attachment_len;
ze_deserializer_deserialize_sequence_length(&deserializer, &attachment_len);
if (ze_deserializer_deserialize_sequence_length(&deserializer, &attachment_len) < 0) {
return;
}
kv_pair_t *kvp = (kv_pair_t *)malloc(sizeof(kv_pair_t) * attachment_len);
for (size_t i = 0; i < attachment_len; ++i) {
ze_deserializer_deserialize_string(&deserializer, &kvp[i].key);
Expand Down Expand Up @@ -178,7 +177,7 @@ int main(int argc, char **argv) {

ze_owned_serializer_t serializer;
ze_serializer_empty(&serializer);
ze_serializer_serialize_sequence_length(z_loan_mut(serializer), 2);
ze_serializer_serialize_sequence_length(z_loan_mut(serializer), 1);
for (size_t i = 0; i < 1; ++i) {
ze_serializer_serialize_string(z_loan_mut(serializer), z_loan(kvs[i].key));
ze_serializer_serialize_string(z_loan_mut(serializer), z_loan(kvs[i].value));
Expand Down
7 changes: 3 additions & 4 deletions examples/unix/c11/z_queryable_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,9 @@ void query_handler(z_loaned_query_t *query, void *ctx) {

// Check attachment
const z_loaned_bytes_t *attachment = z_query_attachment(query);
if (attachment != NULL) {
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
size_t attachment_len;
ze_deserializer_deserialize_sequence_length(&deserializer, &attachment_len);
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
size_t attachment_len;
if (ze_deserializer_deserialize_sequence_length(&deserializer, &attachment_len) == Z_OK) {
kv_pair_t *kvp = (kv_pair_t *)malloc(sizeof(kv_pair_t) * attachment_len);
for (size_t i = 0; i < attachment_len; ++i) {
ze_deserializer_deserialize_string(&deserializer, &kvp[i].key);
Expand Down
8 changes: 3 additions & 5 deletions examples/unix/c11/z_sub_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,10 @@ void data_handler(z_loaned_sample_t *sample, void *ctx) {
printf(" with timestamp: %" PRIu64 "\n", z_timestamp_ntp64_time(ts));
}
// Check attachment

const z_loaned_bytes_t *attachment = z_sample_attachment(sample);
if (attachment != NULL) {
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
size_t attachment_len;
ze_deserializer_deserialize_sequence_length(&deserializer, &attachment_len);
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
size_t attachment_len;
if (ze_deserializer_deserialize_sequence_length(&deserializer, &attachment_len) == Z_OK) {
kv_pair_t *kvp = (kv_pair_t *)malloc(sizeof(kv_pair_t) * attachment_len);
for (size_t i = 0; i < attachment_len; ++i) {
ze_deserializer_deserialize_string(&deserializer, &kvp[i].key);
Expand Down
39 changes: 39 additions & 0 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -2060,6 +2060,45 @@ z_result_t z_declare_background_subscriber(const z_loaned_session_t *zs, const z
const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *subscriber);
#endif

#ifdef Z_FEATURE_UNSTABLE_API
#if Z_FEATURE_BATCHING == 1
/**
* Activate the batching mechanism, any message that would have been sent on the network by a subsequent api call (e.g
* z_put, z_get) will be instead stored until the batch is full, flushed with :c:func:`zp_batch_flush` or batching is
* stopped with :c:func:`zp_batch_stop`.
*
* Parameters:
* zs: Pointer to a :c:type:`z_loaned_session_t` that will start batching messages.
*
* Return:
* ``0`` if batching started, ``negative value`` otherwise.
*/
z_result_t zp_batch_start(const z_loaned_session_t *zs);

/**
* Send the currently batched messages on the network.
*
* Parameters:
* zs: Pointer to a :c:type:`z_loaned_session_t` that will send its batched messages.
*
* Return:
* ``0`` if batch successfully sent, ``negative value`` otherwise.
*/
z_result_t zp_batch_flush(const z_loaned_session_t *zs);

/**
* Deactivate the batching mechanism and send the currently batched on the network.
*
* Parameters:
* zs: Pointer to a :c:type:`z_loaned_session_t` that will stop batching messages.
*
* Return:
* ``0`` if batching stopped and batch successfully sent, ``negative value`` otherwise.
*/
z_result_t zp_batch_stop(const z_loaned_session_t *zs);
#endif
#endif

/************* Multi Thread Tasks helpers **************/
/**
* Builds a :c:type:`zp_task_read_options_t` with default value.
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ _Z_OWNED_TYPE_VALUE(_z_queryable_t, queryable)
/**
* Represents a Zenoh Query entity, received by Zenoh Queryable entities.
*/
_Z_OWNED_TYPE_RC(_z_query_rc_t, query)
_Z_OWNED_TYPE_VALUE(_z_query_t, query)

/**
* Represents the encoding of a payload, in a MIME-like format.
Expand Down
13 changes: 7 additions & 6 deletions include/zenoh-pico/collections/arc_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,30 @@
extern "C" {
#endif

_Z_REFCOUNT_DEFINE(_z_slice, _z_slice)
_Z_SIMPLE_REFCOUNT_DEFINE(_z_slice, _z_slice)

/*-------- ArcSlice --------*/
/**
* An atomically reference counted subslice.
*
* Members:
* _z_slice_rc_t len: Rc counted slice.
* _z_slice_simple_rc_t len: Rc counted slice.
* size_t start: Offset to the subslice start.
* size_t len: Length of the subslice.
*/

typedef struct {
_z_slice_rc_t slice;
_z_slice_simple_rc_t slice;
size_t start;
size_t len;
} _z_arc_slice_t;

_z_arc_slice_t _z_arc_slice_empty(void);
static inline _z_arc_slice_t _z_arc_slice_empty(void) { return (_z_arc_slice_t){0}; }
static inline size_t _z_arc_slice_len(const _z_arc_slice_t* s) { return s->len; }
static inline bool _z_arc_slice_is_empty(const _z_arc_slice_t* s) { return _z_arc_slice_len(s) == 0; }
_z_arc_slice_t _z_arc_slice_wrap(_z_slice_t s, size_t offset, size_t len);
_z_arc_slice_t _z_arc_slice_wrap_slice_rc(_z_slice_simple_rc_t* slice_rc, size_t offset, size_t len);
_z_arc_slice_t _z_arc_slice_get_subslice(const _z_arc_slice_t* s, size_t offset, size_t len);
size_t _z_arc_slice_len(const _z_arc_slice_t* s);
bool _z_arc_slice_is_empty(const _z_arc_slice_t* s);
const uint8_t* _z_arc_slice_data(const _z_arc_slice_t* s);
z_result_t _z_arc_slice_copy(_z_arc_slice_t* dst, const _z_arc_slice_t* src);
z_result_t _z_arc_slice_move(_z_arc_slice_t* dst, _z_arc_slice_t* src);
Expand Down
12 changes: 7 additions & 5 deletions include/zenoh-pico/collections/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ inline size_t _z_arc_slice_size(const _z_arc_slice_t *s) {
(void)s;
return sizeof(_z_arc_slice_t);
}
static inline void _z_arc_slice_elem_move(void *dst, void *src) {
_z_arc_slice_move((_z_arc_slice_t *)dst, (_z_arc_slice_t *)src);
}
_Z_ELEM_DEFINE(_z_arc_slice, _z_arc_slice_t, _z_arc_slice_size, _z_arc_slice_drop, _z_arc_slice_copy)
_Z_ELEM_DEFINE(_z_arc_slice, _z_arc_slice_t, _z_arc_slice_size, _z_arc_slice_drop, _z_arc_slice_copy, _z_arc_slice_move)
_Z_SVEC_DEFINE(_z_arc_slice, _z_arc_slice_t)

/*-------- Bytes --------*/
Expand All @@ -49,14 +46,19 @@ typedef struct {
_z_arc_slice_svec_t _slices;
} _z_bytes_t;

// Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes.
static inline _z_bytes_t _z_bytes_null(void) { return (_z_bytes_t){0}; }
static inline void _z_bytes_alias_arc_slice(_z_bytes_t *dst, _z_arc_slice_t *s) {
dst->_slices = _z_arc_slice_svec_alias_element(s);
}
bool _z_bytes_check(const _z_bytes_t *bytes);
_z_bytes_t _z_bytes_null(void);
z_result_t _z_bytes_append_bytes(_z_bytes_t *dst, _z_bytes_t *src);
z_result_t _z_bytes_append_slice(_z_bytes_t *dst, _z_arc_slice_t *s);
z_result_t _z_bytes_copy(_z_bytes_t *dst, const _z_bytes_t *src);
_z_bytes_t _z_bytes_duplicate(const _z_bytes_t *src);
void _z_bytes_move(_z_bytes_t *dst, _z_bytes_t *src);
void _z_bytes_drop(_z_bytes_t *bytes);
void _z_bytes_aliased_drop(_z_bytes_t *bytes);
void _z_bytes_free(_z_bytes_t **bs);
size_t _z_bytes_num_slices(const _z_bytes_t *bs);
_z_arc_slice_t *_z_bytes_get_slice(const _z_bytes_t *bs, size_t i);
Expand Down
5 changes: 3 additions & 2 deletions include/zenoh-pico/collections/element.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ typedef void (*z_element_move_f)(void *dst, void *src);
typedef void *(*z_element_clone_f)(const void *e);
typedef bool (*z_element_eq_f)(const void *left, const void *right);

#define _Z_ELEM_DEFINE(name, type, elem_size_f, elem_clear_f, elem_copy_f) \
#define _Z_ELEM_DEFINE(name, type, elem_size_f, elem_clear_f, elem_copy_f, elem_move_f) \
typedef bool (*name##_eq_f)(const type *left, const type *right); \
static inline void name##_elem_clear(void *e) { elem_clear_f((type *)e); } \
static inline void name##_elem_free(void **e) { \
Expand All @@ -45,6 +45,7 @@ typedef bool (*z_element_eq_f)(const void *left, const void *right);
*e = NULL; \
} \
} \
static inline void name##_elem_move(void *dst, void *src) { elem_move_f((type *)dst, (type *)src); } \
static inline void name##_elem_copy(void *dst, const void *src) { elem_copy_f((type *)dst, (type *)src); } \
static inline void *name##_elem_clone(const void *src) { \
type *dst = (type *)z_malloc(elem_size_f((type *)src)); \
Expand Down Expand Up @@ -76,7 +77,7 @@ static inline void _z_noop_move(void *dst, void *src) {
_ZP_UNUSED(src);
}

_Z_ELEM_DEFINE(_z_noop, _z_noop_t, _z_noop_size, _z_noop_clear, _z_noop_copy)
_Z_ELEM_DEFINE(_z_noop, _z_noop_t, _z_noop_size, _z_noop_clear, _z_noop_copy, _z_noop_move)

#ifdef __cplusplus
}
Expand Down
Loading
Loading