Apache Camel – developing application from the scratch (part 2 / 2)

This is the second part of the tutorial where we are creating an invoices processing application using Apache Camel. In case you missed it, be sure to look at the first part. Previously we’ve defined functional requirements for the system, created gateway, splitter, filter and content-based router component. Let’s continue with creating a transformer.

5. Transforming invoices to the payments

We’ve successfully filtered out “too expensive” invoices from the system now (they might need manual inspection or so). The important thing is that we can now take an invoice and generate payment from it. First, let’s add Payment class to the banking package:

package com.vrtoonjava.banking;

import com.google.common.base.Objects;

import java.math.BigDecimal;

public class Payment {

    private final String senderAccount;
    private final String receiverAccount;
    private final BigDecimal dollars;

    public Payment(String senderAccount, String receiverAccount, BigDecimal dollars) {
        this.senderAccount = senderAccount;
        this.receiverAccount = receiverAccount;
        this.dollars = dollars;
    }

    public String getSenderAccount() {
        return senderAccount;
    }

    public String getReceiverAccount() {
        return receiverAccount;
    }

    public BigDecimal getDollars() {
        return dollars;
    }

    @Override
    public String toString() {
        return Objects.toStringHelper(this)
                .add("senderAccount", senderAccount)
                .add("receiverAccount", receiverAccount)
                .add("dollars", dollars)
                .toString();
    }

}

Because we will have two ways how to create a payment (from local and foreign invoices), let’s define a common contract (interface) for creating payments. Put interface PaymentCreator to the banking package:

package com.vrtoonjava.banking;

import com.vrtoonjava.invoices.Invoice;

/**
 * Creates payment for bank from the invoice.
 * Real world implementation might do some I/O expensive stuff.
 */
public interface PaymentCreator {

    Payment createPayment(Invoice invoice) throws PaymentException;

}

Technically, this is a simple parametrized Factory. Note that it throws PaymentException. We’ll get to the exception handling later, but here’s the code for the simple PaymentException:

package com.vrtoonjava.banking;

public class PaymentException extends Exception {

    public PaymentException(String message) {
        super(message);
    }

}

Now we’re good to add two implementations to the invoices package. First, let’s create LocalPaymentCreator class:

package com.vrtoonjava.invoices;

import com.vrtoonjava.banking.Payment;
import com.vrtoonjava.banking.PaymentCreator;
import com.vrtoonjava.banking.PaymentException;
import org.springframework.stereotype.Component;

@Component
public class LocalPaymentCreator implements PaymentCreator {

    // hard coded account value for demo purposes
    private static final String CURRENT_LOCAL_ACC = "current-local-acc";

    @Override
    public Payment createPayment(Invoice invoice) throws PaymentException {
        if (null == invoice.getAccount()) {
            throw new PaymentException("Account can not be empty when creating local payment!");
        }

        return new Payment(CURRENT_LOCAL_ACC, invoice.getAccount(), invoice.getDollars());
    }

}

Another creator will be ForeignPaymentCreator with rather straightforward implementation:

package com.vrtoonjava.invoices;

import com.vrtoonjava.banking.Payment;
import com.vrtoonjava.banking.PaymentCreator;
import com.vrtoonjava.banking.PaymentException;
import org.springframework.stereotype.Component;

@Component
public class ForeignPaymentCreator implements PaymentCreator {

    // hard coded account value for demo purposes
    private static final String CURRENT_IBAN_ACC = "current-iban-acc";

    @Override
    public Payment createPayment(Invoice invoice) throws PaymentException {
        if (null == invoice.getIban()) {
            throw new PaymentException("IBAN mustn't be null when creating foreign payment!");
        }

        return new Payment(CURRENT_IBAN_ACC, invoice.getIban(), invoice.getDollars());
    }

}

Those two creators are simple Spring beans, and Apache Camel provides a really nice way of connecting them to the route. We will be creating two transformers by using transform() method on Camel’s Java DSL. We will plug correct transformers to both seda:foreignInvoicesChannel and seda:localInvoicesChannel and make them to forward results to seda:bankingChannel. Add following code to your configure method:

 
from("seda:foreignInvoicesChannel")
        .transform().method("foreignPaymentCreator", "createPayment")
        .to("seda:bankingChannel");

from("seda:localInvoicesChannel")
        .transform().method("localPaymentCreator", "createPayment")
        .to("seda:bankingChannel");

6. Passing payments to the banking service (Service Activator)

Payments are ready and messages containing them are waiting in the seda:bankingChannel. The last step of the flow is to use Service Activator component. The way it works is simple – when a new message appears in a channel, Apache Camel invokes logic specified in a Service Activator component. In other words, we’re connecting external service to our existing messaging infrastructure.
In order to do that we first need to see a contract for the banking service. So put interface BankingService to the banking package (in the real world this would probably reside in some external module):

