Skip to content

Commit

Permalink
[#20549] YSQL: Fix Read Committed txn read time (snapshots) management
Browse files Browse the repository at this point in the history
Summary:
In read committed transaction each statement resets read time at the beginning to read the latest data from DB. This read time must be used by the whole statement.
It is possible that statement calls another statement during execution which may selects its own read time. Simplest example is the function call

```
CREATE FUNCTION foo(n INT) RETURNS INT AS
DECLARE
  vvv INT;
BEGIN
  SELECT v FROM aux_t WHERE k = n INTO vvv;       -- selects own read time before execution
  RETURN nnn;
END;

UPDATE t SET v = v + 10 WHERE foo(k) = v  -- selects own read time before execution
```

`PgClientSession` class preserves read time after applying latest operations. As a result when `statement_1` continue execution after calling of the `statement_2` read time selected by the `statement_2` will be used (it is preserved in `PgClientSession` object).
To handle this scenario YSQL should restore read time for `statement_1` prior to continue its execution.
Postgres code uses stack of `Snapshot` structures for this purpose. Each statement uses its own snapshot data. And prior to continue execution of `statement_1` postgres sets `statement_1`'s snapshot as active.
This diff reuses postgres' infrastructure of snapshots and store `read_time_serial_no` in the newly created field `yb_read_time_point_handle` of `SnapshotData` structure. When particular snapshot become active appropriate `read_time_serial_no` is propagated to the `PgTxnManager` object. Further RW operations will be sent to `PgClientSession` object with this `read_time_serial_no`.
As a result all the `statement_1`'s operations will use same `read_time_serial_no` (i.e. same read time) even in case other statements are called in the middle.
Jira: DB-9557

Test Plan:
Jenkins

New unit tests are introduced

```
 ./yb_build.sh --gtest_filter PgTxnTest.ReadAtMultipleTimestamps
 ./yb_build.sh --gtest_filter PgTxnTest.ReadAtMultipleTimestampsWithConflict
```

Reviewers: pjain, amartsinchyk, patnaik.balivada

Reviewed By: pjain

Subscribers: ybase, bogdan, yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D32313
  • Loading branch information
d-uspenskiy committed Jun 27, 2024
1 parent 507d287 commit edbd06b
Show file tree
Hide file tree
Showing 17 changed files with 458 additions and 119 deletions.
5 changes: 3 additions & 2 deletions src/postgres/src/backend/access/transam/parallel.c
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,9 @@ ParallelWorkerMain(Datum main_arg)
shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
AttachSession(*(dsm_handle *) session_dsm_handle_space);

if (fps->parallel_master_is_yb_session)
YBCRestorePgSessionParallelData(&fps->parallel_master_yb_session_data);

/*
* If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
* the leader has serialized the transaction snapshot and we must restore
Expand Down Expand Up @@ -1463,8 +1466,6 @@ ParallelWorkerMain(Datum main_arg)
{
YbUpdateCatalogCacheVersion(YbGetMasterCatalogVersion());
YBCPgResetCatalogReadTime();
if (fps->parallel_master_is_yb_session)
YBCRestorePgSessionParallelData(&fps->parallel_master_yb_session_data);
}

/*
Expand Down
1 change: 1 addition & 0 deletions src/postgres/src/backend/storage/ipc/procarray.c
Original file line number Diff line number Diff line change
Expand Up @@ -1827,6 +1827,7 @@ GetSnapshotData(Snapshot snapshot)
MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
}

snapshot->yb_read_time_point_handle = YbBuildCurrentReadTimePointHandle();
return snapshot;
}

Expand Down
19 changes: 17 additions & 2 deletions src/postgres/src/backend/utils/misc/pg_yb_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
#include "utils/lsyscache.h"
#include "utils/pg_locale.h"
#include "utils/rel.h"
#include "utils/snapshot.h"
#include "utils/spccache.h"
#include "utils/syscache.h"
#include "utils/uuid.h"
Expand Down Expand Up @@ -1554,7 +1555,7 @@ YBResetEnableNonBreakingDDLMode()
/*
* Reset yb_make_next_ddl_statement_nonbreaking to avoid its further side
* effect that may not be intended.
*
*
* Also, reset Connection Manager cache if the value was cached to begin
* with.
*/
Expand Down Expand Up @@ -3200,7 +3201,7 @@ yb_get_range_split_clause(PG_FUNCTION_ARGS)
*
* For YB backup, if an error is thrown from a PG backend, ysql_dump will
* exit, generate an empty YSQLDUMP file, and block YB backup workflow.
* Currently, we don't have the functionality to adjust options used for
* Currently, we don't have the functionality to adjust options used for
* ysql_dump on YBA and YBM, so we don't have a way to to turn on/off
* a backup-related feature used in ysql_dump.
* There are known cases which caused decoding of split points to fail in
Expand Down Expand Up @@ -4926,3 +4927,17 @@ YbCalculateTimeDifferenceInMicros(TimestampTz yb_start_time)
&microsecs);
return secs * USECS_PER_SEC + microsecs;
}

bool YbIsReadCommittedTxn()
{
return IsYBReadCommitted() &&
!(YBCPgIsDdlMode() || YBCIsInitDbModeEnvVarSet());
}

YbReadTimePointHandle YbBuildCurrentReadTimePointHandle()
{
return YbIsReadCommittedTxn()
? (YbReadTimePointHandle){
.has_value = true, .value = YBCPgGetCurrentReadTimePoint()}
: (YbReadTimePointHandle){};
}
22 changes: 17 additions & 5 deletions src/postgres/src/backend/utils/time/snapmgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,15 @@ SnapMgrInit(void)
}
}

