-
Notifications
You must be signed in to change notification settings - Fork 0
/
helpers.py
151 lines (123 loc) · 5.17 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import streamlit as st
import pandas as pd
from elasticsearch import Elasticsearch
from streamlit_folium import folium_static
import folium
from datetime import datetime
# Connect to Elasticsearch
es = Elasticsearch("http://localhost:9200")
@st.cache_data
def query_data(query_body):
# Search the index. We are using the index called "my_app_scans"
res = es.search(index="my_app_scans", body=query_body, size=1000)
# obtain results and put them in a dataframe and drop duplicates
df = pd.json_normalize(res["hits"]["hits"])
df = df.drop_duplicates(subset=["_source.business_id"])
return df
@st.cache_data
def common_data_process(df):
# rename latitude and longitude columns so they have the correct names for the map function
df = df.rename(columns={"_source.latitude": "latitude",
"_source.longitude": "longitude"})
df = df.filter(items=["_source.business_id", "_source.business_name",
"_source.business_address", "_source.city", "_source.zip", "latitude", "longitude"], axis=1)
# Show data as table
table_df = df.filter(items=["_source.business_id", "_source.business_name",
"_source.business_address", "_source.zip"], axis=1)
# rename columns before rendering the table
table_df = table_df.rename(columns={"_source.business_id": "Business ID", "_source.business_name": "Business Name",
"_source.business_address": "Business Address", "_source.zip": "Postal Code"})
print(table_df.dtypes)
return df, table_df
@st.cache_data
def epoch_to_timestamp(df):
# turn the epochs into timestamps
df['_source.user_birth_date'] = df['_source.user_birth_date'].apply(
lambda x: datetime.fromtimestamp(x/1000000000000).strftime('%Y-%m-%d'))
df['_source.scan_timestamp'] = df['_source.scan_timestamp'].apply(
lambda x: datetime.fromtimestamp(x/1000000000).strftime('%Y-%m-%d %H:%M:%S'))
# sort values by timestamp
df = df.sort_values(
by=['_source.scan_timestamp'], ascending=False)
return df
@st.cache_data
def get_folium_map(df):
# Print the folium map
map = folium.Map(location=[df.iloc[0]['latitude'],
df.iloc[0]['longitude']], zoom_start=15)
# Add the markers to the map
for _, row in df.iterrows():
folium.Marker([row['latitude'], row['longitude']],
popup=f"{row['_source.business_name']} <br> ID= {row['_source.business_id']}").add_to(map)
folium_static(map)
@st.cache_data
def free_text_search(text):
query_body = {
"query": {
"simple_query_string": {
"query": text
}
}
}
df = query_data(query_body)
# rename latitude and longitude columns so they have the correct names for the map function
df, table_df = common_data_process(df)
return df, table_df
@st.cache_data
def postal_code_search(postal_code):
query_body = {
"query": {
"match": {
"zip": postal_code
}
}
}
df = query_data(query_body)
# rename latitude and longitude columns so they have the correct names for the map function
df, table_df = common_data_process(df)
return df, table_df
@st.cache_data
def business_id_search(business_id):
# build the search query for Elasticsearch
query_body = {
"query": {
"simple_query_string": {
"query": business_id,
"fields": ["business_id"],
"default_operator": "AND"
}
}
}
df = query_data(query_body)
# filter the columns
table_df = df.filter(items=['_source.scan_timestamp', '_source.deviceID',
'_source.user_name', '_source.user_birth_date'], axis=1)
# turn the epochs into timestamps
table_df = epoch_to_timestamp(table_df)
# fix names before rendering the table
table_df = table_df.rename(columns={"_source.scan_timestamp": "Scan Timestamp", "_source.deviceID": "Device ID",
"_source.user_name": "User Name", "_source.user_birth_date": "Birth Date"})
return table_df
@st.cache_data
def device_id_search(device_id):
# build the search query for Elasticsearch
query_body = {
"query": {
"match": {
"deviceID": device_id
}
}
}
df = query_data(query_body)
# rename latitude and longitude columns so they have the correct names for the map function
df = df.rename(columns={"_source.latitude": "latitude",
"_source.longitude": "longitude"})
# Turn the epochs into timestamps
df = epoch_to_timestamp(df)
# filter the columns
table_df = df.filter(items=['_source.scan_timestamp', '_source.business_id',
'_source.business_name', '_source.business_address', 'longitude', 'latitude'], axis=1)
# fix names before rendering the table
table_df = table_df.rename(columns={"_source.scan_timestamp": "Scan Timestamp", "_source.business_id": "Business ID",
"_source.business_name": "Business Name", "_source.business_address": "Business Address"})
return df, table_df