Skip to content

Commit

Permalink
Add auto reading; fix release packet queue
Browse files Browse the repository at this point in the history
  • Loading branch information
robinbraemer committed Dec 28, 2023
1 parent 65464e6 commit aff012f
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 43 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21.0
require (
github.com/agext/levenshtein v1.2.3
github.com/dboslee/lru v0.0.1
github.com/edwingeng/deque/v2 v2.1.1
github.com/gammazero/deque v0.2.1
github.com/go-faker/faker/v4 v4.1.1
github.com/go-logr/logr v1.2.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ github.com/dboslee/lru v0.0.1/go.mod h1:vDIFJHUqr1vdYKAdG9x3r+zFWP0i9uJqQWpB6nSu
github.com/df-mc/atomic v1.10.0 h1:0ZuxBKwR/hxcFGorKiHIp+hY7hgY+XBTzhCYD2NqSEg=
github.com/df-mc/atomic v1.10.0/go.mod h1:Gw9rf+rPIbydMjA329Jn4yjd/O2c/qusw3iNp4tFGSc=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/edwingeng/deque/v2 v2.1.1 h1:+xjC3TnaeMPLZMi7QQf9jN2K00MZmTwruApqplbL9IY=
github.com/edwingeng/deque/v2 v2.1.1/go.mod h1:HukI8CQe9KDmZCcURPZRYVYjH79Zy2tIjTF9sN3Bgb0=
github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
Expand Down
98 changes: 71 additions & 27 deletions pkg/edition/java/netmc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,14 @@ type MinecraftConn interface { // TODO convert to exported struct as this interf
// when calling SwitchSessionHandler on the same registry.
AddSessionHandler(*state.Registry, SessionHandler)

// SetAutoReading sets whether the connection should automatically read packets from the underlying connection.
// Default is true.
SetAutoReading(bool)

StateChanger
PacketWriter

Reader() Reader // Only use if you know what you are doing!
}

// Closed returns true if the connection is closed.
Expand Down Expand Up @@ -140,16 +146,17 @@ func NewMinecraftConn(

ctx, cancel := context.WithCancel(ctx)
c := &minecraftConn{
log: log,
c: base,
ctx: ctx,
cancelCtx: cancel,
rd: NewReader(base, in, readTimeout, log),
wr: NewWriter(base, out, writeTimeout, compressionLevel, log),
state: state.Handshake,
protocol: version.Minecraft_1_7_2.Protocol,
connType: phase.Undetermined,
direction: direction,
log: log,
c: base,
ctx: ctx,
cancelCtx: cancel,
rd: NewReader(base, in, readTimeout, log),
wr: NewWriter(base, out, writeTimeout, compressionLevel, log),
state: state.Handshake,
protocol: version.Minecraft_1_7_2.Protocol,
connType: phase.Undetermined,
direction: direction,
autoReading: newStateControl(true),
}
c.sessionHandlerMu.sessionHandlers = make(map[*state.Registry]SessionHandler)
return c, c.startReadLoop
Expand All @@ -165,6 +172,8 @@ type minecraftConn struct {
rd Reader
wr Writer

autoReading *stateControl // Whether the connection should automatically read packets from the underlying connection.

ctx context.Context // is canceled when connection closed
cancelCtx context.CancelFunc
closeOnce sync.Once // Makes sure the connection is closed once, while blocking proceeding calls.
Expand Down Expand Up @@ -192,6 +201,9 @@ func (c *minecraftConn) startReadLoop() {
defer func() { _ = c.closeKnown(false) }()

next := func() bool {
// Wait until auto reading is enabled, if not already
c.autoReading.Wait()

// Read next packet from underlying connection.
packetCtx, err := c.rd.ReadPacket()
if err != nil {
Expand Down Expand Up @@ -234,6 +246,13 @@ func (c *minecraftConn) startReadLoop() {
}
}

func (c *minecraftConn) Reader() Reader { return c.rd }

func (c *minecraftConn) SetAutoReading(enabled bool) {
c.log.V(1).Info("update auto reading", "enabled", enabled)
c.autoReading.SetState(enabled)
}

func (c *minecraftConn) Context() context.Context { return c.ctx }

func (c *minecraftConn) Flush() error {
Expand Down Expand Up @@ -266,6 +285,15 @@ func (c *minecraftConn) Write(payload []byte) (err error) {
}

func (c *minecraftConn) BufferPacket(packet proto.Packet) (err error) {
return c.bufferPacket(packet, true)
}

// bufferNoQueue is a helper func to buffer a packet without queuing it.
func (c *minecraftConn) bufferNoQueue(packet proto.Packet) error {
return c.bufferPacket(packet, false)
}

func (c *minecraftConn) bufferPacket(packet proto.Packet, queue bool) (err error) {
if Closed(c) {
return ErrClosedConn
}
Expand All @@ -274,8 +302,9 @@ func (c *minecraftConn) BufferPacket(packet proto.Packet) (err error) {
c.closeOnWriteErr(err, "bufferPacket", fmt.Sprintf("%T", packet))
}
}()
if c.playPacketQueue.Queue(packet) {
if queue && c.playPacketQueue.Queue(packet) {
// Packet was queued, don't write it now
c.log.V(1).Info("queued packet", "packet", fmt.Sprintf("%T", packet))
return nil
}
_, err = c.wr.WritePacket(packet)
Expand Down Expand Up @@ -324,6 +353,8 @@ var ErrClosedConn = errors.New("connection is closed")
func (c *minecraftConn) closeKnown(markKnown bool) (err error) {
alreadyClosed := true
c.closeOnce.Do(func() {
defer c.SetAutoReading(true) // free the read loop in case auto reading is disabled

alreadyClosed = false
if markKnown {
c.knownDisconnect.Store(true)
Expand Down Expand Up @@ -433,7 +464,9 @@ func (c *minecraftConn) SetState(s *state.Registry) {

c.mu.Unlock()

c.log.V(1).Info("update state", "previous", prevState, "new", s)
if prevState != s {
c.log.V(1).Info("update state", "previous", prevState, "new", s)
}
}

// ensurePlayPacketQueue ensures the play packet queue is activated or deactivated
Expand All @@ -449,7 +482,7 @@ func (c *minecraftConn) ensurePlayPacketQueue(newState state.State) {

// Remove the play packet queue if it exists
if c.playPacketQueue != nil {
if err := c.playPacketQueue.ReleaseQueue(c); err != nil {
if err := c.playPacketQueue.ReleaseQueue(c.bufferNoQueue, c.Flush); err != nil {
c.log.Error(err, "error releasing play packet queue")
}
c.playPacketQueue = nil
Expand All @@ -474,42 +507,47 @@ func (c *minecraftConn) ActiveSessionHandler() SessionHandler {
return c.sessionHandlerMu.activeSessionHandler
}

func (c *minecraftConn) SetActiveSessionHandler(registry *state.Registry, handler SessionHandler) {
func (c *minecraftConn) AddSessionHandler(registry *state.Registry, handler SessionHandler) {
if registry == nil {
panic("registry must not be nil")
}
if handler == nil {
panic("handler must not be nil")
}

c.sessionHandlerMu.Lock()
defer c.sessionHandlerMu.Unlock()

if c.sessionHandlerMu.activeSessionHandler != nil {
c.sessionHandlerMu.activeSessionHandler.Deactivated()
if registry == c.State() {
// Handler would overwrite the current handler
c.log.Info("AddSessionHandler: session handler already exists for state", "state", registry.String())
return
}

c.sessionHandlerMu.sessionHandlers[registry] = handler
c.sessionHandlerMu.activeSessionHandler = handler
c.SetState(registry)
handler.Activated()
c.log.V(1).WithName("AddSessionHandler").
Info("added session handler", "state", registry.String(), "handler", fmt.Sprintf("%T", handler))
}

func (c *minecraftConn) AddSessionHandler(registry *state.Registry, handler SessionHandler) {
func (c *minecraftConn) SetActiveSessionHandler(registry *state.Registry, handler SessionHandler) {
if registry == nil {
panic("registry must not be nil")
}
if handler == nil {
panic("handler must not be nil")
}

c.sessionHandlerMu.Lock()
defer c.sessionHandlerMu.Unlock()

if registry == c.State() {
// Handler would overwrite the current handler
c.log.Info("AddSessionHandler: session handler already exists for state", "state", registry.String())
return
if c.sessionHandlerMu.activeSessionHandler != nil {
c.sessionHandlerMu.activeSessionHandler.Deactivated()
}

c.sessionHandlerMu.sessionHandlers[registry] = handler
c.sessionHandlerMu.activeSessionHandler = handler
c.SetState(registry)
handler.Activated()

c.log.V(1).WithName("SetActiveSessionHandler").
Info("set session handler", "state", registry.String(), "handler", fmt.Sprintf("%T", handler))
}

func (c *minecraftConn) SwitchSessionHandler(registry *state.Registry) bool {
Expand All @@ -526,7 +564,10 @@ func (c *minecraftConn) SwitchSessionHandler(registry *state.Registry) bool {
}

if c.sessionHandlerMu.activeSessionHandler == handler {
c.SetState(registry)

// The handler is already active, no need to switch
c.log.V(1).WithName("SwitchSessionHandler").Info("session handler already active, no need to switch", "state", registry.String(), "handler", fmt.Sprintf("%T", handler))
return true
}

Expand All @@ -538,6 +579,9 @@ func (c *minecraftConn) SwitchSessionHandler(registry *state.Registry) bool {
c.SetState(registry)
handler.Activated()

c.log.V(1).WithName("SwitchSessionHandler").
Info("switched session handler", "state", registry.String(), "handler", fmt.Sprintf("%T", handler))

return true
}

Expand Down
45 changes: 45 additions & 0 deletions pkg/edition/java/netmc/state_control.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package netmc

import "sync"

// stateControl struct holds the state and a condition variable.
type stateControl struct {
state bool // state represents the condition to be met.
cond *sync.Cond // cond is a condition variable associated with a mutex.
}

// newStateControl function initializes a new stateControl with the initial state and a condition variable.
func newStateControl(initial bool) *stateControl {
sc := &stateControl{
state: initial,
cond: sync.NewCond(&sync.Mutex{}), // Initialize a new condition variable with an associated mutex.
}
return sc
}

// State method returns the current state.
// It locks the mutex before reading the state to ensure thread-safety.
func (s *stateControl) State() bool {
s.cond.L.Lock() // Lock the mutex before reading the state.
defer s.cond.L.Unlock() // Unlock the mutex after reading the state.
return s.state
}

// Wait method waits for the state to become true.
// It locks the mutex to ensure that the condition check and the wait operation are atomic.
func (s *stateControl) Wait() {
s.cond.L.Lock() // Lock the mutex before checking the condition.
for !s.state { // If the state is not true, wait for it to become true.
s.cond.Wait() // Wait releases the lock, suspends the goroutine, and reacquires the lock when it wakes up.
}
s.cond.L.Unlock() // Unlock the mutex after the condition is met.
}

// SetState method sets the state and signals all waiting goroutines.
// It locks the mutex to ensure that the state change and the signal operation are atomic.
func (s *stateControl) SetState(state bool) {
s.cond.L.Lock() // Lock the mutex before changing the state.
s.state = state // Change the state.
s.cond.Broadcast() // Signal all waiting goroutines that the condition might be met.
s.cond.L.Unlock() // Unlock the mutex after changing the state and signaling the goroutines.
}
21 changes: 9 additions & 12 deletions pkg/edition/java/proto/util/queue/packet_queue.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package queue

import (
"github.com/gammazero/deque"
"github.com/edwingeng/deque/v2"
"go.minekube.com/gate/pkg/edition/java/proto/state"
"go.minekube.com/gate/pkg/gate/proto"
)
Expand All @@ -24,7 +24,7 @@ type PlayPacketQueue struct {
func NewPlayPacketQueue(version proto.Protocol, direction proto.Direction) *PlayPacketQueue {
return &PlayPacketQueue{
registry: state.FromDirection(direction, state.Config, version),
queue: deque.New[proto.Packet](),
queue: deque.NewDeque[proto.Packet](),
protocolVersion: version,
}
}
Expand All @@ -43,28 +43,25 @@ func (h *PlayPacketQueue) Queue(packet proto.Packet) bool {
return false
}

// PacketBuffer is a packet buffer that can flush packets to an underlying packet writer.
type PacketBuffer interface {
BufferPacket(proto.Packet) error
Flush() error
}

// ReleaseQueue releases all packets in the queue to the sink packet writer.
// It iterates over the queue, buffering each packet and flushing the sink.
func (h *PlayPacketQueue) ReleaseQueue(sink PacketBuffer) error {
func (h *PlayPacketQueue) ReleaseQueue(
buffer func(proto.Packet) error,
flush func() error,
) error {
if h == nil {
return nil
}
var ok bool
for h.queue.Len() > 0 {
for h.queue.Len() != 0 {
packet := h.queue.PopFront()
if err := sink.BufferPacket(packet); err != nil {
if err := buffer(packet); err != nil {
return err
}
ok = true
}
if ok {
return sink.Flush()
return flush()
}
return nil
}
6 changes: 5 additions & 1 deletion pkg/edition/java/proxy/session_backend_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,12 @@ func (b *backendConfigSessionHandler) handleFinishedUpdate(p *config.FinishedUpd
player := b.serverConn.player
configHandler := b.playerConfigSessionHandler

smc.SetState(state.Play)
smc.SetAutoReading(false)
// Even when not auto reading messages are still decoded. Decode them with the correct state
smc.Reader().SetState(state.Play)
configHandler.handleBackendFinishUpdate(b.serverConn, p, func() {
defer smc.SetAutoReading(true)

if b.serverConn == player.connectedServer() {
if !smc.SwitchSessionHandler(state.Play) {
err := errors.New("failed to switch session handler")
Expand Down
4 changes: 2 additions & 2 deletions pkg/edition/java/proxy/session_backend_login.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,9 @@ func (b *backendLoginSessionHandler) handleServerLoginSuccess() {
_ = serverMc.WritePacket(pkt)
}
if csh, ok := player.MinecraftConn.ActiveSessionHandler().(*clientPlaySessionHandler); ok {
// TODO 1.20.2+ serverMc.SetAutoReading(false)
serverMc.SetAutoReading(false)
csh.doSwitch().DoWhenTrue(func() {
// TODO 1.20.2+ serverMc.SetAutoReading(true)
serverMc.SetAutoReading(true)
})
}
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/edition/java/proxy/session_backend_play.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"go.minekube.com/gate/pkg/edition/java/proto/packet/config"
"go.minekube.com/gate/pkg/edition/java/proto/state"
"reflect"
"regexp"
"time"
Expand Down Expand Up @@ -136,7 +137,12 @@ func (b *backendPlaySessionHandler) handleDisconnect(p *packet.Disconnect) {
}

func (b *backendPlaySessionHandler) handleStartUpdate(_ *config.StartUpdate) {
// TODO set decoder state to Config (need MinecraftConnection as struct and expose decoder)
smc, ok := b.serverConn.ensureConnected()
if !ok {
return
}
smc.SetAutoReading(false)
smc.Reader().SetState(state.Config)
b.serverConn.player.switchToConfigState()
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/edition/java/proxy/session_backend_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func (b *backendTransitionSessionHandler) handleJoinGame(pc *proto.PacketContext

// The goods are in hand! We got JoinGame.
// Let's transition completely to the new state.
smc.SetAutoReading(false)
connectedEvent := &ServerConnectedEvent{
player: b.serverConn.player,
server: b.serverConn.server,
Expand Down Expand Up @@ -244,6 +245,9 @@ func (b *backendTransitionSessionHandler) handleJoinGame(pc *proto.PacketContext
}
smc.SetActiveSessionHandler(state.Play, backendPlay)

// Clean up disabling auto-read while the connected event was being processed.
smc.SetAutoReading(true)

// Now set the connected server.
b.serverConn.player.setConnectedServer(b.serverConn)

Expand Down
Loading

0 comments on commit aff012f

Please sign in to comment.