Skip to content

Commit

Permalink
Add retry strategy for publish
Browse files Browse the repository at this point in the history
  • Loading branch information
waybackarchiver committed Apr 10, 2022
1 parent 45be284 commit d2b082e
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 47 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go 1.17

require (
github.com/bwmarrin/discordgo v0.23.3-0.20210627161652-421e14965030
github.com/cenkalti/backoff/v4 v4.1.2
github.com/cretz/bine v0.2.0
github.com/davecgh/go-spew v1.1.1
github.com/dghubble/go-twitter v0.0.0-20201011215211-4b180d0cc78d
Expand Down Expand Up @@ -55,7 +56,6 @@ require (
github.com/bitly/go-simplejson v0.5.0 // indirect
github.com/btcsuite/btcd v0.22.0-beta // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cheggaaa/pb/v3 v3.0.8 // indirect
github.com/chromedp/cdproto v0.0.0-20220217222649-d8c14a5c6edf // indirect
Expand Down
12 changes: 7 additions & 5 deletions publish/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/wabarc/logger"
"github.com/wabarc/wayback"
"github.com/wabarc/wayback/config"
"github.com/wabarc/wayback/errors"
"github.com/wabarc/wayback/metrics"
"github.com/wabarc/wayback/reduxer"
"github.com/wabarc/wayback/service"
Expand All @@ -18,6 +19,8 @@ import (
discord "github.com/bwmarrin/discordgo"
)

var _ Publisher = (*discordBot)(nil)

type discordBot struct {
bot *discord.Session
}
Expand All @@ -42,12 +45,11 @@ func NewDiscord(bot *discord.Session) *discordBot {

// Publish publish text to the Discord channel of given cols and args.
// A context should contain a `reduxer.Reduxer` via `publish.PubBundle` struct.
func (d *discordBot) Publish(ctx context.Context, cols []wayback.Collect, args ...string) {
func (d *discordBot) Publish(ctx context.Context, cols []wayback.Collect, args ...string) error {
metrics.IncrementPublish(metrics.PublishDiscord, metrics.StatusRequest)

if len(cols) == 0 {
logger.Warn("collects empty")
return
return errors.New("publish to discord: collects empty")
}

rdx, art, err := extract(ctx, cols)
Expand All @@ -58,10 +60,10 @@ func (d *discordBot) Publish(ctx context.Context, cols []wayback.Collect, args .
var body = render.ForPublish(&render.Discord{Cols: cols, Data: rdx}).String()
if d.toChannel(art, body) {
metrics.IncrementPublish(metrics.PublishDiscord, metrics.StatusSuccess)
return
return nil
}
metrics.IncrementPublish(metrics.PublishDiscord, metrics.StatusFailure)
return
return errors.New("publish to discord failed")
}

// toChannel for publish to message to Discord channel,
Expand Down
12 changes: 7 additions & 5 deletions publish/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ import (
"github.com/wabarc/logger"
"github.com/wabarc/wayback"
"github.com/wabarc/wayback/config"
"github.com/wabarc/wayback/errors"
"github.com/wabarc/wayback/metrics"
"github.com/wabarc/wayback/template/render"
)

var _ Publisher = (*gitHub)(nil)

type gitHub struct {
client *github.Client
}
Expand Down Expand Up @@ -44,12 +47,11 @@ func NewGitHub(httpClient *http.Client) *gitHub {

// Publish publish markdown text to the GitHub issues of given cols and args.
// A context should contain a `reduxer.Reduxer` via `publish.PubBundle` struct.
func (gh *gitHub) Publish(ctx context.Context, cols []wayback.Collect, args ...string) {
func (gh *gitHub) Publish(ctx context.Context, cols []wayback.Collect, args ...string) error {
metrics.IncrementPublish(metrics.PublishGithub, metrics.StatusRequest)

if len(cols) == 0 {
logger.Warn("collects empty")
return
return errors.New("publish to github: collects empty")
}

rdx, _, err := extract(ctx, cols)
Expand All @@ -65,10 +67,10 @@ func (gh *gitHub) Publish(ctx context.Context, cols []wayback.Collect, args ...s

if gh.toIssues(ctx, head, body) {
metrics.IncrementPublish(metrics.PublishGithub, metrics.StatusSuccess)
return
return nil
}
metrics.IncrementPublish(metrics.PublishGithub, metrics.StatusFailure)
return
return errors.New("publish to github failed")
}

func (gh *gitHub) toIssues(ctx context.Context, head, body string) bool {
Expand Down
15 changes: 9 additions & 6 deletions publish/mastodon.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@ package publish // import "github.com/wabarc/wayback/publish"
import (
"context"

mstdn "github.com/mattn/go-mastodon"
"github.com/wabarc/logger"
"github.com/wabarc/wayback"
"github.com/wabarc/wayback/config"
"github.com/wabarc/wayback/errors"
"github.com/wabarc/wayback/metrics"
"github.com/wabarc/wayback/template/render"

mstdn "github.com/mattn/go-mastodon"
)

var _ Publisher = (*mastodon)(nil)

type mastodon struct {
client *mstdn.Client
}
Expand All @@ -40,16 +44,15 @@ func NewMastodon(client *mstdn.Client) *mastodon {

// Publish publish toot to the Mastodon of given cols and args.
// A context should contain a `reduxer.Reduxer` via `publish.PubBundle` struct.
func (m *mastodon) Publish(ctx context.Context, cols []wayback.Collect, args ...string) {
func (m *mastodon) Publish(ctx context.Context, cols []wayback.Collect, args ...string) error {
var id string
if len(args) > 1 {
id = args[1]
}
metrics.IncrementPublish(metrics.PublishMstdn, metrics.StatusRequest)

if len(cols) == 0 {
logger.Warn("collects empty")
return
return errors.New("publish to mastodon: collects empty")
}

rdx, _, err := extract(ctx, cols)
Expand All @@ -60,10 +63,10 @@ func (m *mastodon) Publish(ctx context.Context, cols []wayback.Collect, args ...
var txt = render.ForPublish(&render.Mastodon{Cols: cols, Data: rdx}).String()
if m.ToMastodon(ctx, txt, id) {
metrics.IncrementPublish(metrics.PublishMstdn, metrics.StatusSuccess)
return
return nil
}
metrics.IncrementPublish(metrics.PublishMstdn, metrics.StatusFailure)
return
return errors.New("publish to mastodon failed")
}

func (m *mastodon) ToMastodon(ctx context.Context, text, id string) bool {
Expand Down
15 changes: 9 additions & 6 deletions publish/matrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ import (
"github.com/wabarc/logger"
"github.com/wabarc/wayback"
"github.com/wabarc/wayback/config"
"github.com/wabarc/wayback/errors"
"github.com/wabarc/wayback/metrics"
"github.com/wabarc/wayback/template/render"
matrix "maunium.net/go/mautrix"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"

matrix "maunium.net/go/mautrix"
)

var _ Publisher = (*matrixBot)(nil)

type matrixBot struct {
client *matrix.Client
}
Expand Down Expand Up @@ -51,12 +55,11 @@ func NewMatrix(client *matrix.Client) *matrixBot {

// Publish publish text to the Matrix room of given cols and args.
// A context should contain a `reduxer.Reduxer` via `publish.PubBundle` struct.
func (m *matrixBot) Publish(ctx context.Context, cols []wayback.Collect, args ...string) {
func (m *matrixBot) Publish(ctx context.Context, cols []wayback.Collect, args ...string) error {
metrics.IncrementPublish(metrics.PublishMatrix, metrics.StatusRequest)

if len(cols) == 0 {
logger.Warn("collects empty")
return
return errors.New("publish to matrix: collects empty")
}

rdx, _, err := extract(ctx, cols)
Expand All @@ -67,10 +70,10 @@ func (m *matrixBot) Publish(ctx context.Context, cols []wayback.Collect, args ..
var body = render.ForPublish(&render.Matrix{Cols: cols, Data: rdx}).String()
if m.toRoom(body) {
metrics.IncrementPublish(metrics.PublishMatrix, metrics.StatusSuccess)
return
return nil
}
metrics.IncrementPublish(metrics.PublishMatrix, metrics.StatusFailure)
return
return errors.New("publish to matrix failed")
}

func (m *matrixBot) toRoom(body string) bool {
Expand Down
23 changes: 20 additions & 3 deletions publish/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"text/template"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/dghubble/go-twitter/twitter"
"github.com/wabarc/helper"
"github.com/wabarc/logger"
Expand Down Expand Up @@ -43,6 +44,10 @@ const (
FlagMatrix // FlagMatrix publish from matrix service
FlagSlack // FlagSlack publish from slack service
FlagIRC // FlagIRC publish from relaychat service

initialInterval = 10 * time.Second
maxElapsedTime = 30 * time.Second
maxRetries = 3
)

var maxDelayTime = 10
Expand All @@ -55,7 +60,7 @@ type PubBundle struct{}
// Publish publish message to serveral media platforms, e.g. Telegram channel, GitHub Issues, etc.
// The cols must either be a []wayback.Collect, args use for specific service.
type Publisher interface {
Publish(ctx context.Context, cols []wayback.Collect, args ...string)
Publish(ctx context.Context, cols []wayback.Collect, args ...string) error
}

// String returns the flag as a string.
Expand Down Expand Up @@ -86,8 +91,10 @@ func process(ctx context.Context, pub Publisher, cols []wayback.Collect, args ..
time.Sleep(w)
}

pub.Publish(ctx, part, args...)
return nil
action := func() error {
return pub.Publish(ctx, part, args...)
}
return doRetry(action)
})
}
if err := g.Wait(); err != nil {
Expand Down Expand Up @@ -252,3 +259,13 @@ func extract(ctx context.Context, cols []wayback.Collect) (rdx reduxer.Reduxer,
}
return rdx, art, errors.New("invalid reduxer")
}

func doRetry(o backoff.Operation) error {
exp := backoff.NewExponentialBackOff()
exp.InitialInterval = initialInterval
exp.MaxElapsedTime = maxElapsedTime
exp.Reset()
b := backoff.WithMaxRetries(exp, maxRetries)

return backoff.Retry(o, b)
}
15 changes: 9 additions & 6 deletions publish/relaychat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@ import (
"context"
"crypto/tls"

irc "github.com/thoj/go-ircevent"
"github.com/wabarc/logger"
"github.com/wabarc/wayback"
"github.com/wabarc/wayback/config"
"github.com/wabarc/wayback/errors"
"github.com/wabarc/wayback/metrics"
"github.com/wabarc/wayback/template/render"

irc "github.com/thoj/go-ircevent"
)

var _ Publisher = (*ircBot)(nil)

type ircBot struct {
conn *irc.Connection
}
Expand All @@ -41,21 +45,20 @@ func NewIRC(conn *irc.Connection) *ircBot {

// Publish publish text to IRC channel of given cols and args.
// A context should contain a `reduxer.Reduxer` via `publish.PubBundle` struct.
func (i *ircBot) Publish(ctx context.Context, cols []wayback.Collect, args ...string) {
func (i *ircBot) Publish(ctx context.Context, cols []wayback.Collect, args ...string) error {
metrics.IncrementPublish(metrics.PublishIRC, metrics.StatusRequest)

if len(cols) == 0 {
logger.Warn("collects empty")
return
return errors.New("publish to irc: collects empty")
}

var txt = render.ForPublish(&render.Relaychat{Cols: cols}).String()
if i.toChannel(ctx, txt) {
metrics.IncrementPublish(metrics.PublishIRC, metrics.StatusSuccess)
return
return nil
}
metrics.IncrementPublish(metrics.PublishIRC, metrics.StatusFailure)
return
return errors.New("publish to irc failed")
}

func (i *ircBot) toChannel(_ context.Context, text string) bool {
Expand Down
12 changes: 7 additions & 5 deletions publish/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/wabarc/logger"
"github.com/wabarc/wayback"
"github.com/wabarc/wayback/config"
"github.com/wabarc/wayback/errors"
"github.com/wabarc/wayback/metrics"
"github.com/wabarc/wayback/reduxer"
"github.com/wabarc/wayback/service"
Expand All @@ -18,6 +19,8 @@ import (
slack "github.com/slack-go/slack"
)

var _ Publisher = (*slackBot)(nil)

type slackBot struct {
bot *slack.Client
}
Expand All @@ -44,12 +47,11 @@ func NewSlack(bot *slack.Client) *slackBot {

// Publish publish text to the Slack channel of given cols and args.
// A context should contains a `reduxer.Reduxer` via `publish.PubBundle` struct.
func (s *slackBot) Publish(ctx context.Context, cols []wayback.Collect, args ...string) {
func (s *slackBot) Publish(ctx context.Context, cols []wayback.Collect, args ...string) error {
metrics.IncrementPublish(metrics.PublishSlack, metrics.StatusRequest)

if len(cols) == 0 {
logger.Warn("collects empty")
return
return errors.New("publish to slack: collects empty")
}

rdx, art, err := extract(ctx, cols)
Expand All @@ -61,10 +63,10 @@ func (s *slackBot) Publish(ctx context.Context, cols []wayback.Collect, args ...
var body = render.ForPublish(&render.Slack{Cols: cols, Data: rdx}).String()
if s.toChannel(art, head, body) {
metrics.IncrementPublish(metrics.PublishSlack, metrics.StatusSuccess)
return
return nil
}
metrics.IncrementPublish(metrics.PublishSlack, metrics.StatusFailure)
return
return errors.New("publish to slack failed")
}

// toChannel for publish to message to Slack channel,
Expand Down
12 changes: 7 additions & 5 deletions publish/telegram.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/wabarc/logger"
"github.com/wabarc/wayback"
"github.com/wabarc/wayback/config"
"github.com/wabarc/wayback/errors"
"github.com/wabarc/wayback/metrics"
"github.com/wabarc/wayback/reduxer"
"github.com/wabarc/wayback/service"
Expand All @@ -18,6 +19,8 @@ import (
telegram "gopkg.in/telebot.v3"
)

var _ Publisher = (*telegramBot)(nil)

type telegramBot struct {
bot *telegram.Bot
}
Expand Down Expand Up @@ -45,12 +48,11 @@ func NewTelegram(bot *telegram.Bot) *telegramBot {

// Publish publish text to the Telegram channel of given cols and args.
// A context should contain a `reduxer.Reduxer` via `publish.PubBundle` struct.
func (t *telegramBot) Publish(ctx context.Context, cols []wayback.Collect, args ...string) {
func (t *telegramBot) Publish(ctx context.Context, cols []wayback.Collect, args ...string) error {
metrics.IncrementPublish(metrics.PublishChannel, metrics.StatusRequest)

if len(cols) == 0 {
logger.Warn("collects empty")
return
return errors.New("publish to telegram: collects empty")
}

rdx, art, err := extract(ctx, cols)
Expand All @@ -62,10 +64,10 @@ func (t *telegramBot) Publish(ctx context.Context, cols []wayback.Collect, args
var body = render.ForPublish(&render.Telegram{Cols: cols, Data: rdx}).String()
if t.toChannel(art, head, body) {
metrics.IncrementPublish(metrics.PublishChannel, metrics.StatusSuccess)
return
return nil
}
metrics.IncrementPublish(metrics.PublishChannel, metrics.StatusFailure)
return
return errors.New("publish to telegram failed")
}

// toChannel for publish to message to Telegram channel,
Expand Down
Loading

0 comments on commit d2b082e

Please sign in to comment.