Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logstash to Security Lake pipeline #147

Closed
wants to merge 82 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
e6784f3
Adding Python script that receives a continuous json stream over stdi…
f-galland Feb 5, 2024
116b22b
Adding logstash pipeline for python script
f-galland Feb 5, 2024
288c40a
encode_parquet() function fixed to handle lists of dictionaries
f-galland Feb 6, 2024
6ac3c99
Correct error in encode_parquet()
f-galland Feb 6, 2024
4ad01c2
Avoid storing the block ending in the output buffer
f-galland Feb 6, 2024
1638b17
Add comments on handling files and streams with pyarrow for future re…
f-galland Feb 6, 2024
17e5dfb
Add s3 handling reference links
f-galland Feb 6, 2024
0b5adc9
Write parquet directly to bucket
f-galland Feb 6, 2024
10824ed
Added basics of map_to_ocsf() function
f-galland Feb 7, 2024
c81239b
Minor fixes
f-galland Feb 7, 2024
210541d
Map alerts to OCSF as they are read
f-galland Feb 7, 2024
5e3c0fa
Add script to convert Wazuh events to OCSF
AlexRuiz7 Feb 8, 2024
0995134
Add OCSF converter + Parquet encoder + test scripts
AlexRuiz7 Feb 9, 2024
d82ed21
Update .gitignore
AlexRuiz7 Feb 9, 2024
17dac0c
Include the contents of the alert under unmapped
f-galland Feb 8, 2024
a4f74db
Add support for different OCSF schema versions
f-galland Feb 8, 2024
34f295b
Use custom ocsf module to map alerts
f-galland Feb 15, 2024
fd63e9e
Modify script to use converter class
f-galland Feb 15, 2024
d32e06d
Code polish and fix errors
f-galland Feb 16, 2024
ab56e89
Remove unnecessary type declaration from debug flag
f-galland Feb 16, 2024
7fc49e7
Improved parquet encoding
f-galland Feb 16, 2024
67b785f
Initial commit for test env's docker-compose.yml
f-galland Feb 19, 2024
1d8efe3
Merge branch '4.9.0' into logstash-pipe-output
f-galland Feb 19, 2024
0bf697d
Remove sudo references from docker-compose.yml
f-galland Feb 19, 2024
159adcb
Adding Python script that receives a continuous json stream over stdi…
f-galland Feb 5, 2024
6e17aae
Adding logstash pipeline for python script
f-galland Feb 5, 2024
a05c23c
encode_parquet() function fixed to handle lists of dictionaries
f-galland Feb 6, 2024
e04f0d5
Correct error in encode_parquet()
f-galland Feb 6, 2024
93935fc
Avoid storing the block ending in the output buffer
f-galland Feb 6, 2024
1db384c
Add comments on handling files and streams with pyarrow for future re…
f-galland Feb 6, 2024
c60045f
Add s3 handling reference links
f-galland Feb 6, 2024
8949097
Write parquet directly to bucket
f-galland Feb 6, 2024
eb7ace3
Added basics of map_to_ocsf() function
f-galland Feb 7, 2024
3d7b8ff
Minor fixes
f-galland Feb 7, 2024
545f855
Map alerts to OCSF as they are read
f-galland Feb 7, 2024
f753b12
Add script to convert Wazuh events to OCSF
AlexRuiz7 Feb 8, 2024
dcc119e
Add OCSF converter + Parquet encoder + test scripts
AlexRuiz7 Feb 9, 2024
5c5ff24
Update .gitignore
AlexRuiz7 Feb 9, 2024
a39ef90
Include the contents of the alert under unmapped
f-galland Feb 8, 2024
97725bc
Add support for different OCSF schema versions
f-galland Feb 8, 2024
e313572
Use custom ocsf module to map alerts
f-galland Feb 15, 2024
4896d15
Modify script to use converter class
f-galland Feb 15, 2024
7fd25d1
Code polish and fix errors
f-galland Feb 16, 2024
e06203c
Remove unnecessary type declaration from debug flag
f-galland Feb 16, 2024
6826e12
Improved parquet encoding
f-galland Feb 16, 2024
9cfc247
Initial commit for test env's docker-compose.yml
f-galland Feb 19, 2024
324d1f5
Remove sudo references from docker-compose.yml
f-galland Feb 19, 2024
cb5ac73
Add operational Python module to transform events to OCSF
AlexRuiz7 Feb 21, 2024
05ae2d1
Create minimal Docker environment to test and develop the integration.
AlexRuiz7 Feb 22, 2024
17f47ca
Fix events-generator's Inventory starvation
AlexRuiz7 Feb 22, 2024
204948f
Remove files present in #147
AlexRuiz7 Feb 22, 2024
5fcc9a3
Cleanup
AlexRuiz7 Feb 22, 2024
a246410
Add FQDN hostnames to services for certificates creation
AlexRuiz7 Feb 22, 2024
bf3f1ff
Add certificate generator service
f-galland Feb 22, 2024
4279b6e
Add certificate config to docker compose file
f-galland Feb 22, 2024
65b3b10
Use secrets for certificates
f-galland Feb 22, 2024
c0d6d2c
Disable permission handling inside cert's generator entrypoint.sh
f-galland Feb 22, 2024
017a908
Back to using a bind mount for certs
f-galland Feb 22, 2024
2a60c41
Have entrypoint.sh generate certs with 1000:1000 ownership
f-galland Feb 22, 2024
91da2c2
Correct certificate permissions and bind mounting
f-galland Feb 23, 2024
55f0726
Add security initialization variable to compose file
f-galland Feb 23, 2024
454d6fd
Add S3 Ninja (Mock)
AlexRuiz7 Feb 23, 2024
a9f9572
Fix permissions on certs generator entrypoint
f-galland Feb 23, 2024
c854dc5
Add cert generator config file
f-galland Feb 23, 2024
422cf1c
Remove old cert generator dir
f-galland Feb 23, 2024
21f89f8
Set indexer hostname right in pipeline file
f-galland Feb 23, 2024
826f06b
Merging changes from 160-security-lake-logstash-certificates
f-galland Feb 23, 2024
6938a54
Change timestamp field in pipeline
f-galland Feb 26, 2024
118b477
Clean up unneeded files
f-galland Feb 26, 2024
17b71f0
Made script available as volume and changed permissions for quick tes…
f-galland Feb 26, 2024
b9b21a8
Temporary fix on ocsf class for testing purposes
f-galland Feb 27, 2024
81ecb7c
Add function to handle local s3 buckets
f-galland Feb 27, 2024
f4a7336
Allow to set the bucket name from the cli
f-galland Feb 27, 2024
811f940
Renamed script to match convention
f-galland Feb 28, 2024
c421235
Move argparse into its own function
f-galland Feb 28, 2024
3713fb1
Add volumes to mount script and dependencies in compose file
f-galland Feb 28, 2024
f6329f4
Delete integrations/ocsf-mapping.json
f-galland Feb 28, 2024
5cb2c38
Delete integrations/amazon-security-lake/logstash/pipe-output.conf
f-galland Feb 28, 2024
259622b
merging s3 mock changes into pipeline development branch
f-galland Feb 28, 2024
25ba5a0
Merge branch '4.9.0' of github.com:wazuh/wazuh-indexer into logstash-…
AlexRuiz7 Mar 4, 2024
9950369
Remove old files
AlexRuiz7 Mar 4, 2024
b75134e
Merge branch '4.9.0-2.11.1' into logstash-pipe-output
AlexRuiz7 Mar 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integrations/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ opensearch
splunk
common
config
docker/certs
docker/certs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ input {
}
}


