Skip to content

Batch Mode

DuyHai DOAN edited this page Nov 9, 2014 · 17 revisions

Batch Mutations

Sometimes it is more convenient to stack up all modifications and send them in one request to Cassandra. This is called batch mutations.

To support batching feature, Achilles provides a Batch. The Batch exposes the same modification methods (insert(), update(), delete(),...) than the normal PersistenceManager plus 6 new operations related to batch:

public void startBatch();

public void startBatch(ConsistencyLevel consistencyLevel);

public void batchNativeStatement(RegularStatement regularStatement, Object ... boundValues);

public void batchNativeStatementWithCASListener(RegularStatement regularStatement, CASResultListener casResultListener, Object... boundValues);

public void endBatch();

public void cleanBatch();
  • startBatch(): start a new batch session with default application-wide Consistency level
  • startBatch(ConsistencyLevel consistencyLevel): start a new batch session with custom consistency write level
  • batchNativeStatement(RegularStatement regularStatement, Object ... boundValues): add a CQL3 statement to this batch, optionally with bound values
  • batchNativeStatementWithCASListener(RegularStatement regularStatement, CASResultListener casResultListener, Object ... boundValues): add a CQL3 statement to this batch with CAS result listener, optionally with bound values. This method is usefull when the native statement is using CAS
  • endBatch(): end an existing batch session and flush the pending mutation to Cassandra
  • cleanBatch(): clean all pending mutations for the current batch session and reset the state

## Implementation details

The batch session is managed by a batch flushing context in Achilles. Unlike an immediate flushing context, the latter stacks up modifications and only flush them to Cassandra when endBatch() is invoked using atomic batch feature of Cassandra 2.0.

Insert and update operations are saved in a temporary map inside the batch flushing context.

  • The Batch is stateful and not thread-safe by design because of the batch flushing context.
  • A Batch instance can be obtained by invoking createBatch() on the PersistenceManager or PersistenceManagerFactory
  • Any Batch instance should be discarded right after the end of the batch.
  • Any managed entity and Counter proxy created by a Batch is bound to the batch flushing context and should be discarded at the end of the batch
  • The runtime Consistency levels declared when starting a new batch will be applied to any statement WRITE inside the batch

## Usage

Let's consider the following UserEntity:

@Entity 
public class UserEntity 
{
	@Id
	private Long id;

	@Column
	private String firstname;

	@Column
	private String lastname; 

	@Column
	private Counter tweetsCount; 

	public UserEntity(Long userId,String firstname,String lastname, Counter tweetsCount)
	{...}
}

When the user create a new tweet message, we need to spread the tweet to all its followers.

// Start batch
Batch batch = manager.createBatch();
batch.startBatch();

UserEntity user = batch.find(UserEntity.class,10L);

user.setFirstname("new firstname");
user.setLastname("new lastname");

// Save name change. No flushing yet
batch.update(user);

// Create new user. No flushing yet
batch.insert(new UserEntity(10L,"John","DOO",CounterBuilder.incr(10));


// Counter value increment, immediately read from Cassandra. No flushing yet
user.getTweetsCount().incr(2L);

// Flush first user name change, new user creation and the counter increment to Cassandra
batch.endBatch();

The above example illustrates how batching mode works. All dirty checking and state changes on the user entity is not flushed when manager.update() is called. Similarly new entities insertion is not flushed until endBatch() is called


## Exception and recovery

As already mentioned, the Batch is stateful so if any exception occurs at flush time, Achilles will try to recover by clearing the statement list and cleaning up the batch flushing context. Theoretically you can re-use the same instance of Batch after the exception is caught.

However it is strongly recommended to create a new Batch instance and not re-use the previous one because creating a new instance is a very cheap operation.

All managed entities created by the old Batch instance should also be discarded because they keep a reference on the (potentially) staled batch flushing context.


## Batch consistency level

It is possible to start a batch session with custom consistency level for WRITE :

// Spawn new batch instance
Batch batch = manager.createBatch();

// Start batch with consistency level QUORUM for WRITE
batch.startBatch(ConsistencyLevel.QUORUM);

In the above example, all WRITE operation will be done with consistency QUORUM.

If a batch session is started with custom consistency levels:

  • Invoking common operations like insert(), update() ... with custom consistency levels on the batch instance will raise an AchillesException
  • Invoking Counter operations with custom consistency levels on proxies created by the batch instance is allowed though and will override the consistency level defined by the batch

## Eventual atomicity

All batch operations are atomic eventually, in the sense that either all upsert statements succeed or they fail after sometime.

Statements Ordering

Currently, statements ordering is not supported by CQL3 batches (see CASSANDRA-6426 for more details). If you issue several updates on the same column with different values, Cassandra will use the following algorithm to solve conflict:

  1. if timestamps are different, pick the column with the largest timestamp (the value being a regular column or a tombstone)
  2. if timestamps are the same, and one of the columns is a tombstone ('null') - pick the tombstone
  3. if timestamps are the same, and none of the columns are tombstones, pick the column with the largest value

As you can see, the behavior is quite unexpected. Therefore to support statements ordering Achilles can assign a runtime-generated timestamp to each of your batch statement. The timestamp has a micro-second resolution.

The downside with this approach is that it is required to synchronize the timestamp of all you clients if they access the same set of data. Otherwise you'll risk update interleaving between different cliens having time drift.

By default, Achilles does not generate update statements with runtime generated timestamp with micro-second precision so your statements will use the default Cassandra resolution algorithm above.

To enable statement ordering you can create an ordered batch as follow:

// Create new batch ORDERED instance
Batch batch = manager.createOrderedBatch();
  

Home

Clone this wiki locally