Skip to content

Commit

Permalink
[BACKPORT pg15-cherrypicks][#20549] YSQL: Fix Read Committed txn read…
Browse files Browse the repository at this point in the history
… time (snapshots) management

Summary:
Merge changes:
- snapmgr.c
  RestoreSnapshot function. New field was added in Pg15 into SnapshotData structure. Preserve both.
- snapshot.h
  - SnapshotData struct. New field was added in Pg15 into snapshot structure. Preserve both.
  - struct YbReadTimePointHandle: adjacent conflict between upstream PG 63746189b23815415cacc715fdc4f6b991f1a5e7 removing
    SnapshotSatisfiesFunc code and this YB master commit adding YbReadTimePointHandle code.
- procarray.c:
  - GetSnapshotDataReuse: ... upstream PG 623a9ba79bbdd11c5eccb30b8bd5c446130e521c

Original commit: edbd06b / D32313
In read committed transaction each statement resets read time at the beginning to read the latest data from DB. This read time must be used by the whole statement.
It is possible that statement calls another statement during execution which may selects its own read time. Simplest example is the function call

```
CREATE FUNCTION foo(n INT) RETURNS INT AS
DECLARE
  vvv INT;
BEGIN
  SELECT v FROM aux_t WHERE k = n INTO vvv;       -- selects own read time before execution
  RETURN nnn;
END;

UPDATE t SET v = v + 10 WHERE foo(k) = v  -- selects own read time before execution
```

`PgClientSession` class preserves read time after applying latest operations. As a result when `statement_1` continue execution after calling of the `statement_2` read time selected by the `statement_2` will be used (it is preserved in `PgClientSession` object).
To handle this scenario YSQL should restore read time for `statement_1` prior to continue its execution.
Postgres code uses stack of `Snapshot` structures for this purpose. Each statement uses its own snapshot data. And prior to continue execution of `statement_1` postgres sets `statement_1`'s snapshot as active.
This diff reuses postgres' infrastructure of snapshots and store `read_time_serial_no` in the newly created field `yb_read_time_point_handle` of `SnapshotData` structure. When particular snapshot become active appropriate `read_time_serial_no` is propagated to the `PgTxnManager` object. Further RW operations will be sent to `PgClientSession` object with this `read_time_serial_no`.
As a result all the `statement_1`'s operations will use same `read_time_serial_no` (i.e. same read time) even in case other statements are called in the middle.
Jira: DB-9557

**Note:** The merge diff contains extra change corresponding to original diff.

```
--- a/src/postgres/src/backend/storage/ipc/procarray.c
+++ b/src/postgres/src/backend/storage/ipc/procarray.c
@@ -2216,6 +2216,7 @@ GetSnapshotDataReuse(Snapshot snapshot)

    GetSnapshotDataInitOldSnapshot(snapshot);

+   snapshot->yb_read_time_point_handle = YbBuildCurrentReadTimePointHandle();
    return true;
 }
```

Test Plan:
Jenkins

New unit tests are introduced

```
 ./yb_build.sh --gtest_filter PgTxnTest.ReadAtMultipleTimestamps
 ./yb_build.sh --gtest_filter PgTxnTest.ReadAtMultipleTimestampsWithConflict
```

Reviewers: jason, tfoucher, pjain

Reviewed By: jason

Subscribers: yql, bogdan, ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D36616
  • Loading branch information
d-uspenskiy committed Jul 17, 2024
1 parent 9bffce3 commit 259a71e
Show file tree
Hide file tree
Showing 17 changed files with 459 additions and 119 deletions.
5 changes: 3 additions & 2 deletions src/postgres/src/backend/access/transam/parallel.c
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,9 @@ ParallelWorkerMain(Datum main_arg)
shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
AttachSession(*(dsm_handle *) session_dsm_handle_space);

if (fps->parallel_master_is_yb_session)
YBCRestorePgSessionParallelData(&fps->parallel_master_yb_session_data);

/*
* If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
* the leader has serialized the transaction snapshot and we must restore
Expand Down Expand Up @@ -1512,8 +1515,6 @@ ParallelWorkerMain(Datum main_arg)
{
YbUpdateCatalogCacheVersion(YbGetMasterCatalogVersion());
YBCPgResetCatalogReadTime();
if (fps->parallel_master_is_yb_session)
YBCRestorePgSessionParallelData(&fps->parallel_master_yb_session_data);
}

/*
Expand Down
2 changes: 2 additions & 0 deletions src/postgres/src/backend/storage/ipc/procarray.c
Original file line number Diff line number Diff line change
Expand Up @@ -2216,6 +2216,7 @@ GetSnapshotDataReuse(Snapshot snapshot)

GetSnapshotDataInitOldSnapshot(snapshot);

snapshot->yb_read_time_point_handle = YbBuildCurrentReadTimePointHandle();
return true;
}

Expand Down Expand Up @@ -2602,6 +2603,7 @@ GetSnapshotData(Snapshot snapshot)

GetSnapshotDataInitOldSnapshot(snapshot);

snapshot->yb_read_time_point_handle = YbBuildCurrentReadTimePointHandle();
return snapshot;
}

Expand Down
19 changes: 17 additions & 2 deletions src/postgres/src/backend/utils/misc/pg_yb_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
#include "utils/lsyscache.h"
#include "utils/pg_locale.h"
#include "utils/rel.h"
#include "utils/snapshot.h"
#include "utils/spccache.h"
#include "utils/syscache.h"
#include "utils/uuid.h"
Expand Down Expand Up @@ -1744,7 +1745,7 @@ YBResetEnableNonBreakingDDLMode()
/*
* Reset yb_make_next_ddl_statement_nonbreaking to avoid its further side
* effect that may not be intended.
*
*
* Also, reset Connection Manager cache if the value was cached to begin
* with.
*/
Expand Down Expand Up @@ -3437,7 +3438,7 @@ yb_get_range_split_clause(PG_FUNCTION_ARGS)
*
* For YB backup, if an error is thrown from a PG backend, ysql_dump will
* exit, generate an empty YSQLDUMP file, and block YB backup workflow.
* Currently, we don't have the functionality to adjust options used for
* Currently, we don't have the functionality to adjust options used for
* ysql_dump on YBA and YBM, so we don't have a way to to turn on/off
* a backup-related feature used in ysql_dump.
* There are known cases which caused decoding of split points to fail in
Expand Down Expand Up @@ -5204,3 +5205,17 @@ YbCalculateTimeDifferenceInMicros(TimestampTz yb_start_time)
&microsecs);
return secs * USECS_PER_SEC + microsecs;
}

