Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Commit

Permalink
Interation-test full Zeek TI consumption & matching
Browse files Browse the repository at this point in the history
  • Loading branch information
0snap committed Mar 8, 2021
1 parent ea2ae4f commit 31517be
Showing 1 changed file with 45 additions and 27 deletions.
72 changes: 45 additions & 27 deletions tests/integration/test_zeek_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
from datetime import datetime
import os
import queue
from stix2 import Indicator, parse
import subprocess
import threading
from threatbus import start as start_threatbus
from threatbus.data import ThreatBusSTIX2Constants
import time
import unittest

from tests.utils import zeek_receiver, zeek_sender
from tests.utils import zmq_receiver, zmq_sender


def RunZeek():
Expand Down Expand Up @@ -38,7 +40,6 @@ def RunZeek():
"-r",
"/trace.pcap",
"/opt/zeek/share/zeek/site/threatbus.zeek",
"--",
"Tenzir::log_operations=F",
]
)
Expand Down Expand Up @@ -72,47 +73,64 @@ def tearDown(self):

def test_intel_sighting_roundtrip(self):
"""
Backend-agnostic roundtrip scenario, that starts a Zeek
subprocess. Zeek is started using the threatbus.zeek "app" script.
The test sends an intelligence item via Threat Bus. The Zeek
subprocess reads a PCAP trace which contains that known threat
intelligence. The integration test subscribes to the sightings topic
and verifies that Zeek reports sighted threat intelligence back.
Backend-agnostic roundtrip scenario, that starts a Zeek subprocess which
activates the threatbus.zeek "app" script.
The test sends an IoC with a malicious hostname via Threat Bus, using
the ZMQ app plugin. Meanwhile, the Zeek subprocess reads a PCAP trace
which contains exactly that malicious hostname from the IoC.
If all goes well, Zeek subscribes to Threat Bus successfully, receives
the IoC and hence reading the PCAP file results in a sighting. Zeek
forwards that sighting to the Threat Bus Zeek plugin, where it is
converted to a valid STIX-2 Sighting.
The integration test subscribes a ZMQ receiver to the `stix2/sighting`
topic and verifies all Zeek communication was handled correctly. I.e.,
Zeek matched the IoC and reported the correct sighting.
"""
# start a receiver that pushes exactly 1 item to a result queue
# Start a ZMQ receiver that subscribes to the `stix2/sighting` topic and
# forward exactly 1 item to a result queue
result_q = queue.Queue()
rec = threading.Thread(
target=zeek_receiver.forward,
args=(1, result_q, "threatbus/sighting"),
target=zmq_receiver.forward,
args=(1, ["stix2/sighting"], result_q),
daemon=True,
)
rec.start()
# spawn a zeek subprocess that uses the `apps/threatbus.zeek` script

# Spawn a Zeek subprocess that runs the `apps/zeek/threatbus.zeek`
# script and reads a prepared PCAP trace that contains a network
# connection to `example.com`
zeek_process = RunZeek()
if not zeek_process:
self.fail("Error starting Zeek. Is it installed?")
self.fail("Error starting Zeek container.")

# Let Zeek start up...
time.sleep(1)

# send a new intelligence item
intel_id = "EXAMPLE.COM.IS.EVIL"
data = {"indicator": "example.com", "intel_type": "DOMAIN"}
intel = broker.zeek.Event("intel", datetime.now(), intel_id, data, "ADD")
zeek_sender.send("threatbus/intel", intel)
# Send a new indicator (IoC) via the ZMQ test-util, which will be
# forwarded to Zeek because Zeek subscribes to `stix2/indicator`
ioc_id = "indicator--42d31a5b-2da0-4bdd-9823-1723a98fc2fb"
ioc = Indicator(
id=ioc_id,
pattern_type="stix",
pattern="[domain-name:value = 'example.com']",
)
zmq_sender.send("stix2/indicator", ioc.serialize(), port=13372, bind=False)

# wait for zeek to report sighting of the intel
sighting = result_q.get(timeout=10)
# Wait for Zeek to ingest the IoC into its Intel framework, read the
# PCAP trace and report back the sighting
raw_msg = result_q.get(timeout=10)
sighting = parse(raw_msg, allow_custom=True)
result_q.task_done()
zeek_process.kill()

self.assertIsNotNone(sighting)
name, args = sighting.name(), sighting.args()[0]
self.assertEqual(len(args), 3)
self.assertTrue(name.endswith("sighting"))
self.assertTrue(type(args[0]).__name__ == "datetime")
self.assertEqual(args[1], intel_id)
self.assertEqual(args[2], {"noisy": False})
self.assertEqual(sighting.sighting_of_ref, ioc_id)
self.assertTrue(
ThreatBusSTIX2Constants.X_THREATBUS_SIGHTING_CONTEXT.value
in sighting.object_properties()
)
self.assertEqual(sighting.x_threatbus_sighting_context, {"noisy": False})

rec.join()
result_q.join()
zeek_process.kill()
self.assertTrue(StopZeek())

0 comments on commit 31517be

Please sign in to comment.