Skip to content

Commit

Permalink
feat(transport): implement kcp core functionality
Browse files Browse the repository at this point in the history
Launch and Shutdown methods for kcpCore struct have been implemented,
providing the core functionality for KCP transport. This includes setting
up a KCP listener, accepting sessions, and handling data read from the
sessions. Additionally, context management has been added for graceful
shutdowns.

The TypedActorRef has been enhanced to support type-safe interactions with
KCP sessions, facilitating the registration and data processing of KCP
connections within the vivid actor framework.
  • Loading branch information
kercylan98 committed Jun 15, 2024
1 parent 875f881 commit b47311b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 80 deletions.
126 changes: 46 additions & 80 deletions minotaur/transport/network/kcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,96 +31,62 @@ func newKcpCore(addr string) transport.Network {
type kcpCore struct {
addr string
closed atomic.Bool
ctx context.Context
cancel context.CancelFunc
}

func (k *kcpCore) Launch(ctx context.Context, srv vivid.TypedActorRef[transport.ServerActorExpandTyped]) error {
//TODO implement me
panic("implement me")
k.ctx, k.cancel = context.WithCancel(ctx)
lis, err := kcp.ListenWithOptions(k.addr, nil, 0, 0)
if err != nil {
return err
}
defer func(lis *kcp.Listener) {
_ = lis.Close()
}(lis)
for !k.closed.Load() {
var session *kcp.UDPSession
if session, err = lis.AcceptKCP(); err != nil {
continue
}

// 注册连接
conn := srv.Api().Attach(session, func(packet transport.Packet) error {
_, err = session.Write(packet.GetBytes())
return err
})

// 处理连接数据
go func(ctx context.Context, session *kcp.UDPSession, conn vivid.TypedActorRef[transport.ConnActorExpandTyped]) {
var buf = make([]byte, 1024)
var n int
for {
select {
case <-ctx.Done():
return
default:
if n, err = session.Read(buf); err != nil {
conn.Stop()
return
}
conn.Api().React(transport.NewPacket(buf[:n]))
}
}
}(k.ctx, session, conn)
}
return nil
}

func (k *kcpCore) Shutdown() error {
//TODO implement me
panic("implement me")
k.closed.Store(true)
k.cancel()
return nil
}

func (k *kcpCore) Schema() string {
//TODO implement me
panic("implement me")
return "udp(kcp)"
}

func (k *kcpCore) Address() string {
//TODO implement me
panic("implement me")
return k.addr
}

//
//func (k *kcpCore) OnPreStart(ctx vivids.ActorContext) (err error) {
// k.ctx = ctx
// ctx.Future(func() vivids.Message {
// lis, err := kcp.ListenWithOptions(k.addr, nil, 0, 0)
// if err != nil {
// return err
// }
// defer func(lis *kcp.Listener) {
// _ = lis.Close()
// }(lis)
// for !k.closed.Load() {
// var conn *kcp.UDPSession
// var srvConn server.Conn
// if conn, err = lis.AcceptKCP(); err != nil {
// continue
// }
//
// // 注册连接
// srvConn, err = k.ctx.GetParentActor().Ask(server.ServerNetworkConnectionOpenedEvent{
// Conn: conn,
// ConnectionWriter: func(packet server.Packet) (err error) {
// if _, err = conn.Write(packet.GetBytes()); err != nil {
// return k.ctx.GetParentActor().Tell(server.NetworkConnectionAsyncWriteErrorMessage{
// Conn: srvConn,
// Packet: packet,
// Error: err,
// })
// }
// return
// },
// })
//
// // 处理连接数据
// go func(ctx context.Context, conn *kcp.UDPSession, srvConn server.Conn) {
// var buf = make([]byte, 1024)
// var n int
// for {
// select {
// case <-ctx.Done():
// return
// default:
// if n, err = conn.Read(buf); err != nil {
// srvConn.Close()
// return
// }
// k.controller.ReactPacket(conn, server.NewPacket(buf[:n]))
// }
// }
// }(ctx, conn, srvConn)
// }
//
// return nil
// })
//
// return
//}
//
//func (k *kcpCore) OnReceived(ctx vivids.MessageContext) (err error) {
// switch v := ctx.GetMessage().(type) {
// case error:
// ctx.NotifyTerminated(v)
// }
//
// return
//}
//
//func (k *kcpCore) OnDestroy(ctx vivids.ActorContext) (err error) {
// k.closed.Store(true)
// return nil
//}
3 changes: 3 additions & 0 deletions minotaur/vivid/typed.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ import (
"reflect"
)

// TypedActorRef 是一个类型化的 ActorRef,可以通过 Api 方法获取其类型化的协议
type TypedActorRef[Protocol any] interface {
ActorRef
Api() Protocol
}

// typedWrapper 用于实现 TypedActorRef 接口的包装器
type typedWrapper[Protocol any] struct {
ActorRef
protocol Protocol
}

// Api 获取 ActorRef 的类型化协议
func (m *typedWrapper[Protocol]) Api() Protocol {
return m.protocol
}
Expand Down

0 comments on commit b47311b

Please sign in to comment.