Skip to content

Commit

Permalink
Add python documentation for Kafka integration. (#1160)
Browse files Browse the repository at this point in the history
* Add python documentation for Kafka integration.

* Added generated .json doc files that were left out of date in main.

* Adding :return: and :raises: to the doc; changing to rainsing ValueErrors where appropriate.

* Followup to review comments.
  • Loading branch information
jcferretti authored Aug 31, 2021
1 parent 36b4c2d commit b181861
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ def _custom_avroSchemaToColumnDefinitions(schema, mapping:dict = None):
if mapping is None:
return _java_type_.avroSchemaToColumnDefinitions(schema)

if not isinstance(mapping, dict):
raise Exception("argument 'mapping' has to be of dict type, instead got " +
str(mapping) + " of type " + type(mapping).__name__)
field_names = jpy.array('java.lang.String', mapping.keys())
column_names = jpy.array('java.lang.String', mapping.values())
mapping = _java_type_.fieldNameMappingFromParallelArrays(field_names, column_names)
Expand All @@ -112,27 +109,56 @@ def consumeToTable(
value = None,
table_type = 'stream'
):
if not isinstance(kafka_config, dict):
raise Exception("argument 'kafka_config' has to be of type dict, instead got " + str(kafka_config))
"""
Consume from Kafka to a Deephaven table.
:param kafka_config: Dictionary with properties to configure the associated kafka consumer and
also the resulting table. Once the table-specific properties are stripped, the result is
passed to the org.apache.kafka.clients.consumer.KafkaConsumer constructor; pass any
KafkaConsumer specific desired configuration here.
:param topic: The topic name
:param partitions: Either a sequence of integer partition numbers or the predefined constant
ALL_PARTITIONS for all partitions.
:param offsets: Either a dict mapping partition numbers to offset numbers, or one of the predefined constants
ALL_PARTITIONS_SEEK_TO_BEGINNING, ALL_PARTITIONS_SEEK_TO_END or ALL_PARTITIONS_DONT_SEEK.
If a dict, the values may be one of the predefined constants SEEK_TO_BEGINNING, SEEK_TO_END
or DONT_SEEK.
:param key: A specification for how to map the Key field in Kafka messages. This should be
the result of calling one of the methods simple, avro or json in this module,
or None to obtain a single column specified in the kafka_config param via the
keys 'deephaven.key.column.name' for column name and 'deephaven.key.column.type' for
the column type; both should have string values associated to them.
:param value: A specification for how to map the Value field in Kafka messages. This should be
the result of calling one of the methods simple, avro or json in this module,
or None to obtain a single column specified in the kafka_config param via the
keys 'deephaven.value.column.name' for column name and 'deephaven.value.column.type' for
the column type; both should have string values associated to them.
:param table_type: A string specifying the resulting table type: one of 'stream' (default), 'append',
'stream_map' or 'append_map'.
:return: A Deephaven live table that will update based on Kafma messages consumed for the given topic.
:raises: ValueError or TypeError if arguments provided can't be processed.
"""

if not _isStr(topic):
raise Exception("argument 'topic' has to be of str type, instead got " + topic)
raise ValueError("argument 'topic' has to be of str type, instead got " + topic)

if partitions is None:
partitions = ALL_PARTITIONS
elif isinstance(partitions, collections.Sequence):
try:
jarr = jpy.array('int', partitions)
except Exception as e:
raise Exception(
raise ValueError(
"when not one of the predefined constants, keyword argument 'partitions' has to " +
"represent a sequence of integer partition values >= 0"
"represent a sequence of integer partition with values >= 0, instead got " +
str(partitions) + " of type " + type(partitions).__name__
) from e
partitions = _java_type_.partitionFilterFromArray(jarr)
elif not isinstance(partitions, jpy.JType):
raise Exception("argument 'partitions' has to be of str or sequence type, " +
"or a predefined compatible constant, instead got partitions " +
str(partitions) + " of type " + type(partitions).__name__)
raise TypeError(
"argument 'partitions' has to be of str or sequence type, " +
"or a predefined compatible constant, instead got partitions " +
str(partitions) + " of type " + type(partitions).__name__)

if offsets is None:
offsets = ALL_PARTITIONS_DONT_SEEK
Expand All @@ -142,54 +168,70 @@ def consumeToTable(
offsets_array = jpy.array('long', offsets.values())
offsets = _java_type_.partitionToOffsetFromParallelArrays(partitions_array, offsets_array)
except Exception as e:
raise Exception(
raise ValueError(
"when of type dict, keyword argument 'offsets' has to map " +
"numeric partitions to either numeric offsets, or one of the constants { " +
"SEEK_TO_BEGINNING, DONT_SEEK, SEEK_TO_END }," +
"instead got offsets=" + str(offsets)
) from e
elif not isinstance(offsets, jpy.JType):
raise Exception(
raise TypeError(
"value " + str(offsets) + " of type " + type(offsets).__name__ +
" not recognized for argument 'offsets'; only str, dict or predefined constants allowed")
" not recognized for argument 'offsets'; only str, dict like, or predefined constants allowed")

if key is None:
key = FROM_PROPERTIES
if value is None:
value = FROM_PROPERTIES
if key is IGNORE and value is IGNORE:
raise Exception(
"at least one argument for 'key' or 'value' must be different from the default IGNORE")
raise ValueError(
"at least one argument for 'key' or 'value' must be different from IGNORE")

if not _isStr(table_type):
raise Exception("argument 'table_type' expected to be of type str, instead got " +
str(table_type) + " of type " + type(table_type).__name__)
raise TypeError(
"argument 'table_type' expected to be of type str, instead got " +
str(table_type) + " of type " + type(table_type).__name__)
table_type_enum = _java_type_.friendlyNameToTableType(table_type)
if table_type_enum is None:
raise Exception("unknown value " + table_type + " for argument 'table_type'")
raise ValueError("unknown value " + table_type + " for argument 'table_type'")

kafka_config = _dictToProperties(kafka_config)
return _java_type_.consumeToTable(kafka_config, topic, partitions, offsets, key, value, table_type_enum)


@_passThrough
def avro(schema, schema_version:str = None, mapping:dict = None, mapping_only:dict = None):
"""
Specify an Avro schema to use when consuming a Kafka stream to a Deephaven table.
:param schema: Either an Avro schema object or a string specifying a schema name for a schema
registered in a Confluent compatible Schema Server. When the latter is provided, the
associated kafka_config dict in the call to consumeToTable should include the key
'schema.registry.url' with the associated value of the Schema Server URL for fetching the schema
definition.
:param schema_version: If a string schema name is provided, the version to fetch from schema
service; if not specified, a default of 'latest' is assumed.
:param mapping: A dict representing a string to string mapping from Avro field name to Deephaven table
column name; the fields mentioned in the mapping will have their column names defined by it; any other
fields not mentioned in the mapping with use the same Avro field name for Deephaven table column
name. Note that only one parameter between mapping and mapping_only can be provided.
:param mapping_only: A dict representing a string to string mapping from Avro field name to Deephaven
table column name; the fields mentioned in the mapping will have their column names defined by it;
any other fields not mentioned in the mapping will be ignored and will not be present in the resulting
table. Note that only one parameter between mapping and mapping_only can be provided.
:return: A Kafka Key or Value spec object to use in a call to consumeToTable.
:raises: ValueError, TypeError or Exception if arguments provided can't be processed.
"""
if mapping is not None and mapping_only is not None:
raise Exception(
"only one argument between 'mapping' and " +
"'mapping_only' expected, instead got both")
if mapping is not None:
have_mapping = True
if not instanceof(mapping, dict):
raise Exception("'mapping' argument is expected to be of dict type, " +
"instead got " + str(mapping) + " of type " + type(mapping).__name__)
# when providing 'mapping_only', fields names not given are mapped as identity
mapping = _dictToFun(mapping, default_value=IDENTITY)
elif mapping_only is not None:
have_mapping = True
if not instanceof(mapping, dict):
raise Exception("'mapping_only' argument is expected to be of dict type, " +
"instead found " + str(mapping_only) + " of type " + type(mapping_only).__name__)
# when providing 'mapping_only', fields not given are ignored.
mapping = _dictToFun(mapping, default_value=None)
else:
Expand All @@ -199,15 +241,18 @@ def avro(schema, schema_version:str = None, mapping:dict = None, mapping_only:di
if schema_version is None:
schema_version = "latest"
elif not _isStr(schema_version):
raise Exception("argument 'schema_version' should be of str type, instead got " +
str(schema_version) + " of type " + type(schema_version).__name__)
raise TypeError(
"argument 'schema_version' should be of str type, instead got " +
str(schema_version) + " of type " + type(schema_version).__name__)
elif instanceof(schema, _avro_schema_jtype_):
have_actual_schema = True
if schema_version is not None:
raise Exception("argument 'schema_version' is only expected if schema is of str type")
raise Exception(
"argument 'schema_version' is only expected if schema is of str type")
else:
raise Exception("'schema' argument expected to be of either " +
"str type or avro schema type, instead got " + str(schema))
raise TypeError(
"'schema' argument expected to be of either " +
"str type or avro schema type, instead got " + str(schema))

if have_mapping:
if have_actual_schema:
Expand All @@ -223,17 +268,30 @@ def avro(schema, schema_version:str = None, mapping:dict = None, mapping_only:di

@_passThrough
def json(col_defs, mapping:dict = None):
"""
Specify how to use JSON data when consuming a Kafka stream to a Deephaven table.
:param col_defs: A sequence of tuples specifying names and types for columns to be
created on the resulting Deephaven table. Tuples contain two elements, a
string for column name and a Deephaven type for column data type.
:param mapping: A dict mapping JSON field names to column names defined in the col_defs
argument. If not present or None, a 1:1 mapping between JSON fields and Deephaven
table column names is assumed.
:return: A Kafka Key or Value spec object to use in a call to consumeToTable.
:raises: ValueError, TypeError or Exception if arguments provided can't be processed.
"""
if not isinstance(col_defs, collections.Sequence) or _isStr(col_defs):
raise Exception("'col_defs' argument needs to be a sequence of tuples, instead got " +
str(col_defs) + " of type " + type(col_defs).__name__)
raise TypeError(
"'col_defs' argument needs to be a sequence of tuples, instead got " +
str(col_defs) + " of type " + type(col_defs).__name__)
try:
col_defs = dh._colDefs(col_defs)
except Exception as e:
raise Exception("could not create column definitions from " + str(col_defs)) from e
if mapping is None:
return _java_type_.jsonSpec(col_defs)
if not isinstance(mapping, dict):
raise Exception(
raise TypeError(
"argument 'mapping' is expected to be of dict type, " +
"instead got " + str(mapping) + " of type " + type(mapping).__name__)
mapping = _dictToMap(mapping)
Expand All @@ -242,13 +300,28 @@ def json(col_defs, mapping:dict = None):

@_passThrough
def simple(column_name:str, data_type:dh.DataType = None):
"""
Specify a single value when consuming a Kafka stream to a Deephaven table.
:param column_name: A string specifying the Deephaven column name to use.
:param data_type: A Deephaven type specifying the column data type to use.
:return: A Kafka Key or Value spec object to use in a call to consumeToTable.
:raises: TypeError if arguments provided can't be processed.
"""
if not _isStr(column_name):
raise Exception("'column_name' argument needs to be of str type, instead got " + str(column_name))
raise TypeError(
"'column_name' argument needs to be of str type, instead got " + str(column_name))
if data_type is None:
return _java_type_.simpleSpec(column_name)
return _java_type_.simpleSpec(column_name, _jclassFromType(data_type))


@_passThrough
def streamTableToAppendTable(t):
"""
Creates a 'stream' table from an 'append' type.
:param t: The 'stream' table input.
:return: The resulting 'append' table.
"""
return _stream_table_tools_.streamToAppendOnlyTable(t)
Loading

0 comments on commit b181861

Please sign in to comment.