Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FL-3933] Pipe #3996

Merged
merged 27 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
bf7fefc
feat: FuriThread stdin
portasynthinca3 Oct 31, 2024
a9d3264
Merge branch 'dev' into portasynthinca3/3927-thread-stdin
portasynthinca3 Oct 31, 2024
4c59243
ci: fix f18
portasynthinca3 Oct 31, 2024
9f2e934
Merge branch 'dev' into portasynthinca3/3927-thread-stdin
portasynthinca3 Nov 5, 2024
f1eb60b
feat: stdio callback context
portasynthinca3 Nov 7, 2024
da187fe
Merge branch 'dev' into portasynthinca3/3927-thread-stdin
portasynthinca3 Nov 7, 2024
a4a7a41
feat: FuriPipe
portasynthinca3 Nov 7, 2024
004e3a6
POTENTIALLY EXPLOSIVE pipe welding
portasynthinca3 Nov 8, 2024
920a3e9
fix: non-explosive welding
portasynthinca3 Nov 11, 2024
f792fde
Revert welding
portasynthinca3 Nov 12, 2024
1c9dbb3
docs: furi_pipe
portasynthinca3 Nov 12, 2024
bd28387
feat: pipe event loop integration
portasynthinca3 Nov 12, 2024
acf307d
update f18 sdk
portasynthinca3 Nov 12, 2024
2bcc3c4
f18
portasynthinca3 Nov 12, 2024
af7ff81
docs: make doxygen happy
portasynthinca3 Nov 12, 2024
857ad33
fix: event loop not triggering when pipe attached to stdio
portasynthinca3 Nov 13, 2024
bc9223a
fix: partial stdout in pipe
portasynthinca3 Nov 13, 2024
dac9c00
allow simultaneous in and out subscription in event loop
portasynthinca3 Nov 13, 2024
84bf18f
Merge branch 'dev' into portasynthinca3/3933-pipe
portasynthinca3 Dec 19, 2024
da5c620
Merge remote-tracking branch 'origin/dev' into portasynthinca3/3933-pipe
portasynthinca3 Dec 20, 2024
5e3f074
refactor: move pipe out of furi and decouple from event loop
portasynthinca3 Dec 20, 2024
5c00891
chore: api versioning
portasynthinca3 Dec 20, 2024
00b7a85
Merge branch 'dev' into portasynthinca3/3933-pipe
skotopes Dec 23, 2024
6d809c2
Bump api versions
skotopes Dec 23, 2024
8b74023
refactor: rename pipe_set_pipe_broken_callback
portasynthinca3 Dec 23, 2024
19e0e77
Merge branch 'portasynthinca3/3933-pipe' of https://github.com/flippe…
portasynthinca3 Dec 23, 2024
5deee10
Toolbox: add missing pragma once
skotopes Dec 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions applications/debug/unit_tests/application.fam
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,11 @@ App(
entry_point="get_api",
requires=["unit_tests"],
)

App(
appid="test_pipe",
sources=["tests/common/*.c", "tests/pipe/*.c"],
apptype=FlipperAppType.PLUGIN,
entry_point="get_api",
requires=["unit_tests"],
)
153 changes: 153 additions & 0 deletions applications/debug/unit_tests/tests/pipe/pipe_test.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#include "../test.h" // IWYU pragma: keep

#include <furi.h>
#include <lib/toolbox/pipe.h>

#define PIPE_SIZE 128U
#define PIPE_TRG_LEVEL 1U

MU_TEST(pipe_test_trivial) {
PipeSideBundle bundle = pipe_alloc(PIPE_SIZE, PIPE_TRG_LEVEL);
PipeSide* alice = bundle.alices_side;
PipeSide* bob = bundle.bobs_side;

mu_assert_int_eq(PipeRoleAlice, pipe_role(alice));
mu_assert_int_eq(PipeRoleBob, pipe_role(bob));
mu_assert_int_eq(PipeStateOpen, pipe_state(alice));
mu_assert_int_eq(PipeStateOpen, pipe_state(bob));

mu_assert_int_eq(PIPE_SIZE, pipe_spaces_available(alice));
mu_assert_int_eq(PIPE_SIZE, pipe_spaces_available(bob));
mu_assert_int_eq(0, pipe_bytes_available(alice));
mu_assert_int_eq(0, pipe_bytes_available(bob));

for(uint8_t i = 0;; ++i) {
mu_assert_int_eq(PIPE_SIZE - i, pipe_spaces_available(alice));
mu_assert_int_eq(i, pipe_bytes_available(bob));

if(pipe_send(alice, &i, sizeof(uint8_t), 0) != sizeof(uint8_t)) {
break;
}

mu_assert_int_eq(PIPE_SIZE - i, pipe_spaces_available(bob));
mu_assert_int_eq(i, pipe_bytes_available(alice));

if(pipe_send(bob, &i, sizeof(uint8_t), 0) != sizeof(uint8_t)) {
break;
}
}

pipe_free(alice);
mu_assert_int_eq(PipeStateBroken, pipe_state(bob));

for(uint8_t i = 0;; ++i) {
mu_assert_int_eq(PIPE_SIZE - i, pipe_bytes_available(bob));

uint8_t value;
if(pipe_receive(bob, &value, sizeof(uint8_t), 0) != sizeof(uint8_t)) {
break;
}

mu_assert_int_eq(i, value);
}

pipe_free(bob);
}

