Skip to content

Latest commit



378 lines (278 loc) · 14.2 KB

File metadata and controls

378 lines (278 loc) · 14.2 KB

My First Database

Hopefully you completed part one of the introduction. This is part two. My aim in this episode is to start making a basic Key->CRDT store. But before that, a quick recap of what we did so far.

What Did You Do?

So you followed part one and it worked. I haven't seen a single mail to the list so assume either no-one did it, or no-one failed. Pretending it is the latter is the only way I can go on writing this.

So what do you have on your machine? When you ran that command:

make devrel

and subsequently joined the nodes into a cluster, what happened? What is a node? A node of what?

What Even Is Riak Core?

You made the most basic riak_core application possible. Riak Core is the distributed systems goop that Riak and other Basho products are built on. It provides the abstractions, and basic services needed to run a clustered application.

The Ring

Riak Core applications are built around something called the ring. At its simplest the ring is a zero to 2^160-1 integer hashspace. This space is split into partitions, each an equal range of the hash space. A Partition has an ID or index that is the maximum hashed key it is responsible for. We use a hash function to assign work (or storage). We hash a key to an integer on the ring, and the next partition greater than that integer owns that key.

The Claim Algorithm assigns partitions to Nodes. When you add, or remove nodes, claim causes a number of partitions to move to their new homes on a different node automatically. When you ran ping in the last tutorial:

