;; Event sourcing ;; http://martinfowler.com/eaaDev/EventSourcing.html ;;Event Sourcing ensures that all changes to application state are stored as a sequence of events. ;;Not just can we query these events, we can also use the event log to reconstruct past states, ;;and as a foundation to automatically adjust the state to cope with retroactive changes. ;;This is a Clojure version of Jonas Bonér's "Event sourcing using actors" ;; http://jonasboner.com/2009/02/12/event-sourcing-using-actors.html (defstruct ship :name :home) (defstruct port :country :city) (defstruct event :agent :time :type :loc);;each ship is represented by an agent (defn mkevent [a ti ty loc] (struct event a ti ty loc)) (def PORTS_OF_CALL [(struct port "Denmark" "Aarhus") (struct port "Denmark" "Copenhagen") (struct port "Denmark" "Esbjerg") (struct port "Holland" "Amsterdam") (struct port "England" "London") (struct port "Portugal" "Lisboa") (struct port "Sweden" "Malmö") (struct port "Norway" "Oslo") (struct port "USA" "New York")]) (def AT_SEA (struct port "At sea" "At sea")) (def NUM_PORTS (count PORTS_OF_CALL)) (def ships (let [rand-port (fn [] (get PORTS_OF_CALL (rand-int NUM_PORTS)))] [(struct ship "Rich Hickey" (rand-port)) (struct ship "Doug Lea" (rand-port)) (struct ship "Douglas Crockford" (rand-port))])) (def NUM_SHIPS (count ships)) ;;Create the agents: a vector of agents, where the agent at index i ;;represents ship i. Initialize agent state to home port. ;;Meta-data is attached to agent state: the meta data is ;;the event that led to that state (from the previous state). (def agents (let [now (java.util.Date.)] (apply vector (map (fn [i] (agent (with-meta (:home (ships i)) (mkevent i now :home (:home (ships i)))))) (range NUM_SHIPS))))) ;;Simulator (defn move [e] "Take one transition according to event e" (send (agents (:agent e)) (fn [_] (let [dest (cond (= (:type e) :home) (:loc e) (= (:type e) :arrive_at) (:loc e) :else AT_SEA)] (with-meta dest e))))) (defn run [es] "Sequentially takes n transitions from an event seq, es. For lazy seqs es, run guarantees to call move on an event before looking at later events in the seq." (doseq [e es] (move e))) ;;Record event history (def events (atom (map #(meta @%) agents))) (defn event_logger [idx a changed] (if changed (do (swap! events conj (meta @a))))) (defn add-watchers [] "associate a watcher with each agent the watcher logs the events for that agent" (dotimes [i NUM_SHIPS] (add-watch (agents i) i event_logger))) ;;Infinite random event source ;;Generates only valid events according to the current state of agents. (defn make-rand-events [] (lazy-cons (let [rand-agent (rand-int (count agents)) rand-dest (get PORTS_OF_CALL (rand-int NUM_PORTS)) at @(agents rand-agent)] (cond (= at AT_SEA) (mkevent rand-agent (java.util.Date.) :arrive_at (:loc (meta at))) (= at rand-dest) (first (make-rand-events));;unlucky dest... retry :else (mkevent rand-agent (java.util.Date.) :depart_for rand-dest))) (make-rand-events))) (def rand-events (atom (make-rand-events))) (defn rand-run [n] "run n random events" (let [nevents (take n @rand-events) tail (drop n @rand-events)] (do (swap! rand-events (constantly tail));;expire events (run nevents))))