Skip to content

Commit

Permalink
Merge branch 'main' into gather-sort
Browse files Browse the repository at this point in the history
  • Loading branch information
akuzm authored Dec 19, 2024
2 parents f0a3308 + 9a85dd9 commit 68544da
Show file tree
Hide file tree
Showing 28 changed files with 935 additions and 740 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/apt-arm-packages.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ name: APT ARM64 packages
jobs:
apt_tests:
name: APT ARM64 ${{ matrix.image }} PG${{ matrix.pg }}
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
Expand Down
280 changes: 153 additions & 127 deletions README.md

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ typedef struct CrossModuleFunctions
PGFunction decompress_chunk;
void (*decompress_batches_for_insert)(const ChunkInsertState *state, TupleTableSlot *slot);
bool (*decompress_target_segments)(HypertableModifyState *ht_state);
int (*hypercore_decompress_update_segment)(Relation relation, const ItemPointer ctid,
TupleTableSlot *slot, Snapshot snapshot,
ItemPointer new_tid);
/* The compression functions below are not installed in SQL as part of create extension;
* They are installed and tested during testing scripts. They are exposed in cross-module
* functions because they may be very useful for debugging customer problems if the sql
Expand Down
15 changes: 5 additions & 10 deletions src/nodes/chunk_dispatch/chunk_dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,22 +168,16 @@ ts_chunk_dispatch_decompress_batches_for_insert(ChunkDispatch *dispatch, ChunkIn
{
if (cis->chunk_compressed)
{
OnConflictAction onconflict_action = ts_chunk_dispatch_get_on_conflict_action(dispatch);

if (cis->use_tam && onconflict_action != ONCONFLICT_UPDATE)
{
/* With our own TAM, a unique index covers both the compressed and
* non-compressed data, so there is no need to decompress anything
* when doing inserts. */
}
/*
* If this is an INSERT into a compressed chunk with UNIQUE or
* PRIMARY KEY constraints we need to make sure any batches that could
* potentially lead to a conflict are in the decompressed chunk so
* postgres can do proper constraint checking.
*/
else if (ts_cm_functions->decompress_batches_for_insert)
if (ts_cm_functions->decompress_batches_for_insert)
{
OnConflictAction onconflict_action = ts_chunk_dispatch_get_on_conflict_action(dispatch);

ts_cm_functions->decompress_batches_for_insert(cis, slot);

/* mark rows visible */
Expand Down Expand Up @@ -445,7 +439,8 @@ chunk_dispatch_exec(CustomScanState *node)
on_chunk_insert_state_changed,
state);

ts_chunk_dispatch_decompress_batches_for_insert(dispatch, cis, slot);
if (!cis->use_tam)
ts_chunk_dispatch_decompress_batches_for_insert(dispatch, cis, slot);

MemoryContextSwitchTo(old);

Expand Down
34 changes: 34 additions & 0 deletions src/nodes/hypertable_modify.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
#include <postgres.h>
#include <access/tupdesc.h>
#include <access/xact.h>
#include <catalog/pg_attribute.h>
#include <catalog/pg_type.h>
#include <executor/execPartition.h>
Expand All @@ -31,6 +32,7 @@
#include "hypertable_modify.h"
#include "nodes/chunk_append/chunk_append.h"
#include "nodes/chunk_dispatch/chunk_dispatch.h"
#include "utils.h"

static void fireASTriggers(ModifyTableState *node);
static void fireBSTriggers(ModifyTableState *node);
Expand Down Expand Up @@ -2391,6 +2393,38 @@ ExecOnConflictUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
ExecWithCheckOptions(WCO_RLS_CONFLICT_CHECK, resultRelInfo, existing, mtstate->ps.state);
}

/*
* If the target relation is using Hypercore TAM, the conflict resolution
* index might point to a compressed segment containing the conflicting
* row. It is possible to decompress the segment immediately so that the
* update can proceed on the decompressed row.
*/
if (ts_is_hypercore_am(resultRelInfo->ri_RelationDesc->rd_rel->relam))
{
ItemPointerData new_tid;
int ntuples =
ts_cm_functions->hypercore_decompress_update_segment(resultRelInfo->ri_RelationDesc,
conflictTid,
existing,
context->estate->es_snapshot,
&new_tid);

if (ntuples > 0)
{
/*
* The conflicting row was decompressed, so must update the
* conflictTid to point to the decompressed row.
*/
ItemPointerCopy(&new_tid, conflictTid);
/*
* Since data was decompressed, the command counter was
* incremented to make it visible. Make sure the executor uses the
* latest command ID to see the changes.
*/
context->estate->es_output_cid = GetCurrentCommandId(true);
}
}

/* Project the new tuple version */
ExecProject(resultRelInfo->ri_onConflict->oc_ProjInfo);

