Skip to content

Commit

Permalink
Merge pull request #798 from cal-itp/avg-speeds
Browse files Browse the repository at this point in the history
Avg speeds
  • Loading branch information
tiffanychu90 authored Jun 30, 2023
2 parents 9a1b3bd + 6a780bc commit 32b2c54
Show file tree
Hide file tree
Showing 11 changed files with 1,600 additions and 179 deletions.
2 changes: 1 addition & 1 deletion _shared_utils/shared_utils/schedule_rt_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def sample_schedule_feed_key_to_organization_crosswalk(
**kwargs,
) -> pd.DataFrame:
"""
From scedule data, using feed_key as primary key,
From schedule data, using feed_key as primary key,
grab the gtfs_dataset_key associated.
Pass this through function to attach quartet data identifier columns
and organization info.
Expand Down
13 changes: 10 additions & 3 deletions high_quality_transit_areas/D1_assemble_hqta_points.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,16 @@ def add_agency_names_hqta_details(

crosswalk = get_agency_info(feeds_df, analysis_date)

NAMES_DICT = dict(zip(crosswalk.feed_key, crosswalk.organization_name))
B64_DICT = dict(zip(crosswalk.feed_key, crosswalk.base64_url))
ORG_DICT = dict(zip(crosswalk.feed_key, crosswalk.organization_source_record_id))
NAMES_DICT = dict(zip(
crosswalk.feed_key, crosswalk.organization_name
))
B64_DICT = dict(zip(
crosswalk.feed_key, crosswalk.base64_url
))
ORG_DICT = dict(zip(
crosswalk.feed_key, crosswalk.organization_source_record_id
))

gdf = gdf.assign(
agency_name_primary = gdf.feed_key_primary.map(NAMES_DICT),
agency_name_secondary = gdf.feed_key_secondary.map(NAMES_DICT),
Expand Down
32 changes: 21 additions & 11 deletions rt_segment_speeds/scripts/A0_preprocessing.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
"""
Pre-processing vehicle positions.
Drop all RT trips with less than 5 min of data.
Drop all RT trips with less than 10 min of data.
"""
import dask.dataframe as dd
import datetime
import geopandas as gpd
import gcsfs
import numpy as np
import pandas as pd
import sys

from loguru import logger

#from shared_utils import utils
from segment_speed_utils import helpers
from segment_speed_utils.project_vars import (SEGMENT_GCS, analysis_date,
CONFIG_PATH)

fs = gcsfs.GCSFileSystem()

def trip_time_elapsed(
ddf: dd.DataFrame,
group_cols: list,
Expand All @@ -25,17 +27,19 @@ def trip_time_elapsed(
Group by trip and calculate the time elapsed (max_time-min_time)
for RT vp observed.
"""
min_time = (ddf.groupby(group_cols)
min_time = (ddf.groupby(group_cols, observed=True)
[timestamp_col]
.min()
.dropna()
.reset_index()
.rename(columns = {timestamp_col: "min_time"})
)


max_time = (ddf.groupby(group_cols)
max_time = (ddf.groupby(group_cols, observed=True)
[timestamp_col]
.max()
.dropna()
.reset_index()
.rename(columns = {timestamp_col: "max_time"})
)
Expand Down Expand Up @@ -112,7 +116,9 @@ def pare_down_vp_to_valid_trips(
).sort_values(
["gtfs_dataset_key", "trip_id",
"location_timestamp_local"]
).drop_duplicates().reset_index(drop=True)
).drop_duplicates(
subset=["gtfs_dataset_key", "trip_id", "location_timestamp_local"]
).reset_index(drop=True)

# Let's convert to tabular now, make use of partitioning
# We want to break up sjoins, so we can wrangle it to points on-the-fly
Expand All @@ -122,9 +128,15 @@ def pare_down_vp_to_valid_trips(
vp_idx = usable_vp.index.astype("int32")
).drop(columns = "geometry")


# Either use dask (which kills kernel here) or remove the existing folder of output
# https://stackoverflow.com/questions/69092126/is-it-possible-to-change-the-output-filenames-when-saving-as-partitioned-parquet
if fs.exists(f"{SEGMENT_GCS}{EXPORT_FILE}_{analysis_date}/"):
fs.rm(f"{SEGMENT_GCS}{EXPORT_FILE}_{analysis_date}/", recursive=True)

usable_vp.to_parquet(
f"{SEGMENT_GCS}{EXPORT_FILE}_{analysis_date}",
partition_cols = ["gtfs_dataset_key"]
partition_cols = ["gtfs_dataset_key"],
)


Expand All @@ -140,15 +152,13 @@ def pare_down_vp_to_valid_trips(

start = datetime.datetime.now()

# Doesn't matter which dictionary to use
# We're operating on same vp, and it's only in the next stage
# that which segments used matters
ROUTE_SEG_DICT = helpers.get_parameters(CONFIG_PATH, "route_segments")
STOP_SEG_DICT = helpers.get_parameters(CONFIG_PATH, "stop_segments")

time1 = datetime.datetime.now()

pare_down_vp_to_valid_trips(
analysis_date,
dict_inputs = ROUTE_SEG_DICT
dict_inputs = STOP_SEG_DICT
)

logger.info(f"pare down vp")
Expand Down
13 changes: 5 additions & 8 deletions rt_segment_speeds/scripts/A2_valid_vehicle_positions.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ def identify_stop_segment_cases(


def merge_usable_vp_with_sjoin_vpidx(
shape_cases: np.ndarray,
shape_cases: list,
usable_vp_file: str,
sjoin_results_file: str,
segment_identifier_cols: list,
grouping_col: str
grouping_col: str,
**kwargs
) -> dd.DataFrame:
"""
Grab all the usable vp (with lat/lon columns), filter it down to
Expand All @@ -73,13 +73,11 @@ def merge_usable_vp_with_sjoin_vpidx(
SEGMENT_GCS,
usable_vp_file,
file_type = "df",
partitioned = True
partitioned = True,
**kwargs
).set_index("vp_idx")

# Grab our results of vp_idx joined to segments
seg_seq_col = [c for c in segment_identifier_cols
if c != grouping_col][0]

vp_to_seg = dd.read_parquet(
f"{SEGMENT_GCS}vp_sjoin/{sjoin_results_file}",
filters = [[(grouping_col, "in", shape_cases)]],
Expand Down Expand Up @@ -125,7 +123,6 @@ def pare_down_vp_by_segment(
normal_shapes,
f"{USABLE_VP}_{analysis_date}",
f"{INPUT_FILE_PREFIX}_{analysis_date}",
SEGMENT_IDENTIFIER_COLS,
GROUPING_COL
)

Expand Down
16 changes: 15 additions & 1 deletion rt_segment_speeds/scripts/A3_loop_inlining.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def find_vp_direction_vector(
suffixes = ('_start', '_end')
).sort_values(trip_group_cols).reset_index(drop=True)

# Use 2 geoseries, the first point and the last point
first_series = gpd.points_from_xy(
df_wide.x_start, df_wide.y_start,
crs="EPSG:4326"
Expand All @@ -190,11 +191,13 @@ def find_vp_direction_vector(
crs="EPSG:4326"
).to_crs(crs)

# Input 2 series to get a directon for each element-pair
direction_vector = [
wrangle_shapes.get_direction_vector(start, end)
for start, end in zip(first_series, last_series)
]

# Normalize vector by Pythagorean Theorem to get values between -1 and 1
vector_normalized = [wrangle_shapes.get_normalized_vector(i)
for i in direction_vector]

Expand All @@ -203,6 +206,8 @@ def find_vp_direction_vector(
vp_vector = vector_normalized
)

# Take the dot product.
# positive = same direction; 0 = orthogonal; negative = opposite direction
dot_result = [wrangle_shapes.dot_product(vec1, vec2) for vec1, vec2 in
zip(results.segments_vector, results.vp_vector)]

Expand All @@ -219,6 +224,13 @@ def find_errors_in_segment_groups(
segment_identifier_cols: list,
) -> dd.DataFrame:
"""
For each sjoin result for each segment-trip:
(1) find the direction the segment is running
(2) use the mean timestamp to divide sjoin results into 2 groups
(3) for each group, find the first/last vp
(4) find the direction of each group of vp for segment-trip
(5) as long as vp are running in same direction as segment (dot product > 0),
keep those observations.
"""
group_cols = segment_identifier_cols + ["trip_id"]

Expand Down Expand Up @@ -265,6 +277,9 @@ def pare_down_vp_for_special_cases(
dict_inputs: dict = {}
):
"""
For special shapes, include a direction check where each
batch of vp have direction generated, and compare that against
the direction the segment is running.
"""
USABLE_VP = dict_inputs["stage1"]
INPUT_FILE_PREFIX = dict_inputs["stage2"]
Expand All @@ -282,7 +297,6 @@ def pare_down_vp_for_special_cases(
special_shapes,
f"{USABLE_VP}_{analysis_date}",
f"{INPUT_FILE_PREFIX}_{analysis_date}",
SEGMENT_IDENTIFIER_COLS,
GROUPING_COL
)

Expand Down
145 changes: 0 additions & 145 deletions rt_segment_speeds/scripts/C2_stop_level_metrics.py

This file was deleted.

Loading

0 comments on commit 32b2c54

Please sign in to comment.