Skip to content

Task scheduler

elandau edited this page Nov 28, 2012 · 9 revisions

Experimental!!!

Overview

This recipe uses cassandra as the persistence for a scheduling queue. The implementation uses a combination of the distributed row lock recipe and two layers of sharding (yes sharding) to achieve a high throughput durable queue. Sharding solves two problems. The first is a time based rolling shard where at any given time most of the traffic is going to one shard. The reason for this is to minimize the impact of a growing wide row and to allow compaction to clean up. This is a rolling (i.e. mod) shard to avoid having to go back and check any old shards that may contain data. The second layer of sharding is meant to distribute load across the cluster. This mitigates hot spots and also reduces lock contention where there are multiple client.

Data Model

Each shard is implemented as a separate wide row with the shard name as the row key and columns representing items in the queue.

The row key format is {QueueName}:{RollingTimeShard}:{ConcurrencyShard}.

The column format is a composite which takes advantage of the composite comparator type sorted to achieve several key features. The structure is {MessageType}{Priority}{TimeUUID}{State}.

MessageType

Metadata (0) - Metadata is used to store any scheduler configuration information and to notify clients of configuration changes (this ins't implemented yet) Lock (1) - Row lock columns exist on the same row to take advantage of row level isolation and to reduce having to make separate calls for releasing the lock and updating the queue. Task (2) - This is the actual task.

This structure makes it possible to read both the lock columns and task in a single call to cassandra thereby reducing overhead. All of the lock columns will be at the top of the response followed by the actual tasks.

Priority

Priority makes it possible to inject high priority messages for processing.

TimeUUID

TimeUUID provides both uniqueness and sorting by time of tasks in the queue. For lock columns it is primarily used to guarantee unique ids for clients trying to take the lock. Time ordering provides both FIFO like semantics as well as future task execution.

State

State is used by the locking algorithm.

Examples

Creating a Scheduler client

The first step to using the scheduler is to create a TaskScheduler instance. Multiple producers and consumers may be created from a single TaskScheduler instance.

The following example shows how to create a scheduler with 50 rolling time shards of 30 seconds and 20 concurrency shards within each time shard. That's a total of 1000 rows being used by the scheduler. Notice that the scheduler uses a keyspace client that was previously created.

CountingSchedulerStats stats  = new CountingSchedulerStats();
TaskScheduler scheduler = new ShardedDistributedScheduler.Builder()
    .withConsistencyLevel(ConsistencyLevel.CL_QUORUM)
    .withColumnFamily("Scheduler")
    .withKeyspace(keyspace)
    .withSchedulerStats(stats)
    .withBuckets(50,  30,  TimeUnit.SECONDS)
    .withShardCount(20)
    .withPollInterval(100L,  TimeUnit.MILLISECONDS)
    .build();

Create the scheduler

The TaskScheduler API provides a convenience method that creates a metadata row within the same column family as the shards which other clients can read to get the scheduler configuration (mainly number of shards).

scheduler.createScheduler();

Producing events

Events may be inserted one at a time or in bulk. Note that a row doesn't need to be locked when producing events.

TaskProducer producer = scheduler.createProducer();
producer.scheduleTask(new Task().setData("The quick brown fox jumped over the lazy cow"));

Consuming events

Events are consumed using the following sequence

  1. Lock a shard
  2. Read N tasks
  3. Submit a single mutation which releases the lock, delete the tasks, inserts a timeout task for each task.
  4. Process the task
  5. ACK the task by deleting the timeout column

The following example reads a block of up to 100 tasks and acks the entire set with one call. Tasks can be ack'd individually as well.

Collection<Task> tasks = null;
try {
    tasks = consumer.acquireTasks(100);
    try {
        for (Task task : tasks) {
            // Do something with the task
        }
    }
finally {
    if (tasks != null) 
        consumer.ackTasks(tasks);
}
Clone this wiki locally