Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve query performance for ClickHouse plugin #157

Merged
merged 1 commit into from
Sep 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan
- [#150](https://github.com/kobsio/kobs/pull/150): :warning: _Breaking change:_ :warning: The ClickHouse plugin can now only be used together with the [kobsio/fluent-bit-clickhouse](https://github.com/kobsio/fluent-bit-clickhouse) output plugin for [Fluent Bit](https://fluentbit.io). For raw SQL queries against a ClickHouse instance the SQL plugin added in [#149](https://github.com/kobsio/kobs/pull/149) can be used.
- [#152](https://github.com/kobsio/kobs/pull/152): Improve performance for large dashboards and open Application page in gallery view.
- [#155](https://github.com/kobsio/kobs/pull/155): Allow users to get all Applications from all namespaces, by allowing an empty namespace list.
- [#157](https://github.com/kobsio/kobs/pull/157): Imporve query performance for ClickHouse plugin.

## [v0.5.0](https://github.com/kobsio/kobs/releases/tag/v0.5.0) (2021-08-03)

Expand Down
1 change: 0 additions & 1 deletion docs/plugins/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ The following options can be used for a panel with the ClickHouse plugin:
| fields | []string | A list of fields to display in the results table. If this field is omitted, the whole document is displayed in the results table. This field is only available for the `logs`. | No |
| order | string | Order for the returned logs. Must be `ascending` or `descending`. The default value for this field is `descending`. | No |
| orderBy | string | The name of the field, by which the results should be orderd. The default value for this field is `timestamp`. | No |
| maxDocuments | string | The maximum amount of documents, which should be returned. The default value for this field is `1000`. | No |

```yaml
---
Expand Down
44 changes: 8 additions & 36 deletions plugins/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,51 +45,22 @@ func (router *Router) getInstance(name string) *instance.Instance {
// getLogs implements the special handling when the user selected the "logs" options for the "view" configuration. This
// options is intended to use together with the kobsio/fluent-bit-clickhouse Fluent Bit plugin and provides a custom
// query language to get the logs from ClickHouse.
// Next to the query and time range, a user can also provide a limit and offset to page through all the logs. The limit
// shouldn't be larger then 1000 and if the offset is empty we use 0, which indicates a new query in our React UI.
func (router *Router) getLogs(w http.ResponseWriter, r *http.Request) {
name := chi.URLParam(r, "name")
query := r.URL.Query().Get("query")
order := r.URL.Query().Get("order")
orderBy := r.URL.Query().Get("orderBy")
maxDocuments := r.URL.Query().Get("maxDocuments")
limit := r.URL.Query().Get("limit")
offset := r.URL.Query().Get("offset")
timeStart := r.URL.Query().Get("timeStart")
timeEnd := r.URL.Query().Get("timeEnd")

log.WithFields(logrus.Fields{"name": name, "query": query, "order": order, "orderBy": orderBy, "maxDocuments": maxDocuments, "limit": limit, "offset": offset, "timeStart": timeStart, "timeEnd": timeEnd}).Tracef("getLogs")
log.WithFields(logrus.Fields{"name": name, "query": query, "order": order, "orderBy": orderBy, "timeStart": timeStart, "timeEnd": timeEnd}).Tracef("getLogs")

i := router.getInstance(name)
if i == nil {
errresponse.Render(w, r, nil, http.StatusBadRequest, "Could not find instance name")
return
}

parsedLimit, err := strconv.ParseInt(limit, 10, 64)
if err != nil || parsedLimit > 1000 {
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not parse limit")
return
}

parsedOffset := int64(0)
if offset != "" {
parsedOffset, err = strconv.ParseInt(offset, 10, 64)
if err != nil {
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not parse offset")
return
}
}

parsedMaxDocuments := int64(1000)
if maxDocuments != "" {
parsedMaxDocuments, err = strconv.ParseInt(maxDocuments, 10, 64)
if err != nil {
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not parse maxDocuments")
return
}
}

parsedTimeStart, err := strconv.ParseInt(timeStart, 10, 64)
if err != nil {
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not parse start time")
Expand Down Expand Up @@ -118,8 +89,13 @@ func (router *Router) getLogs(w http.ResponseWriter, r *http.Request) {
return
case <-ticker.C:
if f, ok := w.(http.Flusher); ok {
// We do not set the processing status code, so that the queries always are returning a 200. This is
// necessary because Go doesn't allow to set a new status code once the header was written.
// See: https://github.com/golang/go/issues/36734
// For that we also have to handle errors, when the status code is 200 in the React UI.
// See plugins/clickhouse/src/components/page/Logs.tsx#L64
// w.WriteHeader(http.StatusProcessing)
w.Write([]byte("\n"))
w.WriteHeader(http.StatusContinue)
f.Flush()
}
}
Expand All @@ -130,7 +106,7 @@ func (router *Router) getLogs(w http.ResponseWriter, r *http.Request) {
done <- true
}()

documents, fields, count, took, buckets, newOffset, newTimeStart, err := i.GetLogs(r.Context(), query, order, orderBy, parsedMaxDocuments, parsedLimit, parsedOffset, parsedTimeStart, parsedTimeEnd)
documents, fields, count, took, buckets, err := i.GetLogs(r.Context(), query, order, orderBy, parsedTimeStart, parsedTimeEnd)
if err != nil {
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not get logs")
return
Expand All @@ -142,16 +118,12 @@ func (router *Router) getLogs(w http.ResponseWriter, r *http.Request) {
Count int64 `json:"count"`
Took int64 `json:"took"`
Buckets []instance.Bucket `json:"buckets"`
Offset int64 `json:"offset"`
TimeStart int64 `json:"timeStart"`
}{
documents,
fields,
count,
took,
buckets,
newOffset,
newTimeStart,
}

render.JSON(w, r, data)
Expand Down
19 changes: 19 additions & 0 deletions plugins/clickhouse/pkg/instance/helpers.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package instance

import (
"sync"
)

// appendIfMissing appends a value to a slice, when this values doesn't exist in the slice already.
func appendIfMissing(items []string, item string) []string {
for _, ele := range items {
Expand All @@ -22,3 +26,18 @@ func contains(items []string, item string) bool {

return false
}

// parallelize runs the given functions in parallel.
func parallelize(functions ...func()) {
var waitGroup sync.WaitGroup
waitGroup.Add(len(functions))

defer waitGroup.Wait()

for _, function := range functions {
go func(copy func()) {
defer waitGroup.Done()
copy()
}(function)
}
}
144 changes: 70 additions & 74 deletions plugins/clickhouse/pkg/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@ type Instance struct {

// GetLogs parses the given query into the sql syntax, which is then run against the ClickHouse instance. The returned
// rows are converted into a document schema which can be used by our UI.
func (i *Instance) GetLogs(ctx context.Context, query, order, orderBy string, maxDocuments, limit, offset, timeStart, timeEnd int64) ([]map[string]interface{}, []string, int64, int64, []Bucket, int64, int64, error) {
func (i *Instance) GetLogs(ctx context.Context, query, order, orderBy string, timeStart, timeEnd int64) ([]map[string]interface{}, []string, int64, int64, []Bucket, error) {
var count int64
var buckets []Bucket
var documents []map[string]interface{}
var timeConditions string

fields := defaultFields
queryStartTime := time.Now()
interval := (timeEnd - timeStart) / 30

// When the user provides a query, we have to build the additional conditions for the sql query. This is done via
// the parseLogsQuery which is responsible for parsing our simple query language and returning the corresponding
Expand All @@ -52,102 +55,99 @@ func (i *Instance) GetLogs(ctx context.Context, query, order, orderBy string, ma
if query != "" {
parsedQuery, err := parseLogsQuery(query)
if err != nil {
return nil, nil, 0, 0, nil, offset, timeStart, err
return nil, nil, 0, 0, nil, err
}

conditions = fmt.Sprintf("AND %s", parsedQuery)
// conditions = parsedQuery
}

// The count of documents and the buckets are only needed for the first query where the offset is 0. For the
// following queries we can reuse the data returned by the first query, because the number of documents shouldn't
// change in the selected time range.
if offset == 0 {
// Now we are creating 30 buckets for the selected time range and count the documents in each bucket. This is
// used to render the distribution chart, which shows how many documents/rows are available within a bucket.
if timeEnd-timeStart > 30 {
interval := (timeEnd - timeStart) / 30
// sqlQueryBuckets := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d second) AS interval_data , count(*) AS count_data FROM %s.logs WHERE timestamp >= ? AND timestamp <= ? %s GROUP BY interval_data SETTINGS skip_unavailable_shards = 1", interval, i.database, conditions)
sqlQueryBuckets := fmt.Sprintf(`SELECT toStartOfInterval(timestamp, INTERVAL %d second) AS interval_data , count(*) AS count_data FROM %s.logs WHERE timestamp >= ? AND timestamp <= ? %s GROUP BY interval_data ORDER BY interval_data WITH FILL FROM toStartOfInterval(FROM_UNIXTIME(%d), INTERVAL %d second) TO toStartOfInterval(FROM_UNIXTIME(%d), INTERVAL %d second) STEP %d SETTINGS skip_unavailable_shards = 1`, interval, i.database, conditions, timeStart, interval, timeEnd, interval, interval)
log.WithFields(logrus.Fields{"query": sqlQueryBuckets, "timeStart": timeStart, "timeEnd": timeEnd}).Tracef("sql buckets query")
rowsBuckets, err := i.client.QueryContext(ctx, sqlQueryBuckets, time.Unix(timeStart, 0), time.Unix(timeEnd, 0))
if err != nil {
return nil, nil, 0, 0, nil, offset, timeStart, err
}
defer rowsBuckets.Close()

for rowsBuckets.Next() {
var intervalData time.Time
var countData int64
// Now we are creating 30 buckets for the selected time range and count the documents in each bucket. This is used
// to render the distribution chart, which shows how many documents/rows are available within a bucket.
if timeEnd-timeStart > 30 {
sqlQueryBuckets := fmt.Sprintf(`SELECT toStartOfInterval(timestamp, INTERVAL %d second) AS interval_data , count(*) AS count_data FROM %s.logs WHERE timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d) %s GROUP BY interval_data ORDER BY interval_data WITH FILL FROM toStartOfInterval(FROM_UNIXTIME(%d), INTERVAL %d second) TO toStartOfInterval(FROM_UNIXTIME(%d), INTERVAL %d second) STEP %d SETTINGS skip_unavailable_shards = 1`, interval, i.database, timeStart, timeEnd, conditions, timeStart, interval, timeEnd, interval, interval)
log.WithFields(logrus.Fields{"query": sqlQueryBuckets}).Tracef("sql query buckets")
rowsBuckets, err := i.client.QueryContext(ctx, sqlQueryBuckets)
if err != nil {
return nil, nil, 0, 0, nil, err
}
defer rowsBuckets.Close()

if err := rowsBuckets.Scan(&intervalData, &countData); err != nil {
return nil, nil, 0, 0, nil, offset, timeStart, err
}
for rowsBuckets.Next() {
var intervalData time.Time
var countData int64

buckets = append(buckets, Bucket{
Interval: intervalData.Unix(),
IntervalFormatted: "",
Count: countData,
// Formatting is handled on the client side.
// IntervalFormatted: intervalData.Format("01-02 15:04:05"),
})
if err := rowsBuckets.Scan(&intervalData, &countData); err != nil {
return nil, nil, 0, 0, nil, err
}

sort.Slice(buckets, func(i, j int) bool {
return buckets[i].Interval < buckets[j].Interval
buckets = append(buckets, Bucket{
Interval: intervalData.Unix(),
IntervalFormatted: "",
Count: countData,
// Formatting is handled on the client side.
// IntervalFormatted: intervalData.Format("01-02 15:04:05"),
})
}

if err := rowsBuckets.Err(); err != nil {
return nil, nil, 0, 0, nil, err
}

// We are only returning the first 1000 documents in buckets of the given limit, to speed up the following
// query to get the documents. For that we are looping through the sorted buckets and using the timestamp
// from the bucket where the sum of all newer buckets contains 1000 docuemnts.
// This new start time is then also returned in the response and can be used for the "load more" call as the
// new start date. In these follow up calls the start time isn't changed again, because we are skipping the
// count and bucket queries.
// The default value of 1000 documents can be overwritten by a user, by providing the maxDocuments parameter
// in the request.
var bucketCount int64
for i := len(buckets) - 1; i >= 0; i-- {
bucketCount = bucketCount + buckets[i].Count
if bucketCount > maxDocuments {
timeStart = buckets[i].Interval
break
sort.Slice(buckets, func(i, j int) bool {
return buckets[i].Interval < buckets[j].Interval
})

// We are only returning the first 1000 documents in buckets of the given limit, to speed up the following query
// to get the documents. For that we are looping through the sorted buckets and using the timestamp from the
// bucket where the sum of all newer buckets contains 1000 docuemnts.
// This new start time is then also returned in the response and can be used for the "load more" call as the new
// start date. In these follow up calls the start time isn't changed again, because we are skipping the count
// and bucket queries.
for i := len(buckets) - 1; i >= 0; i-- {
if count < 1000 && buckets[i].Count > 0 {
if timeConditions == "" {
timeConditions = fmt.Sprintf("(timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", buckets[i].Interval, buckets[i].Interval+interval)
} else {
timeConditions = fmt.Sprintf("%s OR (timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", timeConditions, buckets[i].Interval, buckets[i].Interval+interval)
}
}

for _, bucket := range buckets {
count = count + bucket.Count
}
count = count + buckets[i].Count
}
}

// If the provided max documents option is zero or negative we just return the count and buckets for the provided
// query.
if maxDocuments <= 0 {
return documents, fields, count, time.Now().Sub(queryStartTime).Milliseconds(), buckets, offset + limit, timeStart, nil
log.WithFields(logrus.Fields{"count": count, "buckets": buckets}).Tracef("sql result buckets")

// If the count of documents is 0 we can already return the result, because the following query wouldn't return any
// documents.
if count == 0 {
return documents, fields, count, time.Now().Sub(queryStartTime).Milliseconds(), buckets, nil
}

parsedOrder := parseOrder(order, orderBy)

// Now we are building and executing our sql query. We always return all fields from the logs table, where the
// timestamp of a row is within the selected query range and the parsed query. We also order all the results by the
// timestamp field and limiting the results / using a offset for pagination.
sqlQuery := fmt.Sprintf("SELECT %s FROM %s.logs WHERE timestamp >= ? AND timestamp <= ? %s ORDER BY %s LIMIT %d OFFSET %d SETTINGS skip_unavailable_shards = 1", defaultColumns, i.database, conditions, parsedOrder, limit, offset)
log.WithFields(logrus.Fields{"query": sqlQuery, "timeStart": timeStart, "timeEnd": timeEnd}).Tracef("sql query")
rows, err := i.client.QueryContext(ctx, sqlQuery, time.Unix(timeStart, 0), time.Unix(timeEnd, 0))
sqlQueryRawLogs := fmt.Sprintf("SELECT %s FROM %s.logs WHERE (%s) %s ORDER BY %s LIMIT 1000 SETTINGS skip_unavailable_shards = 1", defaultColumns, i.database, timeConditions, conditions, parsedOrder)
log.WithFields(logrus.Fields{"query": sqlQueryRawLogs}).Tracef("sql query raw logs")
rowsRawLogs, err := i.client.QueryContext(ctx, sqlQueryRawLogs)
if err != nil {
return nil, nil, 0, 0, nil, offset, timeStart, err
return nil, nil, 0, 0, nil, err
}
defer rows.Close()
defer rowsRawLogs.Close()

// Now we are going throw all the returned rows and passing them to the Row struct. After that we are converting
// each row to a JSON document for the React UI, which contains all the default fields and all the items from the
// fields_string / fields_number array.
// When the offset is 0 (user starts a new query) we are also checking all the fields from the nested fields_string
// and fields_number array and adding them to the fields slice. This slice can then be used by the user in our React
// UI to show only a list of selected fields in the table.
for rows.Next() {
for rowsRawLogs.Next() {
var r Row
if err := rows.Scan(&r.Timestamp, &r.Cluster, &r.Namespace, &r.App, &r.Pod, &r.Container, &r.Host, &r.FieldsString.Key, &r.FieldsString.Value, &r.FieldsNumber.Key, &r.FieldsNumber.Value, &r.Log); err != nil {
return nil, nil, 0, 0, nil, offset, timeStart, err
if err := rowsRawLogs.Scan(&r.Timestamp, &r.Cluster, &r.Namespace, &r.App, &r.Pod, &r.Container, &r.Host, &r.FieldsString.Key, &r.FieldsString.Value, &r.FieldsNumber.Key, &r.FieldsNumber.Value, &r.Log); err != nil {
return nil, nil, 0, 0, nil, err
}

var document map[string]interface{}
Expand All @@ -163,29 +163,25 @@ func (i *Instance) GetLogs(ctx context.Context, query, order, orderBy string, ma

for index, field := range r.FieldsNumber.Key {
document[field] = r.FieldsNumber.Value[index]

if offset == 0 {
fields = appendIfMissing(fields, field)
}
fields = appendIfMissing(fields, field)
}

for index, field := range r.FieldsString.Key {
document[field] = r.FieldsString.Value[index]

if offset == 0 {
fields = appendIfMissing(fields, field)
}
fields = appendIfMissing(fields, field)
}

documents = append(documents, document)
}

if err := rows.Err(); err != nil {
return nil, nil, 0, 0, nil, offset, timeStart, err
if err := rowsRawLogs.Err(); err != nil {
return nil, nil, 0, 0, nil, err
}

sort.Strings(fields)
return documents, fields, count, time.Now().Sub(queryStartTime).Milliseconds(), buckets, offset + limit, timeStart, nil
log.WithFields(logrus.Fields{"documents": len(documents)}).Tracef("sql result raw logs")

return documents, fields, count, time.Now().Sub(queryStartTime).Milliseconds(), buckets, nil
}

// New returns a new ClickHouse instance for the given configuration.
Expand Down
Loading