From cdbf3df9e948e16de1cc271a71852f2c0c6a79f5 Mon Sep 17 00:00:00 2001 From: "weilong.pwl" Date: Tue, 15 Aug 2023 11:47:57 +0800 Subject: [PATCH] add query demo --- .../query_demo/simple_demo_with_query.go | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 example/consumer/query_demo/simple_demo_with_query.go diff --git a/example/consumer/query_demo/simple_demo_with_query.go b/example/consumer/query_demo/simple_demo_with_query.go new file mode 100644 index 00000000..64076203 --- /dev/null +++ b/example/consumer/query_demo/simple_demo_with_query.go @@ -0,0 +1,49 @@ +package main + +import ( + "fmt" + "os" + "os/signal" + "syscall" + + sls "github.com/aliyun/aliyun-log-go-sdk" + consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer" + "github.com/go-kit/kit/log/level" +) + +// README : +// This is a very simple example of pulling data from your logstore and printing it for consumption, including pre-handling for logs. + +func main() { + option := consumerLibrary.LogHubConfig{ + Endpoint: "", + AccessKeyID: "", + AccessKeySecret: "", + Project: "", + Logstore: "", + ConsumerGroupName: "", + ConsumerName: "", + // This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed. + // Could be "begin", "end", "specific time format in time stamp", it's log receiving time. + CursorPosition: consumerLibrary.BEGIN_CURSOR, + // Query is for log pre-handling before return to client, more info refer to https://help.aliyun.com/zh/sls/user-guide/rule-based-consumption + Query: "* | where cast(body_bytes_sent as bigint) > 14000", + } + + consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process) + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) + consumerWorker.Start() + if _, ok := <-ch; ok { + level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName) + consumerWorker.StopAndWait() + } +} + +// Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value, +// otherwise you will report errors. +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) { + fmt.Println(shardId, logGroupList) + checkpointTracker.SaveCheckPoint(false) + return "", nil +}