-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathto_object_arrays.pyx
94 lines (79 loc) · 3.49 KB
/
to_object_arrays.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# cython: language_level=3, boundscheck=False, nonecheck=False, optimize.unpack_method_calls=True
# cython: warn.maybe_uninitialized=True
# distutils: language = c++
from typing import Any, List, Sequence, Tuple
import asyncpg
import numpy as np
cimport cython
from cpython cimport PyObject
from numpy cimport ndarray
cdef extern from "asyncpg_recordobj.h":
PyObject *ApgRecord_GET_ITEM(PyObject *, int)
cdef extern from "Python.h":
# added nogil -> from cpython cimport ...
# these are the macros that read directly from the internal ob_items
PyObject *PyList_GET_ITEM(PyObject *, Py_ssize_t) nogil
PyObject *PyTuple_GET_ITEM(PyObject *, Py_ssize_t) nogil
@cython.boundscheck(False)
def to_object_arrays_split(rows: List[Sequence[Any]],
typed_indexes: Sequence[int],
obj_indexes: Sequence[int],
) -> Tuple[np.ndarray, np.ndarray]:
"""
Convert a list of tuples into an object array. Any subclass of
tuple in `rows` will be casted to tuple.
Parameters
----------
rows : 2-d array (N, K)
List of tuples to be converted into an array. Each tuple must be of equal length,
otherwise, the results are undefined.
typed_indexes : array of integers
Sequence of integer indexes in each tuple in `rows` that select the first result.
obj_indexes : array of integers
Sequence of integer indexes in each tuple in `rows` that select the second result.
Returns
-------
(np.ndarray[object, ndim=2], np.ndarray[object, ndim=2])
The first array is the concatenation of columns in `rows` chosen by `typed_indexes`.
The second array is the concatenation of columns in `rows` chosen by `object_indexes`.
"""
cdef:
Py_ssize_t i, j, size, cols_typed, cols_obj
ndarray[object, ndim=2] result_typed
ndarray[object, ndim=2] result_obj
PyObject *record
long[:] typed_indexes_arr
long[:] obj_indexes_arr
assert isinstance(rows, list)
typed_indexes_arr = np.asarray(typed_indexes, dtype=int)
obj_indexes_arr = np.asarray(obj_indexes, dtype=int)
size = len(rows)
cols_typed = len(typed_indexes_arr)
cols_obj = len(obj_indexes_arr)
result_typed = np.empty((cols_typed, size), dtype=object)
result_obj = np.empty((cols_obj, size), dtype=object)
if size == 0:
return result_typed, result_obj
if isinstance(rows[0], asyncpg.Record):
for i in range(size):
record = PyList_GET_ITEM(<PyObject *>rows, i)
for j in range(cols_typed):
result_typed[j, i] = <object>ApgRecord_GET_ITEM(record, typed_indexes_arr[j])
for j in range(cols_obj):
result_obj[j, i] = <object>ApgRecord_GET_ITEM(record, obj_indexes_arr[j])
elif isinstance(rows[0], tuple):
for i in range(size):
record = PyList_GET_ITEM(<PyObject *>rows, i)
for j in range(cols_typed):
result_typed[j, i] = <object>PyTuple_GET_ITEM(record, typed_indexes_arr[j])
for j in range(cols_obj):
result_obj[j, i] = <object>PyTuple_GET_ITEM(record, obj_indexes_arr[j])
else:
# convert to tuple
for i in range(size):
row = tuple(rows[i])
for j in range(cols_typed):
result_typed[j, i] = row[typed_indexes_arr[j]]
for j in range(cols_obj):
result_obj[j, i] = row[obj_indexes_arr[j]]
return result_typed, result_obj