Skip to content

Commit

Permalink
feat(vector): add support for processing Python log files (#291)
Browse files Browse the repository at this point in the history
  • Loading branch information
lwpk110 authored Dec 18, 2024
1 parent 7f74bf2 commit d210053
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 4 deletions.
81 changes: 77 additions & 4 deletions pkg/productlogging/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,16 @@ sources:
- {{.LogDir}}*/*.log4j2.xml
line_delimiter: "\r\n"
files_airlift:
type: "file"
include:
- "{{.LogDir}}*/*.airlift.json"
files_py:
type: file
include:
- {{.LogDir}}*/*.py.json
files_airlift:
type: "file"
include:
- "{{.LogDir}}*/*.airlift.json"
transforms:
processed_files_stdout:
inputs:
Expand Down Expand Up @@ -321,6 +327,73 @@ transforms:
{{"}}"}}
{{"}}"}}
processed_files_py:
inputs:
- files_py
type: remap
source: |
raw_message = string!(.message)
.timestamp = now()
.logger = ""
.level = "INFO"
.message = ""
.errors = []
parsed_event, err = parse_json(raw_message)
if err != null {{"{{"}}
error = "JSON not parsable: " + err
.errors = push(.errors, error)
log(error, level: "warn")
.message = raw_message
{{"}}"}} else if !is_object(parsed_event) {{"{{"}}
error = "Parsed event is not a JSON object."
.errors = push(.errors, error)
log(error, level: "warn")
.message = raw_message
{{"}}"}} else {{"{{"}}
event = object!(parsed_event)
asctime, err = string(event.asctime)
if err == null {{"{{"}}
parsed_timestamp, err = parse_timestamp(asctime, "%F %T,%3f")
if err == null {{"{{"}}
.timestamp = parsed_timestamp
{{"}}"}} else {{"{{"}}
.errors = push(.errors, "Timestamp not parsable, using current time instead: "+ err)
{{"}}"}}
{{"}}"}} else {{"{{"}}
.errors = push(.errors, "Timestamp not found, using current time instead.")
{{"}}"}}
.logger, err = string(event.name)
if err != null || is_empty(.logger) {{"{{"}}
.errors = push(.errors, "Logger not found.")
{{"}}"}}
level, err = string(event.levelname)
if err != null {{"{{"}}
.errors = push(.errors, "Level not found, using \"" + .level + "\" instead.")
{{"}}"}} else if level == "DEBUG" {{"{{"}}
.level = "DEBUG"
{{"}}"}} else if level == "INFO" {{"{{"}}
.level = "INFO"
{{"}}"}} else if level == "WARNING" {{"{{"}}
.level = "WARN"
{{"}}"}} else if level == "ERROR" {{"{{"}}
.level = "ERROR"
{{"}}"}} else if level == "CRITICAL" {{"{{"}}
.level = "FATAL"
{{"}}"}} else {{"{{"}}
.errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.")
{{"}}"}}
.message, err = string(event.message)
if err != null || is_empty(.message) {{"{{"}}
.errors = push(.errors, "Message not found.")
{{"}}"}}
{{"}}"}}
processed_files_airlift:
inputs:
- files_airlift
Expand Down
74 changes: 74 additions & 0 deletions pkg/productlogging/vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,16 @@ sources:
- /kubedoop/log/*/*.log4j2.xml
line_delimiter: "\r\n"
files_py:
type: file
include:
- /kubedoop/log/*/*.py.json
files_airlift:
type: "file"
include:
- "/kubedoop/log/*/*.airlift.json"
transforms:
processed_files_stdout:
inputs:
Expand Down Expand Up @@ -276,6 +282,73 @@ transforms:
}}
}}
processed_files_py:
inputs:
- files_py
type: remap
source: |
raw_message = string!(.message)
.timestamp = now()
.logger = ""
.level = "INFO"
.message = ""
.errors = []
parsed_event, err = parse_json(raw_message)
if err != null {{
error = "JSON not parsable: " + err
.errors = push(.errors, error)
log(error, level: "warn")
.message = raw_message
}} else if !is_object(parsed_event) {{
error = "Parsed event is not a JSON object."
.errors = push(.errors, error)
log(error, level: "warn")
.message = raw_message
}} else {{
event = object!(parsed_event)
asctime, err = string(event.asctime)
if err == null {{
parsed_timestamp, err = parse_timestamp(asctime, "%F %T,%3f")
if err == null {{
.timestamp = parsed_timestamp
}} else {{
.errors = push(.errors, "Timestamp not parsable, using current time instead: "+ err)
}}
}} else {{
.errors = push(.errors, "Timestamp not found, using current time instead.")
}}
.logger, err = string(event.name)
if err != null || is_empty(.logger) {{
.errors = push(.errors, "Logger not found.")
}}
level, err = string(event.levelname)
if err != null {{
.errors = push(.errors, "Level not found, using \"" + .level + "\" instead.")
}} else if level == "DEBUG" {{
.level = "DEBUG"
}} else if level == "INFO" {{
.level = "INFO"
}} else if level == "WARNING" {{
.level = "WARN"
}} else if level == "ERROR" {{
.level = "ERROR"
}} else if level == "CRITICAL" {{
.level = "FATAL"
}} else {{
.errors = push(.errors, "Level \"" + level + "\" unknown, using \"" + .level + "\" instead.")
}}
.message, err = string(event.message)
if err != null || is_empty(.message) {{
.errors = push(.errors, "Message not found.")
}}
}}
processed_files_airlift:
inputs:
- files_airlift
Expand Down Expand Up @@ -310,6 +383,7 @@ sinks:
type: vector
address: "localhost:8080"
`
print(actualYaml)
assert.Equal(t, expectYaml, actualYaml)
assert.NoError(t, err)
}

0 comments on commit d210053

Please sign in to comment.