Azure CosmosDB
Since Camel 3.10
Both producer and consumer are supported
Azure Cosmos DB is Microsoft’s globally distributed, multi-model database service for operational and analytics workloads. It offers multi-mastering feature by automatically scaling throughput, compute, and storage. This component interacts with Azure CosmosDB through Azure SQL API.
Prerequisites
You must have a valid Windows Azure Storage account. More information is available at Azure Documentation Portal.
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-azure-cosmosdb</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
URI Format
azure-cosmosdb://[databaseName][/containerName][?options]
In case of the consumer, databaseName
, containerName
are required, In case of the producer, it depends on the operation that being
requested, for example if operation is on a database level, e.b: deleteDatabase, only databaseName
is required, but in case
of operation being requested in container level, e.g: readItem, then databaseName
and containerName
are required.
You can append query options to the URI in the following format, ?options=value&option2=value&
…
For example in order to consume records from a specific container in a specific database to a file, use the following snippet:
from("azure-cosmosdb://camelDb/myContainer?accountKey=MyaccountKey&databaseEndpoint=https//myazure.com:443&leaseDatabaseName=myLeaseDB&createLeaseDatabaseIfNotExists=true&createLeaseContainerIfNotExists=true").
to("file://directory");
URI Options
The Azure CosmosDB endpoint is configured using URI syntax:
azure-cosmosdb:databaseName/containerName
with the following path and query parameters:
Path Parameters (2 parameters):
Name | Description | Default | Type |
---|---|---|---|
databaseName |
The name of the Cosmos database that component should connect to. In case you are producing data and have createDatabaseIfNotExists=true, the component will automatically auto create a Cosmos database. |
String |
|
containerName |
The name of the Cosmos container that component should connect to. In case you are producing data and have createContainerIfNotExists=true, the component will automatically auto create a Cosmos container. |
String |
Query Parameters (29 parameters):
Name | Description | Default | Type |
---|---|---|---|
clientTelemetryEnabled (common) |
Sets the flag to enable client telemetry which will periodically collect database operations aggregation statistics, system information like cpu/memory and send it to cosmos monitoring service, which will be helpful during debugging. DEFAULT value is false indicating this is opt in feature, by default no telemetry collection. |
false |
boolean |
connectionSharingAcrossClientsEnabled (common) |
Enables connections sharing across multiple Cosmos Clients. The default is false. When you have multiple instances of Cosmos Client in the same JVM interacting to multiple Cosmos accounts, enabling this allows connection sharing in Direct mode if possible between instances of Cosmos Client. Please note, when setting this option, the connection configuration (e.g., socket timeout config, idle timeout config) of the first instantiated client will be used for all other client instances. |
false |
boolean |
consistencyLevel (common) |
Sets the consistency levels supported for Azure Cosmos DB client operations in the Azure Cosmos DB service. The requested ConsistencyLevel must match or be weaker than that provisioned for the database account. Consistency levels by order of strength are STRONG, BOUNDED_STALENESS, SESSION and EVENTUAL. Refer to consistency level documentation for additional details: https://docs.microsoft.com/en-us/azure/cosmos-db/consistency-levels. There are 5 enums and the value can be one of: Strong, BoundedStaleness, Session, Eventual, ConsistentPrefix |
SESSION |
ConsistencyLevel |
containerPartitionKeyPath (common) |
Sets the container partition key path. |
String |
|
contentResponseOnWriteEnabled (common) |
Sets the boolean to only return the headers and status code in Cosmos DB response in case of Create, Update and Delete operations on CosmosItem. In Consumer, it is enabled by default because of the ChangeFeed in the consumer that needs this flag to be enabled and thus is shouldn’t be overridden. In Producer, it advised to disable it since it reduces the network overhead |
true |
boolean |
cosmosAsyncClient (common) |
Autowired Inject an external CosmosAsyncClient into the component which provides a client-side logical representation of the Azure Cosmos DB service. This asynchronous client is used to configure and execute requests against the service. |
CosmosAsyncClient |
|
createContainerIfNotExists (common) |
Sets if the component should create Cosmos container automatically in case it doesn’t exist in Cosmos database |
false |
boolean |
createDatabaseIfNotExists (common) |
Sets if the component should create Cosmos database automatically in case it doesn’t exist in Cosmos account |
false |
boolean |
databaseEndpoint (common) |
Required Sets the Azure Cosmos database endpoint the component will connect to. |
String |
|
multipleWriteRegionsEnabled (common) |
Sets the flag to enable writes on any regions for geo-replicated database accounts in the Azure Cosmos DB service. When the value of this property is true, the SDK will direct write operations to available writable regions of geo-replicated database account. Writable regions are ordered by PreferredRegions property. Setting the property value to true has no effect until EnableMultipleWriteRegions in DatabaseAccount is also set to true. DEFAULT value is true indicating that writes are directed to available writable regions of geo-replicated database account. |
true |
boolean |
preferredRegions (common) |
Sets the preferred regions for geo-replicated database accounts. For example, East US as the preferred region. When EnableEndpointDiscovery is true and PreferredRegions is non-empty, the SDK will prefer to use the regions in the container in the order they are specified to perform operations. |
List |
|
readRequestsFallbackEnabled (common) |
Sets whether to allow for reads to go to multiple regions configured on an account of Azure Cosmos DB service. DEFAULT value is true. If this property is not set, the default is true for all Consistency Levels other than Bounded Staleness, The default is false for Bounded Staleness. 1. endpointDiscoveryEnabled is true 2. the Azure Cosmos DB account has more than one region |
true |
boolean |
throughputProperties (common) |
Sets throughput of the resources in the Azure Cosmos DB service. |
ThroughputProperties |
|
bridgeErrorHandler (consumer) |
Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. |
false |
boolean |
changeFeedProcessorOptions (consumer) |
Sets the ChangeFeedProcessorOptions to be used. Unless specifically set the default values that will be used are: maximum items per page or FeedResponse: 100 lease renew interval: 17 seconds lease acquire interval: 13 seconds lease expiration interval: 60 seconds feed poll delay: 5 seconds maximum scale count: unlimited |
ChangeFeedProcessorOptions |
|
createLeaseContainerIfNotExists (consumer) |
Sets if the component should create Cosmos lease container for the consumer automatically in case it doesn’t exist in Cosmos database |
false |
boolean |
createLeaseDatabaseIfNotExists (consumer) |
Sets if the component should create Cosmos lease database for the consumer automatically in case it doesn’t exist in Cosmos account |
false |
boolean |
hostName (consumer) |
Sets the hostname. The host: a host is an application instance that uses the change feed processor to listen for changes. Multiple instances with the same lease configuration can run in parallel, but each instance should have a different instance name. If not specified, this will be a generated random hostname. |
String |
|
leaseContainerName (consumer) |
Sets the lease container which acts as a state storage and coordinates processing the change feed across multiple workers. The lease container can be stored in the same account as the monitored container or in a separate account. It will be auto created if createLeaseContainerIfNotExists is set to true. |
camel-lease |
String |
leaseDatabaseName (consumer) |
Sets the lease database where the leaseContainerName will be stored. If it is not specified, this component will store the lease container in the same database that is specified in databaseName. It will be auto created if createLeaseDatabaseIfNotExists is set to true. |
String |
|
exceptionHandler (consumer) |
To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |
ExceptionHandler |
|
exchangePattern (consumer) |
Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut |
ExchangePattern |
|
itemId (producer) |
Sets the itemId in case needed for operation on item like delete, replace |
String |
|
itemPartitionKey (producer) |
Sets partition key. Represents a partition key value in the Azure Cosmos DB database service. A partition key identifies the partition where the item is stored in. |
PartitionKey |
|
lazyStartProducer (producer) |
Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing. |
false |
boolean |
operation (producer) |
The CosmosDB operation that can be used with this component on the producer. There are 17 enums and the value can be one of: listDatabases, createDatabase, queryDatabases, deleteDatabase, createContainer, replaceDatabaseThroughput, listContainers, queryContainers, deleteContainer, replaceContainerThroughput, createItem, upsertItem, deleteItem, replaceItem, readItem, readAllItems, queryItems |
listDatabases |
CosmosDbOperationsDefinition |
query (producer) |
An SQL query to execute on a given resources. To learn more about Cosmos SQL API, check this link {link https://docs.microsoft.com/en-us/azure/cosmos-db/sql-query-getting-started} |
String |
|
queryRequestOptions (producer) |
Set additional QueryRequestOptions that can be used with queryItems, queryContainers, queryDatabases, listDatabases, listItems, listContainers operations |
CosmosQueryRequestOptions |
|
accountKey (security) |
Required Sets either a master or readonly key used to perform authentication for accessing resource. |
String |
Component Options
The Azure CosmosDB component supports 29 options, which are listed below.
Name | Description | Default | Type |
---|---|---|---|
clientTelemetryEnabled (common) |
Sets the flag to enable client telemetry which will periodically collect database operations aggregation statistics, system information like cpu/memory and send it to cosmos monitoring service, which will be helpful during debugging. DEFAULT value is false indicating this is opt in feature, by default no telemetry collection. |
false |
boolean |
configuration (common) |
The component configurations |
CosmosDbConfiguration |
|
connectionSharingAcrossClientsEnabled (common) |
Enables connections sharing across multiple Cosmos Clients. The default is false. When you have multiple instances of Cosmos Client in the same JVM interacting to multiple Cosmos accounts, enabling this allows connection sharing in Direct mode if possible between instances of Cosmos Client. Please note, when setting this option, the connection configuration (e.g., socket timeout config, idle timeout config) of the first instantiated client will be used for all other client instances. |
false |
boolean |
consistencyLevel (common) |
Sets the consistency levels supported for Azure Cosmos DB client operations in the Azure Cosmos DB service. The requested ConsistencyLevel must match or be weaker than that provisioned for the database account. Consistency levels by order of strength are STRONG, BOUNDED_STALENESS, SESSION and EVENTUAL. Refer to consistency level documentation for additional details: https://docs.microsoft.com/en-us/azure/cosmos-db/consistency-levels. There are 5 enums and the value can be one of: Strong, BoundedStaleness, Session, Eventual, ConsistentPrefix |
SESSION |
ConsistencyLevel |
containerPartitionKeyPath (common) |
Sets the container partition key path. |
String |
|
contentResponseOnWriteEnabled (common) |
Sets the boolean to only return the headers and status code in Cosmos DB response in case of Create, Update and Delete operations on CosmosItem. In Consumer, it is enabled by default because of the ChangeFeed in the consumer that needs this flag to be enabled and thus is shouldn’t be overridden. In Producer, it advised to disable it since it reduces the network overhead |
true |
boolean |
cosmosAsyncClient (common) |
Autowired Inject an external CosmosAsyncClient into the component which provides a client-side logical representation of the Azure Cosmos DB service. This asynchronous client is used to configure and execute requests against the service. |
CosmosAsyncClient |
|
createContainerIfNotExists (common) |
Sets if the component should create Cosmos container automatically in case it doesn’t exist in Cosmos database |
false |
boolean |
createDatabaseIfNotExists (common) |
Sets if the component should create Cosmos database automatically in case it doesn’t exist in Cosmos account |
false |
boolean |
databaseEndpoint (common) |
Required Sets the Azure Cosmos database endpoint the component will connect to. |
String |
|
multipleWriteRegionsEnabled (common) |
Sets the flag to enable writes on any regions for geo-replicated database accounts in the Azure Cosmos DB service. When the value of this property is true, the SDK will direct write operations to available writable regions of geo-replicated database account. Writable regions are ordered by PreferredRegions property. Setting the property value to true has no effect until EnableMultipleWriteRegions in DatabaseAccount is also set to true. DEFAULT value is true indicating that writes are directed to available writable regions of geo-replicated database account. |
true |
boolean |
preferredRegions (common) |
Sets the preferred regions for geo-replicated database accounts. For example, East US as the preferred region. When EnableEndpointDiscovery is true and PreferredRegions is non-empty, the SDK will prefer to use the regions in the container in the order they are specified to perform operations. |
List |
|
readRequestsFallbackEnabled (common) |
Sets whether to allow for reads to go to multiple regions configured on an account of Azure Cosmos DB service. DEFAULT value is true. If this property is not set, the default is true for all Consistency Levels other than Bounded Staleness, The default is false for Bounded Staleness. 1. endpointDiscoveryEnabled is true 2. the Azure Cosmos DB account has more than one region |
true |
boolean |
throughputProperties (common) |
Sets throughput of the resources in the Azure Cosmos DB service. |
ThroughputProperties |
|
bridgeErrorHandler (consumer) |
Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. |
false |
boolean |
changeFeedProcessorOptions (consumer) |
Sets the ChangeFeedProcessorOptions to be used. Unless specifically set the default values that will be used are: maximum items per page or FeedResponse: 100 lease renew interval: 17 seconds lease acquire interval: 13 seconds lease expiration interval: 60 seconds feed poll delay: 5 seconds maximum scale count: unlimited |
ChangeFeedProcessorOptions |
|
createLeaseContainerIfNotExists (consumer) |
Sets if the component should create Cosmos lease container for the consumer automatically in case it doesn’t exist in Cosmos database |
false |
boolean |
createLeaseDatabaseIfNotExists (consumer) |
Sets if the component should create Cosmos lease database for the consumer automatically in case it doesn’t exist in Cosmos account |
false |
boolean |
hostName (consumer) |
Sets the hostname. The host: a host is an application instance that uses the change feed processor to listen for changes. Multiple instances with the same lease configuration can run in parallel, but each instance should have a different instance name. If not specified, this will be a generated random hostname. |
String |
|
leaseContainerName (consumer) |
Sets the lease container which acts as a state storage and coordinates processing the change feed across multiple workers. The lease container can be stored in the same account as the monitored container or in a separate account. It will be auto created if createLeaseContainerIfNotExists is set to true. |
camel-lease |
String |
leaseDatabaseName (consumer) |
Sets the lease database where the leaseContainerName will be stored. If it is not specified, this component will store the lease container in the same database that is specified in databaseName. It will be auto created if createLeaseDatabaseIfNotExists is set to true. |
String |
|
itemId (producer) |
Sets the itemId in case needed for operation on item like delete, replace |
String |
|
itemPartitionKey (producer) |
Sets partition key. Represents a partition key value in the Azure Cosmos DB database service. A partition key identifies the partition where the item is stored in. |
PartitionKey |
|
lazyStartProducer (producer) |
Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing. |
false |
boolean |
operation (producer) |
The CosmosDB operation that can be used with this component on the producer. There are 17 enums and the value can be one of: listDatabases, createDatabase, queryDatabases, deleteDatabase, createContainer, replaceDatabaseThroughput, listContainers, queryContainers, deleteContainer, replaceContainerThroughput, createItem, upsertItem, deleteItem, replaceItem, readItem, readAllItems, queryItems |
listDatabases |
CosmosDbOperationsDefinition |
query (producer) |
An SQL query to execute on a given resources. To learn more about Cosmos SQL API, check this link {link https://docs.microsoft.com/en-us/azure/cosmos-db/sql-query-getting-started} |
String |
|
queryRequestOptions (producer) |
Set additional QueryRequestOptions that can be used with queryItems, queryContainers, queryDatabases, listDatabases, listItems, listContainers operations |
CosmosQueryRequestOptions |
|
autowiredEnabled (advanced) |
Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. |
true |
boolean |
accountKey (security) |
Required Sets either a master or readonly key used to perform authentication for accessing resource. |
String |
Authentication Information
To use this component, you have 2 options in order to provide the required Azure authentication information:
-
Provide
accountKey
anddatabaseEndpoint
for your Azure CosmosDB account. The account key can be generated through your CosmosDB Azure portal. -
Provide a CosmosAsyncClient instance which can be provided into
cosmosAsyncClient
.
Async Consumer and Producer
This component implements the async Consumer and producer.
This allows camel route to consume and produce events asynchronously without blocking any threads.
Usage
Message headers evaluated by the component producer
Header | Variable Name | Type | Description |
---|---|---|---|
|
|
|
Overrides the database name which is the name of the Cosmos database that component should connect to. In case you are producing data and have createDatabaseIfNotExists=true, the component will automatically auto create a Cosmos database. |
|
|
|
Overrides the container name which is the name of the Cosmos container that component should connect to. In case you are producing data and have createContainerIfNotExists=true, the component will automatically auto create a Cosmos container. |
|
|
|
Sets the producer operation which can be used to execute a specific operation on the producer. |
|
|
|
Sets the SQL query to execute on a given producer query operations. |
|
|
|
Set additional QueryRequestOptions that can be used with queryItems, queryContainers, queryDatabases, listDatabases, listItems, listContainers operations. |
|
|
|
Sets if the component should create Cosmos database automatically in case it doesn’t exist in Cosmos account. |
|
|
|
Sets if the component should create Cosmos container automatically in case it doesn’t exist in Cosmos account. |
|
|
|
Sets throughput of the resources in the Azure Cosmos DB service. |
|
|
|
Sets additional options to execute on database operations. |
|
|
|
Sets the container partition key path. |
|
|
|
Sets additional options to execute on container operations. |
|
|
|
Sets partition key. Represents a partition key value in the Azure Cosmos DB database service. A partition key identifies the partition where the item is stored in. |
|
|
|
Sets additional options to execute on item operations. |
|
|
|
Sets the itemId in case needed for operation on item like delete, replace. |
Message headers set by the component producer
Header | Variable Name | Type | Description |
---|---|---|---|
|
|
|
The resource ID of the requested resource. |
|
|
|
The Etag ID of the requested resource. |
|
|
|
The timestamp of the requested resource. |
|
|
|
The response headers of the requested resource. |
|
|
|
The status code of the requested resource. |
|
|
|
The TTL of the requested resource. |
|
|
|
The manual throughput of the requested resource. |
|
|
|
The AutoscaleMaxThroughput of the requested resource. |
Azure CosmosDB Producer operations
Camel Azure CosmosDB component provides wide range of operations on the producer side:
Operations on the service level
For these operations, databaseName
is required except for queryDatabases
and listDatabases
operations.
Operation | Description |
---|---|
|
Gets a list of all databases as |
|
Creates a database in the specified Azure CosmosDB account. |
|
|
Operations on the database level
For these operations, databaseName
is required for all operations here and containerName
only for createContainer
and queryContainers
.
Operation | Description |
---|---|
|
Deletes a database from the Azure CosmosDB account. |
|
Creates a container in the specified Azure CosmosDB database. |
|
Replaces the throughput for the specified Azure CosmosDB database. |
|
Gets a list of all containers in the specified database as |
|
|
Operations on the container level
For these operations, databaseName
and containerName
is required for all operations here.
Operation | Description |
---|---|
|
Deletes a container from the specified Azure CosmosDB database. |
|
Replaces the throughput for the specified Azure CosmosDB container. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Refer to the example section in this page to learn how to use these operations into your camel application.
Examples
-
listDatabases
:
from("direct:start")
.to("azure-cosmosdb://?operation=listDatabases")
.to("mock:result");
-
createDatabase
:
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "myDb");
})
.to("azure-cosmosdb://?operation=createDatabase")
.to("mock:result");
-
deleteDatabase
:
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "myDb");
})
.to("azure-cosmosdb://?operation=deleteDatabase")
.to("mock:result");
-
createContainer
:
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "databaseName");
exchange.getIn().setHeader(CosmosDbConstants.CONTAINER_NAME, "containerName");
exchange.getIn().setHeader(CosmosDbConstants.CONTAINER_PARTITION_KEY_PATH, "path");
exchange.getIn().setHeader(CosmosDbConstants.CREATE_DATABASE_IF_NOT_EXIST, true);
})
.to("azure-cosmosdb://?operation=createContainer")
.to("mock:result");
-
deleteContainer
:
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "databaseName");
exchange.getIn().setHeader(CosmosDbConstants.CONTAINER_NAME, "containerName");
})
.to("azure-cosmosdb://?operation=deleteContainer")
.to("mock:result");
-
replaceDatabaseThroughput
:
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "databaseName");
exchange.getIn().setHeader(CosmosDbConstants.THROUGHPUT_PROPERTIES,
ThroughputProperties.createManualThroughput(700));
})
.to("azure-cosmosdb://?operation=replaceDatabaseThroughput")
.to("mock:result");
-
queryContainers
:
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "databaseName");
exchange.getIn().setHeader(CosmosDbConstants.QUERY, "SELECT * from c where c.id = 'myAwersomeContainer'");
})
.to("azure-cosmosdb://?operation=queryContainers")
.to("mock:result");
-
createItem
:
from("direct:start")
.process(exchange -> {
// create item to send
final Map<String, Object> item = new HashMap<>();
item1.put("id", "test-id-1");
item1.put("partition", "test-1");
item1.put("field1", "awesome!");
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "databaseName");
exchange.getIn().setHeader(CosmosDbConstants.CONTAINER_NAME, "containerName");
exchange.getIn().setHeader(CosmosDbConstants.CONTAINER_PARTITION_KEY_PATH, "partition");
exchange.getIn().setHeader(CosmosDbConstants.ITEM_PARTITION_KEY, "test-1");
exchange.getIn().setBody(item);
})
.to("azure-cosmosdb://?operation=createItem")
.to("mock:result");
-
replaceItem
:
from("direct:start")
.process(exchange -> {
// create item to send
final Map<String, Object> item = new HashMap<>();
item1.put("id", "test-id-1");
item1.put("partition", "test-1");
item1.put("field1", "awesome!");
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "databaseName");
exchange.getIn().setHeader(CosmosDbConstants.CONTAINER_NAME, "containerName");
exchange.getIn().setHeader(CosmosDbConstants.ITEM_PARTITION_KEY, "test-1");
exchange.getIn().setHeader(CosmosDbConstants.ITEM_ID, "test-id-1");
exchange.getIn().setBody(item);
})
.to("azure-cosmosdb://?operation=replaceItem")
.to("mock:result");
-
deleteItem
:
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "databaseName");
exchange.getIn().setHeader(CosmosDbConstants.CONTAINER_NAME, "containerName");
exchange.getIn().setHeader(CosmosDbConstants.ITEM_PARTITION_KEY, "test-1");
exchange.getIn().setHeader(CosmosDbConstants.ITEM_ID, "test-id-1");
exchange.getIn().setBody(item);
})
.to("azure-cosmosdb://?operation=deleteItem")
.to("mock:result");
-
queryItems
:
from("direct:start")
.process(exchange -> {
exchange.getIn().setHeader(CosmosDbConstants.DATABASE_NAME, "databaseName");
exchange.getIn().setHeader(CosmosDbConstants.CONTAINER_NAME, "containerName");
exchange.getIn().setHeader(CosmosDbConstants.QUERY, "SELECT c.id,c.field2,c.field1 from c where c.id = 'test-id-1'");
})
.to("azure-cosmosdb://?operation=queryItems")
.to("mock:result");
Azure CosmosDB Consumer
Camel Azure CosmosDB uses ChangeFeed pattern to capture a feed of events and feed them into the Camel in an Async manner, something similar to Change Data Capture (CDC) design pattern. However, it doesn’t capture deletes as these are removed from the feed as well.
To use the Camel Azure CosmosDB, containerName
and databaseName
are required. However, there are more options that need to be set in order to use this feature:
-
leaseDatabaseName
: Sets the lease database where theleaseContainerName
will be stored. If it is not specified, this component will store the lease container in the same database that is specified in databaseName. It will be auto created ifcreateLeaseDatabaseIfNotExists
is set to true. -
leaseContainerName
: Sets the lease container which acts as a state storage and coordinates processing the change feed across multiple workers. The lease container can be stored in the same account as the monitored container or in a separate account. It will be auto created ifcreateLeaseContainerIfNotExists
is set to true. If not specified, this component will create container calledcamel-lease
. -
hostName
: Sets the hostname. The host: a host is an application instance that uses the change feed processor to listen for changes. Multiple instances with the same lease configuration can run in parallel, but each instance should have a different instance name. If not specified, this will be a generated random hostname. -
changeFeedProcessorOptions
: Sets additional options for the change feed processor.
The consumer will set List<Map<String,>>
in exchange message body which reflect list of items in a single feed.
Development Notes (Important)
When developing on this component, you will need to obtain your Azure accessKey in order to run the integration tests. In addition to the mocked unit tests you will need to run the integration tests with every change you make or even client upgrade as the Azure client can break things even on minor versions upgrade. To run the integration tests, on this component directory, run the following maven command:
mvn clean install -Dendpoint={{dbaddress}} -DaccessKey={{accessKey}}
Whereby endpoint
is your Azure CosmosDB endpoint name and accessKey
is the access key being generated from Azure CosmosDB portal.
Spring Boot Auto-Configuration
When using azure-cosmosdb with Spring Boot make sure to use the following Maven dependency to have support for auto configuration:
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-azure-cosmosdb-starter</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
The component supports 30 options, which are listed below.
Name | Description | Default | Type |
---|---|---|---|
Sets either a master or readonly key used to perform authentication for accessing resource. |
String |
||
Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. |
true |
Boolean |
|
Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. |
false |
Boolean |
|
camel.component.azure-cosmosdb.change-feed-processor-options |
Sets the ChangeFeedProcessorOptions to be used. Unless specifically set the default values that will be used are: maximum items per page or FeedResponse: 100 lease renew interval: 17 seconds lease acquire interval: 13 seconds lease expiration interval: 60 seconds feed poll delay: 5 seconds maximum scale count: unlimited. The option is a com.azure.cosmos.models.ChangeFeedProcessorOptions type. |
ChangeFeedProcessorOptions |
|
Sets the flag to enable client telemetry which will periodically collect database operations aggregation statistics, system information like cpu/memory and send it to cosmos monitoring service, which will be helpful during debugging. DEFAULT value is false indicating this is opt in feature, by default no telemetry collection. |
false |
Boolean |
|
The component configurations. The option is a org.apache.camel.component.azure.cosmosdb.CosmosDbConfiguration type. |
CosmosDbConfiguration |
||
camel.component.azure-cosmosdb.connection-sharing-across-clients-enabled |
Enables connections sharing across multiple Cosmos Clients. The default is false. When you have multiple instances of Cosmos Client in the same JVM interacting to multiple Cosmos accounts, enabling this allows connection sharing in Direct mode if possible between instances of Cosmos Client. Please note, when setting this option, the connection configuration (e.g., socket timeout config, idle timeout config) of the first instantiated client will be used for all other client instances. |
false |
Boolean |
Sets the consistency levels supported for Azure Cosmos DB client operations in the Azure Cosmos DB service. The requested ConsistencyLevel must match or be weaker than that provisioned for the database account. Consistency levels by order of strength are STRONG, BOUNDED_STALENESS, SESSION and EVENTUAL. Refer to consistency level documentation for additional details: https://docs.microsoft.com/en-us/azure/cosmos-db/consistency-levels. |
ConsistencyLevel |
||
Sets the container partition key path. |
String |
||
camel.component.azure-cosmosdb.content-response-on-write-enabled |
Sets the boolean to only return the headers and status code in Cosmos DB response in case of Create, Update and Delete operations on CosmosItem. In Consumer, it is enabled by default because of the ChangeFeed in the consumer that needs this flag to be enabled and thus is shouldn’t be overridden. In Producer, it advised to disable it since it reduces the network overhead. |
true |
Boolean |
Inject an external CosmosAsyncClient into the component which provides a client-side logical representation of the Azure Cosmos DB service. This asynchronous client is used to configure and execute requests against the service. The option is a com.azure.cosmos.CosmosAsyncClient type. |
CosmosAsyncClient |
||
camel.component.azure-cosmosdb.create-container-if-not-exists |
Sets if the component should create Cosmos container automatically in case it doesn’t exist in Cosmos database. |
false |
Boolean |
camel.component.azure-cosmosdb.create-database-if-not-exists |
Sets if the component should create Cosmos database automatically in case it doesn’t exist in Cosmos account. |
false |
Boolean |
camel.component.azure-cosmosdb.create-lease-container-if-not-exists |
Sets if the component should create Cosmos lease container for the consumer automatically in case it doesn’t exist in Cosmos database. |
false |
Boolean |
camel.component.azure-cosmosdb.create-lease-database-if-not-exists |
Sets if the component should create Cosmos lease database for the consumer automatically in case it doesn’t exist in Cosmos account. |
false |
Boolean |
Sets the Azure Cosmos database endpoint the component will connect to. |
String |
||
Whether to enable auto configuration of the azure-cosmosdb component. This is enabled by default. |
Boolean |
||
Sets the hostname. The host: a host is an application instance that uses the change feed processor to listen for changes. Multiple instances with the same lease configuration can run in parallel, but each instance should have a different instance name. If not specified, this will be a generated random hostname. |
String |
||
Sets the itemId in case needed for operation on item like delete, replace. |
String |
||
Sets partition key. Represents a partition key value in the Azure Cosmos DB database service. A partition key identifies the partition where the item is stored in. The option is a com.azure.cosmos.models.PartitionKey type. |
PartitionKey |
||
Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing. |
false |
Boolean |
|
Sets the lease container which acts as a state storage and coordinates processing the change feed across multiple workers. The lease container can be stored in the same account as the monitored container or in a separate account. It will be auto created if createLeaseContainerIfNotExists is set to true. |
camel-lease |
String |
|
Sets the lease database where the leaseContainerName will be stored. If it is not specified, this component will store the lease container in the same database that is specified in databaseName. It will be auto created if createLeaseDatabaseIfNotExists is set to true. |
String |
||
camel.component.azure-cosmosdb.multiple-write-regions-enabled |
Sets the flag to enable writes on any regions for geo-replicated database accounts in the Azure Cosmos DB service. When the value of this property is true, the SDK will direct write operations to available writable regions of geo-replicated database account. Writable regions are ordered by PreferredRegions property. Setting the property value to true has no effect until EnableMultipleWriteRegions in DatabaseAccount is also set to true. DEFAULT value is true indicating that writes are directed to available writable regions of geo-replicated database account. |
true |
Boolean |
The CosmosDB operation that can be used with this component on the producer. |
CosmosDbOperationsDefinition |
||
Sets the preferred regions for geo-replicated database accounts. For example, East US as the preferred region. When EnableEndpointDiscovery is true and PreferredRegions is non-empty, the SDK will prefer to use the regions in the container in the order they are specified to perform operations. |
List |
||
An SQL query to execute on a given resources. To learn more about Cosmos SQL API, check this link \{link https://docs.microsoft.com/en-us/azure/cosmos-db/sql-query-getting-started}. |
String |
||
Set additional QueryRequestOptions that can be used with queryItems, queryContainers, queryDatabases, listDatabases, listItems, listContainers operations. The option is a com.azure.cosmos.models.CosmosQueryRequestOptions type. |
CosmosQueryRequestOptions |
||
camel.component.azure-cosmosdb.read-requests-fallback-enabled |
Sets whether to allow for reads to go to multiple regions configured on an account of Azure Cosmos DB service. DEFAULT value is true. If this property is not set, the default is true for all Consistency Levels other than Bounded Staleness, The default is false for Bounded Staleness. 1. endpointDiscoveryEnabled is true 2. the Azure Cosmos DB account has more than one region. |
true |
Boolean |
Sets throughput of the resources in the Azure Cosmos DB service. The option is a com.azure.cosmos.models.ThroughputProperties type. |
ThroughputProperties |