rel/crdtdb/bin/crdtdb console
crdtdb@> crdtdb:ping().
(crdtdb@> crdtdb:ping().
(crdtdb@> crdtdb:ping().
(crdtdb@> crdtdb:ping().
(crdtdb@> crdtdb:ping().

That integer is the partition that answered the ping with a pong. Riak Core is responsible for keeping a balanced number of partitions on each node.

Ring For Routing

As well as being a conceptual thing, the ring is a data structure that is gossiped between all Riak Core application nodes. It contains information about what node is responsible for an index or partition of the ring, a routing table in DHT speak. This means a request can hit any node and find its way to the right node in one hop.


A Node is usually a server, virtual or physical, but as you've seen from make devrel many application nodes can be run on a single host. A Node is just an instance of an Erlang release. Nodes host partitions. And partitions manifest as vnodes.

Vnodes For The Workers!

A Virtual Node, or vnode is the actual Erlang process that is responsible for a partition of the ring.

vnodes are the workers. Each vnode is an erlang process. Riak Core manages processes for the partitions hosted by a node but in order for our Riak Core application to do anything useful, we need to provide it with vnode code.

A vnode is also an Erlang Behaviour. If you're used to an OO language, a Behaviour is a little like an interface, or even an abstract-class, or both. A behaviour tells us what function callbacks we need to code in order to fit into the Riak Core framework and have our vnode be part of a Riak Core application.

If you want lots of details about all the callbacks we need to implement to make our vnode, you should have a look at Ryan Zezeski's Try-Try-Try blog post. I've cribbed from it liberally here, but plan to go into less detail.


CRDTDB is a database for CRDTs. It's a Key/Value store. Our vnode is going to manage the storage and retrieval of CRDTs on disk.

A Database In My Database

Luckily Erlang comes with a really simple way to store Erlang Terms on disk, it's called dets and we'll use that for now. It has a 2GB per file limit, but we're not making a real database, they're hard™. Later maybe we'll refactor to use leveldb or bitcask.

And since we're going to store CRDTs, for now we can use riak_dt.

Let's Go

OK. In case you don't like typing I've pushed this code to the SyncFree github account. You can clone the project and make rel if you want just read/run the code.


First we need riak_dt so edit rebar.config to include:

{deps, [
    {lager, "2.0", {git, "git://", {tag, "2.0.0"}}},
    {riak_core, ".*", {git, "git://", {tag, "develop"}}},
    {riak_dt, ".*", {git, "git://", {tag, "develop"}}}

And run

./rebar get-deps

To pull the library down from github. Also add


To rel/reltool.config in the first list section, right before crdtdb.

The Vnode Code

Really quick sidebar: setting up Emacs for erlang is easy. With that done, let's open up src/crdtb_vnode.erl and get started. I don't want to make this an erlang tutorial so I may skip over things in a confusing way, if you get that sensation please mail the list.

Init / Terminate

init/1 is called by riak core when a vnode process starts up. It is passed the index or partition number as an argument.

In our vnode init we need to open our dets file and store the handle to it in the vnode's state.

Add a dets field to the processes state record:

-record(state, {partition, dets}).

And open the dets file in the init/1 fun:

init([Partition]) ->
    File = filename:join(app_helper:get_env(riak_core, platform_data_dir),
   {ok, Dets} = dets:open_file(Partition, [{file, File}, {type, set}]),
   {ok, #state { partition=Partition, dets=Dets }}.

When the vnode finishes Riak Core will call the terminate/2 function. We should close our dets table here.

terminate(_Reason, #state{dets=Dets}) ->


vnodes handle commands. Think of vnode commands as the API of the vnode. What does our vnode want to do? Get CRDTs and Store CRDTs. Lets add these functions. First export them.

-export([get/2, put/3]).

Add a handy macro:

-define(SYNC(PrefList, Command),
    riak_core_vnode_master:sync_command(PrefList, Command, crdtdb_vnode_master)).

The Vnode Master is responsible for dispatching commands to vnodes. sync_command is as it sounds, send the command in a synchronous, blocking call, and wait for the result.

Then the function bodies:

%% @doc get the CRDT stored under Key
get(Preflist, Key) ->
    ?SYNC(Preflist, {get, Key}).

%% @doc put the given CRDT as the value under Key
put(Preflist, Key, CRDT) ->
    ?SYNC(Preflist, {put, Key, CRDT}).

These are just API sugar, they simply package the arguments up for the vnode master to dispatch. Don't worry about what the Preflist argument is for now, we'll get to it.

Handle Command

For every command our vnode handles we need to add a handle_command/3 function call. The vnode master will call this callback function with the argument we gave it above. And it will call it at each vnode in the Preflist it is given. Let's do that now.


Get is simple, just read the CRDT off the disk, and return it.

%% Get the CRDT stored at Key
handle_command({get, Key}, _Sender, State=#state{dets=Dets}) ->
    Reply = case dets:lookup(Dets, Key) of
            [] -> {error, notfound};
            [{Key, CRDT}] -> {ok, CRDT};
            Error -> Error
    {reply, Reply, State};


Put might be a bit more complex. Since we're using CRDTs, we don't want to worry about conflicts, interleaving writes, and things like that. So let's make the most of CRDT-magic and merge state with what is on disk already when we put. This basically means:

Read what is on disk, merge with it, write the result.

What if the type of CRDT we want to write is different to that on disk? Bad luck! We can fix that later. We do need to know the type of thing we are storing though, so that we can merge it. To that end I've decided that when we say CRDT in this app, we mean the record:

#crdt{mod, value}

Which I've added to the file crdt.hrl.

-record(crdt, {mod, value}).

Ok, the put command handling code:

handle_command({put, Key, #crdt{mod=Mod, value=Val}=CRDT}, _Sender, State=#state{dets=Dets}) ->
    Reply = case dets:lookup(Dets, Key) of
            [] -> dets:insert(Dets, {Key, CRDT});
            [{Key, #crdt{mod=Mod, value=LocalValue}}] ->
                Merged = Mod:merge(Val, LocalValue),
                dets:insert(Dets, {Key, #crdt{mod=Mod, value=Merged}});
            [{Key, #crdt{mod=Other}}] ->
                {error, {type_conflict, Mod, Other}};
            Error ->
    {reply, Reply, State};

Lots of opportunity to refactor the common code between get and put but meh. Later.

What we have is the absolute bare minimum. No replication to multiple nodes for fault-tolerance / low-latency. No put-and-return-value, just a very basic Key->CRDT store. We're going to add replication, handoff, and all that good stuff next time.

Does It Work?

We should write tests too, right? Sure.

I'm not going to now, though. But it would be nice to run this, to see if it sort of works. Let's add some code to the internal client module (the one that we used for ping last time) for these two new commands. I'm going to gloss over some important concepts as this has been long. We'll leave them until next time.

Internal Client

Now to open src/crdtdb.erl. This is an internal client. It will be used by any external API endpoints (like HTTP, Protocol Buffers, MsgPack etc) that we end up making.

Riak Core still sports some vestigial links to RiakKV. It expects all "Keys" to be a two-tuple of {Bucket::binary(), Key::binary()}. To keep things simple we'll store all our data in one "bucket".

-define(BUCKET, <<"crdtdb">>).

Let's add the get function:

get(Key) ->
    DocIdx = riak_core_util:chash_key({?BUCKET, Key}),
    PrefList = riak_core_apl:get_primary_apl(DocIdx, 1, crdtdb),
    [{IndexNode, _Type}] = PrefList,
    case crdtdb_vnode:get(IndexNode, Key) of
        {ok, #crdt{mod=M, value=V}} ->
        {ok, {M, V}};
            E ->

Tutorial number 3 will be all about what this snippet does. What a PrefList is and what we use it for. For now be content that we're hashing the key of to a value that we're using to determine where the data should be.

Put is mighty similar:

put(Key, Mod, Value) ->
    DocIdx = riak_core_util:chash_key({?BUCKET, Key}),
    PrefList = riak_core_apl:get_primary_apl(DocIdx, 1, crdtdb),
    [{IndexNode, _Type}] = PrefList,
    crdtdb_vnode:put(IndexNode, Key, #crdt{mod=Mod, value=Value}).

Try It (you might like it)

If all that is done, then run ./rebar compile. Assuming it actually compiles, you should be able to make a node with make rel and start it up:

 rel/crdtdb/bin/crdtdb console

Will start an erlang shell running the database. Now let's try it:

Eshell V5.10.3  (abort with ^G)
(crdtdb@> crdtdb:get(<<"foo">>). %% We haven't stored it yet!
%% Create an ORSWOT with a single member
(crdtdb@> {ok, S} = riak_dt_orswot:update({add, <<"bryan">>}, actor1, riak_dt_orswot:new()).
(crdtdb@> crdtdb:put(<<"foo">>, riak_dt_orswot, S). %% Store the orswot
(crdtdb@> {ok, {M, V}} = crdtdb:get(<<"foo">>). %% Fetch it
(crdtdb@> M:value(V). %% Get the "value"

Great. Now lets "concurrently" write to that same key:

(crdtdb@> {ok, S2} = riak_dt_orswot:update({add, <<"bob">>}, actor2, riak_dt_orswot:new()).
(crdtdb@> crdtdb:put(<<"foo">>, riak_dt_orswot, S2).
(crdtdb@> {ok, {M, V2}} = crdtdb:get(<<"foo">>).
(crdtdb@> M:value(V2).

Note the different actor, and the empty starting set. What happens if we write a counter to <<"foo">>?

{ok, C} = riak_dt_pncounter:new(actor1, 100).
(crdtdb@> crdtdb:put(<<"foo">>, riak_dt_pncounter, C).

Fair enough, but I'm sure we can do better.

Wrap It Up

That was a big chunk. We covered "what is riak core" to a certain extent, at least we looked at the ring, and partitions. Then we made a vnode, and we added some very basic database functionality. So we have a distributed database. We're using continuous hashing and the ring to distributed data around the database. We're storing CRDTs so concurrent writes are no problem. But we're not replicating. If we lose a node, we lose that portion of the keyspace, and that's bad.

Next time we'll cover preference lists, replication, quorums, and handoff. Until then, good luck, and get in touch if your stuck.