bool YbIsReadCommittedTxn()
{
return IsYBReadCommitted() &&
!(YBCPgIsDdlMode() || YBCIsInitDbModeEnvVarSet());
}

YbReadTimePointHandle YbBuildCurrentReadTimePointHandle()
{
return YbIsReadCommittedTxn()
? (YbReadTimePointHandle){
.has_value = true, .value = YBCPgGetCurrentReadTimePoint()}
: (YbReadTimePointHandle){};
}
22 changes: 17 additions & 5 deletions src/postgres/src/backend/utils/time/snapmgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,15 @@ SnapMgrInit(void)
}
}

static void
YBCOnActiveSnapshotChange()
{
const SnapshotData *snap = ActiveSnapshot ? ActiveSnapshot->as_snap : NULL;
if (snap && snap->yb_read_time_point_handle.has_value)
HandleYBStatus(YBCRestoreReadTimePoint(
snap->yb_read_time_point_handle.value));
}

/*
* GetTransactionSnapshot
* Get the appropriate snapshot for a new query in a transaction.
Expand Down Expand Up @@ -313,18 +322,14 @@ GetTransactionSnapshot(void)
/* Don't allow catalog snapshot to be older than xact snapshot. */
InvalidateCatalogSnapshot();

CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);

/*
* YB: We have to RESET read point in YSQL for READ COMMITTED isolation level.
* A read point is analogous to the snapshot in PostgreSQL.
*
* We also need to flush all earlier operations so that they complete on the
* previous snapshot.
*
* READ COMMITTED semantics don't apply to DDLs.
*/
if (IsYBReadCommitted() && !YBCPgIsDdlMode())
if (YbIsReadCommittedTxn())
{
HandleYBStatus(YBCPgFlushBufferedOperations());
/* If this is a retry for a kReadRestart error, avoid resetting the read point */
Expand All @@ -335,6 +340,8 @@ GetTransactionSnapshot(void)
}
}

CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);

return CurrentSnapshot;
}

Expand Down Expand Up @@ -741,6 +748,8 @@ PushActiveSnapshotWithLevel(Snapshot snap, int snap_level)
ActiveSnapshot = newactive;
if (OldestActiveSnapshot == NULL)
OldestActiveSnapshot = ActiveSnapshot;

YBCOnActiveSnapshotChange();
}

/*
Expand Down Expand Up @@ -815,6 +824,7 @@ PopActiveSnapshot(void)
OldestActiveSnapshot = NULL;

SnapshotResetXmin();
YBCOnActiveSnapshotChange();
}

void
Expand Down Expand Up @@ -1041,6 +1051,7 @@ AtSubAbort_Snapshot(int level)
}

SnapshotResetXmin();
YBCOnActiveSnapshotChange();
}

/*
Expand Down Expand Up @@ -2261,6 +2272,7 @@ RestoreSnapshot(char *start_address)
snapshot->whenTaken = serialized_snapshot.whenTaken;
snapshot->lsn = serialized_snapshot.lsn;
snapshot->snapXactCompletionCount = 0;
snapshot->yb_read_time_point_handle = YbBuildCurrentReadTimePointHandle();

/* Copy XIDs, if present. */
if (serialized_snapshot.xcnt > 0)
Expand Down
4 changes: 4 additions & 0 deletions src/postgres/src/include/pg_yb_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -1176,4 +1176,8 @@ static inline bool YbIsNormalDbOidReserved(Oid db_oid) {

extern Oid YbGetSQLIncrementCatalogVersionsFunctionOid();

extern bool YbIsReadCommittedTxn();

extern YbReadTimePointHandle YbBuildCurrentReadTimePointHandle();

#endif /* PG_YB_UTILS_H */
7 changes: 7 additions & 0 deletions src/postgres/src/include/utils/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ typedef struct SnapshotData *Snapshot;

#define InvalidSnapshot ((Snapshot) NULL)

typedef struct YbReadTimePointHandle
{
bool has_value;
uint64 value;
} YbReadTimePointHandle;

/*
* Struct representing all kind of possible snapshots.
*
Expand Down Expand Up @@ -214,6 +220,7 @@ typedef struct SnapshotData
* transactions completed since the last GetSnapshotData().
*/
uint64 snapXactCompletionCount;
YbReadTimePointHandle yb_read_time_point_handle;
} SnapshotData;

#endif /* SNAPSHOT_H */
10 changes: 5 additions & 5 deletions src/yb/common/consistent_read_point.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,12 @@ ConsistentReadPoint::Momento ConsistentReadPoint::GetMomento() const {
return {read_time_, restart_read_ht_, local_limits_, restarts_};
}

void ConsistentReadPoint::SetMomento(ConsistentReadPoint::Momento&& momento) {
void ConsistentReadPoint::SetMomento(const ConsistentReadPoint::Momento& momento) {
std::lock_guard lock(mutex_);
read_time_ = std::move(momento.read_time_);
restart_read_ht_ = std::move(momento.restart_read_ht_);
local_limits_ = std::move(momento.local_limits_);
restarts_ = std::move(momento.restarts_);
read_time_ = momento.read_time_;
restart_read_ht_ = momento.restart_read_ht_;
local_limits_ = momento.local_limits_;
restarts_ = momento.restarts_;
}

} // namespace yb
6 changes: 5 additions & 1 deletion src/yb/common/consistent_read_point.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class ConsistentReadPoint {
void SetInTxnLimit(HybridTime value) EXCLUDES(mutex_);

class Momento {
public:
const ReadHybridTime& read_time() const { return read_time_; }

private:
Momento(const ReadHybridTime& read_time,
HybridTime restart_read_ht,
const HybridTimeMap& local_limits,
Expand All @@ -109,7 +113,7 @@ class ConsistentReadPoint {
};

[[nodiscard]] Momento GetMomento() const EXCLUDES(mutex_);
void SetMomento(ConsistentReadPoint::Momento&& momento) EXCLUDES(mutex_);
void SetMomento(const ConsistentReadPoint::Momento& momento) EXCLUDES(mutex_);

private:
inline void SetReadTimeUnlocked(
Expand Down
Loading

0 comments on commit 259a71e

Please sign in to comment.