Skip to content

Commit

Permalink
Turns the pooling bucket into a non-pointer
Browse files Browse the repository at this point in the history
  • Loading branch information
waybackarchiver committed Aug 28, 2022
1 parent 5064b55 commit f6c67bd
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 23 deletions.
17 changes: 9 additions & 8 deletions pooling/pooling.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Bucket struct {
elapsed uint64

// An object that will perform exactly one action.
once sync.Once
once *sync.Once
}

func newResource(id int) *resource {
Expand Down Expand Up @@ -119,7 +119,7 @@ func (p *Pool) Roll() {
continue
}

if b := p.bucket(); b != nil {
if b, has := p.bucket(); has {
go b.once.Do(func() {
p.do(b)
})
Expand All @@ -128,7 +128,7 @@ func (p *Pool) Roll() {
}

// Pub puts wayback requests to the resource pool
func (p *Pool) Put(b *Bucket) {
func (p *Pool) Put(b Bucket) {
// Inserts a new bucket at the front of queue.
p.mutex.Lock()
p.staging.PushFront(b)
Expand Down Expand Up @@ -166,7 +166,7 @@ func (p *Pool) push(r *resource) error {
return nil
}

func (p *Pool) do(b *Bucket) error {
func (p *Pool) do(b Bucket) error {
atomic.AddInt32(&p.processing, 1)
defer func() {
atomic.AddInt32(&p.waiting, -1)
Expand Down Expand Up @@ -223,13 +223,14 @@ func (p *Pool) do(b *Bucket) error {
return nil
}

func (p *Pool) bucket() *Bucket {
func (p *Pool) bucket() (b Bucket, ok bool) {
p.mutex.Lock()
defer p.mutex.Unlock()

if b, ok := p.staging.PopBack().(*Bucket); ok {
return b
if b, ok = p.staging.PopBack().(Bucket); ok {
b.once = new(sync.Once)
return b, ok
}

return nil
return
}
17 changes: 10 additions & 7 deletions pooling/pooling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package pooling // import "github.com/wabarc/wayback/pooling"
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -39,7 +40,7 @@ func TestRoll(t *testing.T) {
for i < capacity {
ch := make(chan struct{}, 1)
go func(i int) {
bucket := &Bucket{
bucket := Bucket{
Request: func(_ context.Context) error {
time.Sleep(time.Millisecond)
return nil
Expand Down Expand Up @@ -87,7 +88,7 @@ func TestTimeout(t *testing.T) {
for i < capacity {
ch := make(chan struct{}, 1)
go func(i int) {
bucket := &Bucket{
bucket := Bucket{
Request: func(_ context.Context) error {
time.Sleep(10 * time.Millisecond)
return nil
Expand Down Expand Up @@ -122,8 +123,10 @@ func TestMaxRetries(t *testing.T) {
}
logger.SetLogLevel(logger.LevelFatal)

bucket := &Bucket{
var elapsed uint64
bucket := Bucket{
Request: func(_ context.Context) error {
atomic.AddUint64(&elapsed, 1)
return errors.New("process request failed")
},
Fallback: func(_ context.Context) error {
Expand All @@ -133,13 +136,13 @@ func TestMaxRetries(t *testing.T) {

maxRetries := uint64(3)
p := New(context.Background(), 1)
p.timeout = time.Microsecond
p.timeout = time.Second
p.maxRetries = maxRetries
go p.Roll()
p.Put(bucket)
p.Close()
if bucket.elapsed != maxRetries {
t.Fatalf("Unexpected max retries got %d instead of %d", bucket.elapsed, maxRetries)
if elapsed != maxRetries {
t.Fatalf("Unexpected max retries got %d instead of %d", elapsed, maxRetries)
}
}

Expand All @@ -155,7 +158,7 @@ func TestFallback(t *testing.T) {

want := "foo"
fall := ""
bucket := &Bucket{
bucket := Bucket{
Request: func(_ context.Context) error {
return errors.New("some error")
},
Expand Down
2 changes: 1 addition & 1 deletion service/discord/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (d *Discord) process(m *discord.MessageCreate) (err error) {
logger.Error("reply queue failed: %v", err)
return
}
bucket := &pooling.Bucket{
bucket := pooling.Bucket{
Request: func(ctx context.Context) error {
logger.Debug("content: %v", urls)
if err := d.wayback(ctx, m, urls); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion service/httpd/httpd.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (web *web) handle(pool *pooling.Pool) http.Handler {
web.router.HandleFunc("/offline.html", web.showOfflinePage).Methods(http.MethodGet)

web.router.HandleFunc("/wayback", func(w http.ResponseWriter, r *http.Request) {
bucket := &pooling.Bucket{
bucket := pooling.Bucket{
Request: func(ctx context.Context) error {
if err := web.process(ctx, w, r); err != nil {
logger.Error("httpd: process retrying: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion service/mastodon/mastodon.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (m *Mastodon) Serve() error {
m.archiving[n.Status.ID] = true
m.Unlock()
metrics.IncrementWayback(metrics.ServiceMastodon, metrics.StatusRequest)
bucket := &pooling.Bucket{
bucket := pooling.Bucket{
Request: func(ctx context.Context) error {
if err := m.process(ctx, n.ID, n.Status); err != nil {
logger.Error("process failure, notification: %#v, error: %v", n, err)
Expand Down
2 changes: 1 addition & 1 deletion service/matrix/matrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (m *Matrix) Serve() error {
return
}
metrics.IncrementWayback(metrics.ServiceMatrix, metrics.StatusRequest)
bucket := &pooling.Bucket{
bucket := pooling.Bucket{
Request: func(ctx context.Context) error {
if err := m.process(ctx, ev); err != nil {
logger.Error("process request failure, error: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion service/relaychat/relaychat.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (i *IRC) Serve() error {
i.conn.AddCallback("PRIVMSG", func(ev *irc.Event) {
go func(ev *irc.Event) {
metrics.IncrementWayback(metrics.ServiceIRC, metrics.StatusRequest)
bucket := &pooling.Bucket{
bucket := pooling.Bucket{
Request: func(ctx context.Context) error {
if err := i.process(ctx, ev); err != nil {
logger.Error("process failure, message: %s, error: %v", ev.Message(), err)
Expand Down
2 changes: 1 addition & 1 deletion service/slack/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (s *Slack) process(ev *event) (err error) {
logger.Error("reply queue failed: %v", err)
return
}
bucket := &pooling.Bucket{
bucket := pooling.Bucket{
Request: func(ctx context.Context) error {
if err := s.wayback(ctx, ev, urls); err != nil {
logger.Error("archives failed: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion service/telegram/telegram.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (t *Telegram) process(message *telegram.Message) (err error) {
if err != nil {
return errors.Wrap(err, "reply message failed")
}
bucket := &pooling.Bucket{
bucket := pooling.Bucket{
Request: func(ctx context.Context) error {
_, err := t.bot.Edit(request, "Archiving...")
if err != nil && err != telegram.ErrSameMessageContent {
Expand Down
2 changes: 1 addition & 1 deletion service/twitter/twitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (t *Twitter) Serve() error {
}
go func(event twitter.DirectMessageEvent) {
metrics.IncrementWayback(metrics.ServiceTwitter, metrics.StatusRequest)
bucket := &pooling.Bucket{
bucket := pooling.Bucket{
Request: func(ctx context.Context) error {
if err := t.process(ctx, event); err != nil {
logger.Error("process failure, message: %#v, error: %v", event.Message, err)
Expand Down

0 comments on commit f6c67bd

Please sign in to comment.