From 853ebf2130d7c5cc1d480c799c324bf409ce7540 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Sun, 9 Jan 2022 17:57:03 +0100 Subject: [PATCH 1/2] WIP --- go.mod | 1 + go.sum | 12 +++- shard_manager.go | 146 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 157 insertions(+), 2 deletions(-) create mode 100644 shard_manager.go diff --git a/go.mod b/go.mod index e096f824..918af096 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/andersfylling/disgord go 1.13 require ( + github.com/andersfylling/discordgateway v0.5.0 github.com/andersfylling/snowflake/v5 v5.0.1 github.com/klauspost/compress v1.13.6 // indirect go.uber.org/atomic v1.9.0 diff --git a/go.sum b/go.sum index 4ee9a680..b8e795eb 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,9 @@ +github.com/andersfylling/discordgateway v0.5.0 h1:qA7XrY5hxbXlkRVIatCAgNC4+LzSMTGVKSs7K0ntAn0= +github.com/andersfylling/discordgateway v0.5.0/go.mod h1:qsA/yRPd60t6jW2A28xH5SpxyxxFLnYJ23IOBpEfeWg= github.com/andersfylling/snowflake/v5 v5.0.1 h1:unXbYSij6tRCGJzoLz9zl3nJsqd9hu7bbYSgB8K8/i0= github.com/andersfylling/snowflake/v5 v5.0.1/go.mod h1:AdhrB+kewjnQInv8cR7ABe2SGoVXh79njnipUnz1HFc= +github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 h1:GKTyiRCL6zVf5wWaqKnf+7Qs6GbEPfd4iMOitWzXJx8= +github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8/go.mod h1:spo1JLcs67NmW1aVLEgtA8Yy1elc+X8y5SRW1sFW4Og= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -17,10 +21,12 @@ github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD87 github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= -github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= -github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= +github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= +github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= +github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= @@ -64,6 +70,7 @@ github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -86,6 +93,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/shard_manager.go b/shard_manager.go new file mode 100644 index 00000000..a9bc52eb --- /dev/null +++ b/shard_manager.go @@ -0,0 +1,146 @@ +package disgord + +import ( + "context" + "errors" + "fmt" + "github.com/andersfylling/discordgateway" + "github.com/andersfylling/discordgateway/event" + "github.com/andersfylling/discordgateway/gatewayshard" + "github.com/andersfylling/discordgateway/intent" + discordgatewaylog "github.com/andersfylling/discordgateway/log" + "github.com/andersfylling/disgord/internal/constant" + "github.com/andersfylling/disgord/internal/logger" + "golang.org/x/sync/errgroup" + "log" + "os" + "runtime" + "strings" + "sync" +) + +type ShardManager interface { + Connect() error + Disconnect() + SendCommand(cmd GatewayCommand) error +} + +type GatewayCommand interface { + GuildID() Snowflake + OperationCode() uint +} + +func runShard(shard *gatewayshard.Shard, workChan chan<- *gatewayshard.Shard) error { + websocketUrl := fmt.Sprintf("wss://gateway.discord.gg/?v=%d&encoding=%s", constant.DiscordVersion, strings.ToLower(constant.Encoding)) + if _, err := shard.Dial(context.Background(), websocketUrl); err != nil { + return fmt.Errorf("failed to open websocket connection. %w", err) + } + + // process websocket messages as they arrive and trigger the handler whenever relevant + if err := shard.EventLoop(context.Background()); err != nil { + reconnect := true + + var discordErr *discordgateway.DiscordError + if errors.As(err, &discordErr) { + reconnect = discordErr.CanReconnect() + } + + if reconnect { + log.Info(fmt.Errorf("reconnecting: %w", err)) + if err := shard.PrepareForReconnect(); err != nil { + return fmt.Errorf("failed to prepare for reconnect: %w", err) + } + workChan <- shard + return nil + } + + return err + } + + return errors.New("unexpected error from shards event loop, no error was returned") +} + +type BasicShardManager struct { + ShardIDs []uint + ShardCount uint + Intents Intent + BotToken string + Log Logger + + setupOnce sync.Once + shards []*gatewayshard.Shard +} + +func (sm *BasicShardManager) setup() error { + discordgatewaylog.LogInstance = sm.Log + + id := &discordgateway.IdentifyConnectionProperties{ + OS: runtime.GOOS, + Browser: "github.com/andersfylling/discordgateway v0", + Device: "github.com/andersfylling/disgord v0", + } + intents := intent.Type(sm.Intents) + + for i := range sm.ShardIDs { + shardID := discordgateway.ShardID(sm.ShardIDs[i]) + shard, err := gatewayshard.NewShard(shardID, sm.BotToken, nil, + discordgateway.WithIntents(intents), + discordgateway.WithIdentifyConnectionProperties(id), + ) + if err != nil { + log.Fatal(err) + } + + sm.shards = append(sm.shards, shard) + } + + return nil +} + +func (sm *BasicShardManager) Connect(ctx context.Context) error{ + for { + err := sm.connect(ctx) + if err + } + +} + +func (sm *BasicShardManager) connect(parentCtx context.Context) (err error) { + sm.setupOnce.Do(func () { + err = sm.setup() + }) + if err != nil { + return fmt.Errorf("failed to setup shard manager: %w", err) + } + + g, ctx := errgroup.WithContext(parentCtx) + workChan := make(chan *gatewayshard.Shard, len(sm.shards)) + for _, shard := range sm.shards { + workChan <- shard + g.Go(func() error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case shard, ok := <-workChan: + if !ok { + return errors.New("work channel for shard manager unexpectedly closed") + } + if err := RunShard(shard, workChan); err != nil { + return err + } + } + } + }) + } + + return g.Wait() +} + +func (sm *BasicShardManager) Disconnect() error { + +} + +func (sm *BasicShardManager) SendCommand(cmd GatewayCommand) error { + +} \ No newline at end of file From 84d615eeb5b91d64702bca46e5769a68a1de2b98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 10 Jan 2022 01:40:14 +0100 Subject: [PATCH 2/2] feat: add auto scaling of shards --- shard_manager.go | 134 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 113 insertions(+), 21 deletions(-) diff --git a/shard_manager.go b/shard_manager.go index a9bc52eb..079e0f5e 100644 --- a/shard_manager.go +++ b/shard_manager.go @@ -5,12 +5,17 @@ import ( "errors" "fmt" "github.com/andersfylling/discordgateway" + "github.com/andersfylling/discordgateway/closecode" + "github.com/andersfylling/discordgateway/command" "github.com/andersfylling/discordgateway/event" "github.com/andersfylling/discordgateway/gatewayshard" "github.com/andersfylling/discordgateway/intent" discordgatewaylog "github.com/andersfylling/discordgateway/log" + "github.com/andersfylling/discordgateway/opcode" "github.com/andersfylling/disgord/internal/constant" + "github.com/andersfylling/disgord/internal/gateway" "github.com/andersfylling/disgord/internal/logger" + "github.com/andersfylling/disgord/json" "golang.org/x/sync/errgroup" "log" "os" @@ -20,14 +25,15 @@ import ( ) type ShardManager interface { - Connect() error + Connect(ctx context.Context) error Disconnect() - SendCommand(cmd GatewayCommand) error + SendCommand(ctx context.Context, cmd GatewayCommand) error } type GatewayCommand interface { GuildID() Snowflake OperationCode() uint + CommandCode() uint } func runShard(shard *gatewayshard.Shard, workChan chan<- *gatewayshard.Shard) error { @@ -61,34 +67,70 @@ func runShard(shard *gatewayshard.Shard, workChan chan<- *gatewayshard.Shard) er } type BasicShardManager struct { - ShardIDs []uint + ShardIDs []uint ShardCount uint - Intents Intent - BotToken string - Log Logger + Intents Intent + BotToken string + Log Logger + Client Session + + shardingConfiguredByUser bool + setupOnce sync.Once + shards []*gatewayshard.Shard + identityProperties *discordgateway.IdentifyConnectionProperties +} + +func (sm *BasicShardManager) setupShardDetails() error { + gatewayBotInfo, err := sm.getGatewayBotInfo() + if err != nil { + return err + } - setupOnce sync.Once - shards []*gatewayshard.Shard + for i := 0; i < int(gatewayBotInfo.Shards); i++ { + sm.ShardIDs = append(sm.ShardIDs, uint(i)) + } + sm.ShardCount = gatewayBotInfo.Shards + // TODO: rate limits + return nil } func (sm *BasicShardManager) setup() error { discordgatewaylog.LogInstance = sm.Log - id := &discordgateway.IdentifyConnectionProperties{ + if int(sm.ShardCount) < len(sm.ShardIDs) { + return errors.New("shard count is less than the number of specified shard ids") + } + if sm.ShardCount > 0 && len(sm.ShardIDs) == 0 { + return errors.New("shard ids must be specified when using setting shard count") + } + if sm.ShardIDs != nil { + sm.shardingConfiguredByUser = true + } else { + if err := sm.setupShardDetails(); err != nil { + return err + } + } + + sm.identityProperties = &discordgateway.IdentifyConnectionProperties{ OS: runtime.GOOS, Browser: "github.com/andersfylling/discordgateway v0", Device: "github.com/andersfylling/disgord v0", } - intents := intent.Type(sm.Intents) + return sm.setupShards() +} + +func (sm *BasicShardManager) setupShards() error { + intents := intent.Type(sm.Intents) for i := range sm.ShardIDs { shardID := discordgateway.ShardID(sm.ShardIDs[i]) shard, err := gatewayshard.NewShard(shardID, sm.BotToken, nil, discordgateway.WithIntents(intents), - discordgateway.WithIdentifyConnectionProperties(id), + discordgateway.WithIdentifyConnectionProperties(sm.identityProperties), + discordgateway.WithShardCount(sm.ShardCount), ) if err != nil { - log.Fatal(err) + return err } sm.shards = append(sm.shards, shard) @@ -97,16 +139,36 @@ func (sm *BasicShardManager) setup() error { return nil } -func (sm *BasicShardManager) Connect(ctx context.Context) error{ +func (sm *BasicShardManager) getGatewayBotInfo() (*gateway.GatewayBot, error) { + return sm.Client.Gateway().GetBot() +} + +func (sm *BasicShardManager) Connect(ctx context.Context) error { for { err := sm.connect(ctx) - if err - } + var discordErr *discordgateway.DiscordError + if errors.As(err, &discordErr) && discordErr.CloseCode == closecode.ShardingRequired { + // figure out if the sharding information was specified by user or discord + if sm.shardingConfiguredByUser { + // user has to fix this + return err + } + // otherwise we automatically increment the number of shards + if err = sm.setupShardDetails(); err != nil { + return err + } + if err = sm.setupShards(); err != nil { + return err + } + } else { + return err + } + } } func (sm *BasicShardManager) connect(parentCtx context.Context) (err error) { - sm.setupOnce.Do(func () { + sm.setupOnce.Do(func() { err = sm.setup() }) if err != nil { @@ -126,7 +188,7 @@ func (sm *BasicShardManager) connect(parentCtx context.Context) (err error) { if !ok { return errors.New("work channel for shard manager unexpectedly closed") } - if err := RunShard(shard, workChan); err != nil { + if err := runShard(shard, workChan); err != nil { return err } } @@ -137,10 +199,40 @@ func (sm *BasicShardManager) connect(parentCtx context.Context) (err error) { return g.Wait() } -func (sm *BasicShardManager) Disconnect() error { - +func (sm *BasicShardManager) Disconnect() { + for i := range sm.shards { + _ = sm.shards[i].Close() + } } -func (sm *BasicShardManager) SendCommand(cmd GatewayCommand) error { +func (sm *BasicShardManager) SendCommand(parentCtx context.Context, cmd GatewayCommand) error { + data, err := json.Marshal(cmd) + if err != nil { + return err + } + + if !cmd.GuildID().IsZero() { + shardID := discordgateway.DeriveShardID(uint64(cmd.GuildID()), uint(len(sm.shards))) + shard := sm.shards[shardID] + return shard.Write(command.Type(cmd.CommandCode()), data) + } -} \ No newline at end of file + g, ctx := errgroup.WithContext(parentCtx) + workChan := make(chan *gatewayshard.Shard, len(sm.shards)) + for i := range sm.shards { + workChan <- sm.shards[i] + g.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + case shard, ok := <-workChan: + if !ok { + return errors.New("work chan for sending command suddenly closed") + } + return shard.Write(command.Type(cmd.OperationCode()), data) + } + }) + } + + return g.Wait() +}