Skip to content

Commit

Permalink
feat(sfn): add wait api (#605)
Browse files Browse the repository at this point in the history
# Description

Adding `Wait()` API for `StreamFunction`, which is used to block the
main function until the `StreamFunction` is closed either by the remote
or the local.
  • Loading branch information
woorui authored Sep 7, 2023
1 parent a6f0bb7 commit 084b751
Show file tree
Hide file tree
Showing 12 changed files with 32 additions and 21 deletions.
20 changes: 13 additions & 7 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Client struct {

// ctx and ctxCancel manage the lifecycle of client.
ctx context.Context
ctxCancel context.CancelFunc
ctxCancel context.CancelCauseFunc

writeFrameChan chan frame.Frame
}
Expand All @@ -52,7 +52,7 @@ func NewClient(appName string, connType ClientType, opts ...ClientOption) *Clien
logger.Info("use credential", "credential_name", option.credential.Name())
}

ctx, ctxCancel := context.WithCancel(context.Background())
ctx, ctxCancel := context.WithCancelCause(context.Background())

return &Client{
name: appName,
Expand Down Expand Up @@ -102,7 +102,7 @@ func (c *Client) runBackground(ctx context.Context, addr string, controlStream *
for {
select {
case <-c.ctx.Done():
c.cleanStream(controlStream, nil)
c.cleanStream(controlStream, c.ctx.Err())
return
case <-ctx.Done():
c.cleanStream(controlStream, ctx.Err())
Expand Down Expand Up @@ -135,13 +135,19 @@ func (c *Client) WriteFrame(f frame.Frame) error {

// blockWriteFrame writes frames in block mode, guaranteeing that frames are not lost.
func (c *Client) blockWriteFrame(f frame.Frame) error {
c.writeFrameChan <- f
select {
case <-c.ctx.Done():
return c.ctx.Err()
case c.writeFrameChan <- f:
}
return nil
}

// nonBlockWriteFrame writes frames in non-blocking mode, without guaranteeing that frames will not be lost.
func (c *Client) nonBlockWriteFrame(f frame.Frame) error {
select {
case <-c.ctx.Done():
return c.ctx.Err()
case c.writeFrameChan <- f:
return nil
default:
Expand Down Expand Up @@ -169,7 +175,7 @@ func (c *Client) cleanStream(controlStream *ClientControlStream, err error) {
// Close close the client.
func (c *Client) Close() error {
// break runBackgroud() for-loop.
c.ctxCancel()
c.ctxCancel(fmt.Errorf("%s: local shutdown", c.streamType.String()))

return nil
}
Expand Down Expand Up @@ -269,13 +275,13 @@ func (c *Client) handleFrameError(err error, reconnection chan<- struct{}) {

// exit client program if stream has be closed.
if err == io.EOF {
c.ctxCancel()
c.ctxCancel(fmt.Errorf("%s: remote shutdown", c.streamType.String()))
return
}

// If client accepts close signal from server, then exit client program.
if se := new(ErrControllSignal); errors.As(err, &se) {
c.ctxCancel()
c.ctxCancel(fmt.Errorf("%s: remote shutdown", c.streamType.String()))
return
}

Expand Down
4 changes: 1 addition & 3 deletions example/0-basic/sfn/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,9 @@ func main() {
// set the error handler function when server error occurs
sfn.SetErrorHandler(func(err error) {
slog.Error("[sfn] receive server error", "err", err)
sfn.Close()
os.Exit(1)
})

select {}
sfn.Wait()
}

func handler(ctx serverless.Context) {
Expand Down
2 changes: 1 addition & 1 deletion example/3-multi-sfn/stream-fn-1/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {
os.Exit(1)
}

select {}
sfn.Wait()
}

func handler(ctx serverless.Context) {
Expand Down
2 changes: 1 addition & 1 deletion example/3-multi-sfn/stream-fn-2/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func main() {
os.Exit(1)
}

select {}
sfn.Wait()
}

func handler(ctx serverless.Context) {
Expand Down
2 changes: 1 addition & 1 deletion example/3-multi-sfn/stream-fn-3/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func main() {

go SlidingWindowWithTime(observe, SlidingWindowInMS, SlidingTimeInMS, slidingAvg)

select {}
sfn.Wait()
}

func handler(ctx serverless.Context) {
Expand Down
2 changes: 1 addition & 1 deletion example/3-multi-sfn/stream-fn-4/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func main() {
os.Exit(1)
}

select {}
sfn.Wait()
}

func handler(ctx serverless.Context) {
Expand Down
2 changes: 1 addition & 1 deletion example/4-cascading-zipper/sfn/sfn_echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func main() {
os.Exit(1)
}

select {}
sfn.Wait()
}

func handler(ctx serverless.Context) {
Expand Down
2 changes: 1 addition & 1 deletion example/5-backflow/sfn-1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func main() {
os.Exit(1)
})

select {}
sfn.Wait()
}

func handler(ctx serverless.Context) {
Expand Down
2 changes: 1 addition & 1 deletion example/5-backflow/sfn-2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func main() {
os.Exit(1)
})

select {}
sfn.Wait()
}

func handler(ctx serverless.Context) {
Expand Down
2 changes: 1 addition & 1 deletion example/6-mesh/stream-fn-db/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func main() {
return
}

select {}
sfn.Wait()
}

func getPort() int {
Expand Down
2 changes: 1 addition & 1 deletion example/6-mesh/stream-fn/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func main() {
// pipe rx stream and rx handler.
rt.Pipe(Handler)

select {}
sfn.Wait()
}

// DataTags observe tag list
Expand Down
11 changes: 9 additions & 2 deletions sfn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
type StreamFunction interface {
// SetObserveDataTags set the data tag list that will be observed
SetObserveDataTags(tag ...uint32)
// Init will initialize the stream function
Init(fn func() error) error
// SetHandler set the handler function, which accept the raw bytes data and return the tag & response
SetHandler(fn core.AsyncHandler) error
// SetErrorHandler set the error handler function when server error occurs
Expand All @@ -26,8 +28,8 @@ type StreamFunction interface {
Connect() error
// Close will close the connection
Close() error
// Init will initialize the stream function
Init(fn func() error) error
// Wait waits sfn to finish.
Wait()
}

// NewStreamFunction create a stream function.
Expand Down Expand Up @@ -191,6 +193,11 @@ func (s *streamFunction) Close() error {
return nil
}

// Wait waits sfn to finish.
func (s *streamFunction) Wait() {
s.client.Wait()
}

// when DataFrame we observed arrived, invoke the user's function
// func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) {
func (s *streamFunction) onDataFrame(dataFrame *frame.DataFrame) {
Expand Down

0 comments on commit 084b751

Please sign in to comment.