Skip to content

Commit

Permalink
Implement basic getbyte, getc IOStream functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
noteflakes committed Aug 6, 2023
1 parent 55a64af commit 31e69df
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 31 deletions.
43 changes: 28 additions & 15 deletions ext/polyphony/buffers.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,19 @@ inline int bm_prep_buffer(buffer_descriptor **desc, enum buffer_type type, size_
return -1;
}

int bm_buffer_from_string(buffer_descriptor **desc, VALUE str)
{
(*desc) = malloc(sizeof(buffer_descriptor));
(*desc)->type = BT_STRING;
(*desc)->str = str;
(*desc)->ptr = RSTRING_PTR(str);
(*desc)->len = RSTRING_LEN(str);
(*desc)->capacity = rb_str_capacity(str);
(*desc)->prev = NULL;
(*desc)->next = NULL;
return 0;
}

int bm_dispose_managed(buffer_descriptor *desc) {
int power = normalized_power_of_two(desc->capacity);
int idx = FREE_LIST_IDX(power);
Expand Down Expand Up @@ -177,23 +190,23 @@ int bm_mark(void)
void Init_BufferManager(void)
{
memset(&bm, 0, sizeof(bm));
bm_trace();
// bm_trace();

bm_populate(0);
bm_populate(3);
bm_populate(6);
bm_trace();
// bm_populate(0);
// bm_populate(3);
// bm_populate(6);
// bm_trace();

buffer_descriptor *desc;
int ret = bm_prep_buffer(&desc, BT_MANAGED, 30000);
if (!ret)
printf("Got buffer: capacity: %d\n", desc->capacity);
else
rb_raise(rb_eRuntimeError, "Failed to get buffer");
bm_trace();
// buffer_descriptor *desc;
// int ret = bm_prep_buffer(&desc, BT_MANAGED, 30000);
// if (!ret)
// printf("Got buffer: capacity: %d\n", desc->capacity);
// else
// rb_raise(rb_eRuntimeError, "Failed to get buffer");
// bm_trace();

printf("Disposing buffer...\n");
bm_dispose(desc);
// printf("Disposing buffer...\n");
// bm_dispose(desc);

bm_trace();
// bm_trace();
}
5 changes: 3 additions & 2 deletions ext/polyphony/buffers.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ typedef struct buffer_descriptor {
};
};
char * ptr;
int len;
int capacity;
unsigned int len;
unsigned int capacity;
int eof;

struct buffer_descriptor *prev;
Expand All @@ -40,6 +40,7 @@ typedef struct {
} buffer_manager;

int bm_prep_buffer(buffer_descriptor **desc, enum buffer_type type, size_t len);
int bm_buffer_from_string(buffer_descriptor **desc, VALUE str);
int bm_dispose(buffer_descriptor *desc);
int bm_mark(void);

Expand Down
1 change: 0 additions & 1 deletion ext/polyphony/event.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include "polyphony.h"
#include "ring_buffer.h"

typedef struct event {
VALUE waiting_fiber;
Expand Down
162 changes: 149 additions & 13 deletions ext/polyphony/io_stream.c
Original file line number Diff line number Diff line change
@@ -1,19 +1,155 @@
#include "polyphony.h"
#include "buffers.h"
#include "io_stream.h"

void Init_IOStream(void) {
// rb_define_method(rb_cThread, "setup_fiber_scheduling", Thread_setup_fiber_scheduling, 0);
// rb_define_method(rb_cThread, "schedule_and_wakeup", Thread_fiber_schedule_and_wakeup, 2);
// rb_define_method(rb_cThread, "switch_fiber", Thread_switch_fiber, 0);
// rb_define_method(rb_cThread, "fiber_unschedule", Thread_fiber_unschedule, 1);
typedef struct io_stream {
VALUE io;

buffer_descriptor *head;
buffer_descriptor *tail;

buffer_descriptor *cursor_desc;
unsigned int cursor_pos;
} IOStream_t;

VALUE cIOStream = Qnil;

static void IOStream_mark(void *ptr)
{
IOStream_t *io_stream = ptr;
rb_gc_mark(io_stream->io);
buffer_descriptor *desc = io_stream->head;
while (desc) {
if (desc->type == BT_STRING) rb_gc_mark(desc->str);
desc = desc->next;
}
}

static void IOStream_free(void *ptr)
{
xfree(ptr);
}

static size_t IOStream_size(const void *ptr)
{
return sizeof(IOStream_t);
}

static const rb_data_type_t IOStream_type = {
"IOStream",
{IOStream_mark, IOStream_free, IOStream_size,},
0, 0, 0
};

static VALUE IOStream_allocate(VALUE klass)
{
IOStream_t *io_stream;

io_stream = ALLOC(IOStream_t);
return TypedData_Wrap_Struct(klass, &IOStream_type, io_stream);
}

static VALUE IOStream_initialize(VALUE self, VALUE io)
{
IOStream_t *io_stream = RTYPEDDATA_DATA(self);

io_stream->io = io;
io_stream->head = NULL;
io_stream->tail = NULL;
io_stream->cursor_desc = NULL;
io_stream->cursor_pos = 0;

return self;
}

VALUE IOStream_left(VALUE self)
{
IOStream_t *io_stream = RTYPEDDATA_DATA(self);

int left = 0;
buffer_descriptor *desc = io_stream->cursor_desc;
while (desc) {
left += desc->len;
desc = desc->next;
}

// rb_define_singleton_method(rb_cThread, "backend", Thread_class_backend, 0);
return INT2FIX(left);
}

inline void io_stream_push_desc(IOStream_t *io_stream, buffer_descriptor *desc)
{
if (io_stream->tail) {
io_stream->tail->next = desc;
desc->prev = io_stream->tail;
io_stream->tail = desc;
}
else {
io_stream->head = desc;
io_stream->tail = desc;
io_stream->cursor_desc = desc;
io_stream->cursor_pos = 0;
desc->prev = NULL;
desc->next = NULL;
}
}

VALUE IOStream_push_string(VALUE self, VALUE str)
{
IOStream_t *io_stream = RTYPEDDATA_DATA(self);
buffer_descriptor *desc;

// do not add empty string
if (RSTRING_LEN(str) == 0) return self;

if (bm_buffer_from_string(&desc, str))
rb_raise(rb_eRuntimeError, "Failed to create buffer from given string");

io_stream_push_desc(io_stream, desc);
return self;
}

VALUE IOStream_getbyte(VALUE self)
{
IOStream_t *io_stream = RTYPEDDATA_DATA(self);

if (!io_stream->cursor_desc)
return Qnil;

// rb_define_method(rb_cThread, "debug!", Thread_debug, 0);
int byte = io_stream->cursor_desc->ptr[io_stream->cursor_pos];
io_stream->cursor_pos++;
if (io_stream->cursor_pos == io_stream->cursor_desc->len) {
io_stream->cursor_desc = io_stream->cursor_desc->next;
io_stream->cursor_pos = 0;
}

return INT2FIX(byte);
}

VALUE IOStream_getc(VALUE self)
{
IOStream_t *io_stream = RTYPEDDATA_DATA(self);

if (!io_stream->cursor_desc)
return Qnil;

// TODO: add support for multi-byte chars
VALUE chr = rb_str_new(io_stream->cursor_desc->ptr + io_stream->cursor_pos, 1);
io_stream->cursor_pos++;
if (io_stream->cursor_pos == io_stream->cursor_desc->len) {
io_stream->cursor_desc = io_stream->cursor_desc->next;
io_stream->cursor_pos = 0;
}

return chr;
}

void Init_IOStream(void) {
cIOStream = rb_define_class_under(mPolyphony, "IOStream", rb_cObject);
rb_define_alloc_func(cIOStream, IOStream_allocate);

// ID_deactivate_all_watchers_post_fork = rb_intern("deactivate_all_watchers_post_fork");
// ID_ivar_backend = rb_intern("@backend");
// ID_ivar_join_wait_queue = rb_intern("@join_wait_queue");
// ID_ivar_main_fiber = rb_intern("@main_fiber");
// ID_ivar_terminated = rb_intern("@terminated");
// ID_stop = rb_intern("stop");
rb_define_method(cIOStream, "initialize", IOStream_initialize, 1);
rb_define_method(cIOStream, "left", IOStream_left, 0);
rb_define_method(cIOStream, "<<", IOStream_push_string, 1);
rb_define_method(cIOStream, "getbyte", IOStream_getbyte, 0);
rb_define_method(cIOStream, "getc", IOStream_getc, 0);
}
1 change: 1 addition & 0 deletions ext/polyphony/polyphony.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ extern VALUE mPolyphony;
extern VALUE cPipe;
extern VALUE cQueue;
extern VALUE cEvent;
extern VALUE cIOStream;
extern VALUE cTimeoutException;

extern ID ID_call;
Expand Down
42 changes: 42 additions & 0 deletions test/test_io_stream.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# frozen_string_literal: true

require_relative 'helper'

class IOStreamTest < MiniTest::Test
def test_left
s = Polyphony::IOStream.new(nil)

assert_equal 0, s.left

s << 'abc'
s << 'def'

assert_equal 6, s.left
end

def test_getbyte
s = Polyphony::IOStream.new(nil)

assert_nil s.getbyte

s << 'abc'

assert_equal 97, s.getbyte
assert_equal 98, s.getbyte
assert_equal 99, s.getbyte
assert_nil s.getbyte
end

def test_getc
s = Polyphony::IOStream.new(nil)

assert_nil s.getc

s << 'abc'

assert_equal 'a', s.getc
assert_equal 'b', s.getc
assert_equal 'c', s.getc
assert_nil s.getc
end
end

0 comments on commit 31e69df

Please sign in to comment.