static void
YBCOnActiveSnapshotChange()
{
const SnapshotData *snap = ActiveSnapshot ? ActiveSnapshot->as_snap : NULL;
if (snap && snap->yb_read_time_point_handle.has_value)
HandleYBStatus(YBCRestoreReadTimePoint(
snap->yb_read_time_point_handle.value));
}

/*
* GetTransactionSnapshot
* Get the appropriate snapshot for a new query in a transaction.
Expand Down Expand Up @@ -366,18 +375,14 @@ GetTransactionSnapshot(void)
/* Don't allow catalog snapshot to be older than xact snapshot. */
InvalidateCatalogSnapshot();

CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);

/*
* YB: We have to RESET read point in YSQL for READ COMMITTED isolation level.
* A read point is analogous to the snapshot in PostgreSQL.
*
* We also need to flush all earlier operations so that they complete on the
* previous snapshot.
*
* READ COMMITTED semantics don't apply to DDLs.
*/
if (IsYBReadCommitted() && !YBCPgIsDdlMode())
if (YbIsReadCommittedTxn())
{
HandleYBStatus(YBCPgFlushBufferedOperations());
/* If this is a retry for a kReadRestart error, avoid resetting the read point */
Expand All @@ -388,6 +393,8 @@ GetTransactionSnapshot(void)
}
}

CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);

return CurrentSnapshot;
}

Expand Down Expand Up @@ -788,6 +795,8 @@ PushActiveSnapshot(Snapshot snap)
ActiveSnapshot = newactive;
if (OldestActiveSnapshot == NULL)
OldestActiveSnapshot = ActiveSnapshot;

YBCOnActiveSnapshotChange();
}

/*
Expand Down Expand Up @@ -862,6 +871,7 @@ PopActiveSnapshot(void)
OldestActiveSnapshot = NULL;

SnapshotResetXmin();
YBCOnActiveSnapshotChange();
}

void
Expand Down Expand Up @@ -1088,6 +1098,7 @@ AtSubAbort_Snapshot(int level)
}

SnapshotResetXmin();
YBCOnActiveSnapshotChange();
}

/*
Expand Down Expand Up @@ -2194,6 +2205,7 @@ RestoreSnapshot(char *start_address)
snapshot->curcid = serialized_snapshot.curcid;
snapshot->whenTaken = serialized_snapshot.whenTaken;
snapshot->lsn = serialized_snapshot.lsn;
snapshot->yb_read_time_point_handle = YbBuildCurrentReadTimePointHandle();

/* Copy XIDs, if present. */
if (serialized_snapshot.xcnt > 0)
Expand Down
4 changes: 4 additions & 0 deletions src/postgres/src/include/pg_yb_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -1148,4 +1148,8 @@ static inline bool YbIsNormalDbOidReserved(Oid db_oid) {

extern Oid YbGetSQLIncrementCatalogVersionsFunctionOid();

extern bool YbIsReadCommittedTxn();

extern YbReadTimePointHandle YbBuildCurrentReadTimePointHandle();

#endif /* PG_YB_UTILS_H */
7 changes: 7 additions & 0 deletions src/postgres/src/include/utils/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ typedef struct SnapshotData *Snapshot;
typedef bool (*SnapshotSatisfiesFunc) (HeapTuple htup,
Snapshot snapshot, Buffer buffer);

typedef struct YbReadTimePointHandle
{
bool has_value;
uint64 value;
} YbReadTimePointHandle;

/*
* Struct representing all kind of possible snapshots.
*
Expand Down Expand Up @@ -112,6 +118,7 @@ typedef struct SnapshotData

TimestampTz whenTaken; /* timestamp when snapshot was taken */
XLogRecPtr lsn; /* position in the WAL stream when taken */
YbReadTimePointHandle yb_read_time_point_handle;
} SnapshotData;

/*
Expand Down
10 changes: 5 additions & 5 deletions src/yb/common/consistent_read_point.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,12 @@ ConsistentReadPoint::Momento ConsistentReadPoint::GetMomento() const {
return {read_time_, restart_read_ht_, local_limits_, restarts_};
}

void ConsistentReadPoint::SetMomento(ConsistentReadPoint::Momento&& momento) {
void ConsistentReadPoint::SetMomento(const ConsistentReadPoint::Momento& momento) {
std::lock_guard lock(mutex_);
read_time_ = std::move(momento.read_time_);
restart_read_ht_ = std::move(momento.restart_read_ht_);
local_limits_ = std::move(momento.local_limits_);
restarts_ = std::move(momento.restarts_);
read_time_ = momento.read_time_;
restart_read_ht_ = momento.restart_read_ht_;
local_limits_ = momento.local_limits_;
restarts_ = momento.restarts_;
}

} // namespace yb
6 changes: 5 additions & 1 deletion src/yb/common/consistent_read_point.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class ConsistentReadPoint {
void SetInTxnLimit(HybridTime value) EXCLUDES(mutex_);

class Momento {
public:
const ReadHybridTime& read_time() const { return read_time_; }

private:
Momento(const ReadHybridTime& read_time,
HybridTime restart_read_ht,
const HybridTimeMap& local_limits,
Expand All @@ -109,7 +113,7 @@ class ConsistentReadPoint {
};

[[nodiscard]] Momento GetMomento() const EXCLUDES(mutex_);
void SetMomento(ConsistentReadPoint::Momento&& momento) EXCLUDES(mutex_);
void SetMomento(const ConsistentReadPoint::Momento& momento) EXCLUDES(mutex_);

private:
inline void SetReadTimeUnlocked(
Expand Down
Loading

0 comments on commit edbd06b

Please sign in to comment.