Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Commit

Permalink
Merge pull request #61
Browse files Browse the repository at this point in the history
Improve message distribution performance
  • Loading branch information
0snap authored Nov 18, 2020
2 parents b7ffeee + ba36620 commit 2672c7e
Show file tree
Hide file tree
Showing 31 changed files with 1,501 additions and 837 deletions.
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,33 @@ Every entry has a category for which we use the following visual abbreviations:

## Unreleased

- 🎁 The `zmq-app` and `zeek` plugins now use the Unix select system call for
improved performance during message passing. The previous approach impacted
the performance with a constant delay for every message and did not scale.
The new approach saves at least that constant factor *per message*. For ZeroMQ
publishing we observed a speedup of approximately factor 183 for 100k events.
[#61](https://github.com/tenzir/threatbus/pull/61)

- 🎁 The `rabbitmq` backbone plugin now uses an asynchronous
[SelectConnection](https://pika.readthedocs.io/en/stable/modules/adapters/select.html)
instead of a blocking one. We measured a speedup of approximately factor 1.2
for 100k events.
[#61](https://github.com/tenzir/threatbus/pull/61)

- 🎁 Threat Bus now has a controlled shutdown. Pressing ctrl+c first shuts down
backbone plugins, then app plugins, and lastly Threat Bus itself.
[#61](https://github.com/tenzir/threatbus/pull/61)

- ⚠️ There exists a new base class for implementing plugin-threads. Plugin
developers should extend the new `StoppableWorker` for every plugin. Threat
Bus and all plugins in this repository now implement that class.
[#61](https://github.com/tenzir/threatbus/pull/61)

- ⚠️ Threat Bus and all plugins now use
[multiprocessing.JoinableQueue](https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing.JoinableQueue)
for message passing.
[#61](https://github.com/tenzir/threatbus/pull/61)

- 🎁 The `zmq-app` plugin now supports synchronous heartbeats. With heartbeats,
both Threat Bus and the connected apps can mutually ensure that the connected
party is still alive.
Expand Down
7 changes: 7 additions & 0 deletions apps/vast/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ Every entry has a category for which we use the following visual abbreviations:

## Unreleased

- 🎁 `pyvast-threatbus` now uses asynchronous background tasks to query VAST
concurrently. VAST queries were executed sequentially prior to this change.
This boosts the performance by the factor of allowed concurrent background
tasks. Users can control the maximum number of concurrent background tasks
with the new `max-background-tasks` configuration option.
[#61](https://github.com/tenzir/threatbus/pull/61)

- 🎁 The Python app to connect [VAST](https://github.com/tenzir/vast) with
Threat Bus is now packaged and published on [PyPI](https://pypi.org/). You can
install the package via `pip install pyvast-threatbus`.
Expand Down
2 changes: 2 additions & 0 deletions apps/vast/config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ unflatten: true
transform_context: fever alertify --alert-prefix 'MY PREFIX' --extra-key my-ioc --ioc %ioc
# optional. remove the field if you simply want to report back sightings to Threat Bus
sink: STDOUT
# limits the amount of concurrent background tasks for querying vast
max_background_tasks: 100
1 change: 0 additions & 1 deletion apps/vast/message_mapping.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from datetime import datetime
from dateutil import parser as dateutil_parser
from ipaddress import ip_address
import json
Expand Down
174 changes: 121 additions & 53 deletions apps/vast/pyvast_threatbus.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@

logger = logging.getLogger(__name__)
matcher_name = None
async_tasks = [] # list of all running async tasks of the app
p2p_topic = None # the p2p topic that was given to the app upon successful subscription
# list of all running async tasks of the bridge
async_tasks = []
# the p2p topic that was given to the vast-bridge upon successful subscription
p2p_topic = None
max_open_tasks = None


def setup_logging(level):
Expand Down Expand Up @@ -53,6 +56,7 @@ def validate_config(config: confuse.Subview):
config["retro_match"].get(bool)
config["retro_match_max_events"].get(int)
config["unflatten"].get(bool)
config["max_background_tasks"].get(int)

# fallback values for the optional arguments
config["transform_context"].add(None)
Expand All @@ -71,13 +75,14 @@ def cancel_async_tasks():


async def start(
cmd: str,
vast_binary: str,
vast_endpoint: str,
zmq_endpoint: str,
snapshot: int,
retro_match: bool,
retro_match_max_events: int,
unflatten: bool,
max_open_files: int,
transform_cmd: str = None,
sink: str = None,
):
Expand All @@ -92,12 +97,15 @@ async def start(
@param retro_match Boolean flag to use retro-matching over live-matching
@param retro_match_max_events Max amount of retro match results
@param unflatten Boolean flag to unflatten JSON when received from VAST
@param max_open_files The maximum number of concurrent background tasks for VAST queries.
@param transform_cmd The command to use to transform Sighting context with
@param sink Forward sighting context to this sink (subprocess) instead of
reporting back to Threat Bus
"""
global logger, async_tasks, p2p_topic
vast = VAST(binary=cmd, endpoint=vast_endpoint, logger=logger)
global logger, async_tasks, p2p_topic, max_open_tasks
# needs to be created inside the same eventloop where it is used
max_open_tasks = asyncio.Semaphore(max_open_files)
vast = VAST(binary=vast_binary, endpoint=vast_endpoint, logger=logger)
assert await vast.test_connection() is True, "Cannot connect to VAST"

logger.debug(f"Calling Threat Bus management endpoint {zmq_endpoint}")
Expand Down Expand Up @@ -140,7 +148,7 @@ async def start(
async_tasks.append(
asyncio.create_task(
match_intel(
cmd,
vast_binary,
vast_endpoint,
intel_queue,
sightings_queue,
Expand All @@ -153,7 +161,9 @@ async def start(

if not retro_match:
async_tasks.append(
asyncio.create_task(live_match_vast(cmd, vast_endpoint, sightings_queue))
asyncio.create_task(
live_match_vast(vast_binary, vast_endpoint, sightings_queue)
)
)

atexit.register(cancel_async_tasks)
Expand Down Expand Up @@ -196,8 +206,84 @@ async def receive(pub_endpoint: str, topic: str, intel_queue: asyncio.Queue):
await asyncio.sleep(0.05) # free event loop for other tasks


async def retro_match_vast(
vast_binary,
vast_endpoint,
retro_match_max_events,
intel,
sightings_queue,
unflatten,
):
"""
Turns the given intel into a valid VAST query and forwards all all query
results (sightings) to the sightings_queue.
@param vast_binary The vast binary command to use with PyVAST
@param vast_endpoint The endpoint of a running vast node ('host:port')
@param retro_match_max_events Max amount of retro match results
@param intel The IoC to query VAST for
@param sightings_queue The queue to put new sightings into
@param unflatten Boolean flag to unflatten JSON when received from VAST
"""
query = to_vast_query(intel)
if not query:
return
global logger, max_open_tasks
async with max_open_tasks:
vast = VAST(binary=vast_binary, endpoint=vast_endpoint, logger=logger)
proc = await vast.export(max_events=retro_match_max_events).json(query).exec()
reported = 0
while not proc.stdout.at_eof():
line = (await proc.stdout.readline()).decode().rstrip()
if line:
sighting = query_result_to_threatbus_sighting(line, intel, unflatten)
if not sighting:
logger.error(f"Could not parse VAST query result: {line}")
continue
reported += 1
await sightings_queue.put(sighting)
logger.debug(f"Retro-matched {reported} sighting(s) for intel: {intel}")


async def ingest_vast_ioc(vast_binary, vast_endpoint, intel):
"""
Ingests the given intel as IoC into a VAST matcher.
@param vast_binary The vast binary command to use with PyVAST
@param vast_endpoint The endpoint of a running vast node ('host:port')
@param intel The IoC to query VAST for
"""
global logger
ioc = to_vast_ioc(intel)
if not ioc:
logger.error(f"Unable to convert Intel to VAST compatible IoC: {intel}")
return
vast = VAST(binary=vast_binary, endpoint=vast_endpoint, logger=logger)
proc = await vast.import_().json(type="intel.indicator").exec(stdin=ioc)
await proc.wait()
logger.debug(f"Ingested intel for live matching: {intel}")


async def remove_vast_ioc(vast_binary, vast_endpoint, intel):
"""
Removes the given intel as IoC from a VAST matcher.
@param vast_binary The vast binary command to use with PyVAST
@param vast_endpoint The endpoint of a running vast node ('host:port')
@param intel The IoC to query VAST for
"""
global logger, matcher_name
intel_type = get_vast_intel_type(intel)
ioc = get_ioc(intel)
if not ioc or not intel_type:
logger.error(
f"Cannot remove intel with missing intel_type or indicator: {intel}"
)
return
vast = VAST(binary=vast_binary, endpoint=vast_endpoint, logger=logger)
await vast.matcher().ioc_remove(matcher_name, ioc, intel_type).exec()
logger.debug(f"Removed indicator {intel}")


async def match_intel(
cmd: str,
vast_binary: str,
vast_endpoint: str,
intel_queue: asyncio.Queue,
sightings_queue: asyncio.Queue,
Expand All @@ -208,16 +294,15 @@ async def match_intel(
"""
Reads from the intel_queue and matches all IoCs, either via VAST's
live-matching or retro-matching.
@param cmd The vast binary command to use with PyVAST
@param vast_binary The vast binary command to use with PyVAST
@param vast_endpoint The endpoint of a running vast node ('host:port')
@param intel_queue The queue to read new IoCs from
@param sightings_queue The queue to put new sightings into
@param retro_match Boolean flag to use retro-matching over live-matching
@param retro_match_max_events Max amount of retro match results
@param unflatten Boolean flag to unflatten JSON when received from VAST
"""
global logger
vast = VAST(binary=cmd, endpoint=vast_endpoint, logger=logger)
global logger, open_tasks
while True:
msg = await intel_queue.get()
try:
Expand All @@ -232,66 +317,40 @@ async def match_intel(
continue
if intel.operation == Operation.ADD:
if retro_match:
query = to_vast_query(intel)
if not query:
continue
proc = (
await vast.export(max_events=retro_match_max_events)
.json(query)
.exec()
asyncio.create_task(
retro_match_vast(
vast_binary,
vast_endpoint,
retro_match_max_events,
intel,
sightings_queue,
unflatten,
)
)
reported = 0
while not proc.stdout.at_eof():
line = (await proc.stdout.readline()).decode().rstrip()
if line:
sighting = query_result_to_threatbus_sighting(
line, intel, unflatten
)
if not sighting:
logger.error(f"Could not parse VAST query result: {line}")
continue
reported += 1
await sightings_queue.put(sighting)
logger.debug(f"Retro-matched {reported} sighting(s) for intel: {intel}")
else:
ioc = to_vast_ioc(intel)
if not ioc:
logger.error(
f"Unable to convert Intel to VAST compatible IoC: {intel}"
)
continue
proc = await vast.import_().json(type="intel.indicator").exec(stdin=ioc)
await proc.wait()
logger.debug(f"Ingested intel for live matching: {intel}")
asyncio.create_task(ingest_vast_ioc(vast_binary, vast_endpoint, intel))
elif intel.operation == Operation.REMOVE:
if retro_match:
continue
intel_type = get_vast_intel_type(intel)
ioc = get_ioc(intel)
if not ioc or not intel_type:
logger.error(
f"Cannot remove intel with missing intel_type or indicator: {intel}"
)
continue
global matcher_name
await vast.matcher().ioc_remove(matcher_name, ioc, intel_type).exec()
logger.debug(f"Removed indicator {intel}")
asyncio.create_task(remove_vast_ioc(vast_binary, vast_endpoint, intel))
else:
logger.warning(f"Unsupported operation for indicator: {intel}")
intel_queue.task_done()


async def live_match_vast(cmd: str, vast_endpoint: str, sightings_queue: asyncio.Queue):
async def live_match_vast(
vast_binary: str, vast_endpoint: str, sightings_queue: asyncio.Queue
):
"""
Starts a VAST matcher. Enqueues all matches from VAST to the
sightings_queue.
@param cmd The VAST binary command to use with PyVAST
@param vast_binary The VAST binary command to use with PyVAST
@param vast_endpoint The endpoint of a running VAST node
@param sightings_queue The queue to put new sightings into
@param retro_match Boolean flag to use retro-matching over live-matching
"""
global logger, matcher_name
vast = VAST(binary=cmd, endpoint=vast_endpoint, logger=logger)
vast = VAST(binary=vast_binary, endpoint=vast_endpoint, logger=logger)
matcher_name = "threatbus-" + "".join(random.choice(letters) for i in range(10))
proc = await vast.matcher().start(name=matcher_name).exec()
while True:
Expand Down Expand Up @@ -565,6 +624,14 @@ def main():
default=None,
help="If sink is specified, sightings are not reported back to Threat Bus. Instead, the context of a sighting (only the contents without the Threat Bus specific sighting structure) is forwarded to the specified sink via a UNIX pipe. This option takes a command line string to use and invokes it as direct subprocess without shell / globbing support.",
)
parser.add_argument(
"--max-background-tasks",
"-U",
dest="max-background-tasks",
default=100,
type=int,
help="Controls the maximum number of concurrent background tasks for VAST queries. Default is 100.",
)
args = parser.parse_args()

config = confuse.Configuration("pyvast-threatbus")
Expand All @@ -589,6 +656,7 @@ def main():
config["retro_match"].get(),
config["retro_match_max_events"].get(),
config["unflatten"].get(),
config["max_background_tasks"].get(),
config["transform_context"].get(),
config["sink"].get(),
)
Expand Down
2 changes: 1 addition & 1 deletion apps/vast/test_message_mapping.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime, timedelta, timezone
from datetime import datetime, timezone
import unittest
import json

Expand Down
7 changes: 3 additions & 4 deletions plugins/apps/threatbus_cif3/message_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@


def map_to_cif(intel: Intel, logger, confidence, tags, tlp, group):
"""Maps an Intel item to a CIFv3 compatible indicator format.
"""
Maps an Intel item to a CIFv3 compatible indicator format.
@param intel The item to map
@return the mapped intel item or None
"""
Expand Down Expand Up @@ -49,10 +50,8 @@ def map_to_cif(intel: Intel, logger, confidence, tags, tlp, group):
}

try:
ii = Indicator(**ii)
return Indicator(**ii)
except InvalidIndicator as e:
logger.error(f"Invalid CIF indicator {e}")
except Exception as e:
logger.error(f"CIF indicator error: {e}")

return ii
Loading

0 comments on commit 2672c7e

Please sign in to comment.