Skip to content

Commit

Permalink
Python >=3.10 required.
Browse files Browse the repository at this point in the history
  • Loading branch information
coady committed Nov 5, 2024
1 parent c5d3d9f commit aef7073
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 220 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.9', '3.10', '3.11', '3.12', '3.13']
python-version: ['3.10', '3.11', '3.12', '3.13']
arrow-version: ['']
include:
- python-version: 3.x
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Unreleased
### Changed
* Python >=3.10 required

## [1.8](https://pypi.org/project/graphique/1.8/) - 2024-11-01
### Changed
Expand Down
14 changes: 7 additions & 7 deletions graphique/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@
import json
from collections.abc import Callable, Iterable, Iterator, Mapping
from dataclasses import dataclass
from typing import Optional, Union, get_type_hints
from typing import TypeAlias, get_type_hints
import numpy as np
import pyarrow as pa
import pyarrow.acero as ac
import pyarrow.compute as pc
import pyarrow.dataset as ds
from typing_extensions import Self

Array = Union[pa.Array, pa.ChunkedArray]
Batch = Union[pa.RecordBatch, pa.Table]
Array: TypeAlias = pa.Array | pa.ChunkedArray
Batch: TypeAlias = pa.RecordBatch | pa.Table
bit_any = functools.partial(functools.reduce, operator.or_)
bit_all = functools.partial(functools.reduce, operator.and_)

Expand Down Expand Up @@ -152,7 +152,7 @@ def inner_flatten(self) -> pa.lib.BaseListArray:
offsets = self.values.offsets.take(self.offsets)
return type(self).from_arrays(offsets, self.values.values)

def aggregate(self, **funcs: Optional[pc.FunctionOptions]) -> pa.RecordBatch:
def aggregate(self, **funcs: pc.FunctionOptions | None) -> pa.RecordBatch:
"""Return aggregated scalars by grouping each hash function on the parent indices.
If there are empty or null scalars, then the result must be padded with null defaults and
Expand Down Expand Up @@ -391,7 +391,7 @@ def map_list(self, func: Callable, *args, **kwargs) -> Batch:
return Table.union(self, Table.from_counts(table, counts))

def sort_indices(
self, *names: str, length: Optional[int] = None, null_placement: str = 'at_end'
self, *names: str, length: int | None = None, null_placement: str = 'at_end'
) -> pa.Array:
"""Return indices which would sort the table by columns, optimized for fixed length."""
func = functools.partial(pc.sort_indices, null_placement=null_placement)
Expand All @@ -404,7 +404,7 @@ def sort_indices(
def sort(
self,
*names: str,
length: Optional[int] = None,
length: int | None = None,
indices: str = '',
null_placement: str = 'at_end',
) -> Batch:
Expand Down Expand Up @@ -521,7 +521,7 @@ def flatten(self, indices: str = '') -> Iterator[pa.RecordBatch]:
offset += len(batch)
yield pa.RecordBatch.from_pydict(columns)

def split(self) -> Iterator[Optional[pa.RecordBatch]]:
def split(self) -> Iterator[pa.RecordBatch | None]:
"""Generate tables from splitting list scalars."""
lists = Table.list_fields(self)
scalars = set(self.schema.names) - lists
Expand Down
325 changes: 161 additions & 164 deletions graphique/inputs.py

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions graphique/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import itertools
from collections.abc import Callable, Iterable, Iterator, Mapping, Sized
from datetime import timedelta
from typing import Annotated, Optional, Union, no_type_check
from typing import Annotated, TypeAlias, no_type_check
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset as ds
Expand All @@ -25,7 +25,7 @@
from .models import Column, doc_field
from .scalars import Long

Source = Union[ds.Dataset, Nodes, ds.Scanner, pa.Table]
Source: TypeAlias = ds.Dataset | Nodes | ds.Scanner | pa.Table


def references(field) -> Iterator:
Expand Down Expand Up @@ -83,7 +83,7 @@ def select(self, info: Info) -> Source:
return self.source.select(names)
return Nodes.scan(self.source, names)

def to_table(self, info: Info, length: Optional[int] = None) -> pa.Table:
def to_table(self, info: Info, length: int | None = None) -> pa.Table:
"""Return table with only the rows and columns necessary to proceed."""
source = self.select(info)
if isinstance(source, pa.Table):
Expand Down Expand Up @@ -163,7 +163,7 @@ def schema(self) -> Schema:
) # type: ignore

@doc_field
def optional(self) -> Optional[Self]:
def optional(self) -> Self | None:
"""Nullable field to stop error propagation, enabling partial query results.
Will be replaced by client controlled nullability.
Expand Down Expand Up @@ -192,7 +192,7 @@ def any(self, info: Info, length: Long = 1) -> bool:
return len(table) >= length

