Skip to content

Commit

Permalink
Remove remaining hardcoded field lists (#6)
Browse files Browse the repository at this point in the history
* add explicit derived field list

* parse format at the beginning

* generate create table columns dynamically

* fix bug
  • Loading branch information
facundoolano authored Jul 29, 2024
1 parent 68976be commit 81a4bed
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 34 deletions.
34 changes: 12 additions & 22 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type dbSession struct {
const DB_DATE_LAYOUT = "2006-01-02 15:04:05-07:00"

// Open or create the database at the given path.
func InitDB(dbPath string) (*dbSession, error) {
func InitDB(dbPath string, fields []*LogField) (*dbSession, error) {
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return nil, err
Expand All @@ -38,29 +38,19 @@ func InitDB(dbPath string) (*dbSession, error) {
}

// TODO consider adding indexes according to expected queries
// FIXME build this dynamically based on the log format columns
sqlStmt := `

var columns string
for _, field := range fields {
columns += fmt.Sprintf("%s %s,\n", field.ColumnName, field.ColumnSpec)
}

sqlStmt := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS access_logs (
id INTEGER NOT NULL PRIMARY KEY,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ip TEXT,
time TIMESTAMP NOT NULL,
request_raw TEXT NOT NULL,
user_agent_raw TEXT,
status INTEGER,
referer TEXT COLLATE NOCASE,
method TEXT COLLATE NOCASE,
path TEXT,
user_agent TEXT COLLATE NOCASE,
os TEXT COLLATE NOCASE,
device TEXT COLLATE NOCASE,
ua_url TEXT,
ua_type TEXT COLLATE NOCASE
);
`
%s
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);`, columns)

_, err = db.Exec(sqlStmt)
return &dbSession{db: db}, err
}
Expand Down
16 changes: 10 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ func main() {
logFormat = envLogFormat
}

logFormatRegex, fields := ParseFormat(logFormat)

ctx, spec := querySpecFromCLI()
dbs, err := InitDB(dbPath)
dbs, err := InitDB(dbPath, fields)
ctx.FatalIfErrorf(err)
defer dbs.Close()

err = loadLogs(logFormat, logPathPattern, dbs)
err = loadLogs(logFormatRegex, fields, logPathPattern, dbs)
ctx.FatalIfErrorf(err)

columnNames, rowValues, err := dbs.QueryTop(spec)
Expand Down Expand Up @@ -172,21 +174,23 @@ func parseDuration(duration string) (time.Time, error) {
}

// Parse the most recent nginx access.logs and insert the ones not previously seen into the DB.
func loadLogs(logFormat string, logPathPattern string, dbs *dbSession) error {
func loadLogs(logFormatRegex *regexp.Regexp, fields []*LogField, logPathPattern string, dbs *dbSession) error {
logFiles, err := filepath.Glob(logPathPattern)
if err != nil {
return err
}

// FIXME consolidate field list (duplicated knowledge)
dbColumns := []string{"ip", "time", "request_raw", "status", "referer", "user_agent_raw", "method", "path", "user_agent", "os", "device", "ua_url", "ua_type"}
dbColumns := make([]string, len(fields))
for i, field := range fields {
dbColumns[i] = field.ColumnName
}

lastSeenTime, err := dbs.PrepareForUpdate(dbColumns)
if err != nil {
return err
}

err = ProcessAccessLogs(logFormat, logFiles, lastSeenTime, func(logLineFields map[string]string) error {
err = ProcessAccessLogs(logFormatRegex, logFiles, lastSeenTime, func(logLineFields map[string]string) error {
queryValues := make([]interface{}, len(dbColumns))
for i, field := range dbColumns {
queryValues[i] = logLineFields[field]
Expand Down
5 changes: 3 additions & 2 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,12 @@ func runCommand(t *testing.T, logs string, cliArgs []string) ([]string, [][]stri
os.Args = append([]string{"ngtop"}, cliArgs...)
_, spec := querySpecFromCLI()

dbs, err := InitDB(dbFile.Name())
logFormatRegex, fields := ParseFormat(DEFAULT_LOG_FORMAT)
dbs, err := InitDB(dbFile.Name(), fields)
assertEqual(t, err, nil)
defer dbs.Close()

err = loadLogs(DEFAULT_LOG_FORMAT, logFile.Name(), dbs)
err = loadLogs(logFormatRegex, fields, logFile.Name(), dbs)
assertEqual(t, err, nil)
columnNames, rowValues, err := dbs.QueryTop(spec)
assertEqual(t, err, nil)
Expand Down
38 changes: 34 additions & 4 deletions parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type LogField struct {
// TODO
Parse func(string) string
// TODO
DerivedFields []string
// TODO
ParseDerivedFields func(string) map[string]string
}

Expand All @@ -45,12 +47,14 @@ var KNOWN_FIELDS = []LogField{
LogFormatVar: "request",
ColumnName: "request_raw",
ColumnSpec: "TEXT",
DerivedFields: []string{"path", "method", "referer"},
ParseDerivedFields: parseRequestDerivedFields,
},
{
LogFormatVar: "http_user_agent",
ColumnName: "user_agent_raw",
ColumnSpec: "TEXT",
DerivedFields: []string{"user_agent", "os", "device", "ua_type", "ua_url"},
ParseDerivedFields: parseUserAgentDerivedFields,
},
{
Expand Down Expand Up @@ -111,6 +115,8 @@ var KNOWN_FIELDS = []LogField{

var LOGVAR_TO_FIELD = map[string]*LogField{}
var COLUMN_NAME_TO_FIELD = map[string]*LogField{}

// TODO revisit, may be better to do this at main instead
var CLI_NAME_TO_FIELD = map[string]*LogField{}

func init() {
Expand All @@ -130,13 +136,11 @@ const LOG_DATE_LAYOUT = "02/Jan/2006:15:04:05 -0700"
// Parse the fields in the nginx access logs since the `until` time, passing them as a map into the `processFun`.
// Processing is interrupted when a log older than `until` is found.
func ProcessAccessLogs(
logFormat string,
logFormatRegex *regexp.Regexp,
logFiles []string,
until *time.Time,
processFun func(map[string]string) error,
) error {
logPattern := formatToRegex(logFormat)

var untilStr string
if until != nil {
untilStr = until.Format(DB_DATE_LAYOUT)
Expand All @@ -162,7 +166,7 @@ func ProcessAccessLogs(
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
values, err := parseLogLine(logPattern, line)
values, err := parseLogLine(logFormatRegex, line)
if err != nil {
return err
}
Expand All @@ -188,6 +192,32 @@ func ProcessAccessLogs(
return nil
}

func ParseFormat(format string) (*regexp.Regexp, []*LogField) {
regex := formatToRegex(format)

// pick the subset of fields deducted from the regex, plus their derived fields
// use a map to remove duplicates
fieldSubset := make(map[string]*LogField)
for _, name := range regex.SubexpNames() {
if name == "" {
continue
}
fieldSubset[name] = COLUMN_NAME_TO_FIELD[name]

for _, derived := range COLUMN_NAME_TO_FIELD[name].DerivedFields {
fieldSubset[derived] = COLUMN_NAME_TO_FIELD[derived]
}
}

// turn the map into a valuelist
fields := make([]*LogField, 0)
for _, field := range fieldSubset {
fields = append(fields, field)
}

return regex, fields
}

// TODO
func formatToRegex(format string) *regexp.Regexp {
chars := []rune(format)
Expand Down

0 comments on commit 81a4bed

Please sign in to comment.