Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Map-reduce functionality #87

Merged
merged 15 commits into from
Apr 14, 2023
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
A fast text scanner/regex extractor and realtime summarizer. Quickly search, reformat and visualize text files
such as logs, csv, json, etc.

Supports various CLI-based graphing and metric formats (filter (grep-like), histogram, table, bargraph, heatmap).
Supports various CLI-based graphing and metric formats (filter (grep-like), histogram, table, bargraph, heatmap, reduce).

`rare` is a play on "more" and "less", but can also stand for "realtime aggregated regular expressions".

Expand All @@ -19,7 +19,7 @@ See [rare.zdyn.net](https://rare.zdyn.net) or the [docs/ folder](docs/) for the

## Features

* Multiple summary formats including: filter (like grep), histogram, bar graphs, tables, heatmaps, and numerical analysis
* Multiple summary formats including: filter (like grep), histogram, bar graphs, tables, heatmaps, reduce, and numerical analysis
* File glob expansions (eg `/var/log/*` or `/var/log/*/*.log`) and `-R`
* Optional gzip decompression (with `-z`)
* Following `-f` or re-open following `-F` (use `--poll` to poll, and `--tail` to tail)
Expand All @@ -41,6 +41,7 @@ Output formats include:
* `heatmap` will display a color-coded version of the strength of a cell in a dense format
* `bargraph` will create either a stacked or non-stacked bargraph based on 2 dimensions
* `analyze` will use the key as a numeric value and compute mean/median/mode/stddev/percentiles
* `reduce` allows evaluating data using expressions, and grouping/sorting the output

More details on various output formats and aggregators (including examples) can be found in [aggregators](docs/usage/aggregators.md)

Expand Down
3 changes: 2 additions & 1 deletion cmd/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func analyzeCommand() *cli.Command {
Description: `Treat every extracted expression as a numerical input, and run analysis
on that input. Will extract mean, median, mode, min, max. If specifying --extra
will also extract std deviation, and quantiles`,
Action: analyzeFunction,
Action: analyzeFunction,
Category: cmdCatAnalyze,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "extra",
Expand Down
3 changes: 2 additions & 1 deletion cmd/bargraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func bargraphCommand() *cli.Command {
the bargraph can collapse and stack data in different formats. The key data format
is {$ a b [c]}, where a is the base-key, b is the optional sub-key, and c is the increment
(defaults to 1)`,
Action: bargraphFunction,
Action: bargraphFunction,
Category: cmdCatVisualize,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "stacked",
Expand Down
7 changes: 7 additions & 0 deletions cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@ package cmd

import "github.com/urfave/cli/v2"

var (
cmdCatAnalyze = "Analyze"
cmdCatVisualize = "Visualize"
cmdCatHelp = "Help"
)

var commands []*cli.Command = []*cli.Command{
filterCommand(),
histogramCommand(),
heatmapCommand(),
bargraphCommand(),
analyzeCommand(),
tabulateCommand(),
reduceCommand(),
docsCommand(),
expressionCommand(),
}
Expand Down
1 change: 1 addition & 0 deletions cmd/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func docsCommand() *cli.Command {
Usage: "Access detailed documentation",
ArgsUsage: "[doc]",
Action: docsFunction,
Category: cmdCatHelp,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "no-pager",
Expand Down
1 change: 1 addition & 0 deletions cmd/expressions.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func expressionCommand() *cli.Command {
ArgsUsage: "<expression>",
Aliases: []string{"exp"},
Action: expressionFunction,
Category: cmdCatHelp,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "benchmark",
Expand Down
5 changes: 3 additions & 2 deletions cmd/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ func filterCommand() *cli.Command {
Usage: "Filter incoming results with search criteria, and output raw matches",
Description: `Filters incoming results by a regex, and output the match of a single line
or an extracted expression.`,
Aliases: []string{"f"},
Action: filterFunction,
Aliases: []string{"f"},
Action: filterFunction,
Category: cmdCatAnalyze,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "line",
Expand Down
3 changes: 2 additions & 1 deletion cmd/heatmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ func heatmapCommand() *cli.Command {
Description: `Creates a dense 2D visual of extracted data. Each character
represents a single data-point, and can create an alternative visualization to
a table. Unicode and color support required for effective display`,
Action: heatmapFunction,
Action: heatmapFunction,
Category: cmdCatVisualize,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "delim",
Expand Down
1 change: 1 addition & 0 deletions cmd/histo.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func histogramCommand() *cli.Command {
Action: histoFunction,
Aliases: []string{"histo", "h"},
ArgsUsage: helpers.DefaultArgumentDescriptor,
Category: cmdCatVisualize,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "all",
Expand Down
202 changes: 202 additions & 0 deletions cmd/reduce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package cmd

import (
"fmt"
"rare/cmd/helpers"
"rare/pkg/aggregation"
"rare/pkg/aggregation/sorting"
"rare/pkg/color"
"rare/pkg/expressions/stdlib"
"rare/pkg/logger"
"rare/pkg/multiterm/termrenderers"
"strings"

"github.com/urfave/cli/v2"
)

func reduceFunction(c *cli.Context) error {
var (
accumExprs = c.StringSlice("accumulator")
groupExpr = c.StringSlice("group")
defaultInitial = c.String("initial")
table = c.Bool("table")
sort = c.String("sort")
sortReverse = c.Bool("sort-reverse")
rowCount = c.Int("rows")
colCount = c.Int("cols")
)

vt := helpers.BuildVTermFromArguments(c)
batcher := helpers.BuildBatcherFromArguments(c)
extractor := helpers.BuildExtractorFromArguments(c, batcher)

aggr := aggregation.NewAccumulatingGroup(stdlib.NewStdKeyBuilder())

// Set up groups
for _, group := range groupExpr {
name, val := parseKeyValue(group)
if err := aggr.AddGroupExpr(name, val); err != nil {
logger.Fatalf("Error compiling group expression %s: %s", group, err)
}
}

// Set up expressions
maxKeylen := 0
for _, expr := range accumExprs {
name, initial, val := parseKeyValInitial(expr, defaultInitial)
if err := aggr.AddDataExpr(name, val, initial); err != nil {
logger.Fatalf("Error compiling expression %s: %s", expr, err)
} else {
if len(name) > maxKeylen {
maxKeylen = len(name)
}
}
}

// Set up sorting
var sorter = sorting.ByContextual()
if sortReverse {
sorter = sorting.Reverse(sorter)
}
if sort != "" {
if err := aggr.SetSort(sort); err != nil {
logger.Fatalf("Error setting sort: %s", err)
}
}

// run the aggregation
if aggr.GroupColCount() > 0 || table {
// Table output
table := termrenderers.NewTable(vt, colCount, rowCount)

// write header (will never shift)
{
rowBuf := make([]string, aggr.ColCount())
for i, groupCol := range aggr.GroupCols() {
rowBuf[i] = color.Wrap(color.Underline+color.BrightYellow, groupCol)
}
for i, dataCol := range aggr.DataCols() {
rowBuf[aggr.GroupColCount()+i] = color.Wrap(color.Underline+color.BrightBlue, dataCol)
}
table.WriteRow(0, rowBuf...)
}

helpers.RunAggregationLoop(extractor, aggr, func() {
// write data
for i, group := range aggr.Groups(sorter) {
rowBuf := make([]string, aggr.ColCount())
data := aggr.Data(group)
for idx, item := range group.Parts() {
rowBuf[idx] = color.Wrap(color.BrightWhite, item)
}
copy(rowBuf[aggr.GroupColCount():], data)
table.WriteRow(i+1, rowBuf...)
}

// write footer
table.WriteFooter(0, helpers.FWriteExtractorSummary(extractor, aggr.ParseErrors(),
fmt.Sprintf("(R: %d; C: %d)", aggr.DataCount(), aggr.ColCount())))
table.WriteFooter(1, batcher.StatusString())
})
} else {
// Simple output
helpers.RunAggregationLoop(extractor, aggr, func() {
items := aggr.Data("")
colNames := aggr.DataCols()
for idx, expr := range items {
vt.WriteForLine(idx, colNames[idx]+strings.Repeat(" ", maxKeylen-len(colNames[idx]))+": "+expr)
}
vt.WriteForLine(len(items), helpers.FWriteExtractorSummary(extractor, aggr.ParseErrors()))
vt.WriteForLine(len(items)+1, batcher.StatusString())
})
}

vt.Close()

return helpers.DetermineErrorState(batcher, extractor, aggr)
}

func parseKeyValInitial(s, defaultInitial string) (key, initial, val string) {
eqSep := strings.IndexByte(s, '=')
if eqSep < 0 {
return s, defaultInitial, s
}
k := s[:eqSep]
v := s[eqSep+1:]

initialSep := strings.IndexByte(k, ':')
if initialSep >= 0 {
return k[:initialSep], k[initialSep+1:], v
}
return k, defaultInitial, v
}

func reduceCommand() *cli.Command {
cmd := helpers.AdaptCommandForExtractor(cli.Command{
Name: "reduce",
Action: reduceFunction,
Usage: "Aggregate the results of a query based on an expression, pulling customized summary from the extracted data",
Aliases: []string{"r"},
Category: cmdCatVisualize,
Flags: []cli.Flag{
&cli.StringSliceFlag{
Name: "accumulator",
Aliases: []string{"a"},
Usage: "Specify one or more expressions to execute for each match. `{.}` is the accumulator. Syntax: `[name[:initial]=]expr`",
},
&cli.StringSliceFlag{
Name: "group",
Aliases: []string{"g"},
Usage: "Specifies one or more expressions to group on. Syntax: `[name=]expr`",
},
&cli.StringFlag{
Name: "initial",
Usage: "Specify the default initial value for any accumulators that don't specify",
Value: "0",
},
&cli.BoolFlag{
Name: "table",
Usage: "Force output to be a table, even when there are no groups",
},
&cli.IntFlag{
Name: "num",
Aliases: []string{"rows", "n"},
Usage: "Number of elements to display",
Value: 20,
},
&cli.IntFlag{
Name: "cols",
Usage: "Number of columns to display",
Value: 10,
},
&cli.StringFlag{
Name: "sort",
Usage: "Specify an expression to sort groups by. Will sort result in alphanumeric order",
DefaultText: "group key",
},
&cli.BoolFlag{
Name: "sort-reverse",
Usage: "Reverses sort order",
},
helpers.SnapshotFlag,
},
})

// Rewrite the default extraction to output array rather than {0} match
{
didInject := false
for _, flag := range cmd.Flags {
if slice, ok := flag.(*cli.StringSliceFlag); ok && slice.Name == "extract" {
slice.Value = cli.NewStringSlice("{@}")
didInject = true
break
}
}

if !didInject { // To catch issues in tests
panic("Unable to inject extract change")
}
}

return cmd
}
48 changes: 48 additions & 0 deletions cmd/reduce_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package cmd

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestReduce(t *testing.T) {
out, eout, err := testCommandCapture(reduceCommand(),
`-m (\d+) --snapshot -a "test={sumi {.} {0}}" testdata/log.txt`)
assert.NoError(t, err)
assert.Empty(t, eout)
assert.Equal(t, "test: 32\nMatched: 3 / 6\n96 B (0 B/s) \n", out)
}

func TestReduceBasics(t *testing.T) {
testCommandSet(t, reduceCommand(),
`-m (\d+) -g {0} -a {0} testdata/log.txt`,
`-m (\d+) -g {0} -a a={0} --sort {a} --sort-reverse testdata/log.txt`)
}

func TestParseKIV(t *testing.T) {
k, i, v := parseKeyValInitial("abc", "init")
assert.Equal(t, "abc", k)
assert.Equal(t, "init", i)
assert.Equal(t, "abc", v)

k, i, v = parseKeyValInitial("abc=efg", "init")
assert.Equal(t, "abc", k)
assert.Equal(t, "init", i)
assert.Equal(t, "efg", v)

k, i, v = parseKeyValInitial("=efg", "init")
assert.Equal(t, "", k)
assert.Equal(t, "init", i)
assert.Equal(t, "efg", v)

k, i, v = parseKeyValInitial("abc:=efg", "init")
assert.Equal(t, "abc", k)
assert.Equal(t, "", i)
assert.Equal(t, "efg", v)

k, i, v = parseKeyValInitial("abc:1=efg", "init")
assert.Equal(t, "abc", k)
assert.Equal(t, "1", i)
assert.Equal(t, "efg", v)
}
3 changes: 2 additions & 1 deletion cmd/tabulate.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func tabulateCommand() *cli.Command {
Description: `Summarizes the extracted data as a 2D data table.
The expression key data format is {$ a b [c]}, where a is the column key,
b is the rowkey, and optionally c is the increment value (Default: 1)`,
Action: tabulateFunction,
Action: tabulateFunction,
Category: cmdCatVisualize,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "delim",
Expand Down
Binary file modified docs/cli-help.md
Binary file not shown.
5 changes: 4 additions & 1 deletion docs/images/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Commands

```bash
nvm use ---lts
nvm use --lts
npm install -g terminalizer
terminalizer record -k output.yml
terminalizer render -o temp.gif output.yml
Expand Down Expand Up @@ -66,4 +66,7 @@ rare heatmap -m '\[(.+?)\].*" (\d+)' -e "{timeattr {time {1}} yearweek}" -e "{2}

rare analyze -m '(\d{3}) (\d+)' -e '{2}' -i '{neq {1} 200}' access.log

### Reduce http data
rare reduce -m "(\d{3}) (\d+)" -g "http={1}" -a "total={sumi {.} {2}}" -a "count={sumi {.} 1}" -a "avg={divi {total} {count}}" --sort="-{avg}" access.log

```
Binary file added docs/images/rare-reduce.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading