Wire Tap

Wire Tap from the EIP patterns allows you to route messages to a separate location while they are being forwarded to the ultimate destination.

image

Options

The Wire Tap eip supports 12 options, which are listed below.

Name Description Default Type

processorRef

Reference to a Processor to use for creating a new body as the message to use for wire tapping.

String

body

Uses the expression for creating a new body as the message to use for wire tapping.

ExpressionSubElementDefinition

executorServiceRef

Uses a custom thread pool.

String

copy

Uses a copy of the original exchange.

true

Boolean

dynamicUri

Whether the uri is dynamic or static. If the uri is dynamic then the simple language is used to evaluate a dynamic uri to use as the wire-tap destination, for each incoming message. This works similar to how the toD EIP pattern works. If static then the uri is used as-is as the wire-tap destination.

true

Boolean

onPrepareRef

Uses the Processor when preparing the org.apache.camel.Exchange to be send. This can be used to deep-clone messages that should be send, or any custom logic needed before the exchange is send.

String

uri

Required The uri of the endpoint to send to. The uri can be dynamic computed using the org.apache.camel.language.simple.SimpleLanguage expression.

String

pattern

Sets the optional ExchangePattern used to invoke this endpoint.

Enum values:

  • InOnly

  • InOut

  • InOptionalOut

ExchangePattern

cacheSize

Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this recipient list, when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints are stored in memory in the caches. However if there are a high degree of dynamic endpoints that have been used before, then it can benefit to use the cache to reuse both producers and endpoints and therefore the cache size can be set accordingly or rely on the default size (1000). If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size can help reduce memory usage to avoid storing too many non frequent used producers.

Integer

ignoreInvalidEndpoint

Ignore the invalidate endpoint exception when try to create a producer with that endpoint.

Boolean

allowOptimisedComponents

Whether to allow components to optimise toD if they are org.apache.camel.spi.SendDynamicAware .

true

Boolean

autoStartComponents

Whether to auto startup components when toD is starting up.

true

Boolean

description

Sets the description of this node.

DescriptionDefinition

WireTap Nodes

Camel’s Wire Tap node supports two modes when wire tapping an Exchange:

  • Default mode: Camel will copy the original Exchange and set its Exchange Pattern to InOnly, as we want the tapped Exchange to be sent in a fire and forget style. The tapped Exchange is then sent in a separate thread, so it can run in parallel with the original. Beware that only the Exchange is copied - Wire Tap won’t do a deep clone (unless you specify a custom processor via onPrepareRef which does that). So all copies could share objects from the original Exchange.

  • New mode: Camel also provides an option of sending a new Exchange allowing you to populate it with new values.

Using Wire Tap

In the example below the exchange is wire tapped to the direct:tap route. This route delays the message 1 second before continuing. This is because it allows you to see that the tapped message is routed independently from the original route, so that you would see log:result happens before log:tap

from("direct:start")
    .to("log:foo")
    .wireTap("direct:tap")
    .to("log:result");

from("direct:tap")
    .delay(1000).setBody().constant("Tapped")
    .to("log:tap");

And in XML:

<routes>

  <route>
    <from uri="direct:start"/>
    <wireTap uri="direct:tap"/>
    <to uri="log:result"/>
  </route>

  <route>
    <from uri="direct:tap"/>
    <to uri="log:log"/>
  </route>

</routes>

Wire tapping with dynamic URIs

For example to wire tap to a dynamic URI, then the URI uses the Simple language that allows to construct dynamic URIs.

For example to wire tap to a JMS queue where the header ID is part of the queue name:

from("direct:start")
    .wireTap("jms:queue:backup-${header.id}")
    .to("bean:doSomething");

And in XML:

<route>
  <from uri="direct:start"/>
  <wireTap uri="jms:queue:backup-${header.id}"/>
  <to uri="bean:doSomething"/>
</route>

Wire tapping a new message

In the new mode then the wire tap will create a new Exchange that gets populated with new data, to be sent out from the wire tap. The idea is to allow you to construct the tapped message with information from the original message.

The construction of the new message is done via a Processor where you populate the Exchange

from("direct:start")
  .wireTap("direct:tap").newExchange(exchange -> {
        String oldBody = exchange.getMessage().getBody(String.class);
        String newBody = "We are listening: " + oldBody;
        exchange.getMessage().setBody(newBody);
        exchange.getMessage().setHeader("tapId", "123");
    })
    .to("log:result");

from("direct:tap")
  .to("log:tap");

And in XML we are using <body> and <setHeader> (This is also possible in Java DSL) to construct the message, instead of using a Processor:

<routes>

  <route>
    <from uri="direct:start"/>
    <wireTap uri="direct:tap">
      <body><simple>We are listening: ${body}</simple></body>
      <setHeader name="tapId"><constant>123</constant></setHeader>
    </wireTap>
    <to uri="log:result"/>
  </route>

  <route>
    <from uri="direct:tap"/>
    <to uri="log:log"/>
  </route>

</routes>

WireTap Thread Pools

The WireTap uses a thread pool to process the tapped messages. This thread pool will by default use the settings detailed at Threading Model.

In particular, when the pool is exhausted (with all threads utilized), further wiretaps will be executed synchronously by the calling thread. To remedy this, you can configure an explicit thread pool on the Wire Tap having either a different rejection policy, a larger worker queue, or more worker threads.

Wire tapping Streaming based messages

If you Wire Tap a stream message body then you should consider enabling Stream caching to ensure the message body can be read at each endpoint.

See more details at Stream caching.