Skip to content

Commit

Permalink
Implement support for multirange types (#851)
Browse files Browse the repository at this point in the history
  • Loading branch information
elprans committed Nov 16, 2021
1 parent a8fc21e commit d64a44a
Show file tree
Hide file tree
Showing 10 changed files with 341 additions and 35 deletions.
5 changes: 4 additions & 1 deletion asyncpg/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ def __init__(self, protocol, transport, loop,
self._server_caps = _detect_server_capabilities(
self._server_version, settings)

self._intro_query = introspection.INTRO_LOOKUP_TYPES
if self._server_version < (14, 0):
self._intro_query = introspection.INTRO_LOOKUP_TYPES_13
else:
self._intro_query = introspection.INTRO_LOOKUP_TYPES

self._reset_query = None
self._proxy = None
Expand Down
125 changes: 124 additions & 1 deletion asyncpg/introspection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0


_TYPEINFO = '''\
_TYPEINFO_13 = '''\
(
SELECT
t.oid AS oid,
Expand Down Expand Up @@ -82,6 +82,129 @@
'''


INTRO_LOOKUP_TYPES_13 = '''\
WITH RECURSIVE typeinfo_tree(
oid, ns, name, kind, basetype, elemtype, elemdelim,
range_subtype, attrtypoids, attrnames, depth)
AS (
SELECT
ti.oid, ti.ns, ti.name, ti.kind, ti.basetype,
ti.elemtype, ti.elemdelim, ti.range_subtype,
ti.attrtypoids, ti.attrnames, 0
FROM
{typeinfo} AS ti
WHERE
ti.oid = any($1::oid[])
UNION ALL
SELECT
ti.oid, ti.ns, ti.name, ti.kind, ti.basetype,
ti.elemtype, ti.elemdelim, ti.range_subtype,
ti.attrtypoids, ti.attrnames, tt.depth + 1
FROM
{typeinfo} ti,
typeinfo_tree tt
WHERE
(tt.elemtype IS NOT NULL AND ti.oid = tt.elemtype)
OR (tt.attrtypoids IS NOT NULL AND ti.oid = any(tt.attrtypoids))
OR (tt.range_subtype IS NOT NULL AND ti.oid = tt.range_subtype)
)
SELECT DISTINCT
*,
basetype::regtype::text AS basetype_name,
elemtype::regtype::text AS elemtype_name,
range_subtype::regtype::text AS range_subtype_name
FROM
typeinfo_tree
ORDER BY
depth DESC
'''.format(typeinfo=_TYPEINFO_13)


_TYPEINFO = '''\
(
SELECT
t.oid AS oid,
ns.nspname AS ns,
t.typname AS name,
t.typtype AS kind,
(CASE WHEN t.typtype = 'd' THEN
(WITH RECURSIVE typebases(oid, depth) AS (
SELECT
t2.typbasetype AS oid,
0 AS depth
FROM
pg_type t2
WHERE
t2.oid = t.oid
UNION ALL
SELECT
t2.typbasetype AS oid,
tb.depth + 1 AS depth
FROM
pg_type t2,
typebases tb
WHERE
tb.oid = t2.oid
AND t2.typbasetype != 0
) SELECT oid FROM typebases ORDER BY depth DESC LIMIT 1)
ELSE NULL
END) AS basetype,
t.typelem AS elemtype,
elem_t.typdelim AS elemdelim,
COALESCE(
range_t.rngsubtype,
multirange_t.rngsubtype) AS range_subtype,
(CASE WHEN t.typtype = 'c' THEN
(SELECT
array_agg(ia.atttypid ORDER BY ia.attnum)
FROM
pg_attribute ia
INNER JOIN pg_class c
ON (ia.attrelid = c.oid)
WHERE
ia.attnum > 0 AND NOT ia.attisdropped
AND c.reltype = t.oid)
ELSE NULL
END) AS attrtypoids,
(CASE WHEN t.typtype = 'c' THEN
(SELECT
array_agg(ia.attname::text ORDER BY ia.attnum)
FROM
pg_attribute ia
INNER JOIN pg_class c
ON (ia.attrelid = c.oid)
WHERE
ia.attnum > 0 AND NOT ia.attisdropped
AND c.reltype = t.oid)
ELSE NULL
END) AS attrnames
FROM
pg_catalog.pg_type AS t
INNER JOIN pg_catalog.pg_namespace ns ON (
ns.oid = t.typnamespace)
LEFT JOIN pg_type elem_t ON (
t.typlen = -1 AND
t.typelem != 0 AND
t.typelem = elem_t.oid
)
LEFT JOIN pg_range range_t ON (
t.oid = range_t.rngtypid
)
LEFT JOIN pg_range multirange_t ON (
t.oid = multirange_t.rngmultitypid
)
)
'''


INTRO_LOOKUP_TYPES = '''\
WITH RECURSIVE typeinfo_tree(
oid, ns, name, kind, basetype, elemtype, elemdelim,
Expand Down
12 changes: 0 additions & 12 deletions asyncpg/protocol/codecs/array.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -858,19 +858,7 @@ cdef arraytext_decode(ConnectionSettings settings, FRBuffer *buf):
return array_decode(settings, buf, <decode_func_ex>&text_decode_ex, NULL)


cdef anyarray_decode(ConnectionSettings settings, FRBuffer *buf):
# Instances of anyarray (or any other polymorphic pseudotype) are
# never supposed to be returned from actual queries.
raise exceptions.ProtocolError(
'unexpected instance of \'anyarray\' type')


cdef init_array_codecs():
register_core_codec(ANYARRAYOID,
NULL,
<decode_func>&anyarray_decode,
PG_FORMAT_BINARY)

# oid[] and text[] are registered as core codecs
# to make type introspection query work
#
Expand Down
24 changes: 18 additions & 6 deletions asyncpg/protocol/codecs/base.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ ctypedef object (*codec_decode_func)(Codec codec,


cdef enum CodecType:
CODEC_UNDEFINED = 0
CODEC_C = 1
CODEC_PY = 2
CODEC_ARRAY = 3
CODEC_COMPOSITE = 4
CODEC_RANGE = 5
CODEC_UNDEFINED = 0
CODEC_C = 1
CODEC_PY = 2
CODEC_ARRAY = 3
CODEC_COMPOSITE = 4
CODEC_RANGE = 5
CODEC_MULTIRANGE = 6


cdef enum ServerDataFormat:
Expand Down Expand Up @@ -95,6 +96,9 @@ cdef class Codec:
cdef encode_range(self, ConnectionSettings settings, WriteBuffer buf,
object obj)

cdef encode_multirange(self, ConnectionSettings settings, WriteBuffer buf,
object obj)

cdef encode_composite(self, ConnectionSettings settings, WriteBuffer buf,
object obj)

Expand All @@ -109,6 +113,8 @@ cdef class Codec:

cdef decode_range(self, ConnectionSettings settings, FRBuffer *buf)

cdef decode_multirange(self, ConnectionSettings settings, FRBuffer *buf)

cdef decode_composite(self, ConnectionSettings settings, FRBuffer *buf)

cdef decode_in_python(self, ConnectionSettings settings, FRBuffer *buf)
Expand Down Expand Up @@ -139,6 +145,12 @@ cdef class Codec:
str schema,
Codec element_codec)

@staticmethod
cdef Codec new_multirange_codec(uint32_t oid,
str name,
str schema,
Codec element_codec)

@staticmethod
cdef Codec new_composite_codec(uint32_t oid,
str name,
Expand Down
56 changes: 54 additions & 2 deletions asyncpg/protocol/codecs/base.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ cdef class Codec:
'range types is not supported'.format(schema, name))
self.encoder = <codec_encode_func>&self.encode_range
self.decoder = <codec_decode_func>&self.decode_range
elif type == CODEC_MULTIRANGE:
if format != PG_FORMAT_BINARY:
raise exceptions.UnsupportedClientFeatureError(
'cannot decode type "{}"."{}": text encoding of '
'range types is not supported'.format(schema, name))
self.encoder = <codec_encode_func>&self.encode_multirange
self.decoder = <codec_decode_func>&self.decode_multirange
elif type == CODEC_COMPOSITE:
if format != PG_FORMAT_BINARY:
raise exceptions.UnsupportedClientFeatureError(
Expand Down Expand Up @@ -122,6 +129,12 @@ cdef class Codec:
codec_encode_func_ex,
<void*>(<cpython.PyObject>self.element_codec))

cdef encode_multirange(self, ConnectionSettings settings, WriteBuffer buf,
object obj):
multirange_encode(settings, buf, obj, self.element_codec.oid,
codec_encode_func_ex,
<void*>(<cpython.PyObject>self.element_codec))

cdef encode_composite(self, ConnectionSettings settings, WriteBuffer buf,
object obj):
cdef:
Expand Down Expand Up @@ -209,6 +222,10 @@ cdef class Codec:
return range_decode(settings, buf, codec_decode_func_ex,
<void*>(<cpython.PyObject>self.element_codec))

cdef decode_multirange(self, ConnectionSettings settings, FRBuffer *buf):
return multirange_decode(settings, buf, codec_decode_func_ex,
<void*>(<cpython.PyObject>self.element_codec))

cdef decode_composite(self, ConnectionSettings settings,
FRBuffer *buf):
cdef:
Expand Down Expand Up @@ -294,7 +311,11 @@ cdef class Codec:
if self.c_encoder is not NULL or self.py_encoder is not None:
return True

elif self.type == CODEC_ARRAY or self.type == CODEC_RANGE:
elif (
self.type == CODEC_ARRAY
or self.type == CODEC_RANGE
or self.type == CODEC_MULTIRANGE
):
return self.element_codec.has_encoder()

elif self.type == CODEC_COMPOSITE:
Expand All @@ -312,7 +333,11 @@ cdef class Codec:
if self.c_decoder is not NULL or self.py_decoder is not None:
return True

elif self.type == CODEC_ARRAY or self.type == CODEC_RANGE:
elif (
self.type == CODEC_ARRAY
or self.type == CODEC_RANGE
or self.type == CODEC_MULTIRANGE
):
return self.element_codec.has_decoder()

elif self.type == CODEC_COMPOSITE:
Expand Down Expand Up @@ -358,6 +383,18 @@ cdef class Codec:
None, None, None, 0)
return codec

@staticmethod
cdef Codec new_multirange_codec(uint32_t oid,
str name,
str schema,
Codec element_codec):
cdef Codec codec
codec = Codec(oid)
codec.init(name, schema, 'multirange', CODEC_MULTIRANGE,
element_codec.format, PG_XFORMAT_OBJECT, NULL, NULL,
None, None, element_codec, None, None, None, 0)
return codec

@staticmethod
cdef Codec new_composite_codec(uint32_t oid,
str name,
Expand Down Expand Up @@ -536,6 +573,21 @@ cdef class DataCodecConfig:
self._derived_type_codecs[oid, elem_codec.format] = \
Codec.new_range_codec(oid, name, schema, elem_codec)

elif ti['kind'] == b'm':
# Multirange type

if not range_subtype_oid:
raise exceptions.InternalClientError(
f'type record missing base type for multirange {oid}')

elem_codec = self.get_codec(range_subtype_oid, PG_FORMAT_ANY)
if elem_codec is None:
elem_codec = self.declare_fallback_codec(
range_subtype_oid, ti['range_subtype_name'], schema)

self._derived_type_codecs[oid, elem_codec.format] = \
Codec.new_multirange_codec(oid, name, schema, elem_codec)

elif ti['kind'] == b'e':
# Enum types are essentially text
self._set_builtin_type_codec(oid, name, schema, 'scalar',
Expand Down
18 changes: 16 additions & 2 deletions asyncpg/protocol/codecs/pgproto.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,9 @@ cdef init_pseudo_codecs():
FDW_HANDLEROID, TSM_HANDLEROID, INTERNALOID, OPAQUEOID,
ANYELEMENTOID, ANYNONARRAYOID, ANYCOMPATIBLEOID,
ANYCOMPATIBLEARRAYOID, ANYCOMPATIBLENONARRAYOID,
ANYCOMPATIBLERANGEOID, PG_DDL_COMMANDOID, INDEX_AM_HANDLEROID,
TABLE_AM_HANDLEROID,
ANYCOMPATIBLERANGEOID, ANYCOMPATIBLEMULTIRANGEOID,
ANYRANGEOID, ANYMULTIRANGEOID, ANYARRAYOID,
PG_DDL_COMMANDOID, INDEX_AM_HANDLEROID, TABLE_AM_HANDLEROID,
]

register_core_codec(ANYENUMOID,
Expand Down Expand Up @@ -330,6 +331,19 @@ cdef init_pseudo_codecs():
<decode_func>pgproto.bytea_decode,
PG_FORMAT_BINARY)

# These two are internal to BRIN index support and are unlikely
# to be sent, but since I/O functions for these exist, add decoders
# nonetheless.
register_core_codec(PG_BRIN_BLOOM_SUMMARYOID,
NULL,
<decode_func>pgproto.bytea_decode,
PG_FORMAT_BINARY)

register_core_codec(PG_BRIN_MINMAX_MULTI_SUMMARYOID,
NULL,
<decode_func>pgproto.bytea_decode,
PG_FORMAT_BINARY)


cdef init_text_codecs():
textoids = [
Expand Down
Loading

0 comments on commit d64a44a

Please sign in to comment.