Spring Integration DSL pt.1

In this article, you’ll learn the basics on how Spring Integration DSL works, and how to define a Spring Integration message flow.

Spring Integration is great for processing data in real time. It allows you to assemble components into a pipeline through which data can flow– we’ll learn about such components. You can have an integration flow that writes or reads data to a filesystem or database, or any other sort of system. Spring makes it easy.

An integration flow is composed of one or more of the following components:

  • Channels Pass messages from one element to another.
  • Filters Conditionally allow messages to pass through the flow based on some criteria
  • Transformers Change message values and/or convert message payloads from one type to another.
  • Service activators Hand a message off to some Java method for processing, and then publish the return value on an output channel.

You’ll get the opportunity to see a few of these components in play in Spring Integration DSL pt.2 when we define the database-reading integration flow. 

Message channels

Just like the plumbing system in your home is composed of pipes, message channels are the means by which messages move through an integration pipeline. They’re the pipes that connect all the other parts of Spring Integration plumbing together.

There are several channel implementations out there, in part 2 of this article we’ll make use of QueueChannel. Messages published into a QueueChannel are stored in a queue until pulled by a consumer in a first in, first out (FIFO) fashion. If there are multiple consumers, only one of them receives the message.

Lucky for us, we don’t need to define our input channels. In Java DSL, they are automatically created, with DirectChannel as the default. 

However, if you decide your flow requires a different channel implementation, you’ll need to explicitly declare the channel as a bean and reference it in the integration flow. 

@Bean
public MessageChannel orderChannel() {
  return new PublishSubscribeChannel();
}

Then you’d reference this channel by name in the integration flow definition with a call to channel():

@Bean
public IntegrationFlow orderFlow() {
  return IntegrationFlows
      ...
      .channel("orderChannel")
      ...
      .get();
}

Because we are using QueueChannel, our consumers will have to be configured with a QueueChannel, the consumers must be configured with a poller.

For example, if you’ve declared a QueueChannel bean like this:

@Bean
public MessageChannel orderChannel() {
  return new QueueChannel();
}

You’d need to make sure that the consumer is configured to poll the channel for messages.

  c -> c.poller(spec -> spec.fixedRate(5000L)))
        ...        
        .channel(c -> c.queue("orderChannel"))
        .get();

Filters

Just like the intermediate operation in Java 8 streams, the filter method allows or disallows messages from continuing to the next step in the flow.

You could make a call to filter() like this:

@Bean
public IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {
  return IntegrationFlows
      ...
      .<Integer>filter((p) -> p % 2 == 0)
      ...
      .get();
}

In the this example you use lambda to implement filter, but the the filter() method accepts a GenericSelector as an argument. This means that you can implement the GenericSelector interface instead, if your filtering requires something more than a simple lambda!

Transformers

You can perform transformations on messages, this operation is used when a different message and even a different payload type is needed. It can be as simple or complex as you prefer. Want to transform it to json? Sure. Or transform a String value representing an ISBN to look up and return details of the corresponding book. No problem.

Transformers morph messages as they flow through an integration with a simple call to transform(), passing in the method reference to the Transformers.toJson() method:

@Bean
public IntegrationFlow transformerFlow() {
  return IntegrationFlows
      ...
      .transform(Transformers.toJson())
      ...
      .get();
}

If the transformer is complex enough to warrant a separate Java class, you can inject it as a bean into the flow configuration and pass the reference to the transform() method:

@Bean
public RomanNumberTransformer romanNumberTransformer() {
  return new RomanNumberTransformer();
}

@Bean
public IntegrationFlow transformerFlow(
                    RomanNumberTransformer romanNumberTransformer) {
  return IntegrationFlows
      ...
      .transform(romanNumberTransformer)
      ...
      .get();
}

Here, you declare a bean of type RomanNumberTransformer, which itself is an implementation of Spring Integration’s Transformer or GenericTransformer interfaces. The bean is injected into the transformerFlow() method and passed to the transform() method when defining the integration flow.

Routers

Routers, based on some routing criteria, allow for branching in an integration flow, directing messages to different channels.

Routers direct messages to different channels, based on some criteria applied to the messages.

@Bean
public IntegrationFlow numberRoutingFlow(AtomicInteger source) {
  return IntegrationFlows
    ...
      .<Integer, Boolean>route(p -> p % 2 == 0, m -> m
        .subFlowMapping(true, sf -> sf.<Integer>handle((p, h) -> p * 2))
        .subFlowMapping(false, sf -> sf.<Integer>handle((p, h) -> p * 3)))
      .get();
}

@Bean
public MessageChannel evenChannel() {
  return new DirectChannel();
}

@Bean
public MessageChannel oddChannel() {
  return new DirectChannel();
}

Although it’s still possible to declare an AbstractMessageRouter and pass it into route(), this example uses a lambda to determine if a message payload is odd or even. If it’s even, then a String value of EVEN is returned. If it’s odd, then ODD is returned. These values are then used to determine which submapping will handle the message.

Channel adapters

Every integration flow has an entry and exit point and that is called your channel adapter. Data enters an integration flow by way of an inbound channel adapter and exits an integration flow by way of an outbound channel adapter.

Inbound channel adapters can take many forms, depending on the source of the data they introduce into the flow.

Channel adapters are provided by one of Spring Integration’s many endpoint modules. Suppose, for example, that you require an inbound channel adapter that keeps an eye on a specified directory and submits any files that are written to that directory as messages to a channel.

In the Java DSL, we have the magical inboundAdapter() method from the Files class to assist us with this. You can have an outbound channel adapter at the other end of the line for the integration flow, handing off the final message to the application or to some other system:

@Bean
public IntegrationFlow fileReaderFlow() {
  return IntegrationFlows
      .from(Files.inboundAdapter(new File(INPUT_DIR))
          .patternFilter(FILE_PATTERN))
      .get();
}

It’s worth noting, however, that Spring Integration endpoint modules provide useful message handlers for several common use cases.

Endpoint modules

Spring Integration allows you to create your own channel adapters, but also provides over two dozen endpoint modules containing channel adapters—both inbound and outbound—for integration with a variety of common external systems.

The possibilities are endless! Check out my implementation with Event Emitters in pt. 2:

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: