From 7c2834a3c9302756396732985f93306e707723f8 Mon Sep 17 00:00:00 2001 From: hulk Date: Fri, 12 Jul 2024 14:06:00 +0800 Subject: [PATCH] Add support of publish/consume with job attributes (#227) This PR introduces the new API: PublishJob and RePublishJob to allow the use of the job attributes and mark the old API as deprecated. But we don't add the new API for the batch publish since we think it's not a rigorous implementation and don't encourage users to use that. --- client/client.go | 48 ++++++++++++++++++++++++++++++++++++++----- client/client_test.go | 25 ++++++++++++++++++++++ client/setup_test.go | 4 ++-- 3 files changed, 70 insertions(+), 7 deletions(-) diff --git a/client/client.go b/client/client.go index d230641..37c0d9a 100644 --- a/client/client.go +++ b/client/client.go @@ -23,6 +23,17 @@ type Job struct { TTL int64 `json:"ttl"` ElapsedMS int64 `json:"elapsed_ms"` RemainTries int64 `json:"remain_tries"` + Attributes map[string]string +} + +type JobRequest struct { + Queue string `json:"queue"` + ID string `json:"job_id"` + Data []byte `json:"data"` + TTL uint32 `json:"ttl"` + Tries uint16 `json:"tries"` + Delay uint32 `json:"delay"` + Attributes map[string]string `json:"attributes"` } type LmstfyClient struct { @@ -111,27 +122,48 @@ func (c *LmstfyClient) getReq(method, relativePath string, query url.Values, bod return } +// Deprecated: Use PublishJob instead +// // Publish a new job to the queue. // - ttlSecond is the time-to-live of the job. If it's zero, job won't expire; if it's positive, the value is the TTL. // - tries is the maximum times the job can be fetched. // - delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied. func (c *LmstfyClient) Publish(queue string, data []byte, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) { - return c.publish(queue, "", data, ttlSecond, tries, delaySecond) + return c.publish(queue, "", data, nil, ttlSecond, tries, delaySecond) } +func (c *LmstfyClient) PublishJob(job *JobRequest) (jobID string, e error) { + return c.publish(job.Queue, "", job.Data, job.Attributes, job.TTL, job.Tries, job.Delay) +} + +// Deprecated: Use RePublishJob instead +// // RePublish delete(ack) the job of the queue and publish the job again. // - ttlSecond is the time-to-live of the job. If it's zero, job won't expire; if it's positive, the value is the TTL. // - tries is the maximum times the job can be fetched. // - delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied. func (c *LmstfyClient) RePublish(job *Job, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) { - return c.publish(job.Queue, job.ID, job.Data, ttlSecond, tries, delaySecond) + return c.publish(job.Queue, job.ID, job.Data, nil, ttlSecond, tries, delaySecond) +} + +func (c *LmstfyClient) RePublishJob(job *JobRequest) (jobID string, e error) { + return c.publish(job.Queue, jobID, job.Data, job.Attributes, job.TTL, job.Tries, job.Delay) } -func (c *LmstfyClient) publish(queue, ackJobID string, data []byte, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) { +func (c *LmstfyClient) publish( + queue, + ackJobID string, + data []byte, + attributes map[string]string, + ttlSecond uint32, + tries uint16, + delaySecond uint32, +) (jobID string, e error) { query := url.Values{} query.Add("ttl", strconv.FormatUint(uint64(ttlSecond), 10)) query.Add("tries", strconv.FormatUint(uint64(tries), 10)) query.Add("delay", strconv.FormatUint(uint64(delaySecond), 10)) + retryCount := 0 relativePath := queue if ackJobID != "" { @@ -145,6 +177,12 @@ RETRY: Reason: err.Error(), } } + if len(attributes) > 0 { + req.Header.Add("Enable-Job-Version", "YES") + for k, v := range attributes { + req.Header.Add(fmt.Sprintf("Job-Attr-%s", strings.ToTitle(k)), v) + } + } resp, err := c.httpCli.Do(req) if err != nil { @@ -364,7 +402,7 @@ func (c *LmstfyClient) consume(queue string, ttrSecond, timeoutSecond uint32, fr RequestID: resp.Header.Get("X-Request-ID"), } } - respBytes, err := ioutil.ReadAll(resp.Body) + respBytes, err := io.ReadAll(resp.Body) if err != nil { return nil, &APIError{ Type: ResponseErr, @@ -502,7 +540,7 @@ func (c *LmstfyClient) batchConsume(queues []string, count, ttrSecond, timeoutSe // Consume from multiple queues with priority. // The order of the queues in the params implies the priority. eg. -// ConsumeFromQueues(120, 5, "queue-a", "queue-b", "queue-c") +// ConsumeFromQueues(120, 5, "queue-a", "queue-b", "queue-c") // if all the queues have jobs to be fetched, the job in `queue-a` will be return. func (c *LmstfyClient) ConsumeFromQueues(ttrSecond, timeoutSecond uint32, queues ...string) (job *Job, e error) { return c.consumeFromQueues(ttrSecond, timeoutSecond, false, queues...) diff --git a/client/client_test.go b/client/client_test.go index 76f970a..233d108 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -5,6 +5,8 @@ import ( "encoding/json" "testing" "time" + + "github.com/stretchr/testify/require" ) func TestParseSchemeFromURL(t *testing.T) { @@ -121,6 +123,29 @@ func TestLmstfyClient_Consume(t *testing.T) { } } +func TestLmstfyClient_PublishWithAttributes(t *testing.T) { + cli := NewLmstfyClient(Host, Port, Namespace, Token) + jobID, _ := cli.PublishJob(&JobRequest{ + Queue: "test-publish-attributes", + Data: []byte("hello"), + TTL: 10, + Tries: 10, + Delay: 0, + Attributes: map[string]string{ + "hello": "world", + "foo": "bar", + }, + }) + job, err := cli.Consume("test-publish-attributes", 10, 3) + require.NoError(t, err) + require.NotNil(t, job) + require.Equal(t, jobID, job.ID) + require.Equal(t, "hello", string(job.Data)) + require.Len(t, job.Attributes, 2) + require.Equal(t, "world", job.Attributes["hello"]) + require.Equal(t, "bar", job.Attributes["foo"]) +} + func TestLmstfyClient_BatchConsume(t *testing.T) { cli := NewLmstfyClient(Host, Port, Namespace, Token) jobMap := map[string]bool{} diff --git a/client/setup_test.go b/client/setup_test.go index 7160fff..e9fbd43 100644 --- a/client/setup_test.go +++ b/client/setup_test.go @@ -4,7 +4,7 @@ import ( "context" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "os" "testing" @@ -53,7 +53,7 @@ func setup(CONF *config.Config) { if resp.StatusCode != http.StatusCreated { panic("Failed to create testing token") } - respBytes, err := ioutil.ReadAll(resp.Body) + respBytes, err := io.ReadAll(resp.Body) if err != nil { panic("Failed to create testing token") }