Posts Tagged ‘event driven’

Spring Cloud Stream for Event Driven Architectures

March 26, 2017

The emergence of microservices and cloud native architectures has triggered a resurgence of interest in Domain Driven Design,  CQRS, and Event Sourcing.  Central to all of these approaches is the Domain Event, one of the building blocks of DDD and the primary mechanism for enabling eventual consistency in a distributed system.

Domain Events communicate changes to the state of a domain, such as Customer Created, Account Credited,  and the like.  Always expressed in past tense, a domain event represents something of interest to a particular domain that has already happened.

DDD is a commonly recommended approach to decomposing a system into microservices.  The specifics of how to do this are a topic for another day, but it is safe to assume that a financial system following DDD will define things like   Customer and Account as Aggregates.  At a high level an aggregate is a graph of domain objects, consisting of entities and value objects, that defines a transactional boundary. An aggregate encapsulates business logic to perform validation and ensure it is kept internally consistent. An aggregate communicates state changes to other aggregates by emitting domain events.  Thus CustomerCreated is a domain event emitted by the Customer aggregate, and AccountCreated is emitted by the Account aggregate. Customer may keep a reference to account numbers it owns, or Account may keep a reference to customers who may access it.  In either case, each aggregate may be required to handle events emitted by the other.  In CQRS, command side services use aggregates to update state and emit domain events to be handled by query side components to keep materialized views eventually consistent.

The domain event is an object conveying what happened (the event type) along with data necessary to reflect exactly what changed, a time stamp, the aggregate ID, and possibly additional metadata. In a distributed system, domain events are published to a messaging system using the publish subscribe pattern. This allows any number of remote processes (microservices) to subscribe to receive and handle domain events asynchronously.  A microservice listening for domain events in a CQRS + ES system may need to handle many types of events, so some mechanism is needed to dispatch each event to a specific method or function to handle each type of event.

In the java world, we can always provide a message listener for the particular messaging middleware we are using along with a switch statement to handle each event type. Just kidding, we can actually do much better.  In fact, current open source CQRS + ES frameworks such as Axon provide an @EventHandler method annotation which enables its event bus to dispatch events based on the method argument’s type.  With Axon, CustomerCreatedEvent and AccountCreatedEvent are distinct Java types (classes), so you have something like this:


public class MyEventHandler {
   @EventHandler
   public void handle(CustomerCreatedEvent event) {
   ...
   }

   @EventHandler
   public void handle(AccountCreatedEvent event) {
   ...
   }
}

Eventuate, another open source CQRS + ES framework provides a similar mechanism as shown in this example.

The above approaches are sound with respect to object oriented programming.  However, this creates a dependence on Java types across distributed applications.  In effect, domain events become shared types in a distributed system. This creates a strong coupling among any services that rely on shared events.  This type of coupling should be avoided in distributed systems and is generally considered an anti-pattern in a microservice architecture.  For example, if you package domain events in a common jar that is a shared dependency among microservices, every microservice must be redeployed whenever a new event type is added, or when an event type is modified, whether the service cares about that particular event type or not.  Alternatively, each microservice can unmarshall each message payload to a different locally defined event type, but this approach results in a lot of duplication of effort.

Handling Events with Spring Cloud Stream

The recent Chelsea release of Spring Cloud Stream introduces a native dispatching feature, that supports event driven architectures while avoiding the reliance on shared domain types.  Spring Cloud Stream has always provided a @StreamListener method annotation used to coerce a serialized payload to the type of the method argument and invoke the method. For example

@StreamListener(Sink.INPUT)
public void handle(Foo foo){
...
}

will automatically convert a  serialized JSON (for example) payload transported via Kafka or Rabbit MQ (or any supported messaging middleware) to a  `Foo`  object and invoke the `handle` method.   Normally, a Spring Cloud Stream application would declare one stream listener per channel, where the channel is bound to a topic to which other applications publish data. 

The new dispatching feature adds a condition attribute to @StreamListener which enables routing of messages to multiple listeners,  evaluating a boolean expression defined as a Spring Expression Language (SpEL) expression. The condition is applied to incoming messages and can evaluate any field in the message payload or specific header, or combination thereof.  This provides an extremely flexible routing mechanism that does not require different payload types.  For example, we can define a single Event class with a String eventType field. Spring Cloud Stream will support this out of the box:

@EnableBinding
class MyEventHandler{
    @StreamListener target=Sink.INPUT, condition="payload.eventType=='CustomerCreatedEvent'")
    public void handleCustomerEvent(@Payload Event event) {
      // handle the message</span>
     }

     @StreamListener target=Sink.INPUT, condition="payload.eventType=='AccountCreatedEvent'")
    public void handleAccountEvent(@Payload Event event) {
      // handle the message</span>
     }

Custom Annotations

This is already an improvement,  but semantically it’s not as clean as we would like. Ideally, we would like to see something comparable to other CQRS + ES frameworks:

@EnableEventHandling
class MyEventHandler{
    @EventHandler(eventType=='CustomerCreatedEvent'")
    public void handleCustomerEvent(@Payload Event event) {
      // handle the message
     }

     @EventHandler(eventType=='CustomerCreatedEvent'")
    public void handleAccountEvent(@Payload Event event) {
      // handle the message
     }

Fortunately, Spring can get us there with a small bit of customization. Core Spring Framework already has excellent support for custom annotations, so we can easily define an @EventHandler annotation to use in place of @StreamListener.  We can even default the target channel to Sink.INPUT` without too much trouble:

@StreamListener @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented public @interface EventHandler {
    @AliasFor(annotation=StreamListener.class, attribute="target")
    String value() default ""; 

    @AliasFor(annotation=StreamListener.class, attribute="target")
    String target() default Sink.INPUT; 

    @AliasFor(annotation=StreamListener.class, attribute="condition")
    String condition() default "";
}

Now we’re a little closer, but not quite there:

@EnableBinding
class MyEventHandler{
    @EventHandler, condition="payload.eventType=='CustomerCreatedEvent'")
    public void handleCustomerEvent(@Payload Event event) {
      // handle the message
     }

     @EventHandler(condition="payload.eventType=='AccountCreatedEvent'")
    public void handleAccountEvent(@Payload Event event) {
      // handle the message
     }

The final step requires a bit of additional magic if we universally adopt this convention of identifying event type by the value of eventType.   We could just as easily adopt a different convention, such as providing an eventType header on every message. Once we have defined such a convention, the conditional expression becomes a template for which we need only provide the required value as an annotation attribute.  This will require overriding a method in the Bean Post Processor used by Spring Cloud Stream to process the StreamListener annotation .  Fortunately, the hooks are in place:

@Configuration
public class EventHandlerConfig {

	/*
	 * The SpEL expression used to allow the Spring Cloud Stream Binder to dispatch to methods
	 * Annotated with @EventHandler
	 */

	private String eventHandlerSpelPattern = "payload.eventType='%s'";

	/**
	 * Override the default {@link StreamListenerAnnotationBeanPostProcessor} to inject value of
	 * 'eventType' attribute into 'condition' expression.
	 * @return
	 */
	@Bean(name = STREAM_LISTENER_ANNOTATION_BEAN_POST_PROCESSOR_NAME)
	public BeanPostProcessor streamListenerAnnotationBeanPostProcessor() {
	   return new StreamListenerAnnotationBeanPostProcessor() {
		@Override
		protected StreamListener postProcessAnnotation(StreamListener originalAnnotation, Method annotatedMethod) {
		   Map<String, Object> attributes = new HashMap<>(
		  AnnotationUtils.getAnnotationAttributes(originalAnnotation));
		  if (StringUtils.hasText(originalAnnotation.condition())) {
		     String spelExpression = String.format(eventHandlerSpelPattern, originalAnnotation.condition());
		     attributes.put("condition", spelExpression);
		  }
		return AnnotationUtils.synthesizeAnnotation(attributes, StreamListener.class, annotatedMethod);
	  }
       };
    }
}

Next, we can import this configuration using a custom EnableEventHandling annotation:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({EventHandlerConfig.class})
public @interface EnableEventHandling {

}

Finally, we modify our EventHandler annotation to define eventType as an alias for condition :

@StreamListener
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface EventHandler {
	/**
	 * The name of the binding target (e.g. channel) that the method subscribes to.
	 * @return the name of the binding target.
	 */
	@AliasFor(annotation=StreamListener.class, attribute="condition")
	String value() default "";

	/**
	 * The name of the binding target (e.g. channel) that the method subscribes to.
	 * @return the name of the binding target.
	 */
	@AliasFor(annotation=StreamListener.class, attribute="target")
	String target()  default Sink.INPUT;

	/**
	 * A condition that must be met by all items that are dispatched to this method.
	 * @return a SpEL expression that must evaluate to a {@code boolean} value.
	 */
	@AliasFor(annotation=StreamListener.class, attribute="condition")
	String eventType() default "";
}

Summary

With Spring Cloud Stream and a small amount of Spring magic we have implemented an annotation driven framework for handling domain events in any event driven architecture such as CQRS and Event Sourcing. We have implemented event oriented annotations, comparable to what you see in existing CQRS + ES frameworks, namely:

@EnableEventHandling
class MyEventHandler{
    @EventHandler(eventType=='CustomerCreatedEvent'")
    public void handleCustomerEvent(@Payload Event event) {
      // handle the message
     }

     @EventHandler(eventType=='CustomerCreatedEvent'")
    public void handleAccountEvent(@Payload Event event) {
      // handle the message
     }

Unlike the existing CQRS + ES frameworks, we do not rely on payload type routing. This means we can avoid the common pitfall of shared data types in a microservice architecture. Of course, if you really want to route messages by payload type, we can easily support that as well.