Expand Down
103 changes: 40 additions & 63 deletions tsl/src/hypercore/arrow_cache_explain.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,43 +61,12 @@ standard_ExplainOneQuery(Query *query, int cursorOptions, IntoClause *into, Expl
}
#endif

static struct
static inline void
append_if_positive(StringInfo info, const char *key, long long val)
{
const char *hits_text; /* Number of cache hits */
const char *miss_text; /* Number of cache misses */
const char *evict_text; /* Number of cache evictions */
const char *decompress_text; /* Number of arrays decompressed */
const char *decompress_calls_text; /* Number of calls to decompress an array */
} format_texts[] = {
[EXPLAIN_FORMAT_TEXT] = {
.hits_text = "Array Cache Hits",
.miss_text = "Array Cache Misses",
.evict_text = "Array Cache Evictions",
.decompress_text = "Array Decompressions",
.decompress_calls_text = "Array Decompression Calls",
},
[EXPLAIN_FORMAT_XML]= {
.hits_text = "hits",
.miss_text = "misses",
.evict_text = "evictions",
.decompress_text = "decompressions",
.decompress_calls_text = "decompression calls",
},
[EXPLAIN_FORMAT_JSON] = {
.hits_text = "hits",
.miss_text = "misses",
.evict_text = "evictions",
.decompress_text = "decompressions",
.decompress_calls_text = "decompression calls",
},
[EXPLAIN_FORMAT_YAML] = {
.hits_text = "hits",
.miss_text = "misses",
.evict_text = "evictions",
.decompress_text = "decompressions",
.decompress_calls_text = "decompression calls",
},
};
if (val > 0)
appendStringInfo(info, " %s=%lld", key, val);
}

static void
explain_decompression(Query *query, int cursorOptions, IntoClause *into, ExplainState *es,
Expand All @@ -106,33 +75,41 @@ explain_decompression(Query *query, int cursorOptions, IntoClause *into, Explain
standard_ExplainOneQuery(query, cursorOptions, into, es, queryString, params, queryEnv);
if (decompress_cache_print)
{
Assert(es->format < sizeof(format_texts) / sizeof(*format_texts));

ExplainOpenGroup("Array cache", "Arrow Array Cache", true, es);
ExplainPropertyInteger(format_texts[es->format].hits_text,
NULL,
decompress_cache_stats.hits,
es);
ExplainPropertyInteger(format_texts[es->format].miss_text,
NULL,
decompress_cache_stats.misses,
es);
ExplainPropertyInteger(format_texts[es->format].evict_text,
NULL,
decompress_cache_stats.evictions,
es);
ExplainPropertyInteger(format_texts[es->format].decompress_text,
NULL,
decompress_cache_stats.decompressions,
es);

if (es->verbose)
ExplainPropertyInteger(format_texts[es->format].decompress_calls_text,
NULL,
decompress_cache_stats.decompress_calls,
es);

ExplainCloseGroup("Array cache", "Arrow Array Cache", true, es);
const bool has_decompress_data = decompress_cache_stats.decompressions > 0 ||
decompress_cache_stats.decompress_calls > 0;
const bool has_cache_data = decompress_cache_stats.hits > 0 ||
decompress_cache_stats.misses > 0 ||
decompress_cache_stats.evictions > 0;
if (has_decompress_data || has_cache_data)
{
if (es->format == EXPLAIN_FORMAT_TEXT)
{
appendStringInfoString(es->str, "Array:");
if (has_cache_data)
appendStringInfoString(es->str, " cache");
append_if_positive(es->str, "hits", decompress_cache_stats.hits);
append_if_positive(es->str, "misses", decompress_cache_stats.misses);
append_if_positive(es->str, "evictions", decompress_cache_stats.evictions);
if (has_decompress_data)
appendStringInfoString(es->str, ", decompress");
append_if_positive(es->str, "count", decompress_cache_stats.decompressions);
append_if_positive(es->str, "calls", decompress_cache_stats.decompress_calls);
appendStringInfoChar(es->str, '\n');
}
else
{
ExplainOpenGroup("Array Cache", "Arrow Array Cache", true, es);
ExplainPropertyInteger("hits", NULL, decompress_cache_stats.hits, es);
ExplainPropertyInteger("misses", NULL, decompress_cache_stats.misses, es);
ExplainPropertyInteger("evictions", NULL, decompress_cache_stats.evictions, es);
ExplainCloseGroup("Array Cache", "Arrow Array Cache", true, es);

ExplainOpenGroup("Array Decompress", "Arrow Array Decompress", true, es);
ExplainPropertyInteger("count", NULL, decompress_cache_stats.decompressions, es);
ExplainPropertyInteger("calls", NULL, decompress_cache_stats.decompress_calls, es);
ExplainCloseGroup("Array Decompress", "Arrow Array Decompress", true, es);
}
}

decompress_cache_print = false;
memset(&decompress_cache_stats, 0, sizeof(struct DecompressCacheStats));
Expand Down
78 changes: 78 additions & 0 deletions tsl/src/hypercore/hypercore_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <access/attnum.h>
#include <access/heapam.h>
#include <access/hio.h>
#include <access/htup_details.h>
#include <access/rewriteheap.h>
#include <access/sdir.h>
#include <access/skey.h>
Expand Down Expand Up @@ -49,6 +50,7 @@
#include <utils/palloc.h>
#include <utils/rel.h>
#include <utils/sampling.h>
#include <utils/snapmgr.h>
#include <utils/syscache.h>
#include <utils/tuplesort.h>
#include <utils/typcache.h>
Expand Down Expand Up @@ -1787,6 +1789,82 @@ hypercore_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, Snapsh
return result;
}

/*
* Decompress a segment that contains the row given by ctid.
*
* This function is called during an upsert (ON CONFLICT DO UPDATE), where the
* conflicting row points to a compressed segment that needs to be
* decompressed before the update can take place. This function is used to
* decompress that segment into a set of individual rows and insert them into
* the non-compressed region.
*
* Returns the number of rows in the segment that were decompressed, or 0 if
* the TID pointed to a regular (non-compressed) tuple. If any rows are
* decompressed, the TID of the de-compressed conflicting row is returned via
* "new_ctid". If no rows were decompressed, the value of "new_ctid" is
* undefined.
*/
int
hypercore_decompress_update_segment(Relation relation, const ItemPointer ctid, TupleTableSlot *slot,
Snapshot snapshot, ItemPointer new_ctid)
{
HypercoreInfo *hcinfo;
Relation crel;
TupleTableSlot *cslot;
ItemPointerData decoded_tid;
TM_Result result;
TM_FailureData tmfd;
int n_batch_rows = 0;
uint16 tuple_index;
bool should_free;

/* Nothing to do if this is not a compressed segment */
if (!is_compressed_tid(ctid))
return 0;

Assert(TTS_IS_ARROWTUPLE(slot));
Assert(!TTS_EMPTY(slot));
Assert(ItemPointerEquals(ctid, &slot->tts_tid));

hcinfo = RelationGetHypercoreInfo(relation);
crel = table_open(hcinfo->compressed_relid, RowExclusiveLock);
tuple_index = hypercore_tid_decode(&decoded_tid, ctid);
cslot = arrow_slot_get_compressed_slot(slot, NULL);
HeapTuple tuple = ExecFetchSlotHeapTuple(cslot, false, &should_free);

RowDecompressor decompressor = build_decompressor(crel, relation);
heap_deform_tuple(tuple,
RelationGetDescr(crel),
decompressor.compressed_datums,
decompressor.compressed_is_nulls);

/* Must delete the segment before calling the decompression function below
* or otherwise index updates will lead to conflicts */
result = table_tuple_delete(decompressor.in_rel,
&cslot->tts_tid,
decompressor.mycid,
snapshot,
InvalidSnapshot,
true,
&tmfd,
false);

Ensure(result == TM_Ok, "could not delete compressed segment, result: %u", result);

n_batch_rows = row_decompressor_decompress_row_to_table(&decompressor);
/* Return the TID of the decompressed conflicting tuple. Tuple index is
* 1-indexed, so subtract 1. */
slot = decompressor.decompressed_slots[tuple_index - 1];
ItemPointerCopy(&slot->tts_tid, new_ctid);

/* Need to make decompressed (and deleted segment) visible */
CommandCounterIncrement();
row_decompressor_close(&decompressor);
table_close(crel, NoLock);

return n_batch_rows;
}

#if PG16_LT
typedef bool TU_UpdateIndexes;
#endif
Expand Down
3 changes: 3 additions & 0 deletions tsl/src/hypercore/hypercore_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ extern void hypercore_xact_event(XactEvent event, void *arg);
extern bool hypercore_set_truncate_compressed(bool onoff);
extern void hypercore_scan_set_skip_compressed(TableScanDesc scan, bool skip);
extern void hypercore_skip_compressed_data_for_relation(Oid relid);
extern int hypercore_decompress_update_segment(Relation relation, const ItemPointer ctid,
TupleTableSlot *slot, Snapshot snapshot,
ItemPointer new_ctid);

typedef struct ColumnCompressionSettings
{
Expand Down
1 change: 1 addition & 0 deletions tsl/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ CrossModuleFunctions tsl_cm_functions = {
.decompress_target_segments = decompress_target_segments,
.hypercore_handler = hypercore_handler,
.hypercore_proxy_handler = hypercore_proxy_handler,
.hypercore_decompress_update_segment = hypercore_decompress_update_segment,
.is_compressed_tid = tsl_is_compressed_tid,
.ddl_command_start = tsl_ddl_command_start,
.ddl_command_end = tsl_ddl_command_end,
Expand Down
Loading

0 comments on commit 68544da

Please sign in to comment.