This repository has been archived by the owner on Jan 13, 2022. It is now read-only.
forked from fightforthefuture/call-congress
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmodels.py
119 lines (93 loc) · 3.71 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import hashlib
import logging
from datetime import datetime
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import func, Column, Integer, String, DateTime
from sqlalchemy.exc import SQLAlchemyError
db = SQLAlchemy()
class Call(db.Model):
__tablename__ = 'calls'
id = Column(Integer, primary_key=True)
timestamp = Column(DateTime)
campaign_id = Column(String(32))
member_id = Column(String(10)) # congress member sunlight identifier
# user attributes
user_id = Column(String(64)) # hashed phone number
zipcode = Column(String(5))
areacode = Column(String(3)) # first 3 digits of phone number
exchange = Column(String(3)) # next 3 digits of phone number
# twilio attributes
call_id = Column(String(40)) # twilio call ID
status = Column(String(25)) # twilio call status
duration = Column(Integer) # twilio call time in seconds
@classmethod
def hash_phone(cls, number):
"""
Takes a phone number and returns a 64 character string
"""
return hashlib.sha256(number).hexdigest()
def __init__(self, campaign_id, member_id, zipcode=None, phone_number=None,
call_id=None, status='unknown', duration=0):
self.timestamp = datetime.now()
self.status = status
self.duration = duration
self.campaign_id = campaign_id
self.member_id = member_id
self.call_id = call_id
if phone_number:
phone_number = phone_number.replace('-', '').replace('.', '')
self.user_id = self.hash_phone(phone_number)
self.areacode = phone_number[:3]
self.exchange = phone_number[3:6]
self.zipcode = zipcode
def __repr__(self):
return '<Call {}-{}-xxxx to {}>'.format(
self.areacode, self.exchange, self.member_id)
def log_call(params, campaign, request):
try:
i = int(request.values.get('call_index'))
kwds = {
'campaign_id': campaign['id'],
'member_id': params['repIds'][i],
'zipcode': params['zipcode'],
'phone_number': params['userPhone'],
'call_id': request.values.get('CallSid', None),
'status': request.values.get('DialCallStatus', 'unknown'),
'duration': request.values.get('DialCallDuration', 0)
}
db.session.add(Call(**kwds))
db.session.commit()
except SQLAlchemyError:
logging.error('Failed to log call:', exc_info=True)
def call_count(campaign_id):
try:
return (db.session.query(func.Count(Call.zipcode))
.filter(Call.campaign_id == campaign_id).all())[0][0]
except SQLAlchemyError:
logging.error('Failed to get call_count:', exc_info=True)
return 0
def call_list(campaign_id, since, limit=50):
try:
calls = (db.session.query(Call)
.order_by(Call.timestamp)
.filter(Call.campaign_id == campaign_id)
.filter(Call.timestamp >= since)
.limit(limit).all())
return calls
except SQLAlchemyError:
logging.error('Failed to get call_list:', exc_info=True)
return 0
def aggregate_stats(campaign_id):
zipcodes = (db.session.query(Call.zipcode, func.Count(Call.zipcode))
.filter(Call.campaign_id == campaign_id)
.group_by(Call.zipcode).all())
reps = (db.session.query(Call.member_id, func.Count(Call.member_id))
.filter(Call.campaign_id == campaign_id)
.group_by(Call.member_id).all())
return {
'campaign': campaign_id,
'calls': {
'zipcodes': dict(tuple(z) for z in zipcodes),
'reps': dict(tuple(r) for r in reps)
}
}