Spring Integration DSL and Event Emitters pt.2

In this post, we’ll learn about the Spring Integration Java DSL for creating application integrations.

As you might be aware, many applications need to be connected to external systems in order to perform their work, such as read or send emails, react to new entries in a database, or a new file created in a directory or filesystem.

By employing common integration patterns with Spring Integration, you’re able to do just that!

And by virtue of it being Spring, you get many of the common integration patterns right out of the box.

Let’s get started:

In this section we’ll use Spring Integration’s JPA (Java Persistence API) module in order to poll and retrieve data from the database whenever there’s a new entry.

In this case it will be a notification table. Whenever a notification is active, or there is a new entry with the active field as true, then we’ll retrieve data about it.

I’ll be using an H2 in-memory database. Here is what my gradle.build file looks like:

plugins {
    id 'org.springframework.boot' version '2.3.0.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-integration'
    implementation 'org.springframework.integration:spring-integration-jpa'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'

    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'

    runtimeOnly 'com.h2database:h2'

    testCompile "org.springframework.integration:spring-integration-test:5.3.0.RELEASE"
    testImplementation 'junit:junit:4.12'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}

test {
    doFirst {
        clean {}
    }
    useJUnit()
}

Make sure to connect your datasource by adding these to your application.properties file:

# datasource
spring.datasource.url= jdbc:h2:mem:testdb
spring.datasource.username=sa
spring.datasource.password=
spring.datasource.driver-class-name=org.h2.Driver

spring.h2.console.enabled=true
spring.h2.console.path=/h2

spring.jpa.hibernate.ddl-auto = create-drop
logging.level.org.hibernate.SQL=DEBUG
logging.level.org.hibernate.type.descriptor.sql.BasicBinder=TRACE

We’ll need to initialize our database. Here is what our data.sql looks like (just put this under resources):

INSERT INTO notification (id, note_id, employee_id, active) VALUES
  (1, '123', '345', 'FALSE'),
  (2, '567', '789', 'TRUE'),
  (3, '456', '567', 'TRUE');

Here is what the main class looks like:

@Slf4j
@SpringBootApplication
@RestController
@EntityScan(basePackageClasses = Notification.class)
public class DslDemoApplication {

  public static void main(String[] args) {
    ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(DslDemoApplication.class)
        .run(args);
  }

Now we’re going to define a domain class:

@Data
@NoArgsConstructor
@Entity
@Table(name = "notification")
public class Notification {

  @Id
  @GeneratedValue(strategy = GenerationType.AUTO)
  private Long id;

  private String noteId;
  private String employeeId;
  private boolean active = false;
}

And the complete code for our rest controller:

@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@RestController
public class NotificationController {

private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();

  @GetMapping("/client/{id}")
  public SseEmitter files(@PathVariable String id) {
    SseEmitter sseEmitter = new SseEmitter(60 * 1000L);
    emitters.put(id, sseEmitter);
    return sseEmitter;
  }

  @Autowired
  private EntityManagerFactory entityManagerFactory;

  @Bean
  public IntegrationFlow pollingAdapterFlow() {
    return IntegrationFlows
        .from(Jpa.inboundAdapter(this.entityManagerFactory)
                .entityClass(Notification.class)
                .jpaQuery("from Notification n where n.active = true order by n.id desc")
                .maxResults(1)
                .expectSingleResult(true),
            c -> c.poller(spec->spec.fixedDelay(5000L)))
        .transform(Transformers.toJson())
        .handle(String.class, (payload, headers) -> {
          broadcast(payload);
          return null;
        })
        .channel(c -> c.queue("pollingResults"))
        .get();
  }

  private void broadcast(Object payload) {
    log.info("PAYLOAD: {}", payload);
    emitters.forEach((k, emitter) -> {
      try {
        log.info("PAYLOAD: {}", payload);
        emitter.send(payload);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    });
  }
}

So what’s going on here?

Let’s look at the pollingAdapterFlow method, which contains our integration flow. This is an example of how to configure the inbound adapter with the Java DSL.

You provide it an inbound adapter (which is the beginning of every messaging flow– change this if you want to talk to some other system such as a file directory or email). In our case we provide it a reference to the JPA entity manager factory that the adapter will use to create the EntityManager:

@Bean
  public IntegrationFlow pollingAdapterFlow() {
    return IntegrationFlows
        .from(Jpa.inboundAdapter(this.entityManagerFactory)

Then you specify the class type which is being used for retrieving entities from the database and a JPA query to perform. In our case it is a select statement where a notification has the active value set as true. And we expect a single result:

      .entityClass(Notification.class)
      .jpaQuery("from Notification n where n.active = true order by n.id desc")
      .maxResults(1)
      .expectSingleResult(true)

Then we create a poller. It’s going to have a spec, which dictates that requests get processed every 5 seconds. The message is going to have a payload, and once the message comes in, we’re going to transform the payload to json.

Then we can handle that request using a handle method. For every message that comes in, we’re going to broadcast a server sent event. We finally give it the channel to which the adapter sends the message with the payload. Finally, a call to get() builds the IntegrationFlow to be returned.

    c -> c.poller(spec -> spec.fixedDelay(5000L)))
        .transform(Transformers.toJson())
        .handle(String.class, (payload, headers) -> {
          broadcast(payload);
          return null;
        })
        .channel(c -> c.queue("pollingResults"))
        .get();

In the broadcast method, we simply visit every single record in the emitter map, and send the payload.

private void broadcast(Object payload) {
    log.info("PAYLOAD: {}", payload);
    emitters.forEach((k, emitter) -> {
      try {
        log.info("PAYLOAD: {}", payload);
        emitter.send(payload);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    });
  }

Our rest controller endpoint is going to return an SseEmitter whenever there’s a request for a different client. If it’s already there it can just replace it. We’re going to publish a message into each one of these emitters, whenever a notification arrives via the inbound adapter.

@GetMapping("/client/{id}")
  public SseEmitter files(@PathVariable String id) {
    SseEmitter sseEmitter = new SseEmitter(60 * 1000L);
    emitters.put(id, sseEmitter);
    return sseEmitter;
  }

Let’s start up a client! Now open up a new terminal console. Curl the client endpoint with a random id.

curl http://localhost:8080/client/1

Watch the stream you’ve opened and you’ll see the most recent notification. You can add a new entry into the database and you’ll see that notification appear as an event.

Summary:

You can use Spring Integration for processing data in real time or integrate your application to emails, filesystems, and other external systems.

You can have many integrations flows. For even longer, more complex flows, you may even consider extracting portions of the flow into separate methods or subflows for better readability.

Spring Integration’s many components are channel adapters for reading and writing files and JPA inbound channel adapter that let you poll and retrieve ( select ) data from the database by using JPA. There are also JPA outbound channel adapter lets you create, update, and delete entities.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: