-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupdate_dynamodb.R
118 lines (103 loc) · 4.45 KB
/
update_dynamodb.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/bin/bash
require(DBI)
require(odbc)
require(paws)
"This script gets run at ~midnight every night. It scrapes the AWS RDS database
for the last 24 hours, calculates average speeds for each segment, and updates
the Dynamodb table.
The Dynamodb table can then be queried for all kcm routes in a geojson file with
speed histories."
main = function() {
secure = readRDS("secure.rds")
#Get time
time = as.POSIXct(Sys.time())
end_time = as.integer(time)
start_time = as.integer(end_time - (24*60*60))
#Query the last 24 hours of data from the RDS database
rds_db = dbConnect(odbc(), Driver="PostgreSQL Unicode", Server="kcm-gtfs-production.c3rbzvtfbrdb.us-east-2.rds.amazonaws.com", Database="kcm-gtfs", UID=secure[2], PWD=secure[3], Port=5432)
query_text = paste0('SELECT *
FROM active_trips_study
WHERE collectedtime
BETWEEN ', start_time, ' AND ', end_time,';')
daily_results = dbGetQuery(rds_db, query_text)
dbDisconnect(rds_db)
#Get the latest GTFS route - trip data from the KCM FTP server
download.file('http://metro.kingcounty.gov/GTFS/google_transit.zip', "google_transit.zip")
unzip('google_transit.zip', 'trips.txt', exdir='google_transit')
gtfs_trips = read.table("google_transit/trips.txt", header=TRUE, sep=',', stringsAsFactors=FALSE)
gtfs_trips = gtfs_trips[,c('route_id', 'trip_id', 'trip_short_name')]
#Preprocessing out duplicates
daily_results = daily_results[!duplicated(daily_results[,c('tripid', 'locationtime')]),]
daily_results = daily_results[order(daily_results$tripid, daily_results$locationtime),]
#Offset tripdistance, locationtime, and tripids by 1
prev_tripdistance = daily_results$tripdistance[-nrow(daily_results)]
prev_tripdistance = append(prev_tripdistance, NA, after=0)
daily_results$prev_tripdistance = prev_tripdistance
rm(prev_tripdistance)
prev_locationtime = daily_results$locationtime[-nrow(daily_results)]
prev_locationtime = append(prev_locationtime, NA, after=0)
daily_results$prev_locationtime = prev_locationtime
rm(prev_locationtime)
prev_tripid = daily_results$tripid[-nrow(daily_results)]
prev_tripid = append(prev_tripid, NA, after=0)
daily_results$prev_tripid = prev_tripid
rm(prev_tripid)
#Remove NA rows, and rows where tripid is different (last recorded location)
daily_results = daily_results[complete.cases(daily_results),]
daily_results = daily_results[daily_results$tripid == daily_results$prev_tripid,]
#Calculate average speed between each location bus is tracked at
daily_results$dist_diff = daily_results$tripdistance - daily_results$prev_tripdistance
daily_results$timediff = daily_results$locationtime - daily_results$prev_locationtime
daily_results$avg_speed_m_s = daily_results$dist_diff / daily_results$timediff
#Remove rows where speed is below 0 and round to one decimal place
daily_results = daily_results[daily_results$avg_speed_m_s >= 0,]
daily_results$avg_speed_m_s = round(daily_results$avg_speed_m_s, 1)
#Aggregate and join the trip speeds to their routes
daily_results = merge(daily_results, gtfs_trips, by.x='tripid', by.y='trip_id')
daily_results = daily_results[,c('route_id', 'trip_short_name', 'avg_speed_m_s')]
daily_agg = aggregate(daily_results, by=list(daily_results$route_id, daily_results$trip_short_name), FUN=mean, na.rm=TRUE)
daily_agg$Group.2[daily_agg$Group.2=='LOCAL'] = 'L'
daily_agg$Group.2[daily_agg$Group.2=='EXPRESS'] = 'E'
daily_agg$avg_speed_m_s = round(daily_agg$avg_speed_m_s, 1)
#Set up the connection to the Dynamodb database
Sys.setenv(
AWS_ACCESS_KEY_ID = secure[4],
AWS_SECRET_ACCESS_KEY = secure[5],
AWS_REGION = "us-east-2"
)
dynamo_db = paws::dynamodb()
#Update the speeds in the dynamoDB (this will be the daily speed)
for (i in 1:nrow(daily_agg)) {
dynamo_db$update_item(
TableName='KCM_Bus_Routes',
Key=list(
route_id=list(
N=daily_agg[i,1]
),
local_express_code=list(
S=daily_agg[i,2]
)
),
UpdateExpression='SET avg_speed = :speed, #hs = list_append(#hs, :vals)',
ExpressionAttributeNames=list(
`#hs`='historic_speeds'
),
ExpressionAttributeValues=list(
':speed'=list(
N=daily_agg[i,5]
),
`:vals`=list(
L=list(
list(
N=daily_agg[i,5]
)
)
)
)
)
Sys.sleep(0.3)
}
return(1)
}
#Program starts here
main()