@doc_field
def size(self) -> Optional[Long]:
def size(self) -> Long | None:
"""buffer size in bytes; null if table is not loaded"""
return getattr(self.source, 'nbytes', None)

Expand All @@ -203,7 +203,7 @@ def size(self) -> Optional[Long]:
)
def column(
self, info: Info, name: list[str], cast: str = '', safe: bool = True
) -> Optional[Column]:
) -> Column | None:
"""Return column of any type by name.
This is typically only needed for aliased or casted columns.
Expand All @@ -222,7 +222,7 @@ def column(
reverse="reverse order after slicing; forces a copy",
)
def slice(
self, info: Info, offset: Long = 0, length: Optional[Long] = None, reverse: bool = False
self, info: Info, offset: Long = 0, length: Long | None = None, reverse: bool = False
) -> Self:
"""Return zero-copy slice of table.
Expand Down Expand Up @@ -301,7 +301,7 @@ def sort(
self,
info: Info,
by: list[str],
length: Optional[Long] = None,
length: Long | None = None,
null_placement: str = 'at_end',
) -> Self:
"""Return table slice sorted by specified columns.
Expand Down Expand Up @@ -396,7 +396,7 @@ def flatten(self, info: Info, indices: str = '') -> Self:
return type(self)(self.add_metric(info, table, mode='batch'))

@doc_field
def tables(self, info: Info) -> list[Optional[Self]]: # type: ignore
def tables(self, info: Info) -> list[Self | None]: # type: ignore
"""Return a list of tables by splitting list columns.
At least one list column must be referenced, and all list columns must have the same lengths.
Expand Down Expand Up @@ -477,7 +477,7 @@ def join(
info: Info,
right: str,
keys: list[str],
right_keys: Optional[list[str]] = None,
right_keys: list[str] | None = None,
join_type: str = 'left outer',
left_suffix: str = '',
right_suffix: str = '',
Expand Down
9 changes: 4 additions & 5 deletions graphique/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from collections.abc import Iterable, Mapping
from datetime import timedelta
from keyword import iskeyword
from typing import Optional
import pyarrow.dataset as ds
import strawberry.asgi
from strawberry import Info, UNSET
Expand All @@ -33,7 +32,7 @@ def get_results(self) -> dict:
return {'metrics': metrics}

@staticmethod
def duration(data: dict) -> Optional[str]:
def duration(data: dict) -> str | None:
return data['duration'] and str(timedelta(microseconds=data['duration'] / 1e3))


Expand Down Expand Up @@ -87,12 +86,12 @@ def implemented(root: Source, name: str = '', keys: Iterable = ()):
prefix = to_camel_case(name.title())

namespace = {name: strawberry.field(default=UNSET, name=name) for name in types}
annotations = {name: Optional[Column.registry[types[name]]] for name in types}
annotations = {name: Column.registry[types[name]] | None for name in types}
cls = type(prefix + 'Columns', (), dict(namespace, __annotations__=annotations))
Columns = strawberry.type(cls, description="fields for each column")

namespace = {name: strawberry.field(default=UNSET, name=name) for name in types}
annotations = {name: Optional[Column if cls is list else cls] for name, cls in types.items()}
annotations = {name: (Column if cls is list else cls) | None for name, cls in types.items()}
cls = type(prefix + 'Row', (), dict(namespace, __annotations__=annotations))
Row = strawberry.type(cls, description="scalar fields")

Expand All @@ -104,7 +103,7 @@ def columns(self, info: Info) -> Columns: # type: ignore
"""fields for each column"""
return Columns(**super().columns(info))

def row(self, info: Info, index: Long = 0) -> Optional[Row]: # type: ignore
def row(self, info: Info, index: Long = 0) -> Row | None: # type: ignore
"""Return scalar values at index."""
row = super().row(info, index)
for name, value in row.items():
Expand Down
53 changes: 26 additions & 27 deletions graphique/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
GraphQL output types and resolvers.
"""

from __future__ import annotations
import functools
import inspect
from collections.abc import Callable
from datetime import date, datetime, time, timedelta
from decimal import Decimal
from typing import Annotated, Generic, Optional, TypeVar, TYPE_CHECKING, get_args
from typing import Annotated, Generic, TypeVar, TYPE_CHECKING, get_args
import pyarrow as pa
import pyarrow.compute as pc
import strawberry
Expand All @@ -27,7 +28,7 @@ def selections(*fields) -> set:
return {selection.name for field in fields for selection in field.selections}


def doc_field(func: Optional[Callable] = None, **kwargs: str) -> StrawberryField:
def doc_field(func: Callable | None = None, **kwargs: str) -> StrawberryField:
"""Return strawberry field with argument and docstring descriptions."""
if func is None:
return functools.partial(doc_field, **kwargs) # type: ignore
Expand Down Expand Up @@ -77,12 +78,12 @@ def size(self) -> Long:
return self.array.nbytes

@classmethod
def cast(cls, array: pa.ChunkedArray) -> 'Column':
def cast(cls, array: pa.ChunkedArray) -> Column:
"""Return typed column based on array type."""
return cls.registry[py_type(array.type)](array)

@classmethod
def fromscalar(cls, scalar: pa.ListScalar) -> Optional['Column']:
def fromscalar(cls, scalar: pa.ListScalar) -> Column | None:
return None if scalar.values is None else cls.cast(pa.chunked_array([scalar.values]))

@compute_field
Expand All @@ -105,7 +106,7 @@ def __init__(self, array, counts=pa.array([])):
self.array, self.counts = array, counts.to_pylist()

@doc_field
def values(self) -> list[Optional[T]]:
def values(self) -> list[T | None]:
"""list of values"""
return self.array.to_pylist()

Expand All @@ -126,7 +127,7 @@ def unique(self, info: Info) -> Set[T]:
return Set(self.array.unique())

@doc_field
def value(self, index: Long = 0) -> Optional[T]:
def value(self, index: Long = 0) -> T | None:
"""scalar value at index"""
return self.array[index].as_py()

Expand All @@ -139,23 +140,23 @@ def drop_null(self) -> list[T]:
@strawberry.type(name='Column', description="column of ordinal values")
class OrdinalColumn(NominalColumn[T]):
@compute_field
def first(self, skip_nulls: bool = True, min_count: int = 0) -> Optional[T]:
def first(self, skip_nulls: bool = True, min_count: int = 0) -> T | None:
return pc.first(self.array, skip_nulls=skip_nulls, min_count=min_count).as_py()

@compute_field
def last(self, skip_nulls: bool = True, min_count: int = 0) -> Optional[T]:
def last(self, skip_nulls: bool = True, min_count: int = 0) -> T | None:
return pc.last(self.array, skip_nulls=skip_nulls, min_count=min_count).as_py()

@compute_field
def min(self, skip_nulls: bool = True, min_count: int = 0) -> Optional[T]:
def min(self, skip_nulls: bool = True, min_count: int = 0) -> T | None:
return pc.min(self.array, skip_nulls=skip_nulls, min_count=min_count).as_py()

@compute_field
def max(self, skip_nulls: bool = True, min_count: int = 0) -> Optional[T]:
def max(self, skip_nulls: bool = True, min_count: int = 0) -> T | None:
return pc.max(self.array, skip_nulls=skip_nulls, min_count=min_count).as_py()

@compute_field
def index(self, value: T, start: Long = 0, end: Optional[Long] = None) -> Long:
def index(self, value: T, start: Long = 0, end: Long | None = None) -> Long:
return C.index(self.array, value, start, end)

@compute_field
Expand All @@ -175,15 +176,15 @@ def mode(self, n: int = 1, skip_nulls: bool = True, min_count: int = 0) -> Set[T
return Set(*pc.mode(self.array, n, skip_nulls=skip_nulls, min_count=min_count).flatten())

@compute_field
def sum(self, skip_nulls: bool = True, min_count: int = 0) -> Optional[T]:
def sum(self, skip_nulls: bool = True, min_count: int = 0) -> T | None:
return pc.sum(self.array, skip_nulls=skip_nulls, min_count=min_count).as_py()

@compute_field
def product(self, skip_nulls: bool = True, min_count: int = 0) -> Optional[T]:
def product(self, skip_nulls: bool = True, min_count: int = 0) -> T | None:
return pc.product(self.array, skip_nulls=skip_nulls, min_count=min_count).as_py()

@compute_field
def mean(self, skip_nulls: bool = True, min_count: int = 0) -> Optional[float]:
def mean(self, skip_nulls: bool = True, min_count: int = 0) -> float | None:
return pc.mean(self.array, skip_nulls=skip_nulls, min_count=min_count).as_py()

@compute_field
Expand All @@ -195,13 +196,11 @@ def indices_nonzero(self) -> list[Long]:
@strawberry.type(name='Column', description="column of floats or decimals")
class RatioColumn(IntervalColumn[T]):
@compute_field
def stddev(self, ddof: int = 0, skip_nulls: bool = True, min_count: int = 0) -> Optional[float]:
def stddev(self, ddof: int = 0, skip_nulls: bool = True, min_count: int = 0) -> float | None:
return pc.stddev(self.array, ddof=ddof, skip_nulls=skip_nulls, min_count=min_count).as_py()

@compute_field
def variance(
self, ddof: int = 0, skip_nulls: bool = True, min_count: int = 0
) -> Optional[float]:
def variance(self, ddof: int = 0, skip_nulls: bool = True, min_count: int = 0) -> float | None:
options = {'skip_nulls': skip_nulls, 'min_count': min_count}
return pc.variance(self.array, ddof=ddof, **options).as_py()

Expand All @@ -212,7 +211,7 @@ def quantile(
interpolation: str = 'linear',
skip_nulls: bool = True,
min_count: int = 0,
) -> list[Optional[float]]:
) -> list[float | None]:
options = {'skip_nulls': skip_nulls, 'min_count': min_count}
return pc.quantile(self.array, q=q, interpolation=interpolation, **options).to_pylist()

Expand All @@ -224,7 +223,7 @@ def tdigest(
buffer_size: int = 500,
skip_nulls: bool = True,
min_count: int = 0,
) -> list[Optional[float]]:
) -> list[float | None]:
options = {'buffer_size': buffer_size, 'skip_nulls': skip_nulls, 'min_count': min_count}
return pc.tdigest(self.array, q=q, delta=delta, **options).to_pylist()

Expand All @@ -233,11 +232,11 @@ def tdigest(
@strawberry.type(name='eanColumn', description="column of booleans")
class BooleanColumn(IntervalColumn[T]):
@compute_field
def any(self, skip_nulls: bool = True, min_count: int = 1) -> Optional[bool]:
def any(self, skip_nulls: bool = True, min_count: int = 1) -> bool | None:
return pc.any(self.array, skip_nulls=skip_nulls, min_count=min_count).as_py()

@compute_field
def all(self, skip_nulls: bool = True, min_count: int = 1) -> Optional[bool]:
def all(self, skip_nulls: bool = True, min_count: int = 1) -> bool | None:
return pc.all(self.array, skip_nulls=skip_nulls, min_count=min_count).as_py()


Expand All @@ -247,7 +246,7 @@ class IntColumn(RatioColumn[T]):
@doc_field
def take_from(
self, info: Info, field: str
) -> Optional[Annotated['Dataset', strawberry.lazy('.interface')]]:
) -> Annotated['Dataset', strawberry.lazy('.interface')] | None:
"""Select indices from a table on the root Query type."""
root = getattr(info.root_value, field)
return type(root)(root.select(info).take(self.array.combine_chunks()))
Expand All @@ -257,12 +256,12 @@ def take_from(
@strawberry.type(description="column of lists")
class ListColumn(Column):
@doc_field
def value(self, index: Long = 0) -> Optional[Column]:
def value(self, index: Long = 0) -> Column | None:
"""scalar column at index"""
return self.fromscalar(self.array[index])

@doc_field
def values(self) -> list[Optional[Column]]:
def values(self) -> list[Column | None]:
"""list of columns"""
return list(map(self.fromscalar, self.array))

Expand All @@ -280,7 +279,7 @@ def flatten(self) -> Column:
@strawberry.type(description="column of structs")
class StructColumn(Column):
@doc_field
def value(self, index: Long = 0) -> Optional[dict]:
def value(self, index: Long = 0) -> dict | None:
"""scalar json object at index"""
return self.array[index].as_py()

Expand All @@ -290,6 +289,6 @@ def names(self) -> list[str]:
return [field.name for field in self.array.type]

@doc_field(name="field name(s); multiple names access nested fields")
def column(self, name: list[str]) -> Optional[Column]:
def column(self, name: list[str]) -> Column | None:
"""Return struct field as a column."""
return self.cast(pc.struct_field(self.array, name))
Loading

0 comments on commit aef7073

Please sign in to comment.