Skip to content

Commit

Permalink
Remove pending buffer when stream closed
Browse files Browse the repository at this point in the history
See #239
  • Loading branch information
jerry-tao authored and enobufs committed Nov 18, 2022
1 parent fc1168c commit c0159aa
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 130 deletions.
25 changes: 19 additions & 6 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,9 @@ type Association struct {
delayedAckTriggered bool
immediateAckTriggered bool

name string
log logging.LeveledLogger
name string
log logging.LeveledLogger
streamVersion uint32
}

// Config collects the arguments to createAssociation construction into
Expand Down Expand Up @@ -1368,6 +1369,7 @@ func (a *Association) createStream(streamIdentifier uint16, accept bool) *Stream
streamIdentifier: streamIdentifier,
reassemblyQueue: newReassemblyQueue(streamIdentifier),
log: a.log,
version: atomic.AddUint32(&a.streamVersion, 1),
name: fmt.Sprintf("%d:%s", streamIdentifier, a.name),
}

Expand Down Expand Up @@ -2088,10 +2090,14 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1
dataLen := uint32(len(c.userData))
if dataLen == 0 {
sisToReset = append(sisToReset, c.streamIdentifier)
err := a.pendingQueue.pop(c)
if err != nil {
a.log.Errorf("failed to pop from pending queue: %s", err.Error())
}
a.popPendingDataChunksToDrop(c)
continue
}

s, ok := a.streams[c.streamIdentifier]

if !ok || s.State() > StreamStateOpen || s.version != c.streamVersion {
a.popPendingDataChunksToDrop(c)
continue
}

Expand Down Expand Up @@ -2123,6 +2129,13 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1
return chunks, sisToReset
}

func (a *Association) popPendingDataChunksToDrop(c *chunkPayloadData) {
err := a.pendingQueue.pop(c)
if err != nil {
a.log.Errorf("failed to pop from pending queue: %s", err.Error())
}
}

// bundleDataChunksIntoPackets packs DATA chunks into packets. It tries to bundle
// DATA chunks into a packet so long as the resulting packet size does not exceed
// the path MTU.
Expand Down
3 changes: 2 additions & 1 deletion chunk_payload_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type chunkPayloadData struct {
// chunk is still in the inflight queue
retransmit bool

head *chunkPayloadData // link to the head of the fragment
head *chunkPayloadData // link to the head of the fragment
streamVersion uint32
}

const (
Expand Down
51 changes: 31 additions & 20 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ const (

// StreamState is an enum for SCTP Stream state field
// This field identifies the state of stream.
type StreamState int
type StreamState int32

// StreamState enums
const (
StreamStateOpen StreamState = iota // Stream object starts with StreamStateOpen
StreamStateClosing // Outgoing stream is being reset
StreamStateClosing // Stream is closed by remote
StreamStateClosed // Stream has been closed
)

Expand Down Expand Up @@ -71,6 +71,7 @@ type Stream struct {
state StreamState
log logging.LeveledLogger
name string
version uint32
}

// StreamIdentifier returns the Stream identifier associated to the stream.
Expand Down Expand Up @@ -296,6 +297,7 @@ func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPa
copy(userData, raw[i:i+fragmentSize])

chunk := &chunkPayloadData{
streamVersion: s.version,
streamIdentifier: s.streamIdentifier,
userData: userData,
unordered: unordered,
Expand Down Expand Up @@ -338,16 +340,22 @@ func (s *Stream) Close() error {
s.lock.Lock()
defer s.lock.Unlock()

s.log.Debugf("[%s] Close: state=%s", s.name, s.state.String())
state := s.State()
s.log.Debugf("[%s] Close: state=%s", s.name, state.String())

if s.state == StreamStateOpen {
if s.readErr == nil {
s.state = StreamStateClosing
} else {
s.state = StreamStateClosed
}
s.log.Debugf("[%s] state change: open => %s", s.name, s.state.String())
switch state {
case StreamStateOpen:
s.SetState(StreamStateClosed)
s.log.Debugf("[%s] state change: open => closed", s.name)
s.readErr = io.EOF
s.readNotifier.Broadcast()
return s.streamIdentifier, true
case StreamStateClosing:
s.SetState(StreamStateClosed)
s.log.Debugf("[%s] state change: closing => closed", s.name)
return s.streamIdentifier, true
case StreamStateClosed:
return s.streamIdentifier, false
}
return s.streamIdentifier, false
}(); resetOutbound {
Expand Down Expand Up @@ -434,7 +442,8 @@ func (s *Stream) onInboundStreamReset() {
s.lock.Lock()
defer s.lock.Unlock()

s.log.Debugf("[%s] onInboundStreamReset: state=%s", s.name, s.state.String())
state := s.State()
s.log.Debugf("[%s] onInboundStreamReset: state=%s", s.name, state.String())

// No more inbound data to read. Unblock the read with io.EOF.
// This should cause DCEP layer (datachannel package) to call Close() which
Expand All @@ -445,19 +454,21 @@ func (s *Stream) onInboundStreamReset() {
// outgoing stream. When the peer sees that an incoming stream was
// reset, it also resets its corresponding outgoing stream. Once this
// is completed, the data channel is closed.
if state == StreamStateOpen {
s.log.Debugf("[%s] state change: open => closing", s.name)
s.SetState(StreamStateClosing)
}

s.readErr = io.EOF
s.readNotifier.Broadcast()

if s.state == StreamStateClosing {
s.log.Debugf("[%s] state change: closing => closed", s.name)
s.state = StreamStateClosed
}
}

// State return the stream state.
// State atomically returns the stream state.
func (s *Stream) State() StreamState {
s.lock.RLock()
defer s.lock.RUnlock()
return s.state
return StreamState(atomic.LoadInt32((*int32)(&s.state)))
}

// SetState atomically sets the stream state.
func (s *Stream) SetState(newState StreamState) {
atomic.StoreInt32((*int32)(&s.state), int32(newState))
}
Loading

0 comments on commit c0159aa

Please sign in to comment.