Skip to content

Latest commit

 

History

History
204 lines (139 loc) · 7.2 KB

File metadata and controls

204 lines (139 loc) · 7.2 KB

IoTDB

IoTDB sink connector

Description

Used to write data to IoTDB.

Key features

IoTDB supports the exactly-once feature through idempotent writing. If two pieces of data have the same key and timestamp, the new data will overwrite the old one.

:::tip

There is a conflict of thrift version between IoTDB and Spark.Therefore, you need to execute rm -f $SPARK_HOME/jars/libthrift* and cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/ to resolve it.

:::

Options

name type required default value
node_urls list yes -
username string yes -
password string yes -
key_device string yes -
key_timestamp string no processing time
key_measurement_fields array no exclude device & timestamp
storage_group string no -
batch_size int no 1024
batch_interval_ms int no -
max_retries int no -
retry_backoff_multiplier_ms int no -
max_retry_backoff_ms int no -
default_thrift_buffer_size int no -
max_thrift_frame_size int no -
zone_id string no -
enable_rpc_compression boolean no -
connection_timeout_in_ms int no -
common-options no -

node_urls [list]

IoTDB cluster address, the format is ["host:port", ...]

username [string]

IoTDB user username

password [string]

IoTDB user password

key_device [string]

Specify field name of the IoTDB deviceId in SeaTunnelRow

key_timestamp [string]

Specify field-name of the IoTDB timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp

key_measurement_fields [array]

Specify field-name of the IoTDB measurement list in SeaTunnelRow. If not specified, include all fields but exclude device & timestamp

storage_group [string]

Specify device storage group(path prefix)

example: deviceId = ${storage_group} + "." + ${key_device}

batch_size [int]

For batch writing, when the number of buffers reaches the number of batch_size or the time reaches batch_interval_ms, the data will be flushed into the IoTDB

batch_interval_ms [int]

For batch writing, when the number of buffers reaches the number of batch_size or the time reaches batch_interval_ms, the data will be flushed into the IoTDB

max_retries [int]

The number of retries to flush failed

retry_backoff_multiplier_ms [int]

Using as a multiplier for generating the next delay for backoff

max_retry_backoff_ms [int]

The amount of time to wait before attempting to retry a request to IoTDB

default_thrift_buffer_size [int]

Thrift init buffer size in IoTDB client

max_thrift_frame_size [int]

Thrift max frame size in IoTDB client

zone_id [string]

java.time.ZoneId in IoTDB client

enable_rpc_compression [boolean]

Enable rpc compression in IoTDB client

connection_timeout_in_ms [int]

The maximum time (in ms) to wait when connect IoTDB

common options

Sink plugin common parameters, please refer to Sink Common Options for details

Examples

Case1

Common options:

sink {
  IoTDB {
    node_urls = ["localhost:6667"]
    username = "root"
    password = "root"
    batch_size = 1024
    batch_interval_ms = 1000
  }
}

When you assign key_device is device_name, for example:

sink {
  IoTDB {
    ...
    key_device = "device_name"
  }
}

Upstream SeaTunnelRow data format is the following:

device_name field_1 field_2
root.test_group.device_a 1001 1002
root.test_group.device_b 2001 2002
root.test_group.device_c 3001 3002

Output to IoTDB data format is the following:

IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+-----------+----------+
|                    Time|                  Device|   field_1|    field_2|
+------------------------+------------------------+----------+-----------+
|2022-09-26T17:50:01.201Z|root.test_group.device_a|      1001|       1002|
|2022-09-26T17:50:01.202Z|root.test_group.device_b|      2001|       2002|
|2022-09-26T17:50:01.203Z|root.test_group.device_c|      3001|       3002|
+------------------------+------------------------+----------+-----------+

Case2

When you assign key_devicekey_timestampkey_measurement_fields, for example:

sink {
  IoTDB {
    ...
    key_device = "device_name"
    key_timestamp = "ts"
    key_measurement_fields = ["temperature", "moisture"]
  }
}

Upstream SeaTunnelRow data format is the following:

ts device_name field_1 field_2 temperature moisture
1664035200001 root.test_group.device_a 1001 1002 36.1 100
1664035200001 root.test_group.device_b 2001 2002 36.2 101
1664035200001 root.test_group.device_c 3001 3002 36.3 102

Output to IoTDB data format is the following:

IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+
|                    Time|                  Device|   temperature|   moisture|
+------------------------+------------------------+--------------+-----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a|          36.1|        100|
|2022-09-25T00:00:00.001Z|root.test_group.device_b|          36.2|        101|
|2022-09-25T00:00:00.001Z|root.test_group.device_c|          36.3|        102|
+------------------------+------------------------+--------------+-----------+