Skip to content

Timestamp Oracle

Ryan Leung edited this page Jul 31, 2020 · 5 revisions

Background

The transaction implementation of TiDB is based on Google Percolator distributed transaction protocol. In the whole process, we need a timestamp strictly maintains a linear increase to ensure the linearizability of the transaction. To achieve this in a distributed system, there are three mainstream ways in the industry:

  • True Time
  • Hybrid Logic Clock
  • Timestamp Oracle

Google Spanner uses True Time, which uses a set of atomic level of clocks hardware devices based on GPS global synchronization to achieve global time consistency. By exposing a few simple APIs externally, it can help distributed systems to get a linear timestamp. However, not all companies have the financial resources of Google. Also, as a hardware solution, its high cost and lack of versatility make the use of True Time just impossible for most companies.

CockroachDB uses the Hybrid Logic Clock solution. HLC is completely an algorithmic solution. It achieves a linear increase in timestamp by mixing physical time and logical time. Since HLC is based on NTP (Network Time Protocol), taking into account the possible physical time errors caused by synchronization and other issues, there is often a valid time boundary range in the HLC algorithm. When the physical time is incorrect, it will require us to do some logical time complementing to ensure the linear growth of HLC between different nodes, which leads to the corresponding sacrifice that in order to achieve external consistency, the use of HLC scheme may cause high transaction latency.

TiDB adopts the Timestamp Oracle solution which makes PD as a global single point unified time service. Since only a single point is involved and there is no complicated algorithm, it is relatively simple to implement. Although compared with the above two schemes, every time TiDB starts a transaction, network communication with PD will cause additional overhead, but for TiDB and PD clusters that are often under the same DC, this part of the overhead is often within the ideal except. Even for transactions involving multiple DCs, we will optimize this kind of situation through some mechanisms (for example, transactions that completely involve a local table can only require a local TSO without immediately synchronizing with the global TSO). After many considerations, we finally chose TSO as the solution for the distributed time service. The following diagram is the architecture of TiDB. You can see two main functions PD provides for TiDB are TSO and data location's metadata synchronization.

TiDB Architecture

Objectives

Since PD is responsible for the TSO work of the entire cluster, how to achieve low latency, high performance and good fault tolerance are several goals that we need to pay attention to when implementing. We will explain this whole thing through four parts: basic structure, calibration, allocation, and progression.

Design

Basic Structure

For PD, we need to ensure that it can quickly allocate a large number of TSOs for transactions, and we also need to ensure the allocated TSOs are always monotonically increasing, which means once a timestamp t1 is allocated, there will never be a timestamp t2 satisfying t2 <= t1.

TSO is an int64 integer consists of two parts: Physical time and Logical time. Physical time is the current Unix system timestamp (in milliseconds), while Logical time is a counter in the range [0, 1 << 18]. In this way, the millisecond physical time granularity can be further refined to a maximum of 262144 TSOs, which is sufficient for most use cases.

// server/tso/tso.go
// atomicObject represents a tso
type atomicObject struct {
	physical time.Time
	logical  int64  // maxLogical = int64(1 << 18)
}

When actually using atomicObject, we will always use an UnsafePointer points to it as the only way to reach TSO. Compared with passing values directly, the purpose of dealing like this is to control the behaviour we can do to TSO when passing it between the call chains, just in case the TSO returned is changed in a certain point which will break the linearizability constraint.

Calibration

The TSO allocation of PD is completed by the PD leader in the cluster. In order to persist the maximum TSO ever allocated in the PD cluster to avoid affecting TiDB transactions due to the failure of the leader, we need to store the physical timestamp of the TSO in etcd. At the same time, in order to increase the speed of responding to RPC requests, we must avoid interacting with etcd too frequently, we just shouldn’t read and write etcd every time there is a TSO update. So what we want to store is not the last time service result, but the range of a timestamp window. Further explanation will be discussed later.

Every time a new PD leader is elected, a calibration will be conducted to take the last saved physical timestamp from etcd and compare it with the local physical time for correcting.

// server/tso/tso.go
const updateTimestampGuard = time.Millisecond

// Load last timestamp stored in etcd
last, err := t.loadTimestamp()
	if err != nil {
	return err
}

next := time.Now()
if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard {
	next = last.Add(updateTimestampGuard)
}

When using the current system time next to subtract the last timestamp last stored in ectd, if the result is less than the constant updateTimestampGuard we set (1 millisecond for default), we can think that the current system time may not be accurate. In this case, we need to use last as the starting point for the next persistent timestamp. It is not difficult to understand that if the timestamp of the current system and the last used TSO are too close or even smaller than later, there will be a potential risk of breaking linear consistency, so you need to use last and force an increase (in a certain range) to ensure that the TSO allocated by the newly appointed leader definitely will be greater than all previously allocated TSOs.

save := next.Add(t.saveInterval)
if err = t.saveTimestamp(save); err != nil {
	return err
}

current := &atomicObject{
	physical: next,
}
t.lease = lease
atomic.StorePointer(&t.ts, unsafe.Pointer(current))

After that, we add a time interval to the selected TSO physical time that needs to be persisted next time, the default is 3 seconds, and then use saveTimestamp(save) to save it into etcd. The purpose of doing this is to be able to directly use the timestamp of next in the memory within the time interval and avoid frequent interaction with etcd. The performance of directly performing TSO calculation and allocation in memory is very good. Our own internal test shows that PD can allocate millions of TSOs per second. Also, for you to notice, every time this time window expires, the PD will continue to perform the same action to update the timestamp in etcd by save + 3s.

