Skip to content

Commit

Permalink
fix: show error message if duplicate sfn name (#582)
Browse files Browse the repository at this point in the history
# Description

Data stream links their connections, ff two identical named `sfn` are
connected to the same zipper, then the preceding `sfn` should be closed.
  • Loading branch information
woorui authored Sep 6, 2023
1 parent 8d4eddf commit 4db38f5
Show file tree
Hide file tree
Showing 13 changed files with 236 additions and 37 deletions.
6 changes: 6 additions & 0 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ func (c *Client) handleFrameError(err error, reconnection chan<- struct{}) {
return
}

// If client accepts close signal from server, then exit client program.
if se := new(ErrControllSignal); errors.As(err, &se) {
c.ctxCancel()
return
}

// always attempting to reconnect if an error is encountered,
// the error is mostly network error.
select {
Expand Down
2 changes: 1 addition & 1 deletion core/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,5 @@ func TestConnector(t *testing.T) {
// mockDataStream returns a data stream that only includes an ID and a name.
// This function is used for unit testing purposes.
func mockDataStream(id, name string) DataStream {
return newDataStream(name, id, StreamType(0), nil, []frame.Tag{0}, nil)
return newDataStream(name, id, StreamType(0), nil, []frame.Tag{0}, nil, nil, nil)
}
32 changes: 31 additions & 1 deletion core/control_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func (ss *ServerControlStream) OpenStream(ctx context.Context, handshakeFunc Han
md,
ff.ObserveDataTags,
NewFrameStream(stream, ss.codec, ss.packetReadWriter),
ss,
nil,
)
return dataStream, nil
}
Expand All @@ -142,6 +144,16 @@ func (ss *ServerControlStream) CloseWithError(errString string) error {
return ss.conn.CloseWithError(errString)
}

// Goaway tells client-side connection that the connection goaway and closes it.
func (ss *ServerControlStream) Goaway(errString string) error {
// send GoawayFrame to client.
_ = ss.stream.WriteFrame(&frame.GoawayFrame{
Message: errString,
})
// close the connection.
return ss.CloseWithError(errString)
}

// VerifyAuthentication verify the Authentication from client side.
func (ss *ServerControlStream) VerifyAuthentication(verifyFunc VerifyAuthenticationFunc) (metadata.M, error) {
first, err := ss.stream.ReadFrame()
Expand Down Expand Up @@ -192,6 +204,7 @@ type ClientControlStream struct {
handshakeRejectedFrameChan chan *frame.HandshakeRejectedFrame
acceptStreamResultChan chan acceptStreamResult
logger *slog.Logger
signalChan chan frame.Frame
}

// OpenClientControlStream opens ClientControlStream from addr.
Expand Down Expand Up @@ -229,6 +242,7 @@ func NewClientControlStream(
handshakeRejectedFrameChan: make(chan *frame.HandshakeRejectedFrame, 10),
acceptStreamResultChan: make(chan acceptStreamResult, 10),
logger: logger,
signalChan: make(chan frame.Frame, 1),
}

return controlStream
Expand All @@ -245,8 +259,24 @@ func (cs *ClientControlStream) readFrameLoop() {
return
}
switch ff := f.(type) {

// stream level control signal.
case *frame.HandshakeRejectedFrame:
cs.handshakeRejectedFrameChan <- ff

// connection level control signal.
case *frame.RejectedFrame:
select {
case cs.signalChan <- f:
return
default:
}
case *frame.GoawayFrame:
select {
case cs.signalChan <- f:
return
default:
}
default:
cs.logger.Warn("control stream read unexcepted frame", "frame_type", f.Type().String())
_ = cs.conn.CloseWithError("client read unexcepted frame")
Expand Down Expand Up @@ -406,7 +436,7 @@ func (cs *ClientControlStream) acceptStream(ctx context.Context) (DataStream, er
return nil, err
}

return newDataStream(f.Name, f.ID, StreamType(f.StreamType), md, f.ObserveDataTags, fs), nil
return newDataStream(f.Name, f.ID, StreamType(f.StreamType), md, f.ObserveDataTags, fs, nil, cs.signalChan), nil
}

// CloseWithError closes the client-side control stream.
Expand Down
121 changes: 111 additions & 10 deletions core/data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package core

import (
"context"
"errors"
"io"

"github.com/quic-go/quic-go"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/metadata"
)
Expand Down Expand Up @@ -38,8 +40,10 @@ type dataStream struct {
streamType StreamType
metadata metadata.M
observed []frame.Tag
stream *FrameStream

stream *FrameStream
serverController *ServerControlStream
clientSignalChan <-chan frame.Frame
}

// newDataStream constructures dataStream.
Expand All @@ -50,6 +54,8 @@ func newDataStream(
metadata metadata.M,
observed []frame.Tag,
stream *FrameStream,
serverController *ServerControlStream,
clientSignalChan <-chan frame.Frame,
) DataStream {
return &dataStream{
name: name,
Expand All @@ -58,19 +64,73 @@ func newDataStream(
metadata: metadata,
observed: observed,
stream: stream,

serverController: serverController,
clientSignalChan: clientSignalChan,
}
}

// DataStream implements.
func (s *dataStream) Context() context.Context { return s.stream.Context() }
func (s *dataStream) ID() string { return s.id }
func (s *dataStream) Name() string { return s.name }
func (s *dataStream) Metadata() metadata.M { return s.metadata }
func (s *dataStream) StreamType() StreamType { return s.streamType }
func (s *dataStream) ObserveDataTags() []frame.Tag { return s.observed }
func (s *dataStream) WriteFrame(f frame.Frame) error { return s.stream.WriteFrame(f) }
func (s *dataStream) ReadFrame() (frame.Frame, error) { return s.stream.ReadFrame() }
func (s *dataStream) Close() error { return s.stream.Close() }
func (s *dataStream) Context() context.Context { return s.stream.Context() }
func (s *dataStream) ID() string { return s.id }
func (s *dataStream) Name() string { return s.name }
func (s *dataStream) Metadata() metadata.M { return s.metadata }
func (s *dataStream) StreamType() StreamType { return s.streamType }
func (s *dataStream) ObserveDataTags() []frame.Tag { return s.observed }
func (s *dataStream) Close() error { return s.stream.Close() }

func (s *dataStream) WriteFrame(f frame.Frame) error {
if err := readErrorFromController(s.stream, s.clientSignalChan); err != nil {
return err
}
return s.stream.WriteFrame(f)
}
func (s *dataStream) ReadFrame() (frame.Frame, error) {
type outCh struct {
frame frame.Frame
err error
}

out := make(chan outCh)
go func() {
for signal := range s.clientSignalChan {
switch ff := signal.(type) {
case *frame.GoawayFrame:
_ = s.stream.Close()
out <- outCh{
frame: nil,
err: NewErrControllSignal(ff.Message),
}
return
case *frame.RejectedFrame:
_ = s.stream.Close()
out <- outCh{
frame: nil,
err: NewErrControllSignal(ff.Message),
}
return
}
}
}()
go func() {
f, err := s.stream.ReadFrame()
// return EOF if server-side control stream has been closed.
if IsYomoCloseError(err) {
out <- outCh{
frame: nil,
err: io.EOF,
}
return
}
out <- outCh{
frame: f,
err: err,
}
}()
result := <-out

return result.frame, result.err
}

const (
// StreamTypeSource is stream type "Source".
Expand Down Expand Up @@ -113,3 +173,44 @@ type ContextReadWriteCloser interface {
// The stream.
io.ReadWriteCloser
}

// ErrControllSignal represents the error of controll signal.
type ErrControllSignal struct {
errString string
}

// NewErrControllSignal constructs ErrControllSignal.
func NewErrControllSignal(errString string) *ErrControllSignal {
return &ErrControllSignal{
errString: errString,
}
}

// Error implements error interface.
func (e *ErrControllSignal) Error() string {
return e.errString
}

// readErrorFromController try to read error from controller,if there readan error
// from the controller, the stream read function will return the error and the stream will be closed.
func readErrorFromController(closer io.Closer, ch <-chan frame.Frame) error {
select {
case ex := <-ch:
switch ff := ex.(type) {
case *frame.GoawayFrame:
_ = closer.Close()
return NewErrControllSignal(ff.Message)
case *frame.RejectedFrame:
_ = closer.Close()
return NewErrControllSignal(ff.Message)
}
default:
}
return nil
}

// IsYomoCloseError checks if the error is yomo close error.
func IsYomoCloseError(err error) bool {
qerr := new(quic.ApplicationError)
return errors.As(err, &qerr) && qerr.ErrorCode == YomoCloseErrorCode
}
2 changes: 1 addition & 1 deletion core/data_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestDataStream(t *testing.T) {
// create frame stream.
frameStream := NewFrameStream(mockStream, &byteCodec{}, &bytePacketReadWriter{})

stream := newDataStream(name, id, styp, md, observed, frameStream)
stream := newDataStream(name, id, styp, md, observed, frameStream, nil, nil)

t.Run("StreamInfo", func(t *testing.T) {
assert.Equal(t, id, stream.ID())
Expand Down
17 changes: 14 additions & 3 deletions core/frame/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
// 6. HandshakeAckFrame
// 7. RejectedFrame
// 8. BackflowFrame
// Read frame comments to understand the role of the frame.
//
// Read frame comments to understand the role of the frame.
type Frame interface {
// Type returns the type of frame.
Type() Type
Expand Down Expand Up @@ -118,15 +119,22 @@ func (f *BackflowFrame) Type() Type { return TypeBackflowFrame }

// RejectedFrame is used to reject a ControlStream request.
type RejectedFrame struct {
// Code is the code rejected.
Code uint64
// Message encapsulates the rationale behind the rejection of the request.
Message string
}

// Type returns the type of RejectedFrame.
func (f *RejectedFrame) Type() Type { return TypeRejectedFrame }

// GoawayFrame is is used by server to evict a connection.
type GoawayFrame struct {
// Message contains the reason why the connection be evicted.
Message string
}

// Type returns the type of GoawayFrame.
func (f *GoawayFrame) Type() Type { return TypeGoawayFrame }

const (
TypeAuthenticationFrame Type = 0x03 // TypeAuthenticationFrame is the type of AuthenticationFrame.
TypeAuthenticationAckFrame Type = 0x11 // TypeAuthenticationAckFrame is the type of AuthenticationAckFrame.
Expand All @@ -136,6 +144,7 @@ const (
TypeHandshakeAckFrame Type = 0x29 // TypeHandshakeAckFrame is the type of HandshakeAckFrame.
TypeRejectedFrame Type = 0x39 // TypeRejectedFrame is the type of RejectedFrame.
TypeBackflowFrame Type = 0x2D // TypeBackflowFrame is the type of BackflowFrame.
TypeGoawayFrame Type = 0x2E // TypeGoawayFrame is the type of GoawayFrame.
)

var frameTypeStringMap = map[Type]string{
Expand All @@ -147,6 +156,7 @@ var frameTypeStringMap = map[Type]string{
TypeHandshakeAckFrame: "HandshakeAckFrame",
TypeRejectedFrame: "RejectedFrame",
TypeBackflowFrame: "BackflowFrame",
TypeGoawayFrame: "GoawayFrame",
}

// String returns a human-readable string which represents the frame type.
Expand All @@ -168,6 +178,7 @@ var frameTypeNewFuncMap = map[Type]func() Frame{
TypeHandshakeAckFrame: func() Frame { return new(HandshakeAckFrame) },
TypeRejectedFrame: func() Frame { return new(RejectedFrame) },
TypeBackflowFrame: func() Frame { return new(BackflowFrame) },
TypeGoawayFrame: func() Frame { return new(GoawayFrame) },
}

// NewFrame creates a new frame from Type.
Expand Down
3 changes: 2 additions & 1 deletion core/stream_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func (g *StreamGroup) handleRoute(hf *frame.HandshakeFrame, md metadata.M) (rout
return nil, err
}
if ok {
stream.Close()
// if same name sfn here, close the old connection.
stream.(*dataStream).serverController.Goaway(e.Error())
g.connector.Delete(existsStreamID)
g.logger.Debug("connector remove stream", "stream_id", stream.ID(), "stream_type", stream.StreamType().String(), "stream_name", stream.Name())
}
Expand Down
2 changes: 1 addition & 1 deletion example/0-basic/sfn/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func main() {
// start
err := sfn.Connect()
if err != nil {
slog.Error("[sfn] connect", err)
slog.Error("[sfn] connect", "err", err)
os.Exit(1)
}
// set the error handler function when server error occurs
Expand Down
4 changes: 4 additions & 0 deletions pkg/frame-codec/y3codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func (c *y3codec) Encode(f frame.Frame) ([]byte, error) {
return encodeDataFrame(ff)
case *frame.BackflowFrame:
return encodeBackflowFrame(ff)
case *frame.GoawayFrame:
return encodeGoawayFrame(ff)
default:
return nil, ErrUnknownFrame
}
Expand All @@ -78,6 +80,8 @@ func (c *y3codec) Decode(data []byte, f frame.Frame) error {
return decodeDataFrame(data, ff)
case *frame.BackflowFrame:
return decodeBackflowFrame(data, ff)
case *frame.GoawayFrame:
return decodeGoawayFrame(data, ff)
default:
return ErrUnknownFrame
}
Expand Down
18 changes: 14 additions & 4 deletions pkg/frame-codec/y3codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,22 @@ func TestCodec(t *testing.T) {
args: args{
newF: new(frame.RejectedFrame),
dataF: &frame.RejectedFrame{
Code: 123,
Message: "rejected error",
},
data: []byte{
0xb9, 0x13, 0x1, 0x1, 0x7b, 0x2, 0xe, 0x72, 0x65, 0x6a, 0x65, 0x63, 0x74,
0x65, 0x64, 0x20, 0x65, 0x72, 0x72, 0x6f, 0x72,
data: []byte{0xb9, 0x10, 0x1, 0xe, 0x72, 0x65, 0x6a, 0x65, 0x63, 0x74, 0x65,
0x64, 0x20, 0x65, 0x72, 0x72, 0x6f, 0x72,
},
},
},
{
name: "GoawayFrame",
args: args{
newF: new(frame.GoawayFrame),
dataF: &frame.GoawayFrame{
Message: "goaway error",
},
data: []byte{0xae, 0xe, 0x1, 0xc, 0x67, 0x6f, 0x61, 0x77, 0x61, 0x79, 0x20,
0x65, 0x72, 0x72, 0x6f, 0x72,
},
},
},
Expand Down
Loading

0 comments on commit 4db38f5

Please sign in to comment.