package com.vrtoonjava.banking;

/**
 * Contract for communication with bank.
 */
public interface BankingService {

    void pay(Payment payment) throws PaymentException;

}

Now we will need an actual implementation of the BankingService. Again, it’s highly unlikely that implementation would reside in our project (it would probably be remotely exposed service), but let’s at least create some mock implementation for the tutorial purposes. Add MockBankingService class to the banking package:

package com.vrtoonjava.banking;

import org.springframework.stereotype.Service;

import java.util.Random;

/**
 * Mock service that simulates some banking behavior.
 * In real world, we might use some web service or a proxy of real service.
 */
@Service
public class MockBankingService implements BankingService {

    private final Random rand = new Random();

    @Override
    public void pay(Payment payment) throws PaymentException {
        if (rand.nextDouble() > 0.9) {
            throw new PaymentException("Banking services are offline, try again later!");
        }

        System.out.println("Processing payment " + payment);
    }

}

Mock implementation creates on some random occasions (~10%) a failure. Of course for the better decoupling we’re not going to use it directly, we will create dependency from our custom component on a contract (interface) instead. Let’s add PaymentProcessor class to the invoices package now:

package com.vrtoonjava.invoices;

import com.vrtoonjava.banking.BankingService;
import com.vrtoonjava.banking.Payment;
import com.vrtoonjava.banking.PaymentException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * Endpoint that picks Payments from the system and dispatches them to the
 * service provided by bank.
 */
@Component
public class PaymentProcessor {

    @Autowired
    BankingService bankingService;

    public void processPayment(Payment payment) throws PaymentException {
        bankingService.pay(payment);
    }

}

Apache Camel provides easy way how to invoke method on arbitrary bean when a message arrives to certain endpoint (EIP describes this as Service Activator) by using bean() method on Camel’s Java DSL:

from("seda:bankingChannel")
        .bean(PaymentProcessor.class, "processPayment");

Error handling

One of the biggest challenges of messaging systems is to properly identify and handle error situations. EAI describes plenty of approaches and we will use Camel’s implementation of Dead Letter Channel EIP. Dead Letter Channel is just another channel and we can take proper action when an error message appears in this channel. In the real world applications we would probably go for some retry logic or professional reporting, in our sample tutorial we will just print out the cause of the error. Let’s modify previously defined Service Activator and plug in errorHandler() component. When PaymentProcessor throws an exception, this errorHandler will forward original message that caused an error to Dead Letter Channel:

from("seda:bankingChannel")
        .errorHandler(deadLetterChannel("log:failedPayments"))
        .bean(PaymentProcessor.class, "processPayment");

Finally, here is the final and complete route:

package com.vrtoonjava.routes;

import com.vrtoonjava.invoices.LowEnoughAmountPredicate;
import com.vrtoonjava.invoices.PaymentProcessor;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class InvoicesRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("seda:newInvoicesChannel")
                .log(LoggingLevel.INFO, "Invoices processing STARTED")
                .split(body())
                .to("seda:singleInvoicesChannel");

        from("seda:singleInvoicesChannel")
                .filter(new LowEnoughAmountPredicate())
                .to("seda:filteredInvoicesChannel");

        from("seda:filteredInvoicesChannel")
                .choice()
                    .when().simple("${body.isForeign}")
                        .to("seda:foreignInvoicesChannel")
                    .otherwise()
                        .to("seda:localInvoicesChannel");

        from("seda:foreignInvoicesChannel")
                .transform().method("foreignPaymentCreator", "createPayment")
                .to("seda:bankingChannel");

        from("seda:localInvoicesChannel")
                .transform().method("localPaymentCreator", "createPayment")
                .to("seda:bankingChannel");

        from("seda:bankingChannel")
                .errorHandler(deadLetterChannel("log:failedPayments"))
                .bean(PaymentProcessor.class, "processPayment");
    }

}

Running the whole thing

We’ll create a job now that will (at fixed rate) send new invoices to the system. It is only a standard Spring bean that utilizes Spring’s @Scheduled annotation. So let’s add a new class – InvoicesJob to the project:

package com.vrtoonjava.invoices;

import org.apache.camel.Produce;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

@Component
public class InvoicesJob {

    private int limit = 10; // default value, configurable

    @Autowired
    InvoiceCollectorGateway invoiceCollector;

    @Autowired
    InvoiceGenerator invoiceGenerator;

    @Scheduled(fixedRate = 4000)
    public void scheduleInvoicesHandling() {
        Collection<Invoice> invoices = generateInvoices(limit);
        System.out.println("\n===========> Sending " + invoices.size() + " invoices to the system");
        invoiceCollector.collectInvoices(invoices);
    }

