Skip to content
This repository has been archived by the owner on Jul 11, 2024. It is now read-only.

feat!: use discordgateway for shard implementation #466

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/andersfylling/disgord
go 1.13

require (
github.com/andersfylling/discordgateway v0.5.0
github.com/andersfylling/snowflake/v5 v5.0.1
github.com/klauspost/compress v1.15.1 // indirect
go.uber.org/atomic v1.9.0
Expand Down
12 changes: 10 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
github.com/andersfylling/discordgateway v0.5.0 h1:qA7XrY5hxbXlkRVIatCAgNC4+LzSMTGVKSs7K0ntAn0=
github.com/andersfylling/discordgateway v0.5.0/go.mod h1:qsA/yRPd60t6jW2A28xH5SpxyxxFLnYJ23IOBpEfeWg=
github.com/andersfylling/snowflake/v5 v5.0.1 h1:unXbYSij6tRCGJzoLz9zl3nJsqd9hu7bbYSgB8K8/i0=
github.com/andersfylling/snowflake/v5 v5.0.1/go.mod h1:AdhrB+kewjnQInv8cR7ABe2SGoVXh79njnipUnz1HFc=
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 h1:GKTyiRCL6zVf5wWaqKnf+7Qs6GbEPfd4iMOitWzXJx8=
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8/go.mod h1:spo1JLcs67NmW1aVLEgtA8Yy1elc+X8y5SRW1sFW4Og=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -17,10 +21,12 @@ github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD87
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY=
github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
Expand Down Expand Up @@ -64,6 +70,7 @@ github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand All @@ -86,6 +93,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
238 changes: 238 additions & 0 deletions shard_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package disgord

import (
"context"
"errors"
"fmt"
"github.com/andersfylling/discordgateway"
"github.com/andersfylling/discordgateway/closecode"
"github.com/andersfylling/discordgateway/command"
"github.com/andersfylling/discordgateway/event"
"github.com/andersfylling/discordgateway/gatewayshard"
"github.com/andersfylling/discordgateway/intent"
discordgatewaylog "github.com/andersfylling/discordgateway/log"
"github.com/andersfylling/discordgateway/opcode"
"github.com/andersfylling/disgord/internal/constant"
"github.com/andersfylling/disgord/internal/gateway"
"github.com/andersfylling/disgord/internal/logger"
"github.com/andersfylling/disgord/json"
"golang.org/x/sync/errgroup"
"log"
"os"
"runtime"
"strings"
"sync"
)

type ShardManager interface {
Connect(ctx context.Context) error
Disconnect()
SendCommand(ctx context.Context, cmd GatewayCommand) error
}

type GatewayCommand interface {
GuildID() Snowflake
OperationCode() uint
CommandCode() uint
}

func runShard(shard *gatewayshard.Shard, workChan chan<- *gatewayshard.Shard) error {
websocketUrl := fmt.Sprintf("wss://gateway.discord.gg/?v=%d&encoding=%s", constant.DiscordVersion, strings.ToLower(constant.Encoding))
if _, err := shard.Dial(context.Background(), websocketUrl); err != nil {
return fmt.Errorf("failed to open websocket connection. %w", err)
}

// process websocket messages as they arrive and trigger the handler whenever relevant
if err := shard.EventLoop(context.Background()); err != nil {
reconnect := true

var discordErr *discordgateway.DiscordError
if errors.As(err, &discordErr) {
reconnect = discordErr.CanReconnect()
}

if reconnect {
log.Info(fmt.Errorf("reconnecting: %w", err))
if err := shard.PrepareForReconnect(); err != nil {
return fmt.Errorf("failed to prepare for reconnect: %w", err)
}
workChan <- shard
return nil
}

return err
}

return errors.New("unexpected error from shards event loop, no error was returned")
}

type BasicShardManager struct {
ShardIDs []uint
ShardCount uint
Intents Intent
BotToken string
Log Logger
Client Session

shardingConfiguredByUser bool
setupOnce sync.Once
shards []*gatewayshard.Shard
identityProperties *discordgateway.IdentifyConnectionProperties
}

func (sm *BasicShardManager) setupShardDetails() error {
gatewayBotInfo, err := sm.getGatewayBotInfo()
if err != nil {
return err
}

for i := 0; i < int(gatewayBotInfo.Shards); i++ {
sm.ShardIDs = append(sm.ShardIDs, uint(i))
}
sm.ShardCount = gatewayBotInfo.Shards
// TODO: rate limits
return nil
}

func (sm *BasicShardManager) setup() error {
discordgatewaylog.LogInstance = sm.Log

if int(sm.ShardCount) < len(sm.ShardIDs) {
return errors.New("shard count is less than the number of specified shard ids")
}
if sm.ShardCount > 0 && len(sm.ShardIDs) == 0 {
return errors.New("shard ids must be specified when using setting shard count")
}
if sm.ShardIDs != nil {
sm.shardingConfiguredByUser = true
} else {
if err := sm.setupShardDetails(); err != nil {
return err
}
}

sm.identityProperties = &discordgateway.IdentifyConnectionProperties{
OS: runtime.GOOS,
Browser: "github.com/andersfylling/discordgateway v0",
Device: "github.com/andersfylling/disgord v0",
}

return sm.setupShards()
}

func (sm *BasicShardManager) setupShards() error {
intents := intent.Type(sm.Intents)
for i := range sm.ShardIDs {
shardID := discordgateway.ShardID(sm.ShardIDs[i])
shard, err := gatewayshard.NewShard(shardID, sm.BotToken, nil,
discordgateway.WithIntents(intents),
discordgateway.WithIdentifyConnectionProperties(sm.identityProperties),
discordgateway.WithShardCount(sm.ShardCount),
)
if err != nil {
return err
}

sm.shards = append(sm.shards, shard)
}

return nil
}

func (sm *BasicShardManager) getGatewayBotInfo() (*gateway.GatewayBot, error) {
return sm.Client.Gateway().GetBot()
}

func (sm *BasicShardManager) Connect(ctx context.Context) error {
for {
err := sm.connect(ctx)
var discordErr *discordgateway.DiscordError
if errors.As(err, &discordErr) && discordErr.CloseCode == closecode.ShardingRequired {
// figure out if the sharding information was specified by user or discord
if sm.shardingConfiguredByUser {
// user has to fix this
return err
}

// otherwise we automatically increment the number of shards
if err = sm.setupShardDetails(); err != nil {
return err
}
if err = sm.setupShards(); err != nil {
return err
}
} else {
return err
}
}
}

func (sm *BasicShardManager) connect(parentCtx context.Context) (err error) {
sm.setupOnce.Do(func() {
err = sm.setup()
})
if err != nil {
return fmt.Errorf("failed to setup shard manager: %w", err)
}

g, ctx := errgroup.WithContext(parentCtx)
workChan := make(chan *gatewayshard.Shard, len(sm.shards))
for _, shard := range sm.shards {
workChan <- shard
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case shard, ok := <-workChan:
if !ok {
return errors.New("work channel for shard manager unexpectedly closed")
}
if err := runShard(shard, workChan); err != nil {
return err
}
}
}
})
}

return g.Wait()
}

func (sm *BasicShardManager) Disconnect() {
for i := range sm.shards {
_ = sm.shards[i].Close()
}
}

func (sm *BasicShardManager) SendCommand(parentCtx context.Context, cmd GatewayCommand) error {
data, err := json.Marshal(cmd)
if err != nil {
return err
}

if !cmd.GuildID().IsZero() {
shardID := discordgateway.DeriveShardID(uint64(cmd.GuildID()), uint(len(sm.shards)))
shard := sm.shards[shardID]
return shard.Write(command.Type(cmd.CommandCode()), data)
}

g, ctx := errgroup.WithContext(parentCtx)
workChan := make(chan *gatewayshard.Shard, len(sm.shards))
for i := range sm.shards {
workChan <- sm.shards[i]
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
case shard, ok := <-workChan:
if !ok {
return errors.New("work chan for sending command suddenly closed")
}
return shard.Write(command.Type(cmd.OperationCode()), data)
}
})
}

return g.Wait()
}