diff --git a/go.mod b/go.mod index fcf5f2e3..c67ff138 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21.0 require ( github.com/agext/levenshtein v1.2.3 github.com/dboslee/lru v0.0.1 + github.com/edwingeng/deque/v2 v2.1.1 github.com/gammazero/deque v0.2.1 github.com/go-faker/faker/v4 v4.1.1 github.com/go-logr/logr v1.2.4 diff --git a/go.sum b/go.sum index b31a7441..dd12dd28 100644 --- a/go.sum +++ b/go.sum @@ -75,6 +75,8 @@ github.com/dboslee/lru v0.0.1/go.mod h1:vDIFJHUqr1vdYKAdG9x3r+zFWP0i9uJqQWpB6nSu github.com/df-mc/atomic v1.10.0 h1:0ZuxBKwR/hxcFGorKiHIp+hY7hgY+XBTzhCYD2NqSEg= github.com/df-mc/atomic v1.10.0/go.mod h1:Gw9rf+rPIbydMjA329Jn4yjd/O2c/qusw3iNp4tFGSc= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/edwingeng/deque/v2 v2.1.1 h1:+xjC3TnaeMPLZMi7QQf9jN2K00MZmTwruApqplbL9IY= +github.com/edwingeng/deque/v2 v2.1.1/go.mod h1:HukI8CQe9KDmZCcURPZRYVYjH79Zy2tIjTF9sN3Bgb0= github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= diff --git a/pkg/edition/java/netmc/connection.go b/pkg/edition/java/netmc/connection.go index 2a104738..23e04e58 100644 --- a/pkg/edition/java/netmc/connection.go +++ b/pkg/edition/java/netmc/connection.go @@ -63,8 +63,14 @@ type MinecraftConn interface { // TODO convert to exported struct as this interf // when calling SwitchSessionHandler on the same registry. AddSessionHandler(*state.Registry, SessionHandler) + // SetAutoReading sets whether the connection should automatically read packets from the underlying connection. + // Default is true. + SetAutoReading(bool) + StateChanger PacketWriter + + Reader() Reader // Only use if you know what you are doing! } // Closed returns true if the connection is closed. @@ -140,16 +146,17 @@ func NewMinecraftConn( ctx, cancel := context.WithCancel(ctx) c := &minecraftConn{ - log: log, - c: base, - ctx: ctx, - cancelCtx: cancel, - rd: NewReader(base, in, readTimeout, log), - wr: NewWriter(base, out, writeTimeout, compressionLevel, log), - state: state.Handshake, - protocol: version.Minecraft_1_7_2.Protocol, - connType: phase.Undetermined, - direction: direction, + log: log, + c: base, + ctx: ctx, + cancelCtx: cancel, + rd: NewReader(base, in, readTimeout, log), + wr: NewWriter(base, out, writeTimeout, compressionLevel, log), + state: state.Handshake, + protocol: version.Minecraft_1_7_2.Protocol, + connType: phase.Undetermined, + direction: direction, + autoReading: newStateControl(true), } c.sessionHandlerMu.sessionHandlers = make(map[*state.Registry]SessionHandler) return c, c.startReadLoop @@ -165,6 +172,8 @@ type minecraftConn struct { rd Reader wr Writer + autoReading *stateControl // Whether the connection should automatically read packets from the underlying connection. + ctx context.Context // is canceled when connection closed cancelCtx context.CancelFunc closeOnce sync.Once // Makes sure the connection is closed once, while blocking proceeding calls. @@ -192,6 +201,9 @@ func (c *minecraftConn) startReadLoop() { defer func() { _ = c.closeKnown(false) }() next := func() bool { + // Wait until auto reading is enabled, if not already + c.autoReading.Wait() + // Read next packet from underlying connection. packetCtx, err := c.rd.ReadPacket() if err != nil { @@ -234,6 +246,13 @@ func (c *minecraftConn) startReadLoop() { } } +func (c *minecraftConn) Reader() Reader { return c.rd } + +func (c *minecraftConn) SetAutoReading(enabled bool) { + c.log.V(1).Info("update auto reading", "enabled", enabled) + c.autoReading.SetState(enabled) +} + func (c *minecraftConn) Context() context.Context { return c.ctx } func (c *minecraftConn) Flush() error { @@ -266,6 +285,15 @@ func (c *minecraftConn) Write(payload []byte) (err error) { } func (c *minecraftConn) BufferPacket(packet proto.Packet) (err error) { + return c.bufferPacket(packet, true) +} + +// bufferNoQueue is a helper func to buffer a packet without queuing it. +func (c *minecraftConn) bufferNoQueue(packet proto.Packet) error { + return c.bufferPacket(packet, false) +} + +func (c *minecraftConn) bufferPacket(packet proto.Packet, queue bool) (err error) { if Closed(c) { return ErrClosedConn } @@ -274,8 +302,9 @@ func (c *minecraftConn) BufferPacket(packet proto.Packet) (err error) { c.closeOnWriteErr(err, "bufferPacket", fmt.Sprintf("%T", packet)) } }() - if c.playPacketQueue.Queue(packet) { + if queue && c.playPacketQueue.Queue(packet) { // Packet was queued, don't write it now + c.log.V(1).Info("queued packet", "packet", fmt.Sprintf("%T", packet)) return nil } _, err = c.wr.WritePacket(packet) @@ -324,6 +353,8 @@ var ErrClosedConn = errors.New("connection is closed") func (c *minecraftConn) closeKnown(markKnown bool) (err error) { alreadyClosed := true c.closeOnce.Do(func() { + defer c.SetAutoReading(true) // free the read loop in case auto reading is disabled + alreadyClosed = false if markKnown { c.knownDisconnect.Store(true) @@ -433,7 +464,9 @@ func (c *minecraftConn) SetState(s *state.Registry) { c.mu.Unlock() - c.log.V(1).Info("update state", "previous", prevState, "new", s) + if prevState != s { + c.log.V(1).Info("update state", "previous", prevState, "new", s) + } } // ensurePlayPacketQueue ensures the play packet queue is activated or deactivated @@ -449,7 +482,7 @@ func (c *minecraftConn) ensurePlayPacketQueue(newState state.State) { // Remove the play packet queue if it exists if c.playPacketQueue != nil { - if err := c.playPacketQueue.ReleaseQueue(c); err != nil { + if err := c.playPacketQueue.ReleaseQueue(c.bufferNoQueue, c.Flush); err != nil { c.log.Error(err, "error releasing play packet queue") } c.playPacketQueue = nil @@ -474,42 +507,47 @@ func (c *minecraftConn) ActiveSessionHandler() SessionHandler { return c.sessionHandlerMu.activeSessionHandler } -func (c *minecraftConn) SetActiveSessionHandler(registry *state.Registry, handler SessionHandler) { +func (c *minecraftConn) AddSessionHandler(registry *state.Registry, handler SessionHandler) { if registry == nil { panic("registry must not be nil") } + if handler == nil { + panic("handler must not be nil") + } c.sessionHandlerMu.Lock() defer c.sessionHandlerMu.Unlock() - if c.sessionHandlerMu.activeSessionHandler != nil { - c.sessionHandlerMu.activeSessionHandler.Deactivated() + if registry == c.State() { + // Handler would overwrite the current handler + c.log.Info("AddSessionHandler: session handler already exists for state", "state", registry.String()) + return } c.sessionHandlerMu.sessionHandlers[registry] = handler - c.sessionHandlerMu.activeSessionHandler = handler - c.SetState(registry) - handler.Activated() + c.log.V(1).WithName("AddSessionHandler"). + Info("added session handler", "state", registry.String(), "handler", fmt.Sprintf("%T", handler)) } -func (c *minecraftConn) AddSessionHandler(registry *state.Registry, handler SessionHandler) { +func (c *minecraftConn) SetActiveSessionHandler(registry *state.Registry, handler SessionHandler) { if registry == nil { panic("registry must not be nil") } - if handler == nil { - panic("handler must not be nil") - } c.sessionHandlerMu.Lock() defer c.sessionHandlerMu.Unlock() - if registry == c.State() { - // Handler would overwrite the current handler - c.log.Info("AddSessionHandler: session handler already exists for state", "state", registry.String()) - return + if c.sessionHandlerMu.activeSessionHandler != nil { + c.sessionHandlerMu.activeSessionHandler.Deactivated() } c.sessionHandlerMu.sessionHandlers[registry] = handler + c.sessionHandlerMu.activeSessionHandler = handler + c.SetState(registry) + handler.Activated() + + c.log.V(1).WithName("SetActiveSessionHandler"). + Info("set session handler", "state", registry.String(), "handler", fmt.Sprintf("%T", handler)) } func (c *minecraftConn) SwitchSessionHandler(registry *state.Registry) bool { @@ -526,7 +564,10 @@ func (c *minecraftConn) SwitchSessionHandler(registry *state.Registry) bool { } if c.sessionHandlerMu.activeSessionHandler == handler { + c.SetState(registry) + // The handler is already active, no need to switch + c.log.V(1).WithName("SwitchSessionHandler").Info("session handler already active, no need to switch", "state", registry.String(), "handler", fmt.Sprintf("%T", handler)) return true } @@ -538,6 +579,9 @@ func (c *minecraftConn) SwitchSessionHandler(registry *state.Registry) bool { c.SetState(registry) handler.Activated() + c.log.V(1).WithName("SwitchSessionHandler"). + Info("switched session handler", "state", registry.String(), "handler", fmt.Sprintf("%T", handler)) + return true } diff --git a/pkg/edition/java/netmc/state_control.go b/pkg/edition/java/netmc/state_control.go new file mode 100644 index 00000000..5aec9091 --- /dev/null +++ b/pkg/edition/java/netmc/state_control.go @@ -0,0 +1,45 @@ +package netmc + +import "sync" + +// stateControl struct holds the state and a condition variable. +type stateControl struct { + state bool // state represents the condition to be met. + cond *sync.Cond // cond is a condition variable associated with a mutex. +} + +// newStateControl function initializes a new stateControl with the initial state and a condition variable. +func newStateControl(initial bool) *stateControl { + sc := &stateControl{ + state: initial, + cond: sync.NewCond(&sync.Mutex{}), // Initialize a new condition variable with an associated mutex. + } + return sc +} + +// State method returns the current state. +// It locks the mutex before reading the state to ensure thread-safety. +func (s *stateControl) State() bool { + s.cond.L.Lock() // Lock the mutex before reading the state. + defer s.cond.L.Unlock() // Unlock the mutex after reading the state. + return s.state +} + +// Wait method waits for the state to become true. +// It locks the mutex to ensure that the condition check and the wait operation are atomic. +func (s *stateControl) Wait() { + s.cond.L.Lock() // Lock the mutex before checking the condition. + for !s.state { // If the state is not true, wait for it to become true. + s.cond.Wait() // Wait releases the lock, suspends the goroutine, and reacquires the lock when it wakes up. + } + s.cond.L.Unlock() // Unlock the mutex after the condition is met. +} + +// SetState method sets the state and signals all waiting goroutines. +// It locks the mutex to ensure that the state change and the signal operation are atomic. +func (s *stateControl) SetState(state bool) { + s.cond.L.Lock() // Lock the mutex before changing the state. + s.state = state // Change the state. + s.cond.Broadcast() // Signal all waiting goroutines that the condition might be met. + s.cond.L.Unlock() // Unlock the mutex after changing the state and signaling the goroutines. +} diff --git a/pkg/edition/java/proto/util/queue/packet_queue.go b/pkg/edition/java/proto/util/queue/packet_queue.go index 72168efb..3f7ab1b8 100644 --- a/pkg/edition/java/proto/util/queue/packet_queue.go +++ b/pkg/edition/java/proto/util/queue/packet_queue.go @@ -1,7 +1,7 @@ package queue import ( - "github.com/gammazero/deque" + "github.com/edwingeng/deque/v2" "go.minekube.com/gate/pkg/edition/java/proto/state" "go.minekube.com/gate/pkg/gate/proto" ) @@ -24,7 +24,7 @@ type PlayPacketQueue struct { func NewPlayPacketQueue(version proto.Protocol, direction proto.Direction) *PlayPacketQueue { return &PlayPacketQueue{ registry: state.FromDirection(direction, state.Config, version), - queue: deque.New[proto.Packet](), + queue: deque.NewDeque[proto.Packet](), protocolVersion: version, } } @@ -43,28 +43,25 @@ func (h *PlayPacketQueue) Queue(packet proto.Packet) bool { return false } -// PacketBuffer is a packet buffer that can flush packets to an underlying packet writer. -type PacketBuffer interface { - BufferPacket(proto.Packet) error - Flush() error -} - // ReleaseQueue releases all packets in the queue to the sink packet writer. // It iterates over the queue, buffering each packet and flushing the sink. -func (h *PlayPacketQueue) ReleaseQueue(sink PacketBuffer) error { +func (h *PlayPacketQueue) ReleaseQueue( + buffer func(proto.Packet) error, + flush func() error, +) error { if h == nil { return nil } var ok bool - for h.queue.Len() > 0 { + for h.queue.Len() != 0 { packet := h.queue.PopFront() - if err := sink.BufferPacket(packet); err != nil { + if err := buffer(packet); err != nil { return err } ok = true } if ok { - return sink.Flush() + return flush() } return nil } diff --git a/pkg/edition/java/proxy/session_backend_config.go b/pkg/edition/java/proxy/session_backend_config.go index 1825b1d6..264e8723 100644 --- a/pkg/edition/java/proxy/session_backend_config.go +++ b/pkg/edition/java/proxy/session_backend_config.go @@ -129,8 +129,12 @@ func (b *backendConfigSessionHandler) handleFinishedUpdate(p *config.FinishedUpd player := b.serverConn.player configHandler := b.playerConfigSessionHandler - smc.SetState(state.Play) + smc.SetAutoReading(false) + // Even when not auto reading messages are still decoded. Decode them with the correct state + smc.Reader().SetState(state.Play) configHandler.handleBackendFinishUpdate(b.serverConn, p, func() { + defer smc.SetAutoReading(true) + if b.serverConn == player.connectedServer() { if !smc.SwitchSessionHandler(state.Play) { err := errors.New("failed to switch session handler") diff --git a/pkg/edition/java/proxy/session_backend_login.go b/pkg/edition/java/proxy/session_backend_login.go index fafff4f5..d1884c04 100644 --- a/pkg/edition/java/proxy/session_backend_login.go +++ b/pkg/edition/java/proxy/session_backend_login.go @@ -345,9 +345,9 @@ func (b *backendLoginSessionHandler) handleServerLoginSuccess() { _ = serverMc.WritePacket(pkt) } if csh, ok := player.MinecraftConn.ActiveSessionHandler().(*clientPlaySessionHandler); ok { - // TODO 1.20.2+ serverMc.SetAutoReading(false) + serverMc.SetAutoReading(false) csh.doSwitch().DoWhenTrue(func() { - // TODO 1.20.2+ serverMc.SetAutoReading(true) + serverMc.SetAutoReading(true) }) } } diff --git a/pkg/edition/java/proxy/session_backend_play.go b/pkg/edition/java/proxy/session_backend_play.go index 7419f12e..5b9469ad 100644 --- a/pkg/edition/java/proxy/session_backend_play.go +++ b/pkg/edition/java/proxy/session_backend_play.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "go.minekube.com/gate/pkg/edition/java/proto/packet/config" + "go.minekube.com/gate/pkg/edition/java/proto/state" "reflect" "regexp" "time" @@ -136,7 +137,12 @@ func (b *backendPlaySessionHandler) handleDisconnect(p *packet.Disconnect) { } func (b *backendPlaySessionHandler) handleStartUpdate(_ *config.StartUpdate) { - // TODO set decoder state to Config (need MinecraftConnection as struct and expose decoder) + smc, ok := b.serverConn.ensureConnected() + if !ok { + return + } + smc.SetAutoReading(false) + smc.Reader().SetState(state.Config) b.serverConn.player.switchToConfigState() } diff --git a/pkg/edition/java/proxy/session_backend_transition.go b/pkg/edition/java/proxy/session_backend_transition.go index c1706925..9a17421d 100644 --- a/pkg/edition/java/proxy/session_backend_transition.go +++ b/pkg/edition/java/proxy/session_backend_transition.go @@ -199,6 +199,7 @@ func (b *backendTransitionSessionHandler) handleJoinGame(pc *proto.PacketContext // The goods are in hand! We got JoinGame. // Let's transition completely to the new state. + smc.SetAutoReading(false) connectedEvent := &ServerConnectedEvent{ player: b.serverConn.player, server: b.serverConn.server, @@ -244,6 +245,9 @@ func (b *backendTransitionSessionHandler) handleJoinGame(pc *proto.PacketContext } smc.SetActiveSessionHandler(state.Play, backendPlay) + // Clean up disabling auto-read while the connected event was being processed. + smc.SetAutoReading(true) + // Now set the connected server. b.serverConn.player.setConnectedServer(b.serverConn) diff --git a/pkg/edition/java/proxy/session_client_play.go b/pkg/edition/java/proxy/session_client_play.go index c045b0fb..5323b91b 100644 --- a/pkg/edition/java/proxy/session_client_play.go +++ b/pkg/edition/java/proxy/session_client_play.go @@ -784,6 +784,7 @@ func (c *clientPlaySessionHandler) handleFinishUpdate(p *config.FinishedUpdate) err := errors.New("failed to switch session handler") c.log.Error(err, "expected to switch session handler to config state") } + smc.SetAutoReading(true) }() } c.configSwitchDone.SetTrue()