From 06effc12118d2ebc1d3640dcf75cfdc6bee4d39b Mon Sep 17 00:00:00 2001 From: git-hulk Date: Fri, 12 Jul 2024 11:57:29 +0800 Subject: [PATCH] Add support of publish/consume with job attributes --- client/client.go | 48 ++++++++++++++++++++++++++++++++++++++----- client/client_test.go | 25 ++++++++++++++++++++++ client/setup_test.go | 4 ++-- 3 files changed, 70 insertions(+), 7 deletions(-) diff --git a/client/client.go b/client/client.go index d2306414..37c0d9a6 100644 --- a/client/client.go +++ b/client/client.go @@ -23,6 +23,17 @@ type Job struct { TTL int64 `json:"ttl"` ElapsedMS int64 `json:"elapsed_ms"` RemainTries int64 `json:"remain_tries"` + Attributes map[string]string +} + +type JobRequest struct { + Queue string `json:"queue"` + ID string `json:"job_id"` + Data []byte `json:"data"` + TTL uint32 `json:"ttl"` + Tries uint16 `json:"tries"` + Delay uint32 `json:"delay"` + Attributes map[string]string `json:"attributes"` } type LmstfyClient struct { @@ -111,27 +122,48 @@ func (c *LmstfyClient) getReq(method, relativePath string, query url.Values, bod return } +// Deprecated: Use PublishJob instead +// // Publish a new job to the queue. // - ttlSecond is the time-to-live of the job. If it's zero, job won't expire; if it's positive, the value is the TTL. // - tries is the maximum times the job can be fetched. // - delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied. func (c *LmstfyClient) Publish(queue string, data []byte, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) { - return c.publish(queue, "", data, ttlSecond, tries, delaySecond) + return c.publish(queue, "", data, nil, ttlSecond, tries, delaySecond) } +func (c *LmstfyClient) PublishJob(job *JobRequest) (jobID string, e error) { + return c.publish(job.Queue, "", job.Data, job.Attributes, job.TTL, job.Tries, job.Delay) +} + +// Deprecated: Use RePublishJob instead +// // RePublish delete(ack) the job of the queue and publish the job again. // - ttlSecond is the time-to-live of the job. If it's zero, job won't expire; if it's positive, the value is the TTL. // - tries is the maximum times the job can be fetched. // - delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied. func (c *LmstfyClient) RePublish(job *Job, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) { - return c.publish(job.Queue, job.ID, job.Data, ttlSecond, tries, delaySecond) + return c.publish(job.Queue, job.ID, job.Data, nil, ttlSecond, tries, delaySecond) +} + +func (c *LmstfyClient) RePublishJob(job *JobRequest) (jobID string, e error) { + return c.publish(job.Queue, jobID, job.Data, job.Attributes, job.TTL, job.Tries, job.Delay) } -func (c *LmstfyClient) publish(queue, ackJobID string, data []byte, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) { +func (c *LmstfyClient) publish( + queue, + ackJobID string, + data []byte, + attributes map[string]string, + ttlSecond uint32, + tries uint16, + delaySecond uint32, +) (jobID string, e error) { query := url.Values{} query.Add("ttl", strconv.FormatUint(uint64(ttlSecond), 10)) query.Add("tries", strconv.FormatUint(uint64(tries), 10)) query.Add("delay", strconv.FormatUint(uint64(delaySecond), 10)) + retryCount := 0 relativePath := queue if ackJobID != "" { @@ -145,6 +177,12 @@ RETRY: Reason: err.Error(), } } + if len(attributes) > 0 { + req.Header.Add("Enable-Job-Version", "YES") + for k, v := range attributes { + req.Header.Add(fmt.Sprintf("Job-Attr-%s", strings.ToTitle(k)), v) + } + } resp, err := c.httpCli.Do(req) if err != nil { @@ -364,7 +402,7 @@ func (c *LmstfyClient) consume(queue string, ttrSecond, timeoutSecond uint32, fr RequestID: resp.Header.Get("X-Request-ID"), } } - respBytes, err := ioutil.ReadAll(resp.Body) + respBytes, err := io.ReadAll(resp.Body) if err != nil { return nil, &APIError{ Type: ResponseErr, @@ -502,7 +540,7 @@ func (c *LmstfyClient) batchConsume(queues []string, count, ttrSecond, timeoutSe // Consume from multiple queues with priority. // The order of the queues in the params implies the priority. eg. -// ConsumeFromQueues(120, 5, "queue-a", "queue-b", "queue-c") +// ConsumeFromQueues(120, 5, "queue-a", "queue-b", "queue-c") // if all the queues have jobs to be fetched, the job in `queue-a` will be return. func (c *LmstfyClient) ConsumeFromQueues(ttrSecond, timeoutSecond uint32, queues ...string) (job *Job, e error) { return c.consumeFromQueues(ttrSecond, timeoutSecond, false, queues...) diff --git a/client/client_test.go b/client/client_test.go index 76f970a1..233d1084 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -5,6 +5,8 @@ import ( "encoding/json" "testing" "time" + + "github.com/stretchr/testify/require" ) func TestParseSchemeFromURL(t *testing.T) { @@ -121,6 +123,29 @@ func TestLmstfyClient_Consume(t *testing.T) { } } +func TestLmstfyClient_PublishWithAttributes(t *testing.T) { + cli := NewLmstfyClient(Host, Port, Namespace, Token) + jobID, _ := cli.PublishJob(&JobRequest{ + Queue: "test-publish-attributes", + Data: []byte("hello"), + TTL: 10, + Tries: 10, + Delay: 0, + Attributes: map[string]string{ + "hello": "world", + "foo": "bar", + }, + }) + job, err := cli.Consume("test-publish-attributes", 10, 3) + require.NoError(t, err) + require.NotNil(t, job) + require.Equal(t, jobID, job.ID) + require.Equal(t, "hello", string(job.Data)) + require.Len(t, job.Attributes, 2) + require.Equal(t, "world", job.Attributes["hello"]) + require.Equal(t, "bar", job.Attributes["foo"]) +} + func TestLmstfyClient_BatchConsume(t *testing.T) { cli := NewLmstfyClient(Host, Port, Namespace, Token) jobMap := map[string]bool{} diff --git a/client/setup_test.go b/client/setup_test.go index 7160fff4..e9fbd434 100644 --- a/client/setup_test.go +++ b/client/setup_test.go @@ -4,7 +4,7 @@ import ( "context" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "os" "testing" @@ -53,7 +53,7 @@ func setup(CONF *config.Config) { if resp.StatusCode != http.StatusCreated { panic("Failed to create testing token") } - respBytes, err := ioutil.ReadAll(resp.Body) + respBytes, err := io.ReadAll(resp.Body) if err != nil { panic("Failed to create testing token") }