Camel Essential Components

  • submit to reddit

Camel Essential Components

By Christian Posta

10,342 Downloads · Refcard 170 of 202 (see them all)

Download
FREE PDF


The Essential Camel Components Cheat Sheet

DZone's 170th Refcard is an essential reference to Camel, an open-source, lightweight, integration library. This Refcard is authored by Christian Posta, a senior consultant and architect at Red Hat who specializes in messaging-based enterprise integrations.
HTML Preview
Essential Camel Components

Camel Essential Components

By Christian Posta

What is Apache Camel?

Camel is an open-source, lightweight, integration library that allows your applications to accomplish intelligent routing, message transformation, and protocol mediation using the established Enterprise Integration Patterns and out-of-the-box components with a highly expressive Domain Specific Language (Java, XML, or Scala). With Camel you can implement integration solutions as part of an overarching ESB solution, or as individual routes deployed to any container such as Apache Tomcat, Apache ServiceMix, JBoss AS, or even a stand-alone java process.

Why use Camel?

Camel simplifies systems integrations with an easy-to-use DSL to create routes that clearly identify the integration intentions and endpoints. Camel's out of the box integration components are modeled after the Enterprise Integration Patterns cataloged in Gregor Hohpe and Bobby Wolf's book (http://www.eaipatterns.com). You can use these EIPs as pre-packaged units, along with any custom processors or external adapters you may need, to easily assemble otherwise complex routing and transformation routes. For example, this route takes an XML message from a queue, does some processing, and publishes to another queue:

from( "jms:incomingQueue" )
  .convertBodyTo( String.class )
  .unmarshal( jaxb )
  .process(new Processor() {
       @Override
       public void process( Exchange exchange ) throws Exception {
		.... Custom Logic If Needed ....
       }
    })
  .marshal( jaxb )
  .convertBodyTo( String.class )
  .to( "jms:outgoingQueue" );

Camel allows you to integrate with quite a few protocols and systems out of the box using Camel components, and the community is constantly adding more. Each component is highly flexible and can be easily configured using Camel's consistent URI syntax. This ref card introduces you to the following popular and widely used components and their configurations:

  • Log
  • JMS/ActiveMQ
  • CXF
  • File
  • SEDA/Direct
  • Mock

Configuring Camel Components

All Camel components can be configured with a familiar URI syntax. Usually components are specified in the from() clause (beginning of a route) or the to() clause (typically termination points in the route, but not always). A component "creates" specific "endpoint" instances. Think of a component as an "endpoint factory" where an endpoint is the actual object listening for a message (in the "from" scenario) or sending a message (in the "to" scenario).

An endpoint URI usually has the form:

<component:><//endpoint-local-name><?config=value&config=value> 

The component is listed first, followed by some local name specific to the endpoint, and finally some endpoint-specific configuration Example:

jms:test-queue?preserveMessageQos=true

This URI creates a JMS endpoint that listens to the "test-queue" and sets the preserveMessageQos option to true.

There are over 100 camel components and each one has configuration options to more finely tune its behavior. See the Camel components list to see if a component is available for the types of integrations you might need:

http://camel.apache.org/components.html

Essential Components

Bean Component

Camel implements the Service Activator pattern (from the EIPs) which allows you to plug a service in to your route through a thin layer that's responsible for mediating between the messaging system (or integration route) and the service. Camel implements the Service Activator with its "bean" component and allows you to bind to and invoke POJO beans within your route.

Basic usage

Define your bean in the camel registry (usually the spring context if you're instantiating from a spring application context).

<bean id="beanName" class="com.some.class.ClassName"> ... 

Then from your route, you can invoke a method on the bean:

from("direct:incoming").beanRef("beanName", "methodName")

Instead of relying on the registry to have the bean, you could just let Camel create and manage the bean by supplying the class:

from("direct:incoming").bean(InvokeCustomLogicService.class, "methodName")

How is a method chosen?

You don't always have to supply an explicit method name, but if you do it makes it easier on both Camel and reading the route. If you don't supply an explicit method name, Camel will do it's best to resolve the correct method. Follow these rules to be safe, but for a more in-depth treatment of the algorithm, please see the Camel documentation for more http://camel.apache.org/bean-binding.html

  • If there is only a single method, Camel will try to use that
  • If there is multiple methods, Camel will try to select one that has only one parameter
  • Camel will try to match a method's parameters by type to the incoming message body

How does Camel bind parameters?

Camel will automatically try to bind the first parameter to the body of the incoming message. Otherwise, if you have parameters of these types, they will automatically be bound, regardless of order:

  • CamelContext – use when you need access to the full context, components defined in the context, or to create objects that require the context
  • Registry – the registry is the object that holds all of the beans that might be used in a CamelContext; usually the Spring BeanFactory/ApplicationFactory (but not always)
  • Exchange – contains headers, body, properties and overall state for processing a message unit including a reply
  • Message – the incoming message
  • TypeConverter – part of camel's type conversion internals, can be used to explicitly convert types

You can also use annotations to specifically bind certain parts of the Exchange to parameters in your methods. See the Camel documentation (http://camel.apache.org/parameter-binding-annotations.html) for more.

Further Reading

For more information on the bean component, see http://camel.apache.org/bean.html

Log Component

We always advise using logging in your routes to make it clear what steps in the processing have completed successfully. One way to do that is through the Log Component. Another way to insert logging statements, especially for free-text, is through the Log EIP (see Camel documentation http://camel.apache.org/logeip.html for more on the Log EIP).

Logging

To log an exchange at debug level:

from("direct:start").to("log:com.fusesource.examples?level=DEBUG")

This will output the exchange type (InOnly/InOut) and the body of the In message. You some control over what is logged; for example, to see the headers along with the exchange:

from("direct:start")
       .to("log:com.fusesource.examples?showHeaders=true")

To make the Exchange logging easier to read, consider enabling multiline printing:

from("direct:start")
.to("log:com.fusesource.examples?showHeaders=true&multiline=true")

For logging streams, you must determine whether you want to cache the stream first before logging it. Streams are read-once entities, and if you don't cache them, they won't be available to processors further down the chain. To cache streams and log them:

from("direct:start").streamCaching()   
.to("log:com.fusesource.examples?showHeaders=true&showStreams=true")

Formatting

You can add configurations to fine-tune exactly what's logged see the following options that can be enabled/disabled:

Option Description
showAll turns all options on, such as headers, body, out, stackTrace, etc
showExchangeId prints out the exchange id
showHeaders shows all headers for in the in message
showBodyType shows the java type for the body
showBody shows the actual contents of the body
showOut shows the out message
showException if there's an exception in the exchange, show the exception. doesn't show the full stacktrace
showStackTrace print the stacktrace from the exception
multiline log each part of the exchange and its details on separate lines for better readability
maxChars the maximum number of characters logged per line

Further Reading

For more information on the log component, see http://camel.apache.org/log.html

JMS and ActiveMQ Component

The Camel JMS component allows you to produce or consume from a JMS provider. Most commonly, JMS and associated routes are used for asynchronous inOnly style messaging, however camel-jms understands request-reply or inOut style messaging also and hides the details. Use the activemq-camel component for JMS endpoints that rely on an ActiveMQ provider as it provides some simple optimizations.

Consume from a queue

Note, you can specify "jms:queue:incomingOrders" but "queue" is default so can be left off

from("jms:incomingOrders").process(<some
processing>).to("jms:inventoryCheck");

Consume from a topic

from("jms:topic:incomingOrders").process(<some
processing>).to("jms:inventoryCheck");

Set up pooled resources

JMS resources are often expensive to create. Connections, Sessions, Producers, Consumers should be cached where it makes sense, and setting up the appropriate caching starts with a pooled connection factory. If you're using ActiveMQ, you can use the org.apache.activemq.pool.PooledConnectionFactory. Alternatively for general purpose JMS connection pooling, you can use the Spring's CachingConnectionFactory. See the documentation for using the CachingConnectionFactory.

When using ActiveMQ:

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
   <property name="brokerURL" value="tcp://localhost:61616" />
</bean>

<bean id="jmsPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
   <property name="connectionFactory" ref="connectionFactory">
   </property>
</bean>

Request/Reply

By default, Camel will use a Message Exchange Pattern of InOnly when dealing with JMS. However, it will be changed to InOut if there is a JMSReplyTo header or if explicitly set:

from("direct:incoming").inOut().to("jms:outgoingOrders").process(pro
cess the jms response here and continue your route)

In the previous example, the MEP was set to InOut which means before sending to the outgoingOrders queue, Camel will create a temporary destination, listen to it, and set the JMSReplyTo header to the temporary destination before it sends the JMS message. Some other consumer will need to listen to outgoingOrders queue and send a reply to the destination named in the JMSReplyTo header. The response received over the temporary destination will be used in further processing in the route. Camel will listen for a default 20s on the temporary destination before it times out. If you prefer to use named queues instead of temporary ones, you can set the replyTo configuration option:

from("direct:incoming").inOut().to("jms:outgoingOrders?replyTo=outgo
ingOrdersReply").process(process the jms response here and continue
your route)

Transactions

To use transactions with camel-jms, you should set up a pooled connection factory, a transaction manager, and configure the camel-jms configuration in a JmsConfiguration object:

<!-- JmsConfiguration object -->
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration" >
<property name="connectionFactory"          
	ref="jmsPooledConnectionFactory" />
  <property name="transacted" value="true" />
  <property name="transactionManager" ref="jmsTransactionManager" />
  <property name="cacheLevelName" value="CACHE_CONSUMER" />
</bean>

<!-- Spring Transaction Manager -->
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory"                                 
ref="jmsPooledConnectionFactory" />
</bean>

<!-- Set up Pooled Connection Factory -->
<bean id="jmsPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
  <property name="connectionFactory">
    <bean class="org.apache.activemq.ActiveMQConnectionFactory">
      <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>
  </property>
</bean>

Make sure to set cache level to CACHE_CONSUMER so that your consumers, sessions, and connections are cached between messages.

Updated for Camel 2.10, you can now use "local" transactions with the camel-jms consumer (don't have to specify an external transaction manager):

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration" >
    	<property name="connectionFactory" 
ref="jmsPooledConnectionFactory" />
    	<property name="transacted" value="true" />
    	<property name="cacheLevelName" value="CACHE_CONSUMER" />
</bean>

Common configuration options:

Option Default Description
concurrentConsumers 1 specifies the number of consumers to create to listen on the destination
disableReplyTo false set this endpoint to not do inOut messaging
durableSubscriptionName null explicitly set the name of a durable subscription (per JMS, must be used with clientId)
clientId null JMS client id for this endpoint. Must be unique
replyTo null specify the name of a destination to be included as a JMSReplyTo header on outgoing messages
selector null JMS selector to specify a predicate for what messages the broker should deliver to this endpoint consumer
timeToLive null time to live in milliseconds for the message as it travels through a broker (possible network of brokers)
transacted false specify whether to do send and receives in a transaction
acknowledgementModeName AUTO_ACKNOWLEDGE JMS acknowledgement mode
asyncConsumer false whether to process a message from a destination asynchronously. By default this is false
cacheLevelName CACHE_AUTO how to cache consumers, sessions, connections. e.g. CACHE_AUTO, CACHE_CONSUMER

Further Reading

For more information on the jms component, see http://camel.apache.org/jms.html

CXF Component

When you want to expose or consume a SOAP or REST style web service for use in an integration route, use the camel-cxf component. Sometimes a solution you're working on requires a web-service endpoint to kick off a larger process, or enrich data before calling another web service, or just act as a proxy to another service. Camel works nicely with CXF to allow you to configure a web-service endpoint that hooks into and leverages the power of Camel's routing and mediation engine.

You can use camel-cxf as a consumer (from() clause) which internally starts up a Jetty server to publish the web service. It can also be used as a producer which will send SOAP or REST requests. You can alternatively configure your endpoint to deploy into an existing servlet container instead of relying on the built in Jetty server.

JAX-WS (SOAP)

Using the JAX-WS front end for CXF with Camel requires you to set up your contract (WSDL) or Java classes first and follow the correct wsdl2java or java annotations to describe your web service. Once you've done this, you can use Camel to hook the endpoint into a route.

Configuration

As a bean (easier to read in the DSL) To configure the JAX-WS component, specify a with an id attribute that can be later referenced by a camel route

<cxf:cxfEndpoint id="helloWorld"
                     wsdlURL="wsdl/HelloWorld.wsdl"
                     serviceClass="org.apache.helloworld.HelloWorld"
                     address="http://localhost:9090/helloworld" >
</cxf:cxfEndpoint>

Note, for this configuration, you'll need this namespace in your spring application context:

http://camel.apache.org/schema/cxf
http://camel.apache.org/schema/cxf/camel-cxf.xsd

Then in your camel route, you can refer to the CXF endpoint with:

from("cxf:bean:helloWorld?dataFormat=PAYLOAD").log("This is what was
said: ${body}").process();

With the URI (more convenient, not as easy to read in the DSL)

You can leave out the bean config in the registry (spring XML) altogether and configure options directly on the endpoint:

from("cxf://http://localhost:9090/helloworld?serviceClass=org.apache.
helloworld.HelloWorld&wsdlURL=wsdl/HelloWorld.wsdl&dataFormat=MESSAGE")

In this example, org.apache.helloworld.HelloWorld is the SEI (Service Endpoint Interface). Note, there is no *Impl class that implements the SEI, as camel takes care of that.

DataFormat

Camel allows you to work with the SOAP message in three different formats:

  • POJO (default) – a list of Java objects that represent the parameters to the service being called
  • MESSAGE – Raw message, no SOAP processing
  • PAYLOAD – just deal with the SOAP body of the message

JAX-RS (REST)

Just like with JAX-WS, you'll need to set up your java classes with the correct JAX-RS annotations and use Camel to expose the REST endpoint.

Configuration

As a bean

Note, for this configuration, you'll need this namespace in your spring application context:

http://camel.apache.org/schema/cxf
http://camel.apache.org/schema/cxf/camel-cxf.xsd
<cxf:rsServer id="rsServer" address="http://localhost:9090/route" serviceClass="com.fusesource.samples.CustomerServiceResource">

Then within your route, just like the bean for the JAX-WS web service, you can refer to the bean like this:

from("cxfrs:bean:rsServer").process()

With the URI

from("cxfrs://http://localhost:9090/route?resourceClasses=com.fusesource.samples.CustomerServiceResource")

Further Reading

For more information on the CXF component, see http://camel.apache.org/cxf.html

For more about Camel, CXF, and JAX-RS see: http://www.christianposta.com/blog/?p=229

File Component

When reading or writing files (CSV, XML, fixed-format, for example) to or from the file system, use the Camel File component. File-based integrations are fairly common with legacy applications but can be tedious to interact with. With a file consumer endpoint, you can consume files from the file system and work with them as exchanges in your camel route. Conversely, you can also persist the content of your exchanges to disk.

Read Files

When used in a "from" DSL clause, the file component will be used in "consumer" mode or "read"

from("file:/location/of/files?configs").log("${body}")

Configuration Options for Reading Files

Option Default Description
delay 500ms how long (in milliseconds) to wait before polling the file system for the next file
delete false whether or not to delete the file after it has been processed
doneFileName null name of file that must exist before camel starts processing files from the folder
exclude null regex for file patterns to ignore
filter null a custom filter class used to filter out files/folders you don't want to process (specify as a bean name: #beanName)
idempotent false skip already process files following the Idempotent Consumer pattern
include null regex for file patterns to include
move camel default place to move files after they've been processed
noop false if true, the file is not moved or deleted after it has been processed
readLock markerFile camel will only process files that it can get a read-lock for. specify a strategy for how to determine whether it has read-lock access
readLockTimeout 10000ms timeout in milliseconds for how long to wait to aquire a read-lock
recursive false recurse through sub directories as well

Things to Watch Out For

  • Locking of files that are being processed (on reads) until route is complete (by default)
  • Will ignore files that start with '.'
  • By default will move processed files to '.camel' directory
  • Moving/Deleting files will happen after routing

Write Files

from(<endpoint>).to("file:/location/of/files?fileName=<filename>")

Configuration Options for Writing Files

Option Description
fileExist What to do if a file exists: Override, Append, Ignore, Fail
fileName Name of file to be written. This can be a dynamic value determined by an expression (e.g., using the simple expression language. See http://camel.apache.org/file-language.html for more)
tempFileName A temporary name given to the file as it's being written. Will change to real name when writing is finished

Big Files

Sometime you need to process files that are too large to fit into memory, or would be too large to process efficiently if loaded into memory. A common approach to dealing with such files using the camel-file component is to stream them and process them in chunks. Generally the files are structured in such a way that it makes sense to process them in chunks, and we can leverage the power of camel's EIP processors to "split" a file into those chunks. Here's an example route:

from("file:/location/of/files").split(body().tokenize("\n")).streaming().log("${body}").end()

This route splits on newlines in the file, streaming and processing one line at a time.

For more information on splitting large files, see http://www.davsclaus.com/2011/11/splitting-big-xml-files-with-apache.html

Headers

Headers are automatically inserted (consumer) for use in your routes or can be set in your route and used by the file component (producer). Here are some of the headers that are set automatically by the file consumer endpoint:

  • CameFileName - name of file consumed as a relative location with the offset from the directory configured in the endpoint
  • CamelFileAbsolute - a boolean that specifies whether the path is absolute or not
  • CamelFileAbsolutePath - would be the absolute path if the CamelFileAbsolute header is true
  • CamelFilePath - starting directory + relative filename
  • CamelFileRelativePath - just the relative path
  • CamelFileParent - just the parent path
  • CamelFileLength - length of the file
  • CamelFileLastModified - date of the last modified timestamp
  • CamelFileName - name of the file to write to the output directory (usually an expression)
  • CamelFileNameProduced - the actual filename produced (absolute file path)

Further Reading

For more information on the file component, see http://camel.apache.org/file2.html

SEDA and Direct Component

You can still use messaging as a means of communicating with or between your routes without having to use an external messaging broker. Camel allows you to do in-memory messaging via synchronous or asynchronous queues. Use the "direct" component for synchronous processing and use the "seda" for asynchronous "staged" event-driven processing.

Direct

Use the direct component to break up routes using synchronous messaging:

from("direct:channelName").process(<process
something>).to("direct:outgoingChannel"); from("direct:outgoingChannel").transform(<transform
something>).to("jms:outgoingOrders")

SEDA

SEDA endpoints and their associated routes will be run in separate threads and process exchanges asynchronously. Although the pattern of usage is similar to the "direct" component, the semantics are quite different.


from("<any component>").choice().when(header("accountType").endsWith("Consumer")).to("seda:consumerAccounts")

.when(header("accountType").endsWith("Business")).to("seda:businessAccounts")

from("seda:consumerAccounts").process(<process logic>)

from("seda:businessAccounts").process(<process logic>)

You can set up the SEDA endpoint to use multiple threads to do its processing by adding the concurrentConsumers configuration, eg:

from("seda:consumerAccounts?concurrentConsumers=20").process(<process logic>)

Keep in mind that using SEDA (or any asynchronous endpoint) will behave differently in a transaction, i.e., the consumers of the SEDA queue will not participate in the transaction as they are in different threads.

Common configuration options

The "direct" component does not have any configuration options.

SEDA commonly used options:

Option Default Description
size Unbounded Max capacity of the in-memory queue
concurrentConsumers 1 The number of concurrent threads that can process exchanges
multipleConsumers false Determine whether multiple consumers are allowed
blockWhenFull false Block a thread that tries to write to a full SEDA queue instead of throwing an Exception

Further Reading

For more information on the direct and SEDA component, see
http://camel.apache.org/direct.html and http://camel.apache.org/seda.html respectively

Mock Component

Testing your routes is an important aspect to integration development and camel makes it easier with the mock component. You can write your Camel routes in JUnit or TestNG to use mock components. Then you can declare a set of expectations on the mock such as how many messages were processed, or that certain headers were present at an endpoint, and then run your route. At the completion of the route, you can verify that the intended expectations were met and fail the test if they were not.

Mocks

You start by obtaining the mock endpoint from the route:

MockEndpoint mock = context.getEndpoint("mock:outgoing", 
MockEndpoint.class);

Next, you declare expectations. Methods that declare expectations start with "expect", for example:

mock.expectedMessageCount( 1 )

Then you run your route.

Finally, you assert that the declared expectations were met:

mock.assertIsSatisfied()

Here are a few of the methods you can use to set up expectations and then also verifying your mock endpoints (see http://camel.apache.org/mock.html for more)

Expectation Methods:

  • expectedMessageCount(int)
  • expectedMinimumMessageCount(int)
  • expectedBodiesReceived()
  • expectedHeadersReceived()
  • expectsNoDuplicate(Expression)

AssertionMethods:

  • assertIsSatisfied()
  • assertIsNotSatisfied()

Mocking existing endpoints

Sometimes you'll have routes with live endpoints that you cannot change (or don't want to change) for test. For such routes, you can insert mocks where the live mocks are to do some testing.

RouteDefinition route = context.getRouteDefinition("advice");

route.adviceWith(context, new AdviceWithRouteBuilder() {
    @Override
    public void configure() throws Exception {
        mockEndpoints();
    }
});

<pattern> allows you to specify which endpoints to mock. For example, to mock out only the JMS components, you would do mockEndpoints("jms*"). To mock all endpoints, leave off the pattern completely.

NOTE: inserting mocks at the location of the endpoints does not replace the endpoints, i.e., the live endpoints will still exist. Updated for Camel 2.10, you can now skip sending the exchange to the live endpoint:

RouteDefinition route = context.getRouteDefinition("advice");

route.adviceWith(context, new AdviceWithRouteBuilder() {
    @Override
    public void configure() throws Exception {
        mockEndpointsAndSkip();
    }
});

Further Reading

For more information on the mock component, see http://camel.apache.org/mock.html

About The Author

Author

Christian Posta

Based in Phoenix, AZ, I'm a Senior Consultant and Architect at Red Hat and I specialize in messaging-based enterprise integrations. I'm passionate about software development, love solving tough technical problems, and enjoy learning new languages and programing paradigms. Favorite languages include Python and Scala, but spend a lot of time writing Java. I am a committer on Apache ActiveMQ and Apache Apollo projects and frequently blog at http://www.christianposta.com/blog as well as tweet about interesting technology @christianposta.

Recommended Book

Recommended Book

Camel in Action is a Camel tutorial full of small examples showing how to work with the integration patterns. It starts with core concepts like sending, receiving, routing, and transforming data. It then shows you the entire lifecycle and goes in depth on how to test, deal with errors, scale, deploy, and even monitor your app— details you can find only in the Camel code itself.



Share this Refcard with
your friends & followers...

DZone greatly appreciates your support.


Your download should begin immediately.
If it doesn't, click here.