Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of publish/consume with job attributes in client side #227

Merged
merged 1 commit into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ type Job struct {
TTL int64 `json:"ttl"`
ElapsedMS int64 `json:"elapsed_ms"`
RemainTries int64 `json:"remain_tries"`
Attributes map[string]string
}

type JobRequest struct {
Queue string `json:"queue"`
ID string `json:"job_id"`
Data []byte `json:"data"`
TTL uint32 `json:"ttl"`
Tries uint16 `json:"tries"`
Delay uint32 `json:"delay"`
Attributes map[string]string `json:"attributes"`
}

type LmstfyClient struct {
Expand Down Expand Up @@ -111,27 +122,48 @@ func (c *LmstfyClient) getReq(method, relativePath string, query url.Values, bod
return
}

// Deprecated: Use PublishJob instead
//
// Publish a new job to the queue.
// - ttlSecond is the time-to-live of the job. If it's zero, job won't expire; if it's positive, the value is the TTL.
// - tries is the maximum times the job can be fetched.
// - delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied.
func (c *LmstfyClient) Publish(queue string, data []byte, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) {
return c.publish(queue, "", data, ttlSecond, tries, delaySecond)
return c.publish(queue, "", data, nil, ttlSecond, tries, delaySecond)
}

func (c *LmstfyClient) PublishJob(job *JobRequest) (jobID string, e error) {
return c.publish(job.Queue, "", job.Data, job.Attributes, job.TTL, job.Tries, job.Delay)
}

// Deprecated: Use RePublishJob instead
//
// RePublish delete(ack) the job of the queue and publish the job again.
// - ttlSecond is the time-to-live of the job. If it's zero, job won't expire; if it's positive, the value is the TTL.
// - tries is the maximum times the job can be fetched.
// - delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied.
func (c *LmstfyClient) RePublish(job *Job, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) {
return c.publish(job.Queue, job.ID, job.Data, ttlSecond, tries, delaySecond)
return c.publish(job.Queue, job.ID, job.Data, nil, ttlSecond, tries, delaySecond)
}

func (c *LmstfyClient) RePublishJob(job *JobRequest) (jobID string, e error) {
return c.publish(job.Queue, jobID, job.Data, job.Attributes, job.TTL, job.Tries, job.Delay)
}

func (c *LmstfyClient) publish(queue, ackJobID string, data []byte, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) {
func (c *LmstfyClient) publish(
queue,
ackJobID string,
data []byte,
attributes map[string]string,
ttlSecond uint32,
tries uint16,
delaySecond uint32,
) (jobID string, e error) {
query := url.Values{}
query.Add("ttl", strconv.FormatUint(uint64(ttlSecond), 10))
query.Add("tries", strconv.FormatUint(uint64(tries), 10))
query.Add("delay", strconv.FormatUint(uint64(delaySecond), 10))

retryCount := 0
relativePath := queue
if ackJobID != "" {
Expand All @@ -145,6 +177,12 @@ RETRY:
Reason: err.Error(),
}
}
if len(attributes) > 0 {
req.Header.Add("Enable-Job-Version", "YES")
for k, v := range attributes {
req.Header.Add(fmt.Sprintf("Job-Attr-%s", strings.ToTitle(k)), v)
}
}

resp, err := c.httpCli.Do(req)
if err != nil {
Expand Down Expand Up @@ -364,7 +402,7 @@ func (c *LmstfyClient) consume(queue string, ttrSecond, timeoutSecond uint32, fr
RequestID: resp.Header.Get("X-Request-ID"),
}
}
respBytes, err := ioutil.ReadAll(resp.Body)
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return nil, &APIError{
Type: ResponseErr,
Expand Down Expand Up @@ -502,7 +540,7 @@ func (c *LmstfyClient) batchConsume(queues []string, count, ttrSecond, timeoutSe

// Consume from multiple queues with priority.
// The order of the queues in the params implies the priority. eg.
// ConsumeFromQueues(120, 5, "queue-a", "queue-b", "queue-c")
// ConsumeFromQueues(120, 5, "queue-a", "queue-b", "queue-c")
// if all the queues have jobs to be fetched, the job in `queue-a` will be return.
func (c *LmstfyClient) ConsumeFromQueues(ttrSecond, timeoutSecond uint32, queues ...string) (job *Job, e error) {
return c.consumeFromQueues(ttrSecond, timeoutSecond, false, queues...)
Expand Down
25 changes: 25 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestParseSchemeFromURL(t *testing.T) {
Expand Down Expand Up @@ -121,6 +123,29 @@ func TestLmstfyClient_Consume(t *testing.T) {
}
}

func TestLmstfyClient_PublishWithAttributes(t *testing.T) {
cli := NewLmstfyClient(Host, Port, Namespace, Token)
jobID, _ := cli.PublishJob(&JobRequest{
Queue: "test-publish-attributes",
Data: []byte("hello"),
TTL: 10,
Tries: 10,
Delay: 0,
Attributes: map[string]string{
"hello": "world",
"foo": "bar",
},
})
job, err := cli.Consume("test-publish-attributes", 10, 3)
require.NoError(t, err)
require.NotNil(t, job)
require.Equal(t, jobID, job.ID)
require.Equal(t, "hello", string(job.Data))
require.Len(t, job.Attributes, 2)
require.Equal(t, "world", job.Attributes["hello"])
require.Equal(t, "bar", job.Attributes["foo"])
}

func TestLmstfyClient_BatchConsume(t *testing.T) {
cli := NewLmstfyClient(Host, Port, Namespace, Token)
jobMap := map[string]bool{}
Expand Down
4 changes: 2 additions & 2 deletions client/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"net/http"
"os"
"testing"
Expand Down Expand Up @@ -53,7 +53,7 @@ func setup(CONF *config.Config) {
if resp.StatusCode != http.StatusCreated {
panic("Failed to create testing token")
}
respBytes, err := ioutil.ReadAll(resp.Body)
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
panic("Failed to create testing token")
}
Expand Down
Loading