Skip to content

Batch Mode

DuyHai DOAN edited this page May 5, 2013 · 17 revisions

Batch Mutations

Sometimes it is more convenient to pile up all column family modifications and send them in one request to Cassandra. This is called batch mutations.

To support batching feature, Achilles provides a ThriftBatchingEntityManager. This entit manager exposes the same methods than the normal ThriftEntityManager plus 4 new operations related to batch:

public void startBatch();

public void startBatch(ConsistencyLevel readLevel, ConsistencyLevel writeLevel);

public void endBatch();

public void cleanBatch();
  • startBatch(): start a new batch session
  • startBatch(ConsistencyLevel readLevel, ConsistencyLevel writeLevel): start a new batch session with custom consistency levels
  • endBatch(): end an existing batch session and flush the pending mutation to Cassandra
  • cleanBatch(): clean all pending mutations for the current batch session and reset the state

## Implementation

The batch session is managed by a batch flushing context in Achilles. Unlike an immediate flushing context, the latter builds up batch mutations and only flush them to Cassandra when endBatch() is invoked.

Insert and update operations are added to a mutator by Achilles and saved in a temporary map inside the batch flushing context. There is one mutator object per column family.

At flush time, all the mutator are sent to Cassandra and the mutator map is cleared.

The ThriftBatchingEntityManager is statefull and not thread-safe by design because of the batch flushing context.

A ThriftBatchingEntityManager instance can be obtained by invoking batchingEntityManager() on any ThriftEntityManager

Any ThriftBatchingEntityManager instance should be discarded right after the end of the batch.

Any managed entity and WideMap/Counter proxies created by a ThriftBatchingEntityManager is bound to the batch flushing context and should be discarded at the end of the batch


## Usage

Let's consider the following UserEntity:

@Entity 
public class UserEntity implements Serializable
{
	private static final long serialVersionUID = 1L;

	@Id
	private Long id;

	@Column
	private String firstname;

	@Column
	private String lastname; 

	@Column
	private Counter tweetsCount; 
		
	@Column(table="timeline")
	WideMap<UUID,String> timeline;

	@JoinColumn(table="followers")
	WideMap<String,UserEntity> followers;
}

When the user create a new tweet message, we need to spread the tweet to all its followers.

// Start batch
ThriftBatchingEntityManager batchEm = em.batchingEntityManager();
batchEm.startBatch();

UserEntity user = batchEm.find(UserEntity.class,10L);

user.setFirstname("new firstname");
user.setLastname("new lastname");

// Save name change. No flushing yet
batchEm.merge(user);

// Add new tweet messages
user.getTimeline().insert(uuid1,"tweet 1");
user.getTimeline().insert(uuid2,"tweet 2");

// Iterate on all followers
KeyValueIterator<String,UserEntity> followersIter = user.getFollowers().iterator(null,null,Integer.MAX_VALUE);

// Spread the tweets to all followers
while(followersIter.hasNext())
{
	// Join user entity is loaded
	UserEntity follower = followersIter.nextValue();

	// No flushing even if insert() is called
	follower.getTimeline().insert(uuid1,"tweet 1");
	follower.getTimeline().insert(uuid2,"tweet 2");
}

// Counter value increment, immediatly flushed even if in Batch mode
user.getTweetsCount().incr(2L);

// Flush user name change and tweet insertions in user and his followers timelines
batchEm.endBatch();

The above example illustrates how batching mode works. All dirty checking and state changes on the user entity is not flushed when em.merge() is called.

In the same manner, all the tweet insertions to timeline WideMap structures are not flushed until endBatch() is invoked.

However, the counter value increment is flushed immediatly to Cassandra as per design of counter in Achilles


## Exception handling and recovery

As already mentioned, the ThriftBatchingEntityManager is stateful so if any exception occurs at flush time, Achilles will try to recover by clearing the mutator map and cleaning up the batch flushing context. Theorically you can re-use the same instance of ThriftBatchingEntityManager after the exception is caught.

However it is strongly recommended to create a new ThriftBatchingEntityManager instance and not re-use the previous one because creating a new instance is a very cheap operation.

All managed entities created by the old ThriftBatchingEntityManager instance should also be discarded because they keep a reference on the (potentially) staled batch flushing context.


## Batch consistency level

It is possible to start a batch session with custom consistency levels:

// Spawn net batchEm instance
ThriftBatchingEntityManager batchEm = em.batchingEntityManager();

// Start batch with custom consistency levels
batchEm.startBatch(ConsistencyLevel.ONE,ConsistencyLevel.QUORUM);

In the above example, all read operation will be done with consistency ONE and write operations with consistency QUORUM.

If a batch session is started with custom consistency levels:

  • Invoking JPA operations with custom consistency levels on the batchEm instance will raise an AchillesException
  • Invoking WideMap operations with custom consistency levels on proxies created by the batchEm instance will raise an AchillesException
  • Invoking Counter operations with custom consistency levels on proxies created by the batchEm instance will raise an AchillesException

Bottom line, the batch custom consistency levels override any static and runtime levels

Home

Clone this wiki locally