    // configurable from Injector
    public void setLimit(int limit) {
        this.limit = limit;
    }

    private Collection<Invoice> generateInvoices(int limit) {
        List<Invoice> invoices = new ArrayList<>();
        for (int i = 0; i < limit; i++) {
            invoices.add(invoiceGenerator.nextInvoice());
        }

        return invoices;
    }
}

Job invokes (every 4 seconds) InvoicesGenerator and forwards invoices to the Gateway (first component we read about). To make it work we also need InvoicesGenerator class:

package com.vrtoonjava.invoices;

import org.springframework.stereotype.Component;

import java.math.BigDecimal;
import java.util.Random;

/**
 * Utility class for generating invoices.
 */
@Component
public class InvoiceGenerator {

    private Random rand = new Random();

    public Invoice nextInvoice() {
        return new Invoice(rand.nextBoolean() ? iban() : null, address(), account(), dollars());
    }

    private BigDecimal dollars() {
        return new BigDecimal(1 + rand.nextInt(20_000));
    }

    private String account() {
        return "test-account " + rand.nextInt(1000) + 1000;
    }

    private String address() {
        return "Test Street " + rand.nextInt(100) + 1;
    }

    private String iban() {
        return "test-iban-" + rand.nextInt(1000) + 1000;
    }

}

This is only a simple mock facility that’ll allow us to see the system working. In the real world we wouldn’t use any generator but probably some exposed service instead.

Now under resources folder create a new spring config file – invoices-context.xml and declare component scanning and task scheduling support:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns = "http://www.springframework.org/schema/beans"
       xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xmlns:task = "http://www.springframework.org/schema/task"
       xmlns:context = "http://www.springframework.org/schema/context"
       xsi:schemaLocation = "http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <import resource = "camel-config.xml" />

    <context:component-scan base-package = "com.vrtoonjava" />

    <task:executor id = "executor" pool-size="10" />
    <task:scheduler id = "scheduler" pool-size="10" />
    <task:annotation-driven executor="executor" scheduler="scheduler" />

</beans>

To see the whole thing running we need one more last piece – standard Java main application where we will create Spring’s ApplicationContext.

package com.vrtoonjava.invoices;

import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * Entry point of the application.
 * Creates Spring context, lets Spring to schedule job and use schema.
 */
public class InvoicesApplication {

    public static void main(String[] args) {
        new ClassPathXmlApplicationContext("/invoices-context.xml");
    }

}

Simply run mvn clean install from command line and launch the main method in InvoicesApplication class. You should be able to see similar output:

