Poll Enrich
Camel supports the Content Enricher from the EIP patterns.
In Camel the Content Enricher can be done in several ways:
-
Using Enrich EIP or Poll Enrich EIP
-
Using a Message Translator
-
Using a Processor with the enrichment programmed in Java
-
Using a Bean EIP with the enrichment programmed in Java
The most natural Camel approach is using Enrich EIP, which comes as two kinds:
-
Enrich EIP - This is the most common content enricher that uses a
Producer
to obtain the data. It is usually used for Request Reply messaging, for instance to invoke an external web service. -
Poll Enrich EIP - Uses a Polling Consumer to obtain the additional data. It is usually used for Event Message messaging, for instance to read a file or download a FTP file.
This page documents the Poll Enrich EIP. |
Options
The Poll Enrich eip supports 7 options, which are listed below.
Name | Description | Default | Type |
---|---|---|---|
expression |
Required Expression that computes the endpoint uri to use as the resource endpoint to enrich from. |
ExpressionDefinition |
|
timeout |
Timeout in millis when polling from the external service. The timeout has influence about the poll enrich behavior. It basically operations in three different modes: negative value - Waits until a message is available and then returns it. Warning that this method could block indefinitely if no messages are available. 0 - Attempts to receive a message exchange immediately without waiting and returning null if a message exchange is not available yet. positive value - Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet available. Returns null if timed out The default value is -1 and therefore the method could block indefinitely, and therefore its recommended to use a timeout value. |
-1 |
String |
strategyRef |
Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. By default Camel will use the reply from the external service as outgoing message. The value can either refer to a bean to lookup, or to lookup a singleton bean by its type, or to create a new bean: Lookup bean - This is the default behavior to lookup an existing bean by the bean id (value) reference by type - Values can refer to singleton beans by their type in the registry by prefixing with #type: syntax, eg #type:com.foo.MyClassType reference new class - Values can refer to creating new beans by their class name by prefixing with #class, eg #class:com.foo.MyClassType. The class is created using a default no-arg constructor, however if you need to create the instance via a factory method then you specify the method as shown: #class:com.foo.MyClassType#myFactoryMethod. And if the factory method requires parameters they can be specified as follows: #class:com.foo.MyClassType#myFactoryMethod('Hello World', 5, true). Or if you need to create the instance via constructor parameters then you can specify the parameters as shown: #class:com.foo.MyClass('Hello World', 5, true). |
String |
|
strategyMethodName |
This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. |
String |
|
strategyMethodAllowNull |
If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy. |
false |
Boolean |
aggregateOnException |
If this option is false then the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what to do if there was an exception in the aggregate method. For example to suppress the exception or set a custom message body etc. |
false |
Boolean |
cacheSize |
Sets the maximum size used by the org.apache.camel.spi.ConsumerCache which is used to cache and reuse consumers when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints are stored in memory in the caches. However if there are a high degree of dynamic endpoints that have been used before, then it can benefit to use the cache to reuse both producers and endpoints and therefore the cache size can be set accordingly or rely on the default size (1000). If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size can help reduce memory usage to avoid storing too many non frequent used producers. |
Integer |
|
ignoreInvalidEndpoint |
Ignore the invalidate endpoint exception when try to create a producer with that endpoint. |
Integer |
|
description |
Sets the description of this node. |
DescriptionDefinition |
Content enrichment using Poll Enrich EIP
pollEnrich
uses a Polling Consumer to obtain the additional data.
It is usually used for Event Message messaging, for instance to read a file or download a FTP file.
The pollEnrich
works just the same as enrich
,
however as it uses a Polling Consumer we have three methods when polling:
-
receive
- Waits until a message is available and then returns it. Warning that this method could block indefinitely if no messages are available. -
receiveNoWait
- Attempts to receive a message exchange immediately without waiting and returningnull
if a message exchange is not available yet. -
receive(timeout)
- Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet available. Returns the message ornull
if the timeout expired.
Poll Enrich with timeout
It is good practice to use timeout value.
By default, Camel will use the receive
which may block until there is a message available.
It is therefore recommended to always provide a timeout value, to make this clear that we may wait for a message, until the timeout is hit.
You can pass in a timeout value that determines which method to use:
-
if timeout is
-1
or other negative number thenreceive
is selected (Important: thereceive
method may block if there is no message) -
if timeout is
0
thenreceiveNoWait
is selected -
otherwise,
receive(timeout)
is selected
The timeout values is in millis.
Using Poll Enrich
The content enricher (pollEnrich
) retrieves additional data from a resource endpoint in order to enrich an incoming message (contained in the original exchange).
An AggregationStrategy
is used to combine the original exchange and the resource exchange. The first parameter of the AggregationStrategy.aggregate(Exchange, Exchange)
method corresponds to the original exchange, the second parameter the resource exchange.
Here’s an example for implementing an AggregationStrategy
, which merges the two data together
as a String
with colon separator:
public class ExampleAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange original, Exchange resource) {
// this is just an example, for real-world use-cases the
// aggregation strategy would be specific to the use-case
if (newExchange == null) {
return oldExchange;
}
Object oldBody = oldExchange.getIn().getBody();
Object newBody = newExchange.getIn().getBody();
oldExchange.getIn().setBody(oldBody + ":" + newBody);
return oldExchange;
}
}
You then use the AggregationStrategy
with the pollEnrich
in the Java DSL as shown:
AggregationStrategy aggregationStrategy = ...
from("direct:start")
.pollEnrich("file:inbox?fileName=data.txt", 10000, aggregationStrategy)
.to("mock:result");
In the example Camel will poll a file (timeout 10 seconds).
The AggregationStrategy
is then used to merge the file with the existing Exchange
.
In XML DSL you use pollEnrich
as follows:
<bean id="myStrategy" class="com.foo.ExampleAggregationStrategy"/>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<pollEnrich timeout="10000" strategyRef="myStrategy">
<constant>"file:inbox?fileName=data.txt"</constant>
</pollEnrich>
<to uri="mock:result"/>
</route>
</camelContext>
Using Poll Enrich with Rest DSL
You can also use pollEnrich
with Rest DSL to
for example download a file from AWS S3 as the response of an API call.
<rest path="/report">
<description>Report REST API</description>
<get uri="/{id}/payload">
<route id="report-payload-download">
<pollEnrich>
<constant>aws-s3:xavier-dev?amazonS3Client=#s3client&deleteAfterRead=false&fileName=report-file.pdf</constant>
</pollEnrich>
</route>
</get>
</rest>
Notice that the enriched endpoint is a constant, however Camel also supports dynamic endpoints which is covered next.
Poll Enrich with Dynamic Endpoints
Both enrich
and pollEnrich
supports using dynamic uris computed based on information from the current Exchange
.
For example to pollEnrich
from an endpoint that uses a header to indicate a SEDA queue name:
from("direct:start")
.pollEnrich().simple("seda:${header.queueName}")
.to("direct:result");
And in XML DSL:
<route>
<from uri="direct:start"/>
<pollEnrich>
<simple>seda:${header.queueName}</simple>
</pollEnrich>
<to uri="direct:result"/>
</route>
See the cacheSize option for more details on how much cache to use depending on how many or few unique endpoints are used.
|
See More
See Enrich EIP