Skip to content

Commit

Permalink
Added Clickhouse Logger (#349)
Browse files Browse the repository at this point in the history
* Added Clickhouse Logger
* resync clickhouse client
* fix linter
* Update README.md

---------

Co-authored-by: dmachard <5562930+dmachard@users.noreply.github.com>
  • Loading branch information
zunnurainbadar and dmachard authored Mar 22, 2024
1 parent 494145c commit c931a14
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
- [`Scalyr`](docs/loggers/logger_scalyr.md)
- [`Redis`](docs/loggers/logger_redis.md) publisher
- [`Kafka`](docs/loggers/logger_kafka.md) producer
- [`Clickhouse`](doc/logger_clickhouse.md) *not yet production ready*
- *Send to security tools*
- [`Falco`](docs/loggers/logger_falco.md)

Expand Down
23 changes: 23 additions & 0 deletions docs/loggers/logger_clickhouse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

# Logger: Clickhouse client

Clickhouse client to remote Clickhouse server

Options:

- `url`: (string) Clickhouse server url
- `user`: (string) Clickhouse database user
- `password`: (string) Clickhouse database user password
- `table`: (string) Clickhouse table name
- `database`: (string) Clickhouse database name

Defaults:

```yaml
clickhouse:
url: "http://localhost:8123"
user: "default"
password: "password"
table: "records"
database: "dnscollector"
```
222 changes: 222 additions & 0 deletions loggers/clickhouse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package loggers

import (
"net/http"
"strconv"
"time"

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/pkgconfig"
"github.com/dmachard/go-dnscollector/pkgutils"
"github.com/dmachard/go-dnscollector/transformers"
"github.com/dmachard/go-logger"
)

type ClickhouseData struct {
Identity string `json:"identity"`
QueryIP string `json:"query_ip"`
QName string `json:"q_name"`
Operation string `json:"operation"`
Family string `json:"family"`
Protocol string `json:"protocol"`
QType string `json:"q_type"`
RCode string `json:"r_code"`
TimeNSec string `json:"timensec"`
TimeStamp string `json:"timestamp"`
}

type ClickhouseClient struct {
stopProcess chan bool
doneProcess chan bool
stopRun chan bool
doneRun chan bool
inputChan chan dnsutils.DNSMessage
outputChan chan dnsutils.DNSMessage
config *pkgconfig.Config
configChan chan *pkgconfig.Config
logger *logger.Logger
name string
url string
user string
password string
database string
table string
RoutingHandler pkgutils.RoutingHandler
}

func NewClickhouseClient(config *pkgconfig.Config, console *logger.Logger, name string) *ClickhouseClient {
console.Info("[%s] logger=clickhouse - enabled", name)
o := &ClickhouseClient{
stopProcess: make(chan bool),
doneProcess: make(chan bool),
stopRun: make(chan bool),
doneRun: make(chan bool),
inputChan: make(chan dnsutils.DNSMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize),
outputChan: make(chan dnsutils.DNSMessage, config.Loggers.ElasticSearchClient.ChannelBufferSize),
logger: console,
config: config,
name: name,
configChan: make(chan *pkgconfig.Config),
RoutingHandler: pkgutils.NewRoutingHandler(config, console, name),
}
o.ReadConfig()
return o
}

func (o *ClickhouseClient) GetName() string { return o.name }

func (o *ClickhouseClient) SetLoggers(loggers []pkgutils.Worker) {}

func (o *ClickhouseClient) AddDroppedRoute(wrk pkgutils.Worker) {
o.RoutingHandler.AddDroppedRoute(wrk)
}

func (o *ClickhouseClient) AddDefaultRoute(wrk pkgutils.Worker) {
o.RoutingHandler.AddDefaultRoute(wrk)
}

func (o *ClickhouseClient) ReloadConfig(config *pkgconfig.Config) {
o.LogInfo("reload configuration!")
o.configChan <- config
}

func (o *ClickhouseClient) ReadConfig() {
o.url = o.config.Loggers.ClickhouseClient.URL
o.user = o.config.Loggers.ClickhouseClient.User
o.password = o.config.Loggers.ClickhouseClient.Password
o.database = o.config.Loggers.ClickhouseClient.Database
o.table = o.config.Loggers.ClickhouseClient.Table
}

func (o *ClickhouseClient) Channel() chan dnsutils.DNSMessage {
return o.inputChan
}

func (o *ClickhouseClient) GetInputChannel() chan dnsutils.DNSMessage {
return o.inputChan
}

func (o *ClickhouseClient) LogInfo(msg string, v ...interface{}) {
o.logger.Info("["+o.name+"] Clickhouse - "+msg, v...)
}

func (o *ClickhouseClient) LogError(msg string, v ...interface{}) {
o.logger.Error("["+o.name+"] Clickhouse - "+msg, v...)
}

func (o *ClickhouseClient) Stop() {
o.LogInfo("stopping to run...")
o.stopRun <- true
<-o.doneRun

o.LogInfo("stopping to process...")
o.stopProcess <- true
<-o.doneProcess
}

func (o *ClickhouseClient) Run() {
o.LogInfo("running in background...")

// prepare next channels
defaultRoutes, defaultNames := o.RoutingHandler.GetDefaultRoutes()
droppedRoutes, droppedNames := o.RoutingHandler.GetDroppedRoutes()

// prepare transforms
listChannel := []chan dnsutils.DNSMessage{}
listChannel = append(listChannel, o.outputChan)
subprocessors := transformers.NewTransforms(&o.config.OutgoingTransformers, o.logger, o.name, listChannel, 0)

// goroutine to process transformed dns messages
go o.Process()

// loop to process incoming messages
RUN_LOOP:
for {
select {
case <-o.stopRun:
// cleanup transformers
subprocessors.Reset()

o.doneRun <- true
break RUN_LOOP

case cfg, opened := <-o.configChan:
if !opened {
return
}
o.config = cfg
o.ReadConfig()
subprocessors.ReloadConfig(&cfg.OutgoingTransformers)

case dm, opened := <-o.inputChan:
if !opened {
o.LogInfo("input channel closed!")
return
}

// apply tranforms, init dns message with additionnals parts if necessary
subprocessors.InitDNSMessageFormat(&dm)
if subprocessors.ProcessMessage(&dm) == transformers.ReturnDrop {
o.RoutingHandler.SendTo(droppedRoutes, droppedNames, dm)
continue
}

// send to next ?
o.RoutingHandler.SendTo(defaultRoutes, defaultNames, dm)

// send to output channel
o.outputChan <- dm
}
}
o.LogInfo("run terminated")
}

func (o *ClickhouseClient) Process() {
o.LogInfo("ready to process")

PROCESS_LOOP:
for {
select {
case <-o.stopProcess:
o.doneProcess <- true
break PROCESS_LOOP

// incoming dns message to process
case dm, opened := <-o.outputChan:
if !opened {
o.LogInfo("output channel closed!")
return
}
t, err := time.Parse(time.RFC3339, dm.DNSTap.TimestampRFC3339)
timensec := ""
if err == nil {
timensec = strconv.Itoa(int(t.UnixNano()))
}
data := ClickhouseData{
Identity: dm.DNSTap.Identity,
QueryIP: dm.NetworkInfo.QueryIP,
QName: dm.DNS.Qname,
Operation: dm.DNSTap.Operation,
Family: dm.NetworkInfo.Family,
Protocol: dm.NetworkInfo.Protocol,
QType: dm.DNS.Qtype,
RCode: dm.DNS.Rcode,
TimeNSec: timensec,
TimeStamp: strconv.Itoa(int(int64(dm.DNSTap.TimeSec))),
}
// nolint
url := o.url + "?query=INSERT%20INTO%20" + o.database + "." + o.table + "(identity,queryip,qname,operation,family,protocol,qtype,rcode,timensec,timestamp)%20VALUES%20('" + data.Identity + "','" + data.QueryIP + "','" + data.QName + "','" + data.Operation + "','" + data.Family + "','" + data.Protocol + "','" + data.QType + "','" + data.RCode + "','" + data.TimeNSec + "','" + data.TimeStamp + "')"
req, _ := http.NewRequest("POST", url, nil)

req.Header.Add("Accept", "*/*")
req.Header.Add("X-ClickHouse-User", o.user)
req.Header.Add("X-ClickHouse-Key", o.password)

_, errReq := http.DefaultClient.Do(req)
if errReq != nil {
o.LogError(errReq.Error())
}
}
}
o.LogInfo("processing terminated")
}
67 changes: 67 additions & 0 deletions loggers/clickhouse_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package loggers

import (
"bufio"
"net"
"net/http"
"regexp"
"testing"

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/pkgconfig"
"github.com/dmachard/go-logger"
)

func Test_ClickhouseClient(t *testing.T) {

testcases := []struct {
mode string
pattern string
}{
{
mode: pkgconfig.ModeJSON,
pattern: "dns.collector",
},
}
cfg := pkgconfig.GetFakeConfig()
cfg.Loggers.ClickhouseClient.URL = "http://127.0.0.1:8123"
cfg.Loggers.ClickhouseClient.User = "default"
cfg.Loggers.ClickhouseClient.Password = "password"
cfg.Loggers.ClickhouseClient.Database = "database"
cfg.Loggers.ClickhouseClient.Table = "table"
fakeRcvr, err := net.Listen("tcp", "127.0.0.1:8123")
if err != nil {
t.Fatal(err)
}
defer fakeRcvr.Close()

for _, tc := range testcases {
t.Run(tc.mode, func(t *testing.T) {
g := NewClickhouseClient(cfg, logger.New(false), "test")

go g.Run()

dm := dnsutils.GetFakeDNSMessage()
g.Channel() <- dm
// accept conn
conn, err := fakeRcvr.Accept()
if err != nil {
t.Fatal(err)
}
defer conn.Close()

// read and parse http request on server side
request, err := http.ReadRequest(bufio.NewReader(conn))
if err != nil {
t.Fatal(err)
}
query := request.URL.Query().Get("query")
conn.Write([]byte(pkgconfig.HTTPOK))

pattern := regexp.MustCompile(tc.pattern)
if !pattern.MatchString(query) {
t.Errorf("clickhouse test error want %s, got: %s", tc.pattern, query)
}
})
}
}
19 changes: 17 additions & 2 deletions pkgconfig/loggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,14 @@ type ConfigLoggers struct {
URL string `yaml:"url"`
ChannelBufferSize int `yaml:"chan-buffer-size"`
} `yaml:"falco"`
ClickhouseClient struct {
Enable bool `yaml:"enable"`
URL string `yaml:"url"`
User string `yaml:"user"`
Password string `yaml:"password"`
Database string `yaml:"database"`
Table string `yaml:"table"`
} `yaml:"clickhouse"`
}

func (c *ConfigLoggers) SetDefault() {
Expand Down Expand Up @@ -500,7 +508,7 @@ func (c *ConfigLoggers) SetDefault() {

c.ElasticSearchClient.Enable = false
c.ElasticSearchClient.Server = "http://127.0.0.1:9200/"
c.ElasticSearchClient.Index = "dnscollector"
c.ElasticSearchClient.Index = ProgName
c.ElasticSearchClient.ChannelBufferSize = 4096
c.ElasticSearchClient.BulkSize = 5242880
c.ElasticSearchClient.FlushInterval = 10
Expand Down Expand Up @@ -546,14 +554,21 @@ func (c *ConfigLoggers) SetDefault() {
c.KafkaProducer.BufferSize = 100
c.KafkaProducer.ConnectTimeout = 5
c.KafkaProducer.FlushInterval = 10
c.KafkaProducer.Topic = "dnscollector"
c.KafkaProducer.Topic = ProgName
c.KafkaProducer.Partition = 0
c.KafkaProducer.ChannelBufferSize = 4096
c.KafkaProducer.Compression = CompressNone

c.FalcoClient.Enable = false
c.FalcoClient.URL = "http://127.0.0.1:9200"
c.FalcoClient.ChannelBufferSize = 65535

c.ClickhouseClient.Enable = false
c.ClickhouseClient.URL = "http://localhost:8123"
c.ClickhouseClient.User = "default"
c.ClickhouseClient.Password = "password"
c.ClickhouseClient.Database = ProgName
c.ClickhouseClient.Table = "records"
}

func (c *ConfigLoggers) GetTags() (ret []string) {
Expand Down
3 changes: 3 additions & 0 deletions pkglinker/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ func InitMultiplexer(mapLoggers map[string]pkgutils.Worker, mapCollectors map[st
if subcfg.Loggers.FalcoClient.Enable && IsLoggerRouted(config, output.Name) {
mapLoggers[output.Name] = loggers.NewFalcoClient(subcfg, logger, output.Name)
}
if subcfg.Loggers.ClickhouseClient.Enable && IsLoggerRouted(config, output.Name) {
mapLoggers[output.Name] = loggers.NewClickhouseClient(subcfg, logger, output.Name)
}
}

// load collectors
Expand Down
3 changes: 3 additions & 0 deletions pkglinker/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ func CreateStanza(stanzaName string, config *pkgconfig.Config, mapCollectors map
if config.Loggers.FalcoClient.Enable {
mapLoggers[stanzaName] = loggers.NewFalcoClient(config, logger, stanzaName)
}
if config.Loggers.ClickhouseClient.Enable {
mapLoggers[stanzaName] = loggers.NewClickhouseClient(config, logger, stanzaName)
}

// register the collector if enabled
if config.Collectors.DNSMessage.Enable {
Expand Down

0 comments on commit c931a14

Please sign in to comment.