Resume Strategies
The resume strategies allow users to implement strategies that point the consumer part of the routes to the last point of consumption. This allows Camel to skip reading and processing data that has already been consumed.
The resume strategies can be used to allow quicker stop and resume operations when consuming large data sources. For instance, imagine a scenario where the file consumer is reading a large file. Without a resume strategy, stopping and starting Camel would cause the consumer in the File component to read all the bytes of the given file at the initial offset (offset 0). The resume strategy allow integrations can point the consumer to the exact offset to resume the operations.
Support for resume varies according to the component. Initially, the support is available for the following components:
The resume strategies comes in 3 parts:
-
A DSL method that marks the route as supporting resume operations and points to an instance of a strategy implementation.
-
A set of core infrastructure that allow integrations to implement different types of strategies
-
Basic strategies implementations that can be extended to implement the specific resume strategies required by the integrations
The DSL method
The route needs to use the resumable()
method followed by a resumableStrategy
to point to an instance of the resume strategy in use.
This instance can be bound in the Context registry as follows:
getCamelContext().getRegistry().bind("testResumeStrategy", new MyTestResumeStrategy(new MyAdapter()));
from("some:component")
.resumable("testResumeStrategy")
.process(this::process);
Or the instance can be constructed as follows:
from("some:component")
.resumable(new MyTestResumeStrategy(new MyAdapter()))
.process(this::process)
The Resume Adapter
The adapter class responsibility is to bind the component-specific part of the logic to the more generic handling of the resume strategy. The adapter is always component specific and some components may have more than one. Integrations with more complex resume processes, may implement their own adapters, although the builtin ones should be useful in most of the cases. Currently, the following adapters are available:
-
camel-atom:
org.apache.camel.component.feed.EntryFilter
-
camel-aws2-kinesis:
org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter
-
camel-cassandracql:
org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter
-
camel-couchbase:
org.apache.camel.component.couchbase.CouchbaseResumeAdapter
-
camel-couchdb:
org.apache.camel.component.couchdb.consumer.CouchDbResumeAdapter
-
camel-file:
org.apache.camel.component.file.consumer.adapters.FileSetResumeAdapter
for directories -
camel-file:
org.apache.camel.component.file.consumer.adapters.GenericFileResumeAdapter
for files -
camel-kafka:
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter
-
camel-rss:
org.apache.camel.component.feed.EntryFilter
-
generic:
org.apache.camel.processor.resume.DelegatingResumeAdapter
Note: in the future, these adapters will be resolved automatically by Camel.
The Resume API Interfaces
These are the core interfaces:
-
org.apache.camel.resume.ResumeStrategy
- the resume strategy service -
org.apache.camel.resume.ResumeAdapter
- an adapter that binds the generic parts of the resume strategy with the component -
org.apache.camel.resume.UpdatableConsumerResumeStrategy
- an extension to the resume strategy to allow updatable strategies -
org.apache.camel.resume.cache.ResumeCache
- the base interface for local cache for resumable information -
org.apache.camel.resume.cache.SingleEntryCache
- an interface for local cache for resumable information where there is a one-to-one relationship between cache a key and its entry (i.e: a file and its offset) -
org.apache.camel.resume.cache.MultiEntryCache
- an interface for local cache for resumable information where there is a one-to-many relationship between the cache a keys and its entries (i.e.: a path and its file entries)
These are the core classes supporting the strategies:
-
org.apache.camel.resume.Resumable
- an interface to allow users to work with abstract resumable entities (files, offsets, etc) -
org.apache.camel.resume.ResumableSet
- an interface for resumables with a 1-to-many relationship -
org.apache.camel.resume.Offset
- a generic offset without a concrete type (it may represent a long, a file name, etc)
These are the supporting classes:
-
org.apache.camel.support.Resumables
- resumables handling support -
org.apache.camel.support.Offsets
- offset handling support
Builtin Resume Strategies
Camel comes with a few builtin strategies that can be used to store, retrieve and update the offsets. The following strategies are available:
-
SingleNodeKafkaResumeStrategy
- a resume strategy from thecamel-kafka
component that uses Kafka as the store for the offsets and is suitable for single node integrations. -
MultiNodeKafkaResumeStrategy
- a resume strategy from thecamel-kafka
component that uses Kafka as the store for the offsets and is suitable for multi node integrations (i.e.: integrations running on clusters using the camel-master component.
Local Cache Support
A sample local cache implemented using Caffeine.
-
org.apache.camel.component.caffeine.resume.single.CaffeineCache
: for data with where 1 key can only point to 1 entry (1-to-1 relationship) -
org.apache.camel.component.caffeine.resume.multi.CaffeineCache
: for data with where 1 key can point to 1 or more entries (1-to-many relationship)
Known Limitations
When using the converters with the file component, beware of the differences in the behavior from Reader
and InputStream
:
For instance, the behavior of:
from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
.resumable("testResumeStrategy")
.convertBodyTo(Reader.class)
.process(this::process);
Is different from the behavior of:
from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
.resumable("testResumeStrategy")
.convertBodyTo(InputStream.class)
.process(this::process);
Reason: the skip
method in the Reader will skip characters, whereas the same method on the InputStream will skip bytes.
Pausable Consumers API
The Pausable consumers API is a subset of the resume API that provides pause and resume features for supported components. With this API it is possible to implement logic that controls the behavior of the consumer based on conditions that are external to the component. For instance, it makes it possible to pause the consumer if an external system becomes unavailable.
Currently, support for pausable consumers is available for the following components:
To use the API, it needs an instance of a Consumer listener along with a predicate that tests whether to continue.
-
org.apache.camel.resume.ConsumerListener
- the consumer listener interface. Camel already comes with pre-built consumer listeners, but users in need of more complex behaviors can create their own listeners. -
a predicate that returns true if data consumption should resume or false if consumption should be put on pause
Usage example:
from(from)
.pausable(new KafkaConsumerListener(), o -> canContinue())
.process(exchange -> LOG.info("Received an exchange: {}", exchange.getMessage().getBody()))
.to(destination);
You can also integrate the pausable API and the consumer listener with the circuit breaker EIP. For instance, it’s possible to configure the circuit breaker so that it can manipulate the state of the listener based on success or on error conditions on the circuit.
One example, would be to create a event watcher that checks for a downstream system availability. It watches for error events and, when they happen, it triggers a scheduled check. On success, it shuts down the scheduled check.
An example implementation of this approach would be similar to this:
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("pausable");
circuitBreaker.getEventPublisher()
.onSuccess(event -> {
LOG.info("Downstream call succeeded");
if (executorService != null) {
executorService.shutdownNow();
executorService = null;
}
})
.onError(event -> {
LOG.info(
"Downstream call error. Starting a thread to simulate checking for the downstream availability");
if (executorService == null) {
executorService = Executors.newSingleThreadScheduledExecutor();
// In a real world scenario, instead of incrementing, it could be pinging a remote system or
// running a similar check to determine whether it's available. That
executorService.scheduleAtFixedRate(() -> someCheckMethod(), 1, 1, TimeUnit.SECONDS);
}
});
// Binds the configuration to the registry
getCamelContext().getRegistry().bind("pausableCircuit", circuitBreaker);
from(from)
.pausable(new KafkaConsumerListener(), o -> canContinue())
.routeId("pausable-it")
.process(exchange -> LOG.info("Got record from Kafka: {}", exchange.getMessage().getBody()))
.circuitBreaker()
.resilience4jConfiguration().circuitBreaker("pausableCircuit").end()
.to(to)
.end();