Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Cache metric ID and forward the metric info to series writer
Browse files Browse the repository at this point in the history
This is an optimization to remove the need to fetch metric ID and
table name when storing new series data since we already fetch
this info in a prior step of the process.
  • Loading branch information
antekresic committed Jan 31, 2022
1 parent c8b4335 commit fd0bffb
Show file tree
Hide file tree
Showing 14 changed files with 221 additions and 150 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ We use the following categories for changes:
### Added
- Add support to instrument Promscale's Otel GRPC server with Prometheus metrics [#1061]

### Changed
- Optimized series ID creation by caching metric ID [#1062]

### Fixed
- Fix broken `promscale_packager` telemetry field for docker envs [#1077]
- Fix compression of old chunks thus reducing storage requirements [#1081]
Expand Down
19 changes: 17 additions & 2 deletions pkg/migrations/migration_files_generated.go

Large diffs are not rendered by default.

35 changes: 13 additions & 22 deletions pkg/migrations/sql/idempotent/base.sql
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_or_create_label_key(TEXT) to prom_w
-- This uses some pretty heavy locks so use sparingly.
-- locks: label_key_position, data table, series partition (in view creation),
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_new_pos_for_key(
metric_name text, key_name_array text[], is_for_exemplar boolean)
metric_name text, metric_table name, key_name_array text[], is_for_exemplar boolean)
RETURNS int[]
AS $func$
DECLARE
Expand All @@ -486,7 +486,6 @@ DECLARE
key_name text;
next_position int;
max_position int;
metric_table name;
position_table_name text;
BEGIN
If is_for_exemplar THEN
Expand Down Expand Up @@ -516,9 +515,6 @@ BEGIN

-- Lock tables for exclusiveness.
IF NOT is_for_exemplar THEN
SELECT table_name
FROM SCHEMA_CATALOG.get_or_create_metric_table_name(get_new_pos_for_key.metric_name)
INTO metric_table;
--lock as for ALTER TABLE because we are in effect changing the schema here
--also makes sure the next_position below is correct in terms of concurrency
EXECUTE format('LOCK TABLE prom_data_series.%I IN SHARE UPDATE EXCLUSIVE MODE', metric_table);
Expand Down Expand Up @@ -602,9 +598,9 @@ SECURITY DEFINER
--search path must be set for security definer
SET search_path = pg_temp;
--redundant given schema settings but extra caution for security definers
REVOKE ALL ON FUNCTION SCHEMA_CATALOG.get_new_pos_for_key(text, text[], boolean) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_new_pos_for_key(text, text[], boolean) TO prom_reader; -- For exemplars querying.
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_new_pos_for_key(text, text[], boolean) TO prom_writer;
REVOKE ALL ON FUNCTION SCHEMA_CATALOG.get_new_pos_for_key(text, name, text[], boolean) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_new_pos_for_key(text, name, text[], boolean) TO prom_reader; -- For exemplars querying.
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_new_pos_for_key(text, name, text[], boolean) TO prom_writer;

--should only be called after a check that that the label doesn't exist
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_new_label_id(key_name text, value_name text, OUT id INT)
Expand Down Expand Up @@ -771,7 +767,9 @@ AS $$
AND lkp.key = get_or_create_label_key_pos.key
UNION ALL
SELECT
(SCHEMA_CATALOG.get_new_pos_for_key(get_or_create_label_key_pos.metric_name, array[get_or_create_label_key_pos.key], false))[1]
(SCHEMA_CATALOG.get_new_pos_for_key(get_or_create_label_key_pos.metric_name, m.table_name, array[get_or_create_label_key_pos.key], false))[1]
FROM
SCHEMA_CATALOG.get_or_create_metric_table_name(get_or_create_label_key_pos.metric_name) m
LIMIT 1
$$
LANGUAGE SQL VOLATILE;
Expand Down Expand Up @@ -934,7 +932,7 @@ COMMENT ON FUNCTION SCHEMA_CATALOG.get_or_create_label_array(text, text[], text[
IS 'converts a metric name, array of keys, and array of values to a label array';
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_or_create_label_array(TEXT, text[], text[]) TO prom_writer;

CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_or_create_label_ids(metric_name TEXT, label_keys text[], label_values text[])
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_or_create_label_ids(metric_name TEXT, metric_table NAME, label_keys text[], label_values text[])
RETURNS TABLE(pos int[], id int[], label_key text[], label_value text[]) AS $$
WITH cte as (
SELECT
Expand All @@ -958,17 +956,17 @@ RETURNS TABLE(pos int[], id int[], label_key text[], label_value text[]) AS $$
case when count(*) = count(known_pos) Then
array_agg(known_pos)
else
SCHEMA_CATALOG.get_new_pos_for_key(get_or_create_label_ids.metric_name, array_agg(key_str), false)
SCHEMA_CATALOG.get_new_pos_for_key(get_or_create_label_ids.metric_name, get_or_create_label_ids.metric_table, array_agg(key_str), false)
end as poss,
array_agg(label_id) as label_ids,
array_agg(key_str) as keys,
array_agg(val_str) as vals
FROM cte
$$
LANGUAGE SQL VOLATILE;
COMMENT ON FUNCTION SCHEMA_CATALOG.get_or_create_label_ids(text, text[], text[])
COMMENT ON FUNCTION SCHEMA_CATALOG.get_or_create_label_ids(text, name, text[], text[])
IS 'converts a metric name, array of keys, and array of values to a list of label ids';
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_or_create_label_ids(TEXT, text[], text[]) TO prom_writer;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_or_create_label_ids(TEXT, NAME, text[], text[]) TO prom_writer;


-- Returns ids, keys and values for a label_array
Expand Down Expand Up @@ -1169,16 +1167,9 @@ LANGUAGE PLPGSQL VOLATILE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_or_create_series_id_for_kv_array(TEXT, text[], text[]) TO prom_writer;



CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_or_create_series_id_for_label_array(metric_name TEXT, larray SCHEMA_PROM.label_array, OUT table_name NAME, OUT series_id BIGINT)
CREATE OR REPLACE FUNCTION SCHEMA_CATALOG.get_or_create_series_id_for_label_array(metric_id INT, table_name NAME, larray SCHEMA_PROM.label_array, OUT series_id BIGINT)
AS $func$
DECLARE
metric_id int;
BEGIN
--need to make sure the series partition exists
SELECT mtn.id, mtn.table_name FROM SCHEMA_CATALOG.get_or_create_metric_table_name(metric_name) mtn
INTO metric_id, table_name;

EXECUTE format($query$
WITH existing AS (
SELECT
Expand All @@ -1201,7 +1192,7 @@ BEGIN
END
$func$
LANGUAGE PLPGSQL VOLATILE;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_or_create_series_id_for_label_array(TEXT, SCHEMA_PROM.label_array) TO prom_writer;
GRANT EXECUTE ON FUNCTION SCHEMA_CATALOG.get_or_create_series_id_for_label_array(INT, NAME, SCHEMA_PROM.label_array) TO prom_writer;

--
-- Parameter manipulation functions
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP FUNCTION IF EXISTS SCHEMA_CATALOG.get_new_pos_for_key(text, text[], boolean);
DROP FUNCTION IF EXISTS SCHEMA_CATALOG.get_or_create_label_ids(TEXT, text[], text[]);
DROP FUNCTION IF EXISTS SCHEMA_CATALOG.get_or_create_series_id_for_label_array(TEXT, SCHEMA_PROM.label_array);
24 changes: 12 additions & 12 deletions pkg/pgmodel/ingestor/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
const maxInsertStmtPerTxn = 100

type copyRequest struct {
data *pendingBuffer
table string
data *pendingBuffer
info *pgmodel.MetricInfo
}

var (
Expand All @@ -39,25 +39,25 @@ var (

type copyBatch []copyRequest

func (reqs copyBatch) VisitSeries(callBack func(s *pgmodel.Series) error) error {
func (reqs copyBatch) VisitSeries(callBack func(info *pgmodel.MetricInfo, s *pgmodel.Series) error) error {
for _, req := range reqs {
insertables := req.data.batch.Data()
for i := range insertables {
if err := callBack(insertables[i].Series()); err != nil {
if err := callBack(req.info, insertables[i].Series()); err != nil {
return err
}
}
}
return nil
}

func (reqs copyBatch) VisitExemplar(callBack func(s *pgmodel.PromExemplars) error) error {
func (reqs copyBatch) VisitExemplar(callBack func(info *pgmodel.MetricInfo, s *pgmodel.PromExemplars) error) error {
for _, req := range reqs {
insertables := req.data.batch.Data()
for i := range insertables {
exemplar, ok := insertables[i].(*pgmodel.PromExemplars)
if ok {
if err := callBack(exemplar); err != nil {
if err := callBack(req.info, exemplar); err != nil {
return err
}
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func runCopier(conn pgxconn.PgxConn, in chan readRequest, sw *seriesWriter, elf

// sort to prevent deadlocks on table locks
sort.Slice(insertBatch, func(i, j int) bool {
return insertBatch[i].table < insertBatch[j].table
return insertBatch[i].info.TableName < insertBatch[j].info.TableName
})

err := persistBatch(ctx, conn, sw, elf, insertBatch)
Expand Down Expand Up @@ -235,7 +235,7 @@ func tryRecovery(ctx context.Context, conn pgxconn.PgxConn, err error, req copyR
pgErr, ok := err.(*pgconn.PgError)
if !ok {
errMsg := err.Error()
log.Warn("msg", fmt.Sprintf("unexpected error while inserting to %s", req.table), "err", errMsg)
log.Warn("msg", fmt.Sprintf("unexpected error while inserting to %s", req.info.TableName), "err", errMsg)
return err
}

Expand All @@ -244,7 +244,7 @@ func tryRecovery(ctx context.Context, conn pgxconn.PgxConn, err error, req copyR
return handleDecompression(ctx, conn, req, minTime)
}

log.Warn("msg", fmt.Sprintf("unexpected postgres error while inserting to %s", req.table), "err", pgErr.Error())
log.Warn("msg", fmt.Sprintf("unexpected postgres error while inserting to %s", req.info.TableName), "err", pgErr.Error())
return pgErr
}

Expand All @@ -260,7 +260,7 @@ func retryAfterDecompression(ctx context.Context, conn pgxconn.PgxConn, req copy
ctx, span := tracer.Default().Start(ctx, "retry-after-decompression")
defer span.End()
var (
table = req.table
table = req.info.TableName
minTime = model.Time(minTimeInt).Time()
)
//how much faster are we at ingestion than wall-clock time?
Expand Down Expand Up @@ -402,7 +402,7 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, reqs ...copyRequest
totalExemplars += numExemplars
if hasSamples {
numRowsPerInsert = append(numRowsPerInsert, numSamples)
batch.Queue("SELECT "+schema.Catalog+".insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])", req.table, timeSamples, valSamples, seriesIdSamples)
batch.Queue("SELECT "+schema.Catalog+".insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])", req.info.TableName, timeSamples, valSamples, seriesIdSamples)
}
if hasExemplars {
// We cannot send 2-D [][]TEXT to postgres via the pgx.encoder. For this and easier querying reasons, we create a
Expand All @@ -413,7 +413,7 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, reqs ...copyRequest
return fmt.Errorf("setting prom_api.label_value_array[] value: %w", err), lowestMinTime
}
numRowsPerInsert = append(numRowsPerInsert, numExemplars)
batch.Queue("SELECT "+schema.Catalog+".insert_exemplar_row($1::NAME, $2::TIMESTAMPTZ[], $3::BIGINT[], $4::"+schema.Prom+".label_value_array[], $5::DOUBLE PRECISION[])", req.table, timeExemplars, seriesIdExemplars, labelValues, valExemplars)
batch.Queue("SELECT "+schema.Catalog+".insert_exemplar_row($1::NAME, $2::TIMESTAMPTZ[], $3::BIGINT[], $4::"+schema.Prom+".label_value_array[], $5::DOUBLE PRECISION[])", req.info.TableName, timeExemplars, seriesIdExemplars, labelValues, valExemplars)
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/pgmodel/ingestor/exemplar_label_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
)

const (
getExemplarLabelPositions = "SELECT * FROM " + schema.Catalog + ".get_new_pos_for_key($1::TEXT, $2::TEXT[], true)"
getExemplarLabelPositions = "SELECT * FROM " + schema.Catalog + ".get_new_pos_for_key($1::TEXT, $2::NAME, $3::TEXT[], true)"
)

type ExemplarVisitor interface {
VisitExemplar(func(s *model.PromExemplars) error) error
VisitExemplar(func(info *model.MetricInfo, s *model.PromExemplars) error) error
}

type ExemplarLabelFormatter struct {
Expand All @@ -39,7 +39,7 @@ func (t *ExemplarLabelFormatter) orderExemplarLabelValues(ev ExemplarVisitor) er
pendingIndexes []positionPending
)

err := ev.VisitExemplar(func(row *model.PromExemplars) error {
err := ev.VisitExemplar(func(info *model.MetricInfo, row *model.PromExemplars) error {
labelKeyIndex, entryExists := t.exemplarKeyPosCache.GetLabelPositions(row.Series().MetricName())
if entryExists {
//make sure all positions exist
Expand All @@ -53,7 +53,7 @@ func (t *ExemplarLabelFormatter) orderExemplarLabelValues(ev ExemplarVisitor) er
// Allocate a batch only if required. If the cache does the job, why to waste on allocs.
batch = t.conn.NewBatch()
}
batch.Queue(getExemplarLabelPositions, row.Series().MetricName(), unorderedLabelKeys)
batch.Queue(getExemplarLabelPositions, row.Series().MetricName(), info.TableName, unorderedLabelKeys)
pendingIndexes = append(pendingIndexes, positionPending{ // One-to-One relation with queries
exemplarRef: row,
metricName: row.Series().MetricName(),
Expand Down
Loading

0 comments on commit fd0bffb

Please sign in to comment.