Skip to content

Commit

Permalink
Merge branch 'develop' of github.com:AequilibraE/aequilibrae into ped…
Browse files Browse the repository at this point in the history
…ro/messaging
  • Loading branch information
pveigadecamargo committed Oct 25, 2024
2 parents d2ecf52 + b4e760a commit 53cd626
Show file tree
Hide file tree
Showing 35 changed files with 601 additions and 588 deletions.
2 changes: 1 addition & 1 deletion aequilibrae/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from aequilibrae import project

from aequilibrae.distribution import Ipf, GravityApplication, GravityCalibration, SyntheticGravityModel
from aequilibrae.matrix import AequilibraeMatrix, AequilibraeData
from aequilibrae.matrix import AequilibraeMatrix
from aequilibrae import distribution
from aequilibrae.paths.network_skimming import NetworkSkimming
from aequilibrae.paths.traffic_class import TrafficClass
Expand Down
179 changes: 36 additions & 143 deletions aequilibrae/distribution/cython/ipf_core.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import multiprocessing as mp
cimport cython
import numpy as np
from cython.parallel import parallel, prange
from libc.stdlib cimport malloc, free
from libc.stdlib cimport calloc, free
from libc.string cimport memset


def ipf_core(seed_matrix, target_productions, target_attractions, max_iterations=200, tolerance=0.001, cores = 0, warn=True):
Expand Down Expand Up @@ -32,44 +33,34 @@ def ipf_core(seed_matrix, target_productions, target_attractions, max_iterations
cdef double [:] prod_tgt = target_productions
cdef double [:] prod_factor = factor_prod

cdef double [:] attr_tot = mat_attr_tot
cdef double [::1] attr_tot = np.ascontiguousarray(mat_attr_tot)
cdef double [:] attr_tgt = target_attractions
cdef double [:] attr_factor = factor_attr

# This uses a Cython fused type cython.floating to defer to the float or double method without duplicated code
if seed_matrix.dtype == np.float32:
iter, err = _ipf_core32(seed_matrix, prod_tot, prod_tgt, prod_factor, attr_tot, attr_tgt, attr_factor, max_iter, toler, cpus)
iter, err = _fratar[float](seed_matrix, prod_tot, prod_tgt, prod_factor, attr_tot, attr_tgt, attr_factor, max_iter, toler, cpus)
else:
iter, err = _ipf_core64(seed_matrix, prod_tot, prod_tgt, prod_factor, attr_tot, attr_tgt, attr_factor, max_iter, toler, cpus)
iter, err = _fratar[double](seed_matrix, prod_tot, prod_tgt, prod_factor, attr_tot, attr_tgt, attr_factor, max_iter, toler, cpus)

if err > tolerance and warn:
warnings.warn(f"Could not reach convergence in {iter} iterations: {err}")
return iter, err

cdef _ipf_core64(seed_matrix, prod_tot, prod_tgt, prod_factor, attr_tot, attr_tgt, attr_factor, max_iter, toler, cpus):

cdef double [:, :] flows = seed_matrix
return _fratar(flows, prod_tot, prod_tgt, prod_factor, attr_tot, attr_tgt, attr_factor, max_iter, toler, cpus)


cdef _ipf_core32(seed_matrix, prod_tot, prod_tgt, prod_factor, attr_tot, attr_tgt, attr_factor, max_iter, toler, cpus):

cdef float [:, :] flows_single = seed_matrix
return _fratar_sing(flows_single, prod_tot, prod_tgt, prod_factor, attr_tot, attr_tgt, attr_factor, max_iter, toler, cpus)


@cython.wraparound(False)
@cython.embedsignature(True)
@cython.boundscheck(False)
cdef _fratar(double[:, :] flows,
double[:] prod_tot,
double[:] prod_tgt,
double[:] prod_factor,
double[:] attr_tot,
double[:] attr_tgt,
double[:] attr_factor,
int max_iter,
double toler,
int cpus) noexcept:
cdef object _fratar(cython.floating[:, :] flows,
double[:] prod_tot,
double[:] prod_tgt,
double[:] prod_factor,
double[::1] attr_tot,
double[:] attr_tgt,
double[:] attr_factor,
int max_iter,
double toler,
int cpus):

cdef double err = 1.0
cdef int iter = 0
Expand All @@ -91,7 +82,6 @@ cdef _fratar(double[:, :] flows,
for j in range(J):
flows[i, j] = flows[i, j] * prod_factor[i]


_total_attra(flows, prod_tgt, attr_tot, cpus)
err = _factors(attr_tgt, attr_tot, attr_factor, cpus)
for i in prange(I, nogil=True, num_threads=cpus):
Expand All @@ -106,29 +96,33 @@ cdef _fratar(double[:, :] flows,

return iter, err - 1


@cython.wraparound(False)
@cython.embedsignature(True)
@cython.boundscheck(False)
cpdef void _total_attra(double[:, :] flows,
cpdef void _total_attra(cython.floating[:, :] flows,
double[:] prod_tgt,
double[:] attr_tot,
double[::1] attr_tot,
int cpus) noexcept:
cdef long i, j, jk
cdef double *local_buf
cdef long I = flows.shape[0]
cdef long J = flows.shape[1]

cdef long long i, j, jk
cdef double *local_buf
cdef long long I = flows.shape[0]
cdef long long J = flows.shape[1]
# Zero the provided total buffer, a IEEE 64bit floating point value represents its
# 0.0 with all 0 bytes.
memset(&(attr_tot[0]), 0, attr_tot.shape[0] * sizeof(double))

# Computes factors
with nogil, parallel( num_threads=cpus):
local_buf = <double *> malloc(sizeof(double) * J)
for j in range(J):
local_buf[j] = 0
attr_tot[j] = 0
with nogil, parallel(num_threads=cpus):
# A thread-local buffer is used to optimize access to cache.
# This makes the IPF roughly 2x faster, with this function being 8x faster
local_buf = <double *> calloc(J, sizeof(double))

for i in prange(I):
if prod_tgt[i] == 0:
continue

for jk in range(J):
# for jk in array_of_indices_of_non_zeros:
local_buf[jk] += flows[i, jk]
Expand All @@ -143,7 +137,7 @@ cpdef void _total_attra(double[:, :] flows,
@cython.wraparound(False)
@cython.embedsignature(True)
@cython.boundscheck(False)
cpdef void _total_prods(double[:, :] flows,
cpdef void _total_prods(cython.floating[:, :] flows,
double[:] prod_tgt,
double[:] prod_tot,
int cpus) noexcept nogil:
Expand All @@ -157,13 +151,15 @@ cpdef void _total_prods(double[:, :] flows,
prod_tot[i] = 0
if prod_tgt[i] == 0:
continue

for j in range(J):
prod_tot[i] += flows[i, j]


@cython.wraparound(False)
@cython.embedsignature(True)
@cython.boundscheck(False)
@cython.cdivision(True)
cpdef double _factors(double[:] target,
double[:] total,
double[:] factor,
Expand All @@ -183,9 +179,11 @@ cpdef double _factors(double[:] target,
factor[i] = target[i] / total[i]
return err


@cython.wraparound(False)
@cython.embedsignature(True)
@cython.boundscheck(False)
@cython.cdivision(True)
cpdef double _calc_err(double[:] p_factor,
double[:] a_factor) noexcept:

Expand All @@ -205,108 +203,3 @@ cpdef double _calc_err(double[:] p_factor,
err = err if err > 1 / a_factor[j] else 1 / a_factor[j]
err = err if err > a_factor[j] else a_factor[j]
return err


@cython.wraparound(False)
@cython.embedsignature(True)
@cython.boundscheck(False)
cdef _fratar_sing(float[:, :] flows,
double[:] prod_tot,
double[:] prod_tgt,
double[:] prod_factor,
double[:] attr_tot,
double[:] attr_tgt,
double[:] attr_factor,
int max_iter,
double toler,
int cpus) noexcept:

cdef double err = 1.0
cdef int iter = 0
cdef long long i, j
cdef long long I = flows.shape[0]
cdef long long J = flows.shape[1]

# we zero everyone that needs to be zero to be able to skip them in the future
for i in prange(I, nogil=True):
for j in range(J):
if prod_tgt[i] + attr_tgt[j] == 0:
flows[i, j] = 0

for iter in range(max_iter):
_total_prods_sing(flows, prod_tgt, prod_tot, cpus)
err = _factors(prod_tgt, prod_tot, prod_factor, cpus)
for i in prange(I, nogil=True, num_threads=cpus):
if prod_tgt[i] > 0:
for j in range(J):
flows[i, j] = flows[i, j] * prod_factor[i]


_total_attra_sing(flows, prod_tgt, attr_tot, cpus)
err = _factors(attr_tgt, attr_tot, attr_factor, cpus)
for i in prange(I, nogil=True, num_threads=cpus):
if prod_tgt[i] > 0:
for j in range(J):
flows[i, j] = flows[i, j] * attr_factor[j]

# FUNCTION TO COMPUTE THE ERROR
err = _calc_err(prod_factor, attr_factor)
if (err - 1) < toler:
break

return iter, err - 1


@cython.wraparound(False)
@cython.embedsignature(True)
@cython.boundscheck(False)
cpdef void _total_attra_sing(float[:, :] flows,
double[:] prod_tgt,
double[:] attr_tot,
int cpus) noexcept:

cdef long long i, j, jk
cdef double *local_buf
cdef long long I = flows.shape[0]
cdef long long J = flows.shape[1]

# Computes factors
with nogil, parallel( num_threads=cpus):
local_buf = <double *> malloc(sizeof(double) * J)
for j in range(J):
local_buf[j] = 0
attr_tot[j] = 0

for i in prange(I):
if prod_tgt[i] == 0:
continue
for jk in range(J):
# for jk in array_of_indices_of_non_zeros:
local_buf[jk] += flows[i, jk]

with gil:
for j in range(J):
attr_tot[j] += local_buf[j]

free(local_buf)


@cython.wraparound(False)
@cython.embedsignature(True)
@cython.boundscheck(False)
cpdef void _total_prods_sing(float[:, :] flows,
double[:] prod_tgt,
double[:] prod_tot,
int cpus) noexcept nogil:

cdef long long i, j
cdef long long I = flows.shape[0]
cdef long long J = flows.shape[1]

# Calculate the row totals (prods) from the flows
for i in prange(I, num_threads=cpus):
prod_tot[i] = 0
if prod_tgt[i] == 0:
continue
for j in range(J):
prod_tot[i] += flows[i, j]
Loading

0 comments on commit 53cd626

Please sign in to comment.