===========> Sending 10 invoices to the system
13:48:54.347 INFO  [Camel (camel-1) thread #0 - seda://newInvoicesChannel][route1] Invoices processing STARTED
Amount of $4201 can be automatically processed by system
Amount of $15110 can not be automatically processed by system
Amount of $17165 can not be automatically processed by system
Amount of $1193 can be automatically processed by system
Amount of $6077 can be automatically processed by system
Amount of $17164 can not be automatically processed by system
Amount of $11272 can not be automatically processed by system
Processing payment Payment{senderAccount=current-local-acc, receiverAccount=test-account 1901000, dollars=4201}
Amount of $3598 can be automatically processed by system
Amount of $14449 can not be automatically processed by system
Processing payment Payment{senderAccount=current-local-acc, receiverAccount=test-account 8911000, dollars=1193}
Amount of $12486 can not be automatically processed by system
13:48:54.365 INFO  [Camel (camel-1) thread #5 - seda://bankingChannel][failedPayments] Exchange[ExchangePattern: InOnly, BodyType: com.vrtoonjava.banking.Payment, Body: Payment{senderAccount=current-iban-acc, receiverAccount=test-iban-7451000, dollars=6077}]
Processing payment Payment{senderAccount=current-iban-acc, receiverAccount=test-iban-6201000, dollars=3598}
About these ads

3 thoughts on “Apache Camel – developing application from the scratch (part 2 / 2)

  1. Pingback: Apache Camel – developing application from the scratch (part 1 / 2) | vrtoonjava

  2. I tried, but i have below output, what could i possibly done wrong?

    11:29:08.119 INFO [main][org.springframework.context.support.ClassPathXmlApplicationContext] Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@1d1acd3: startup date [Mon Sep 15 11:29:08 ICT 2014]; root of context hierarchy
    11:29:08.163 INFO [main][org.springframework.beans.factory.xml.XmlBeanDefinitionReader] Loading XML bean definitions from class path resource [invoices-context.xml]
    11:29:08.243 INFO [main][org.springframework.beans.factory.xml.XmlBeanDefinitionReader] Loading XML bean definitions from class path resource [camel-config.xml]
    11:29:08.995 INFO [main][org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor] Initializing ExecutorService
    11:29:08.997 INFO [main][org.springframework.context.support.ClassPathXmlApplicationContext] Bean ‘executor’ of type [class org.springframework.scheduling.config.TaskExecutorFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
    11:29:08.997 INFO [main][org.springframework.context.support.ClassPathXmlApplicationContext] Bean ‘executor’ of type [class org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
    11:29:09.017 INFO [main][org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler] Initializing ExecutorService ‘scheduler’
    11:29:09.019 INFO [main][org.springframework.context.support.ClassPathXmlApplicationContext] Bean ‘scheduler’ of type [class org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
    11:29:09.030 INFO [main][org.springframework.beans.factory.support.DefaultListableBeanFactory] Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@1dc64a5: defining beans [template,consumerTemplate,camel-1:beanPostProcessor,camel-1,invoicesRouteBuilder,mockBankingService,camelInvoiceCollectorGateway,foreignPaymentCreator,invoiceGenerator,invoicesJob,localPaymentCreator,paymentProcessor,org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalRequiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,executor,scheduler,org.springframework.context.annotation.internalAsyncAnnotationProcessor,org.springframework.context.annotation.internalScheduledAnnotationProcessor,org.springframework.context.annotation.ConfigurationClassPostProcessor.importAwareProcessor]; root of factory hierarchy
    11:29:09.314 INFO [main][org.apache.camel.impl.converter.DefaultTypeConverter] Loaded 176 type converters
    11:29:09.320 INFO [main][org.apache.camel.component.seda.SedaEndpoint] Endpoint Endpoint[seda://newInvoicesChannel] is using shared queue: seda://newInvoicesChannel with size: 2147483647
    11:29:09.353 INFO [main][org.apache.camel.spring.SpringCamelContext] Apache Camel 2.12.0 (CamelContext: camel-1) is starting
    11:29:09.353 INFO [main][org.apache.camel.management.ManagedManagementStrategy] JMX is enabled
    11:29:09.485 INFO [main][org.apache.camel.component.seda.SedaEndpoint] Endpoint Endpoint[seda://singleInvoicesChannel] is using shared queue: seda://singleInvoicesChannel with size: 2147483647
    11:29:09.495 INFO [main][org.apache.camel.component.seda.SedaEndpoint] Endpoint Endpoint[seda://filteredInvoicesChannel] is using shared queue: seda://filteredInvoicesChannel with size: 2147483647
    11:29:09.500 INFO [main][org.apache.camel.component.seda.SedaEndpoint] Endpoint Endpoint[seda://foreignInvoicesChannel] is using shared queue: seda://foreignInvoicesChannel with size: 2147483647
    11:29:09.503 INFO [main][org.apache.camel.component.seda.SedaEndpoint] Endpoint Endpoint[seda://localInvoicesChannel] is using shared queue: seda://localInvoicesChannel with size: 2147483647
    11:29:09.528 INFO [main][org.apache.camel.component.seda.SedaEndpoint] Endpoint Endpoint[seda://bankingChannel] is using shared queue: seda://bankingChannel with size: 2147483647
    11:29:09.537 INFO [main][org.apache.camel.spring.SpringCamelContext] StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
    11:29:09.606 INFO [main][org.apache.camel.spring.SpringCamelContext] Route: route1 started and consuming from: Endpoint[seda://newInvoicesChannel]
    11:29:09.608 INFO [main][org.apache.camel.spring.SpringCamelContext] Route: route2 started and consuming from: Endpoint[seda://singleInvoicesChannel]
    11:29:09.610 INFO [main][org.apache.camel.spring.SpringCamelContext] Route: route3 started and consuming from: Endpoint[seda://filteredInvoicesChannel]
    11:29:09.612 INFO [main][org.apache.camel.spring.SpringCamelContext] Route: route4 started and consuming from: Endpoint[seda://foreignInvoicesChannel]
    11:29:09.613 INFO [main][org.apache.camel.spring.SpringCamelContext] Route: route5 started and consuming from: Endpoint[seda://localInvoicesChannel]
    11:29:09.615 INFO [main][org.apache.camel.spring.SpringCamelContext] Route: route6 started and consuming from: Endpoint[seda://bankingChannel]
    11:29:09.615 INFO [main][org.apache.camel.spring.SpringCamelContext] Apache Camel 2.12.0 (CamelContext: camel-1) is shutting down
    11:29:09.616 INFO [main][org.apache.camel.impl.DefaultShutdownStrategy] Starting to graceful shutdown 6 routes (timeout 300 seconds)

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s