output {
file {
path => "/usr/share/logstash/pipeline/indexer-to-file.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ input {
}
}'
target => "_source"
ecs_compatibility => disabled
schedule => "* * * * *"
}
}
Expand All @@ -26,5 +27,6 @@ output {
message_format => "%{_source}"
ttl => "10"
command => "/usr/bin/env python3 /usr/local/bin/run.py -d"
# command => "/usr/share/logstash/bin/run.py --pushinterval 300 --maxlength 2000 --linebuffer 100 --sleeptime 1 --bucketname securitylake --s3endpoint s3.ninja:9000 --s3profile default"
}
}
2 changes: 1 addition & 1 deletion integrations/amazon-security-lake/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pyarrow>=10.0.1
parquet-tools>=0.2.15
pydantic==2.6.1
pydantic==2.6.1
146 changes: 145 additions & 1 deletion integrations/amazon-security-lake/run.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,7 +1,151 @@
#!/env/bin/python3.9
# vim: bkc=yes bk wb

import transform
import os
import sys
import argparse
import logging
import time
import json
import datetime
import boto3
import transform
from pyarrow import parquet, Table, fs


logging.basicConfig(format='%(asctime)s %(message)s',
encoding='utf-8', level=logging.DEBUG)

BLOCK_ENDING = {"block_ending": True}


def create_arg_parser():
parser = argparse.ArgumentParser(
description='STDIN to Security Lake pipeline')
parser.add_argument('-d', '--debug', action='store_true',
help='Activate debugging')
parser.add_argument('-b', '--bucketname', type=str, action='store',
help='S3 bucket name to write parquet files to')
parser.add_argument('-e', '--s3endpoint', type=str, action='store', default=None,
help='Hostname and port of the S3 destination (defaults to AWS\')')
parser.add_argument('-i', '--pushinterval', type=int, action='store', default=299,
help='Time interval in seconds for pushing data to Security Lake')
parser.add_argument('-l', '--logoutput', type=str, default="/tmp/stdintosecuritylake.txt",
help='File path of the destination file to write to')
parser.add_argument('-m', '--maxlength', type=int, action='store', default=2000,
help='Event number threshold for submission to Security Lake')
parser.add_argument('-n', '--linebuffer', type=int, action='store',
default=100, help='stdin line buffer length')
parser.add_argument('-p', '--s3profile', type=str, action='store',
default='default', help='AWS profile as stored in credentials file')
parser.add_argument('-s', '--sleeptime', type=int, action='store',
default=5, help='Input buffer polling interval')
return parser