typedef enum {
TestFlagDataArrived = 1 << 0,
TestFlagSpaceFreed = 1 << 1,
TestFlagBecameBroken = 1 << 2,
} TestFlag;

typedef struct {
TestFlag flag;
FuriEventLoop* event_loop;
} AncillaryThreadContext;

static void on_data_arrived(PipeSide* pipe, void* context) {
AncillaryThreadContext* ctx = context;
ctx->flag |= TestFlagDataArrived;
uint8_t buffer[PIPE_SIZE];
size_t size = pipe_receive(pipe, buffer, sizeof(buffer), 0);
pipe_send(pipe, buffer, size, 0);
}

static void on_space_freed(PipeSide* pipe, void* context) {
AncillaryThreadContext* ctx = context;
ctx->flag |= TestFlagSpaceFreed;
const char* message = "Hi!";
pipe_send(pipe, message, strlen(message), 0);
}

static void on_became_broken(PipeSide* pipe, void* context) {
UNUSED(pipe);
AncillaryThreadContext* ctx = context;
ctx->flag |= TestFlagBecameBroken;
furi_event_loop_stop(ctx->event_loop);
}

static int32_t ancillary_thread(void* context) {
PipeSide* pipe = context;
AncillaryThreadContext thread_ctx = {
.flag = 0,
.event_loop = furi_event_loop_alloc(),
};

pipe_attach_to_event_loop(pipe, thread_ctx.event_loop);
pipe_set_callback_context(pipe, &thread_ctx);
pipe_set_data_arrived_callback(pipe, on_data_arrived, 0);
pipe_set_space_freed_callback(pipe, on_space_freed, FuriEventLoopEventFlagEdge);
pipe_set_broken_callback(pipe, on_became_broken, 0);

furi_event_loop_run(thread_ctx.event_loop);

pipe_detach_from_event_loop(pipe);
pipe_free(pipe);
furi_event_loop_free(thread_ctx.event_loop);
return thread_ctx.flag;
}

MU_TEST(pipe_test_event_loop) {
PipeSideBundle bundle = pipe_alloc(PIPE_SIZE, PIPE_TRG_LEVEL);
PipeSide* alice = bundle.alices_side;
PipeSide* bob = bundle.bobs_side;

FuriThread* thread = furi_thread_alloc_ex("PipeTestAnc", 2048, ancillary_thread, bob);
furi_thread_start(thread);

const char* message = "Hello!";
pipe_send(alice, message, strlen(message), FuriWaitForever);

char buffer_1[16];
size_t size = pipe_receive(alice, buffer_1, sizeof(buffer_1), FuriWaitForever);
buffer_1[size] = 0;

char buffer_2[16];
const char* expected_reply = "Hi!";
size = pipe_receive(alice, buffer_2, sizeof(buffer_2), FuriWaitForever);
buffer_2[size] = 0;

pipe_free(alice);
furi_thread_join(thread);

mu_assert_string_eq(message, buffer_1);
mu_assert_string_eq(expected_reply, buffer_2);
mu_assert_int_eq(
TestFlagDataArrived | TestFlagSpaceFreed | TestFlagBecameBroken,
furi_thread_get_return_code(thread));

furi_thread_free(thread);
}

MU_TEST_SUITE(test_pipe) {
MU_RUN_TEST(pipe_test_trivial);
MU_RUN_TEST(pipe_test_event_loop);
}

int run_minunit_test_pipe(void) {
MU_RUN_SUITE(test_pipe);
return MU_EXIT_CODE;
}

TEST_API_DEFINE(run_minunit_test_pipe)
5 changes: 5 additions & 0 deletions furi/core/stream_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ bool furi_stream_set_trigger_level(FuriStreamBuffer* stream_buffer, size_t trigg
pdTRUE;
}

size_t furi_stream_get_trigger_level(FuriStreamBuffer* stream_buffer) {
furi_check(stream_buffer);
return ((StaticStreamBuffer_t*)stream_buffer)->xTriggerLevelBytes;
}

size_t furi_stream_buffer_send(
FuriStreamBuffer* stream_buffer,
const void* data,
Expand Down
11 changes: 11 additions & 0 deletions furi/core/stream_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ void furi_stream_buffer_free(FuriStreamBuffer* stream_buffer);
*/
bool furi_stream_set_trigger_level(FuriStreamBuffer* stream_buffer, size_t trigger_level);

/**
* @brief Get trigger level for stream buffer.
* A stream buffer's trigger level is the number of bytes that must be in the
* stream buffer before a task that is blocked on the stream buffer to
* wait for data is moved out of the blocked state.
*
* @param stream_buffer The stream buffer instance
* @return The trigger level for the stream buffer
*/
size_t furi_stream_get_trigger_level(FuriStreamBuffer* stream_buffer);

/**
* @brief Sends bytes to a stream buffer. The bytes are copied into the stream buffer.
* Wakes up task waiting for data to become available if called from ISR.
Expand Down
1 change: 1 addition & 0 deletions lib/toolbox/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ env.Append(
File("stream/string_stream.h"),
File("stream/buffered_file_stream.h"),
File("strint.h"),
File("pipe.h"),
File("protocols/protocol_dict.h"),
File("pretty_format.h"),
File("hex.h"),
Expand Down
Loading
Loading