Higher Order Blog


Clojure event sourcing

22 Feb 2009

Event sourcing:
Event Sourcing ensures that all changes to application state are stored as a sequence of events. Not just can we query these events, we can also use the event log to reconstruct past states, and as a foundation to automatically adjust the state to cope with retroactive changes.
I recently watched an InfoQ interview with Greg Young where he discusses an interesting architecture that uses event stream processing. I thought their architecture sounded very interesting, but I did not go into 'research mode' to dig into it. Later I came by Jonas Bonér's blog on "Real world scala" and saw an example of event sourcing using actors in Scala. I thought it would be a good exercise to implement a similar example in Clojure, and perhaps it would also be interesting to compare the solutions and see the effect of the languages on the implementation. As Clojure focuses on concurrency it was natural to start thinking about what 'concurrent' event sourcing would mean and how to implement it. Sequential Clojure implementation. For context, read Fowler's article on event sourcing, and for comparison Jonas Bonér's Scala implementation. Modeling. Scala uses classes and actors for modeling ships and their state. A ship is an actor that receives events (messages) and reacts to those by updating its internal state, i.e., its location (e.g., departure event, arrival event). Clojure has agents which are similar to actors. Agents are simpler and there are more functions that act on agents than on Actors; for example, in Clojure it is possible to directly read the state of an agent (using (deref x) or @x) — this is not possible with actors since they are distributed (at least programmed as if they were distributed). I've used two additional Clojure features in implementing event sourcing: watchers and meta-data. Watchers are a form of callback attached to agents. The callback is called synchronously with the agent actions and "derefs of the agent in the callback will see the value set during that action." I am using watchers to record the events that are sent to each ship. This decouples the code dealing with event storage from the state-changing functions sent to agents. The state of an agent is the location of the ship it represents. So we can get the location of a ship at any time simply by deref'ing the agent representing the ship (with actors this is more complex as one has to send a message to the actor and receive a message with the answer). I decided to use meta data attached to the location to represent the event that caused a move to that location. For example

user> @(first agents)
{:country "At sea", :city "At sea"}
user> ^@(first agents)
{:agent 0, :time #<Date Sun Feb 22 19:49:59 CET 2009>, :type :depart_for, 
:loc {:country "Sweden", :city "Malmö"}}
In general, I think it is possible to implement sequential event sourcing reasonably elegantly in Clojure using agents, watchers and metadata. The agents store the state that can change, the watchers record the events that cause the changes by looking at state meta data. The invariant should be that if an agent moves from state s1 to state s2 then (meta s2) should store the event that caused a transition from the s1 to state s2. Then if one knows the initial state, and all the events it is possible to reconstruct the entire sequence of states. Our watchers are quite simple: (notice the cool syntax #(meta @%). A shame I couldn't write #(^@%) it looks like I'm swearing! ;-) that would be cool)

;;Record event history
(def events (atom (map #(meta @%) agents))) 

(defn event_logger
  [idx a changed]
  (if changed
    (swap! events conj (meta @a))))

(defn add-watchers []
  "associate a watcher with each agent
   the watcher logs the events for that agent"
  (dotimes [i NUM_SHIPS]
    (add-watch (agents i) i event_logger)))
Concurrent event sourcing A natural question to ask for Clojure is "what if there are multiple concurrent event sources?" Is it still possible to do event sourcing? I don't think so; at least not is a manner as powerful as with sequential events. Since Fowler defines event sourcing as ensuring "(…) that all changes to application state are stored as a sequence of events." We already have a problem: when events can occur concurrently one must serialize them in order to store them as a sequence. One could try and timestamp all events and store them as a set, but this is sensitive to timing an scheduler issues. Even if stored as a set of timestamped events, how would one replay these events in a way that ensures that the global program states are the same in the replayed program? This would be dependent on thread scheduler timings. It is of course possible to do the less advanced use cases of event sourcing, e.g. event querying and analysis (I guess that is concurrent event stream processing). Anyway, if anyone is still reading ;-) the Source file link.