Allocation

Based on the finished calibration, we already have the timestamp data we can use to performance a calculation in memory. To further improve efficiency and reduce overhead, we often obtain TSO from PD in batches. The client will first collect a batch of transaction TSO requests, such as n, and then directly send a request with parameter n to PD. After PD receives this request, it will generate n TSOs and return it to the client.

// server/tso/tso.go
var resp pdpb.Timestamp
for i := 0; i < maxRetryCount; i++ {
	current := (*atomicObject)(atomic.LoadPointer(&t.ts))
	if current == nil || current.physical == typeutil.ZeroTime {
		return pdpb.Timestamp{}, errors.New("can not get timestamp, may be not leader")
	}
	
	resp.Physical = current.physical.UnixNano() / int64(time.Millisecond)
	resp.Logical = atomic.AddInt64(&current.logical, int64(count))
	if resp.Logical >= maxLogical {
		time.Sleep(UpdateTimestampStep)
		continue
	}
	if t.lease == nil || t.lease.IsExpired() {
		return pdpb.Timestamp{}, errors.New("alloc timestamp failed, lease expired")
	}
	return resp, nil
}

When the client requests PD's TSO service, what is returned to the client is a TSO that mixes physical and logical time. The physical clock in the PD will increase with the system time, and the logical clock part will only passively increase with the TSO request atomically. Here we should notice: due to the range limit of the logic clock if this limit is exceeded, the leader will choose to sleep for the length of UpdateTimestampStep (default 50 milliseconds) to wait for the time to be advanced. UpdateTimestampStep is the time slice interval for the PD to update the system timestamp operation, so at least wait for this period of time, the physical time in the system will definitely be advanced with the corresponding logical time will be reset to zero, and then you can continue to allocate timestamp. During the TSO calculation process, it is also necessary to check the leader's lease in real time. If the lease expires, no further TSO can be allocated to ensure that there is only one server, a.k.a. leader, in the PD cluster that can generate TSO at any time.

Progression

The update operation of TSO is carried out along with the leader's lease renewal as the system time passes.

// server/server.go
for {
	select {
	case <-leaderTicker.C:
		if lease.IsExpired() {
			log.Info("lease expired, leader step down")
			return
		}
		etcdLeader := s.member.GetEtcdLeader()
		if etcdLeader != s.member.ID() {
			log.Info("etcd leader changed, resigns leadership", zap.String("old-leader-name", s.Name()))
			return
		}
	case <-tsTicker.C:
		if err = s.tso.UpdateTimestamp(); err != nil {
			log.Error("failed to update timestamp", zap.Error(err))
			return
		}
	case <-ctx.Done():
		// Server is closed and it should return nil.
		log.Info("server is closed")
		return
	}
}

The UpdateTimestamp function mainly does three things. One is to update the current physical time of TSO in memory (as mentioned before, the logical time only increases passively with the allocation request and does not increase actively), the other is to check whether the current logical time exceeds the threshold, and the third is to timely Update the time window in etcd. At the same time, in order to ensure the linear consistency of TSO, the UpdateTimestamp function must ensure the following constraints during the entire procedure:

  • Physical time strictly increases monotonically
  • The timestamp stored in etcd strictly increases monotonically
  • The physical time must be less than the stored timestamp

Let’s talk about the first thing UpdateTimestamp does: update the physical time of TSO in the current memory while ensuring that the physical time is strictly monotonously increasing. In fact, it is simple and easy to just let the physical time of TSO elapse with the real world. The intuitive way is to update it to the current system in real time. But of course, the current system time does not strictly guarantee the constraints because it may be manually changed, backtrack after the network sync, or the PD leader newly elected has a slower time. To cover these situations, only when the system time is greater than the current (or old) TSO physical timestamp will it be updated to ensure constraints.

// Physical time now minus prev.physical
jetLag := typeutil.SubTimeByWallClock(now, prev.physical)
// If the system time is greater, it will be synchronized with the system time.
if jetLag > updateTimestampGuard {
	next = now
}

Let’s talk about checking whether the logic time exceeds the threshold. Although the logic time has a range of [0, 1 << 18], we still have to consider the possibility of this range being broken. In order to avoid overflow, we will check the logic time in time. When the value exceeds half of the maximum range (this is currently fixed, after our consideration, it is sufficient to cover most scenes), the logical time will be cleared and the physical time will be added 1 millisecond.

if prevLogical > maxLogical/2 {
	// The reason choosing maxLogical/2 here is that it's big enough for common cases.
	// Because there is enough timestamp can be allocated before next update.
	log.Warn("the logical time may be not enough")
	next = prev.physical.Add(time.Millisecond)
}

As for the third, in the previous part, we mentioned that in order to store and directly use the timestamp in memory for TSO allocation without frequent interaction with etcd, PD will write a time window to etcd and update it timely. Notice that the constant updateTimestampGuard in the code has a value of one millisecond. When we find that the last value stored in etcd and the current time are close to one millisecond or less, it means that the last window time is about to or has expired. We need to slide the time window to open up a newly available time space by adding the default 3s time interval and writing it into etcd.

// The time window needs to be updated and saved to etcd.
if typeutil.SubTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= updateTimestampGuard {
	save := next.Add(t.saveInterval)
	if err := t.saveTimestamp(save); err != nil {
		return err
	}
}

As a result, we fully reviewed the design of TSO from the appointment of a PD leader with the linearizability constraint guarantee. In order to achieve good performance and ensure the transaction efficiency of TiDB, we also introduce some concepts such as time windows to improve allocation performance.

For more details, please see tso.go.