Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Pyhton module to implement Amazon Security Lake integration #186

Merged
merged 9 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions integrations/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,32 @@ docker compose -f ./docker/amazon-security-lake.yml up -d

This docker compose project will bring a *wazuh-indexer* node, a *wazuh-dashboard* node,
a *logstash* node and our event generator. On the one hand, the event generator will push events
constantly to the indexer. On the other hand, logstash will constantly query for new data and
deliver it to the integration Python program, also present in that node. Finally, the integration
module will prepare and send the data to the Amazon Security Lake's S3 bucket.
constantly to the indexer, on the `wazuh-alerts-4.x-sample` index by default (refer to the [events
generator](./tools/events-generator/README.md) documentation for customization options).
On the other hand, logstash will constantly query for new data and deliver it to the integration
Python program, also present in that node. Finally, the integration module will prepare and send the
data to the Amazon Security Lake's S3 bucket.
<!-- TODO continue with S3 credentials setup -->

Attach a terminal to the container and start the integration by starting logstash, as follows:

```console
/usr/share/logstash/bin/logstash -f /usr/share/logstash/pipeline/indexer-to-integrator.conf --path.settings /etc/logstash
```

Unprocessed data can be sent to a file or to an S3 bucket.
```console
/usr/share/logstash/bin/logstash -f /usr/share/logstash/pipeline/indexer-to-file.conf --path.settings /etc/logstash
/usr/share/logstash/bin/logstash -f /usr/share/logstash/pipeline/indexer-to-s3.conf --path.settings /etc/logstash
```

All three pipelines are configured to fetch the latest data from the *wazuh-indexer* every minute. In
the case of `indexer-to-file`, the data is written at the same pace, whereas `indexer-to-s3`, data
is uploaded every 5 minutes.

For development or debugging purposes, you may want to enable hot-reload, test or debug on these files,
by using the `--config.reload.automatic`, `--config.test_and_exit` or `--debug` flags, respectively.

For production usage, follow the instructions in our documentation page about this matter.
(_when-its-done_)

Expand Down
9 changes: 7 additions & 2 deletions integrations/amazon-security-lake/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ RUN pip install -r /app/requirements.txt
FROM python:3.9
ENV LOGSTASH_KEYSTORE_PASS="SecretPassword"
# Add the application source code.
COPY --chown=logstash:logstash . /home/app
COPY --chown=logstash:logstash ./src /home/app
# Add execution persmissions.
RUN chmod a+x /home/app/run.py
# Copy the application's dependencies.
Expand All @@ -38,4 +38,9 @@ RUN chown --recursive logstash:logstash /usr/share/logstash /etc/logstash /var/l
USER logstash
# Copy and run the setup.sh script to create and configure a keystore for Logstash.
COPY --chown=logstash:logstash logstash/setup.sh /usr/share/logstash/bin/setup.sh
RUN bash /usr/share/logstash/bin/setup.sh
RUN bash /usr/share/logstash/bin/setup.sh

# Disable ECS compatibility
RUN `echo "pipeline.ecs_compatibility: disabled" >> /etc/logstash/logstash.yml`

WORKDIR /home/app
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@ input {
}
}
}'
target => "_source"
schedule => "* * * * *"
}
}


output {
stdout {
id => "output.stdout"
codec => json_lines
}
file {
path => "/usr/share/logstash/pipeline/indexer-to-file.json"
id => "output.file"
path => "/var/log/logstash/indexer-to-file-%{+YYYY-MM-dd-HH}.log"
file_mode => 0644
codec => json_lines
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@ input {
}
}
}'
target => "_source"
schedule => "* * * * *"
}
}

output {
pipe {
id => "securityLake"
message_format => "%{_source}"
ttl => "10"
command => "/usr/bin/env python3 /usr/local/bin/run.py -d"
}
stdout {
id => "output.stdout"
codec => json_lines
}
pipe {
id => "output.integrator"
ttl => "10"
command => "/env/bin/python3 /usr/share/logstash/amazon-security-lake/run.py"
# command => "/usr/share/logstash/amazon-security-lake/run.py --pushinterval 300 --maxlength 2000 --linebuffer 100 --sleeptime 1 --bucketname securitylake --s3endpoint s3.ninja:9000 --s3profile default"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,31 @@ input {
}
}
}'
target => "_source"
schedule => "* * * * *"
schedule => "5/* * * * *"
}
}

output {
stdout { codec => rubydebug }
stdout {
id => "output.stdout"
codec => json_lines
}
s3 {
access_key_id => "<aws-access-key>"
secret_access_key => "<aws-secret-key>"
region => "<your-region>"
id => "output.s3"
access_key_id => "${AWS_KEY}"
secret_access_key => "${AWS_SECRET}"
region => "${AWS_REGION}"
endpoint => "http://s3.ninja:9000"
bucket => "${AWS_BUCKET}"
codec => "json"
retry_count => 0
validate_credentials_on_root_bucket => false
prefix => "%{+YYYY}/%{+MM}/%{+dd}"
server_side_encryption => true
server_side_encryption_algorithm => "AES256"
bucket => "wazuh-indexer-amazon-security-lake-bucket"
canned_acl => "bucket-owner-full-control"
codec => "json"
additional_settings => {
"force_path_style" => true
}
time_file => 5
}
}
3 changes: 2 additions & 1 deletion integrations/amazon-security-lake/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pyarrow>=10.0.1
parquet-tools>=0.2.15
pydantic==2.6.1
pydantic==2.6.1
boto3==1.34.46
26 changes: 0 additions & 26 deletions integrations/amazon-security-lake/run.py

This file was deleted.

1 change: 1 addition & 0 deletions integrations/amazon-security-lake/src/parquet/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
import parquet.parquet
122 changes: 122 additions & 0 deletions integrations/amazon-security-lake/src/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/env/bin/python3
# vim: bkc=yes bk wb

import sys
import os
import datetime
import transform
import pyarrow as pa
import pyarrow.parquet as pq
import logging
import boto3
from botocore.exceptions import ClientError

# NOTE work in progress
def upload_file(table, file_name, bucket, object_name=None):
"""Upload a file to an S3 bucket

:param table: PyArrow table with events data
:param file_name: File to upload
:param bucket: Bucket to upload to
:param object_name: S3 object name. If not specified then file_name is used
:return: True if file was uploaded, else False
"""

client = boto3.client(
service_name='s3',
aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
region_name=os.environ['AWS_REGION'],
endpoint_url='http://s3.ninja:9000',
)

# If S3 object_name was not specified, use file_name
if object_name is None:
object_name = os.path.basename(file_name)

# Upload the file
try:
client.put_object(Bucket=bucket, Key=file_name, Body=open(file_name, 'rb'))
except ClientError as e:
logging.error(e)
return False
return True


def main():
'''Main function'''
# Get current timestamp
timestamp = datetime.datetime.now(datetime.timezone.utc).isoformat()

# Generate filenames
filename_raw = f"/tmp/integrator-raw-{timestamp}.json"
filename_ocsf = f"/tmp/integrator-ocsf-{timestamp}.json"
filename_parquet = f"/tmp/integrator-ocsf-{timestamp}.parquet"

# 1. Extract data
# ================
raw_data = []
for line in sys.stdin:
raw_data.append(line)

# Echo piped data
with open(filename_raw, "a") as fd:
fd.write(line)

# 2. Transform data
# ================
# a. Transform to OCSF
ocsf_data = []
for line in raw_data:
try:
event = transform.converter.from_json(line)
ocsf_event = transform.converter.to_detection_finding(event)
ocsf_data.append(ocsf_event.model_dump())

# Temporal disk storage
with open(filename_ocsf, "a") as fd:
fd.write(str(ocsf_event) + "\n")
except AttributeError as e:
print("Error transforming line to OCSF")
print(event)
print(e)

# b. Encode as Parquet
try:
table = pa.Table.from_pylist(ocsf_data)
pq.write_table(table, filename_parquet)
except AttributeError as e:
print("Error encoding data to parquet")
print(e)

# 3. Load data (upload to S3)
# ================
if upload_file(table, filename_parquet, os.environ['AWS_BUCKET']):
# Remove /tmp files
pass


def _test():
ocsf_event = {}
with open("./wazuh-event.sample.json", "r") as fd:
# Load from file descriptor
for raw_event in fd:
try:
event = transform.converter.from_json(raw_event)
print("")
print("-- Wazuh event Pydantic model")
print("")
print(event.model_dump())
ocsf_event = transform.converter.to_detection_finding(event)
print("")
print("-- Converted to OCSF")
print("")
print(ocsf_event.model_dump())

except KeyError as e:
raise (e)


if __name__ == '__main__':
main()
# _test()
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,16 @@ def to_detection_finding(event: models.wazuh.Event) -> models.ocsf.DetectionFind
)


def from_json(event: dict) -> models.wazuh.Event:
# def from_json(event: dict) -> models.wazuh.Event:
# # Needs to a string, bytes or bytearray
# try:
# return models.wazuh.Event.model_validate_json(json.dumps(event))
# except pydantic.ValidationError as e:
# print(e)

def from_json(event: str) -> models.wazuh.Event:
# Needs to a string, bytes or bytearray
try:
return models.wazuh.Event.model_validate_json(json.dumps(event))
return models.wazuh.Event.model_validate_json(event)
except pydantic.ValidationError as e:
print(e)
print(e)
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,27 @@


class Mitre(pydantic.BaseModel):
technique: typing.List[str] = []
id: typing.List[str] = ""
tactic: typing.List[str] = []
technique: typing.List[str] = ["N/A"]
id: typing.List[str] = ["N/A"]
tactic: typing.List[str] = ["N/A"]


class Rule(pydantic.BaseModel):
firedtimes: int = 0
description: str = ""
description: str = "N/A"
groups: typing.List[str] = []
id: str = ""
id: str = "N/A"
mitre: Mitre = Mitre()
level: int = 0
nist_800_53: typing.List[str] = []


class Decoder(pydantic.BaseModel):
name: str
name: str = "N/A"


class Input(pydantic.BaseModel):
type: str
type: str = "N/A"


class Agent(pydantic.BaseModel):
Expand All @@ -39,9 +39,9 @@ class Manager(pydantic.BaseModel):


class Event(pydantic.BaseModel):
rule: Rule = {}
decoder: Decoder = {}
input: Input = {}
rule: Rule = Rule()
decoder: Decoder = Decoder()
input: Input = Input()
id: str = ""
full_log: str = ""
agent: Agent = {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"cluster":{"name":"wazuh-cluster","node":"wazuh-manager"},"agent":{"id":"003","ip":"10.0.0.180","name":"ip-10-0-0-180.us-west-1.compute.internal"},"@timestamp":"2024-03-14T12:57:05.730Z","data":{"audit":{"exe":"/usr/sbin/sshd","type":"NORMAL","cwd":"/home/wazuh","file":{"name":"/var/sample"},"success":"yes","command":"ssh"}},"@version":"1","manager":{"name":"wazuh-manager"},"location":"","decoder":{},"id":"1580123327.49031","predecoder":{},"timestamp":"2024-03-14T12:57:05.730+0000","rule":{"description":"Audit: Command: /usr/sbin/ssh","firedtimes":3,"level":3,"id":"80791","mail":false,"groups":["audit","audit_command"],"gdpr":["IV_30.1.g"]}}
Loading