Higher Order Blog

home

Circuit Breaker: a small but real-life example of Clojure protocols and datatype

05 May 2010

(Update July 2nd 2010: I've cleaned up the code and git repo. Inlined the protocol function definitions in the state records for native-platform speed. The policy can now specify which exceptions should be considered errors: this is really useful in real life when you don't want to trip the circuit breaker say on security exceptions. Now builds with lein 1.1.0. ...]$ lein jar ... ...]$ javac -cp lib/clojure-1.2.0-master-20100623.220259-87.jar:circuit-breaker.jar src/C.java ...]$ java -cp lib/clojure-1.2.0-master-20100623.220259-87.jar:circuit-breaker.jar:src C ) Michael Nygards stability pattern "Circuit Breaker" is useful for failing fast when calling integration points that are unstable (which is every integration point I've ever dealt with!). This is done by detecting when integration points fail, and subsequently cutting off access for a time-period. Use the circuit breaker to
... preserve request handling threads in the calling system. Very often, when you make a call to an external integration point that's broken, it will tie up a thread in a blocking synchronous call for an indefinite period of time. [Michael Nygard, QCon interview]
I've written a fast, non-blocking functional implementation of the Circuit Breaker in the 1.2 branch of Clojure which will be released shortly. It uses the new Clojure constructs protocols and datatypes for modeling states, for interop and to obtain platform-speed polymorphic calls. The implementation exposes a simple Java interface which makes it usable from Java, Scala, JRuby et al. Why?
Design Patterns are a disease, and Clojure is the cure. :-) (the smiley is mine!)
[http://www.nofluffjuststuff.com/blog/stuart_halloway/2009/10/the_case_for_clojure]
I believe this implementation has a couple of advantages compared to these Java and Scala implementations that use the GoF "State Machine" pattern. First, I find the functional version simpler (see examples below). Second, this version guarantees that only a single call is made to the integration point when the circuit breaker decides to re-test if it is working again (the other versions seem to allow an unbounded number of calls if more threads concurrently try to access the integration point calling "invoke"). Finally, this version encapsulates the (immutable) state in a single Clojure atom (corresponding to a Java AtomicReference), whereas the GoF implementations use at least three atomics for various counters. Why does that matter? Well, it gives you the ability to obtain a consistent (immutable) snapshot of the state of circuit breaker at any given time which can be used to e.g. logging and analysis - this isn't possible when you have several atomics in play. These benefits come naturally from following Clojure's programming model and concurrency constructs. Let me illustrate the Clojure features that I've found useful for this problem. Protocols and records. I'm using pure polymorphic functions on-success, on-error, on-before-call as transition functions mapping a state to the next state for an event (successfull call, error call and before a call is initiated). A pure function proceed is a predicate on states that decide whether or not the state allows calls to go though to the integration point. Together these functions form a Clojure protocol (which is similar to a Java interface, but has additional benefits). (Show with JavaScript: for non-JS User agents, see http://gist.github.com/390485) Apart from the definition of the protocol, we define a default implementation of the protocol functions that our states can use. The default transition functions are simply the identity function and proceed defaults to false. We now define datatypes corresponding to each type of state: closed (calls go through, count failures), open (calls fail-fast, stores a time-stamp when IP failed), initial-half-open (a single call goes through), pending-half-open (waiting for a probing call to return). The datatypes are parameterized by a "transition policy" defining how many failures are "needed" to transition to the open state, and how long to wait in the open state. (For non-JS User agents, see http://gist.github.com/390492 ) This simply defines the states as datatypes. Note that we use defrecord not deftype. This makes our datatypes work like persistent clojure maps which is extremely useful - for example our states can be destructured (see, e.g. defrecord at http://clojure.org/datatypes). We can make our new types participate in our CircuitBreakerTransitions protocol (http://gist.github.com/390494) A couple of notes: - We use merge to take the default implementations and "override" with the implementations given (this would correspond to an abstract super-class in Java but is more flexible). - We use destructing in the function definitions for easy access to the "ClosedState" data, e.g., in the body of (fn [{f :fail-count p :policy, :as s}] ... the states fail-count is available as f and similarly for the policy. The :as s clause makes the state itself available as s. - Finally, we can construct new instances of the states since they are simple dynamically compiled classes, e.g., (ClosedState. p 0) creates the initial state with a policy p. Pure functions. A clear advantage of the functional approach is the ease of testing. Consider this simple test of some states and transition functions. (http://gist.github.com/390505) This is the core of the circuit breaker itself: (http://gist.github.com/390529) A circuit breaker is simply an atomic reference to a state. The function wrap-with takes a function, f to wrap - this represents a function that will call an integration point, and a circuit breaker, named state. It then returns a "wrapped" function which is guarded by the circuit breaker. It uses the "transition-by!" function which makes a state-transition from the current state. An example usage: (http://gist.github.com/390535) Notice that the snapshot of the state is available simply with a deref, e.g., as @cb.

A Java-interface

To expose the functionality to Java I have used Clojures gen-class to create a class that exposes two methods given by this protocol which lets you wrap a function and look at the state: (defprotocol CircuitBreaker (#^clojure.lang.IFn wrap [this #^clojure.lang.IFn fn]) (#^net.higher_order.integration.circuit_breaker.states.CircuitBreakerTransitions current-state [this])) Then using gen-class I generate a class that implements the interface corresponding to that protocol. This gives the possibility of using the circuit breaker from Java:
import clojure.lang.IFn;
import clojure.lang.RT;
import net.higher_order.integration.circuit_breaker.AtomicCircuitBreaker;
import net.higher_order.integration.circuit_breaker.CircuitBreaker;

public class C {
	public static void main(String[] args) {
		CircuitBreaker atomicCircuitBreaker = new AtomicCircuitBreaker();
		IFn wrap = (IFn) atomicCircuitBreaker.wrap(new clojure.lang.AFn() {
			public Object invoke(Object arg0) throws Exception {
				if (arg0 == null) throw new IllegalArgumentException("null arg");
				System.out.println("Invoked with: "+arg0);
				return arg0;
			}
		});
		succeed(atomicCircuitBreaker, wrap);
		fail(atomicCircuitBreaker, wrap);
		fail(atomicCircuitBreaker, wrap);
		fail(atomicCircuitBreaker, wrap);
		fail(atomicCircuitBreaker, wrap);
		fail(atomicCircuitBreaker, wrap);
		fail(atomicCircuitBreaker, wrap);
		sleep(1000);
		status(atomicCircuitBreaker);
		fail(atomicCircuitBreaker, wrap);
		sleep(5000);
		succeed(atomicCircuitBreaker, wrap);
	}

	
	private static void sleep(long howlong) {
		try {
			Thread.sleep(howlong);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	private static void succeed(CircuitBreaker atomicCircuitBreaker, IFn wrap) {
		try {
			System.out.println(wrap.invoke("KARL"));
			System.out.println(wrap.invoke(42));
		} catch (Exception e) {
			System.out.println(e.getMessage());
		} finally {
			status(atomicCircuitBreaker);
		}
	}


	private static void status(CircuitBreaker atomicCircuitBreaker) {
		System.out.println(RT.printString(atomicCircuitBreaker.current_state()));
	}

	private static void fail(CircuitBreaker atomicCircuitBreaker, IFn wrap) {
		try {
			System.out.println(wrap.invoke(null));
			System.out.println(wrap.invoke(42));
		} catch (Exception e) {
			System.out.println(e.getMessage());
		} finally {
			status(atomicCircuitBreaker);
		}
	}
}
This is getting long. I'll save the comparison for the next post :-) Github: http://github.com/krukow/clojure-circuit-breaker