Skip to content

Commit

Permalink
feat: add API to set time unit in a metric
Browse files Browse the repository at this point in the history
This will allow reusing the same client for writing data to different tables with different time units.
  • Loading branch information
SergeTupchiy committed Jan 23, 2024
1 parent a2f5de8 commit e471ea6
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 45 deletions.
55 changes: 19 additions & 36 deletions src/greptimedb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,23 @@
-export([start_client/1, stop_client/1, write_batch/2, write/3, write_stream/1,
async_write/4, async_write_batch/3, is_alive/1, is_alive/2, ddl/1]).

-export_type([metric/0, point/0, timeunit/0]).

-type table() :: atom() | binary() | list().
-type dbname() :: atom() | binary() | list().
-type timeunit() :: ns | us| ms | s | nanosecond | microsecond | millisecond | second.
-type metric() :: table()
| {dbname(), table()}
| #{dbname => dbname(), table := table(), timeunit => timeunit()}.
-type point() :: #{tags => map(),
fields => map(),
timestamp => integer()}.

-spec start_client(list()) ->
{ok, Client :: map()} |
{error, {already_started, Client :: map()}} |
{error, Reason :: term()}.

start_client(Options0) ->
Pool = proplists:get_value(pool, Options0),
Options = lists:keydelete(protocol, 1, lists:keydelete(pool, 1, Options0)),
Expand All @@ -41,30 +54,15 @@ start_client(Options0) ->
%% @doc Write points to the metric table, return the result.
-spec write(Client, Metric, Points) -> {ok, term()} | {error, term()}
when Client :: map(),
Metric :: Table | {DbName, Table},
DbName :: atom() | binary() | list(),
Table :: atom() | binary() | list(),
Points :: [Point],
Point ::
#{tags => map(),
fields => map(),
timestamp => integer()}.
Metric :: metric(),
Points :: [point()].
write(Client, Metric, Points) ->
write_batch(Client, [{Metric, Points}]).

%% @doc Write a batch of data points to the database, return the result.
-spec write_batch(Client, MetricAndPoints) -> {ok, term()} | {error, term()}
when Client :: map(),
MetricAndPoints :: [MetricAndPoint],
MetricAndPoint :: {Metric, Points},
Metric :: Table | {DbName, Table},
DbName :: atom() | binary() | list(),
Table :: atom() | binary() | list(),
Points :: [Point],
Point ::
#{tags => map(),
fields => map(),
timestamp => integer()}.
MetricAndPoints :: [{metric(), [point()]}].
write_batch(Client, MetricAndPoints) ->
try
Request = greptimedb_encoder:insert_requests(Client, MetricAndPoints),
Expand All @@ -89,31 +87,16 @@ write_stream(Client) ->
%% @doc Send an async request to write points to the metric table. The callback is evaluated when an error happens or response is received.
-spec async_write(Client, Metric, Points, ResultCallback) -> ok | {error, term()}
when Client :: map(),
Metric :: Table | {DbName, Table},
DbName :: atom() | binary() | list(),
Table :: atom() | binary() | list(),
Points :: [Point],
Point ::
#{tags => map(),
fields => map(),
timestamp => integer()},
Metric :: metric(),
Points :: [point()],
ResultCallback :: {function(), list()}.
async_write(Client, Metric, Points, ResultCallback) ->
async_write_batch(Client, [{Metric, Points}], ResultCallback).

%% @doc Send a batch of async request. The callback is evaluated when an error happens or response is received.
-spec async_write_batch(Client, MetricAndPoints, ResultCallback) -> ok | {error, term()}
when Client :: map(),
MetricAndPoints :: [MetricAndPoint],
MetricAndPoint :: {Metric, Points},
Metric :: Table | {DbName, Table},
DbName :: atom() | binary() | list(),
Table :: atom() | binary() | list(),
Points :: [Point],
Point ::
#{tags => map(),
fields => map(),
timestamp => integer()},
MetricAndPoints :: [{metric(), [point()]}],
ResultCallback :: {function(), list()}.
async_write_batch(Client, MetricAndPoints, ResultCallback) ->
Request = greptimedb_encoder:insert_requests(Client, MetricAndPoints),
Expand Down
32 changes: 23 additions & 9 deletions src/greptimedb_encoder.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,44 @@ insert_requests(#{cli_opts := Options} = _Client, [], DbName, Inserts) ->
#{dbname => DbName, authorization => #{auth_scheme => Scheme}}
end,
#{header => Header, request => {inserts, #{inserts => Inserts}}};
insert_requests(#{cli_opts := Options} = Client, [{Table, Points} | T], PrevDbName, Inserts) ->
{DbName, Insert} = insert_request(Options, Table, Points),
insert_requests(#{cli_opts := Options} = Client, [{Metric, Points} | T], PrevDbName, Inserts) ->
{DbName, Insert} = insert_request(Options, metric(Options, Metric), Points),
case PrevDbName of
unknown ->
insert_requests(Client, T, DbName, [Insert | Inserts]);
Name when Name == DbName ->
insert_requests(Client, T, Name, [Insert | Inserts])
end.

insert_request(Options, {DbName, Table}, Points) ->
Timeunit = proplists:get_value(timeunit, Options, ms),
insert_request(_Options, #{dbname := DbName, table := Table, timeunit := Timeunit}, Points) ->
RowCount = length(Points),
Columns =
lists:map(fun(Column) -> pad_null_mask(Column, RowCount) end, collect_columns(Timeunit, Points)),
Columns = lists:map(fun(Column) -> pad_null_mask(Column, RowCount) end,
collect_columns(Timeunit, Points)),
{DbName,
#{table_name => Table,
columns => Columns,
row_count => RowCount}};
insert_request(Options, Table, Points) ->
insert_request(Options, {?DEFAULT_DBNAME, Table}, Points).
row_count => RowCount}}.

%%%===================================================================
%%% Internal functions
%%%===================================================================

metric(Options, Metric) ->
metric_with_default(default_metric(Options), Metric).

default_metric(Options) ->
#{dbname => ?DEFAULT_DBNAME,
timeunit => proplists:get_value(timeunit, Options, ms)}.

%% table is required
metric_with_default(Default, #{table := _} = Metric) ->
maps:merge(Default, Metric);
%% backward compatibility
metric_with_default(Default, {DbName, Table}) ->
Default#{dbname => DbName, table => Table};
metric_with_default(Default, Table) when is_atom(Table); is_list(Table); is_binary(Table) ->
Default#{table => Table}.

collect_columns(Timeunit, Points) ->
collect_columns(Timeunit, Points, []).

Expand Down
21 changes: 21 additions & 0 deletions test/greptimedb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ all() ->
[t_write,
t_write_stream,
t_insert_requests,
t_insert_requests_with_timeunit,
t_write_failure,
t_write_batch,
t_bench_perf,
Expand Down Expand Up @@ -107,6 +108,26 @@ t_insert_requests(_) ->
end,
ok.

t_insert_requests_with_timeunit(_) ->
TsNano = 1705946037724448346,
Points = [#{fields => #{<<"temperature">> => 1},
tags =>
#{<<"from">> => <<"mqttx_4b963a8e">>,
<<"host">> => <<"serverA">>,
<<"qos">> => "0",
<<"device">> => <<"NO.1">>,
<<"region">> => <<"hangzhou">>},
timestamp => TsNano}],
AuthInfo = {basic, #{username => "test", password => "test"}},
Client = #{cli_opts => [{auth, AuthInfo}, {timeunit, second}]},
Metric = #{table => "Test", timeunit => nanosecond},
Request = greptimedb_encoder:insert_requests(Client, [{Metric, Points}]),
#{header := #{dbname := _DbName, authorization := _Auth},
request := {inserts, #{inserts := [#{columns := Columns}]}}} = Request,
{value, TimestampColumn} =
lists:search(fun(C) -> maps:get(column_name, C) == <<"greptime_timestamp">> end, Columns),
?assertEqual([TsNano], maps:get(timestamp_nanosecond_values, maps:get(values, TimestampColumn))).

t_write_failure(_) ->
Metric = <<"temperatures">>,
Points =
Expand Down

0 comments on commit e471ea6

Please sign in to comment.