Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Commit

Permalink
Add rabbitmq sender for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
0snap committed Nov 3, 2020
1 parent 8597cdc commit 996d3ef
Showing 1 changed file with 33 additions and 0 deletions.
33 changes: 33 additions & 0 deletions tests/utils/rabbitmq_sender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from datetime import datetime
import json
import pika
import uuid
from threatbus.data import Intel, IntelData, IntelType, Operation, IntelEncoder

## Dummy intel data
intel_id = "intel-42"
indicator = "6.6.6.6"
intel_type = IntelType.IPSRC
operation = Operation.ADD
intel_data = IntelData(indicator, intel_type, foo=23, more_args="MORE ARGS")
intel = Intel(
datetime.strptime("2020-11-02 17:00:00", "%Y-%m-%d %H:%M:%S"),
intel_id,
intel_data,
operation,
)

intel_json = json.dumps(intel, cls=IntelEncoder)

## rabbitmq
host = "localhost"
port = "5672"
vhost = "/"
credentials = pika.PlainCredentials("guest", "guest")
conn_params = pika.ConnectionParameters(host, port, vhost, credentials)

connection = pika.BlockingConnection(conn_params)
channel = connection.channel()

for i in range(100):
channel.basic_publish(exchange="threatbus.intel", routing_key="", body=intel_json)

0 comments on commit 996d3ef

Please sign in to comment.