-
Notifications
You must be signed in to change notification settings - Fork 0
/
user_posting_emulation_streaming.py
78 lines (55 loc) · 2.69 KB
/
user_posting_emulation_streaming.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import awsdb
import requests
from time import sleep
import random
from multiprocessing import Process
import boto3
import json
from sqlalchemy import text
import datetime
random.seed(100)
new_connector = awsdb.AWSDBConnector()
def streaming_data(request_type, invoke_url, dict_results, partition_key):
for key, value in dict_results.items():
if type(value) == datetime.datetime:
dict_results[key] = dict_results[key].strftime("%Y-%m-%d %H:%M:%S")
stream_name = invoke_url.split("/")[-2]
#To send JSON messages you need to follow this structure
payload = json.dumps({
# the stream_name -> streaming-1232252d77df-user
"StreamName": stream_name,
"Data": dict_results,
"PartitionKey": partition_key
})
headers = {'Content-Type': 'application/json'}
try:
response = requests.request(request_type, invoke_url, headers=headers, data=payload)
print(response.status_code)
except requests.exceptions.RequestException as errex:
print(errex)
print("Exception request")
def run_infinite_post_data_loop():
while True:
sleep(random.randrange(0, 2))
random_row = random.randint(0, 11000)
engine = new_connector.create_db_connector()
with engine.connect() as connection:
pin_string = text(f"SELECT * FROM pinterest_data LIMIT {random_row}, 1")
pin_selected_row = connection.execute(pin_string)
for row in pin_selected_row:
pin_result = dict(row._mapping)
geo_string = text(f"SELECT * FROM geolocation_data LIMIT {random_row}, 1")
geo_selected_row = connection.execute(geo_string)
for row in geo_selected_row:
geo_result = dict(row._mapping)
user_string = text(f"SELECT * FROM user_data LIMIT {random_row}, 1")
user_selected_row = connection.execute(user_string)
for row in user_selected_row:
user_result = dict(row._mapping)
# print(user_result)
streaming_data('PUT', 'https://qynzmaevn3.execute-api.us-east-1.amazonaws.com/final_streaming_data/streams/streaming-1232252d77df-user/record', user_result, 'user_partition')
streaming_data('PUT', 'https://qynzmaevn3.execute-api.us-east-1.amazonaws.com/final_streaming_data/streams/streaming-1232252d77df-geo/record', geo_result, 'geo_partition')
streaming_data('PUT', 'https://qynzmaevn3.execute-api.us-east-1.amazonaws.com/final_streaming_data/streams/streaming-1232252d77df-pin/record', pin_result, 'pin_partition')
if __name__ == "__main__":
run_infinite_post_data_loop()
print('Working')