Skip to content

Commit

Permalink
fix: wrap cmds in Conn.TxPipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Nov 22, 2022
1 parent 0884e48 commit 5053db2
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 10 deletions.
15 changes: 5 additions & 10 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,21 +725,13 @@ func (c *Conn) Process(ctx context.Context, cmd Cmder) error {
return err
}

func (c *Conn) processPipeline(ctx context.Context, cmds []Cmder) error {
return c.hooks.processPipeline(ctx, cmds)
}

func (c *Conn) processTxPipeline(ctx context.Context, cmds []Cmder) error {
return c.hooks.processTxPipeline(ctx, cmds)
}

func (c *Conn) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(ctx, fn)
}

func (c *Conn) Pipeline() Pipeliner {
pipe := Pipeline{
exec: c.processPipeline,
exec: c.hooks.processPipeline,
}
pipe.init()
return &pipe
Expand All @@ -752,7 +744,10 @@ func (c *Conn) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmd
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *Conn) TxPipeline() Pipeliner {
pipe := Pipeline{
exec: c.processTxPipeline,
exec: func(ctx context.Context, cmds []Cmder) error {
cmds = wrapMultiExec(ctx, cmds)
return c.hooks.processTxPipeline(ctx, cmds)
},
}
pipe.init()
return &pipe
Expand Down
22 changes: 22 additions & 0 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,3 +446,25 @@ var _ = Describe("Client context cancelation", func() {
Expect(err).To(BeIdenticalTo(context.Canceled))
})
})

var _ = Describe("Conn", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(redisOptions())
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
})

AfterEach(func() {
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})

It("TxPipeline", func() {
tx := client.Conn().TxPipeline()
tx.SwapDB(ctx, 0, 2)
tx.SwapDB(ctx, 1, 0)
_, err := tx.Exec(ctx)
Expect(err).NotTo(HaveOccurred())
})
})

0 comments on commit 5053db2

Please sign in to comment.