def check_fd_open(file):
return file.closed


def s3authenticate(profile, endpoint=None, scheme='https'):
session = boto3.session.Session(profile_name=profile)
credentials = session.get_credentials()

if endpoint != None:
scheme = 'http'

s3fs = fs.S3FileSystem(
endpoint_override=endpoint,
access_key=credentials.access_key,
secret_key=credentials.secret_key,
scheme=scheme)

return s3fs


def encode_parquet(list, bucketname, filename, filesystem):
try:
table = Table.from_pylist(list)
parquet.write_table(table, '{}/{}'.format(bucketname,
filename), filesystem=filesystem)
except Exception as e:
logging.error(e)
raise


def map_block(fileobject, length):
output = []
ocsf_mapped_alert = {}
for line in range(0, length):
line = fileobject.readline()
if line == '':
output.append(BLOCK_ENDING)
break
alert = json.loads(line)
ocsf_mapped_alert = converter.convert(alert)
output.append(ocsf_mapped_alert)
return output


def timedelta(reference_timestamp):
current_time = datetime.datetime.now(datetime.timezone.utc)
return (current_time - reference_timestamp).total_seconds()


def utctime():
return datetime.datetime.now(datetime.timezone.utc)


if __name__ == "__main__":
try:
args = create_arg_parser().parse_args()
logging.info('BUFFERING STDIN')

with os.fdopen(sys.stdin.fileno(), 'rt') as stdin:
output_buffer = []
loop_start_time = utctime()

try:
s3fs = s3authenticate(args.s3profile, args.s3endpoint)
while True:

current_block = map_block(stdin, args.linebuffer)

if current_block[-1] == BLOCK_ENDING:
output_buffer += current_block[0:-1]
time.sleep(args.sleeptime)
else:
output_buffer += current_block

buffer_length = len(output_buffer)

if buffer_length == 0:
continue

elapsed_seconds = timedelta(loop_start_time)

if buffer_length > args.maxlength or elapsed_seconds > args.pushinterval:
logging.info(
'Elapsed seconds: {}'.format(elapsed_seconds))
loop_start_time = utctime()
timestamp = loop_start_time.strftime('%F_%H.%M.%S')
filename = 'wazuh-{}.parquet'.format(timestamp)
logging.info(
'Writing data to s3://{}/{}'.format(args.bucketname, filename))
encode_parquet(
output_buffer, args.bucketname, filename, s3fs)
output_buffer = []

except KeyboardInterrupt:
logging.info("Keyboard Interrupt issued")
exit(0)

logging.info('FINISHED RETRIEVING STDIN')

except Exception as e:
logging.error("Error running script")
logging.error(e)
raise


def _test():
Expand Down
Empty file modified integrations/amazon-security-lake/transform/__init__.py
100644 → 100755
Empty file.
Empty file modified integrations/amazon-security-lake/transform/converter.py
100644 → 100755
Empty file.
Empty file modified integrations/amazon-security-lake/transform/models/__init__.py
100644 → 100755
Empty file.
Empty file modified integrations/amazon-security-lake/transform/models/ocsf.py
100644 → 100755
Empty file.
Empty file modified integrations/amazon-security-lake/transform/models/wazuh.py
100644 → 100755
Empty file.
11 changes: 8 additions & 3 deletions integrations/docker/amazon-security-lake.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
version: "3.8"
version: '3.8'
name: "amazon-security-lake"
services:

events-generator:
image: wazuh/indexer-events-generator
build:
Expand Down Expand Up @@ -66,7 +67,7 @@ services:
- "5601" # Expose port 5601 for web access to OpenSearch Dashboards
environment:
OPENSEARCH_HOSTS: '["https://wazuh.indexer:9200"]' # Define the OpenSearch nodes that OpenSearch Dashboards will query

wazuh.integration.security.lake:
image: wazuh/indexer-security-lake-integration
build:
Expand All @@ -89,7 +90,11 @@ services:
volumes:
- ../amazon-security-lake/logstash/pipeline:/usr/share/logstash/pipeline
- ./certs/root-ca.pem:/usr/share/logstash/root-ca.pem
# command: tail -f /dev/null
- ../amazon-security-lake/run.py:/usr/share/logstash/bin/run.py
- ../amazon-security-lake/transform/:/usr/share/logstash/bin/transform/
- ../amazon-security-lake/ocsf/:/usr/share/logstash/bin/ocsf/
- ./credentials:/usr/share/logstash/.aws/credentials
#command: tail -f /dev/null
command: /usr/share/logstash/bin/logstash -f /usr/share/logstash/pipeline/indexer-to-integrator.conf --path.settings /etc/logstash --config.reload.automatic

s3.ninja:
Expand Down