Skip to content

Commit

Permalink
docstrings and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
facundoolano committed Jul 27, 2024
1 parent 41747d9 commit fc21c87
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 17 deletions.
2 changes: 2 additions & 0 deletions accesslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const LOG_COMBINED_PATTERN = `(?P<ip>\S+) - (?P<remote_user>\S+) \[(?P<time>.*?)

var logPattern = regexp.MustCompile(LOG_COMBINED_PATTERN)

// Parse the fields in the nginx access logs since the `until` time, passing them as a map into the `processFun`.
// Processing is interrupted when a log older than `until` is found.
func ProcessAccessLogs(until *time.Time, processFun func(map[string]interface{}) error) error {

// could make sense to try detecting the OS and applying a sensible default accordingly
Expand Down
6 changes: 5 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (dbs *dbSession) Close() {
dbs.db.Close()
}

// TODO doc
// Prepare a transaction to insert a new batch of log entries, returning the time of the last seen log entry.
func (dbs *dbSession) PrepareForUpdate(columns []string) (*time.Time, error) {
// we want to avoid processed files that were already processed in the past. but we still want to add new log entries
// from the most recent files, which may have been extended since we last saw them.
Expand Down Expand Up @@ -116,6 +116,8 @@ func (dbs *dbSession) AddLogEntry(values ...any) error {
return err
}

// If the given processing `err` is nil, commit the log insertion transaction,
// Otherwise roll it back and return the error.
func (dbs *dbSession) FinishUpdate(err error) error {
tx := dbs.insertTx
dbs.insertTx = nil
Expand All @@ -127,6 +129,7 @@ func (dbs *dbSession) FinishUpdate(err error) error {
return tx.Commit()
}

// Build a query from the spec and execute it, returning the results as stringified values.
func (dbs *dbSession) QueryTop(spec *RequestCountSpec) ([]string, [][]string, error) {
queryString, queryArgs := spec.buildQuery()

Expand Down Expand Up @@ -161,6 +164,7 @@ func (dbs *dbSession) QueryTop(spec *RequestCountSpec) ([]string, [][]string, er
return columns, results, rows.Err()
}

// Turn the request count specification into an SQL query.
func (spec *RequestCountSpec) buildQuery() (string, []any) {
queryArgs := []any{}

Expand Down
41 changes: 25 additions & 16 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func main() {
printTopTable(columnNames, rowValues)
}

// Parse the command line arguments into a top requests query specification
func querySpecFromCLI() (*kong.Context, *RequestCountSpec) {
// Parse query spec first, i.e. don't bother with db updates if the command is invalid
fieldNames := make([]string, 0, len(FIELD_NAMES))
Expand Down Expand Up @@ -110,6 +111,11 @@ func querySpecFromCLI() (*kong.Context, *RequestCountSpec) {
return ctx, spec
}

// Parse the -w conditions like "ua=Firefox" and "url=/blog%" into a mapping that can be used to query the database.
// field alias are translated to their canonical column name
// multiple values of the same field are preserved to be used as OR values
// different fields will be treated as AND conditions on the query
// != pairs are treated as 'different than'
func resolveWhereConditions(clauses []string) (map[string][]string, error) {
conditions := make(map[string][]string)

Expand All @@ -132,6 +138,7 @@ func resolveWhereConditions(clauses []string) (map[string][]string, error) {
return conditions, nil
}

// parse duration expressions as 1d or 10s into a date by subtracting them from the Now() time.
func parseDuration(duration string) (time.Time, error) {
t := NowTimeFun().UTC()
if duration != "now" {
Expand Down Expand Up @@ -163,33 +170,35 @@ func parseDuration(duration string) (time.Time, error) {
return t, nil
}

func printTopTable(columnNames []string, rowValues [][]string) {
tab := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0)
fmt.Fprintf(tab, "%s\n", strings.ToUpper(strings.Join(columnNames, "\t")))
for _, row := range rowValues {
fmt.Fprintf(tab, "%s\n", strings.Join(row, "\t"))
}
tab.Flush()
}

// Parse the most recent nginx access.logs and insert the ones not previously seen into the DB.
func loadLogs(dbs *dbSession) error {
// FIXME consolidate field list (duplicated knowledge)
insertFields := []string{"ip", "time", "request_raw", "status", "bytes_sent", "referer", "user_agent_raw", "method", "path", "user_agent", "os", "device", "ua_url", "ua_type"}
dbColumns := []string{"ip", "time", "request_raw", "status", "bytes_sent", "referer", "user_agent_raw", "method", "path", "user_agent", "os", "device", "ua_url", "ua_type"}

// FIXME this API could be improved, why not a single call?
lastSeenTime, err := dbs.PrepareForUpdate(insertFields)
lastSeenTime, err := dbs.PrepareForUpdate(dbColumns)
if err != nil {
return err
}

err = ProcessAccessLogs(lastSeenTime, func(fields map[string]interface{}) error {
queryValues := make([]interface{}, len(insertFields))
for i, field := range insertFields {
queryValues[i] = fields[field]
err = ProcessAccessLogs(lastSeenTime, func(logLineFields map[string]interface{}) error {
queryValues := make([]interface{}, len(dbColumns))
for i, field := range dbColumns {
queryValues[i] = logLineFields[field]
}
return dbs.AddLogEntry(queryValues...)
})

// Rollback or commit before returning, depending on error
return dbs.FinishUpdate(err)
}

// Print the query results as a table
func printTopTable(columnNames []string, rowValues [][]string) {
tab := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0)
fmt.Fprintf(tab, "%s\n", strings.ToUpper(strings.Join(columnNames, "\t")))
for _, value := range rowValues {
// TODO for last item (always the count) do pretty formatting e.g. 1.3K instead of 1301
fmt.Fprintf(tab, "%s\n", strings.Join(value, "\t"))
}
tab.Flush()
}

0 comments on commit fc21c87

Please sign in to comment.