diff --git a/options.go b/options.go index c37d3b1fd..f419ef693 100644 --- a/options.go +++ b/options.go @@ -97,8 +97,8 @@ var ( } } - // WithUptreamOption provides upstream zipper options for Zipper. - WithUptreamOption = func(opts ...ClientOption) ZipperOption { + // WithUpstreamOption provides upstream zipper options for Zipper. + WithUpstreamOption = func(opts ...ClientOption) ZipperOption { return func(o *zipperOptions) { o.clientOption = opts } diff --git a/sfn_test.go b/sfn_test.go index 507d8fd8a..abff21113 100644 --- a/sfn_test.go +++ b/sfn_test.go @@ -2,25 +2,47 @@ package yomo import ( "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/yomorun/yomo/core" + "github.com/yomorun/yomo/core/ylog" + "github.com/yomorun/yomo/serverless" ) -func TestSfnConnectToServer(t *testing.T) { +func TestStreamFunction(t *testing.T) { + t.Parallel() + sfn := NewStreamFunction( - "sfn-ai-stream-response", + "sfn-async-log-events", "localhost:9000", WithSfnCredential("token:"), + WithSfnLogger(ylog.Default()), + WithSfnQuicConfig(core.DefalutQuicConfig), + WithSfnTLSConfig(nil), ) - sfn.SetObserveDataTags(0x33) - defer sfn.Close() + sfn.SetObserveDataTags(0x21) + + time.AfterFunc(time.Second, func() { + sfn.Close() + }) + + // set error handler + sfn.SetErrorHandler(func(err error) {}) // set handler - sfn.SetHandler(nil) + sfn.SetHandler(func(ctx serverless.Context) { + t.Logf("unittest sfn receive <- (%d)", len(ctx.Data())) + assert.Equal(t, uint32(0x21), ctx.Tag()) + assert.Equal(t, []byte("test"), ctx.Data()) + ctx.Write(0x22, []byte("backflow")) + }) // connect to server err := sfn.Connect() assert.Nil(t, err) + + sfn.Wait() } func TestSfnInit(t *testing.T) { diff --git a/source_test.go b/source_test.go index 0f928929b..98d90d20c 100644 --- a/source_test.go +++ b/source_test.go @@ -2,19 +2,51 @@ package yomo import ( "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/yomorun/yomo/core" + "github.com/yomorun/yomo/core/frame" + "github.com/yomorun/yomo/core/ylog" ) -func TestSourceSendDataToServer(t *testing.T) { - source := NewSource("test-source", "localhost:9000", WithCredential("token:")) - defer source.Close() +func TestSource(t *testing.T) { + t.Parallel() - // connect to server + source := NewSource( + "test-source", + "localhost:9000", + WithCredential("token:"), + WithLogger(ylog.Default()), + WithObserveDataTags(0x22), + WithSourceQuicConfig(core.DefalutQuicConfig), + WithSourceTLSConfig(nil), + ) + + exit := make(chan struct{}) + time.AfterFunc(time.Second, func() { + source.Close() + close(exit) + }) + + source.SetErrorHandler(func(err error) {}) + + source.SetReceiveHandler(func(tag frame.Tag, data []byte) { + assert.Equal(t, uint32(0x22), tag) + assert.Equal(t, []byte("backflow"), data) + }) + + // connect to zipper err := source.Connect() assert.Nil(t, err) - // send data to server + // send data to zipper err = source.Write(0x21, []byte("test")) assert.Nil(t, err) + + // broadcast data to zipper + err = source.Broadcast(0x21, []byte("test")) + assert.Nil(t, err) + + <-exit } diff --git a/zipper_test.go b/zipper_test.go index 8bed06501..ab0cebb5f 100644 --- a/zipper_test.go +++ b/zipper_test.go @@ -5,10 +5,21 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/yomorun/yomo/core" + "github.com/yomorun/yomo/core/ylog" ) func TestZipperRun(t *testing.T) { - zipper, err := NewZipper("zipper", nil, nil) + zipper, err := NewZipper( + "zipper", + nil, + nil, + // WithAuth("token", ""), + WithUpstreamOption(core.ClientOption(WithCredential("token:"))), + WithZipperLogger(ylog.Default()), + WithZipperQuicConfig(core.DefalutQuicConfig), + WithZipperTLSConfig(nil), + ) assert.Nil(t, err) time.Sleep(time.Second) assert.NotNil(t, zipper)