Kamelets Developer Guide
Introduction
This document guides you through the process of developing a new Kamelet that can be used by any Apache Camel subproject supporting the Kamelet technology stack and shared with others via Kamelet catalogs, such as the official the Apache Camel Kamelet Catalog.
We assume that the reader is familiar with the content of the Kamelets User Guide and with Camel K installation and general usage.
Basics
If you started to learn a bit about Kamelets, you’ve seen that they can be used to create two kinds of connectors:
-
Sources: they produce data that can be injected into a destination
-
Sinks: they consume data and optionally produce a response
When creating a new Kamelet, you should first decide first which kind of Kamelet you’re going to create, which depends on the use case you’ve in mind. A Kamelet does a single thing, so if you want to provide both a source and a sink for your system, they are two Kamelets.
In its essence, a Kamelet consists of a single YAML file that contains information on two distinct aspects of the connector:
-
User view: this part contains general documentation about the Kamelet, covering also the parameters that need to be configured in order to use it
-
Runtime aspects: this part tells the Camel runtime how to implement what the Kamelet promises to do. Most of the times it contains a Camel route template in YAML DSL
We’re ignoring here the part around data types of a Kamelet, which is not fundamental for the Kamelet to work and it is still subject to change |
We’ll guide you through the process of creating a simple Kamelet by remapping a Camel component, then we’ll go through a much more complicated real-world example.
Creating a simple Kamelet
Since Apache Camel provides more than 300 components out of the box, it’s easy to create a Kamelet starting from one of the components already available. Most of the Kamelets available in the official catalog, in fact, are simple ones that contain only a remapping of the Kamelet properties into Camel endpoint parameters. We’re going to show an example shortly.
Suppose that you want to provide a Kamelet that allows users to search data on Twitter, providing a stream of information about a given keyword. Creating such a Kamelet is a fairly easy task: we can use options of the "camel-twitter" component without adding much processing logic.
So the procedure of writing a simple Kamelet starts with scaffolding a new Kamelet resource, which can be done with the kamel
CLI:
kamel init twitter-search-source.kamelet.yaml
This produces a YAML file like the following one:
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: twitter-search-source
labels:
camel.apache.org/kamelet.type: "source"
spec:
definition:
title: "Timer"
description: "Produces periodic events with a custom payload"
required:
- message
properties:
period:
title: Period
description: The time interval between two events
type: integer
default: 1000
message:
title: Message
description: The message to generate
type: string
types:
out:
mediaType: text/plain
template:
from:
uri: timer:tick
parameters:
period: "{{period}}"
steps:
- set-body:
constant: "{{message}}"
- to: "kamelet:sink"
We need to change the file to do what we want to achieve, that is, creating a route that searches a given keyword on Twitter.
The route provided in the initial scaffold (timer-to-log) is not what we need, so we change it to the following:
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
# ...
spec:
# ...
template:
from:
uri: "twitter-search:{{keywords}}" (1)
parameters:
accessToken: "{{accessToken}}" (2)
accessTokenSecret: "{{accessTokenSecret}}"
consumerKey: "{{apiKey}}" (3)
consumerSecret: "{{apiKeySecret}}"
steps:
- marshal: (4)
json: {}
- to: "kamelet:sink" (5)
1 | keywords is a path parameter in Camel Twitter-search |
2 | Some endpoint parameters are just mapped 1-1 |
3 | The Camel component consumerKey is named apiKey to reflect the actual name in the Twitter developer portal |
4 | The Camel Twitter component generates Java objects, so we marshal them to JSON |
5 | A Source Kamelet sends data to the special endpoint "kamelet:sink", that will be replaced at runtime by a different target |
The YAML route template above just uses the twitter-search
component to do searches on Twitter. We added a marshalling step to JSON
because the output of a Kamelet needs always to be something that can be transferred over the wire.
The Kamelet is almost complete, we just need to document the parameters in a JSON schema format.
We specify it in the spec
→ definition
part:
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: twitter-search-source
# ...
spec:
definition:
title: "Twitter Search Source" (1)
description: |-
Allows to get all tweets on particular keywords from Twitter.
It requires tokens that can be obtained by creating an application
in the Twitter developer portal: https://developer.twitter.com/.
required: (2)
- keywords
- apiKey
- apiKeySecret
- accessToken
- accessTokenSecret
properties:
keywords: (3)
title: Keywords
description: The keywords to use in the Twitter search (Supports Twitter standard operators)
type: string
example: "Apache Camel"
apiKey:
title: API Key
description: The API Key from the Twitter application in the developer portal
type: string
format: password
x-descriptors:
- urn:alm:descriptor:com.tectonic.ui:password (4)
apiKeySecret:
title: API Key Secret
description: The API Key Secret from the Twitter application in the developer portal
type: string
format: password
x-descriptors:
- urn:alm:descriptor:com.tectonic.ui:password
accessToken:
title: Access Token
description: The Access Token from the Twitter application in the developer portal
type: string
format: password
x-descriptors:
- urn:alm:descriptor:com.tectonic.ui:password
accessTokenSecret:
title: Access Token Secret
description: The Access Token Secret from the Twitter application in the developer portal
type: string
format: password
x-descriptors:
- urn:alm:descriptor:com.tectonic.ui:password
# ...
1 | General information about the Kamelet itself in textual format |
2 | List of required parameters |
3 | A specification for each one of the parameters (flat structure, no nested options allowed) |
4 | Optional graphical customization for a specific UI (OpenShift Console) |
This is all you need to create a Kamelet so that other users can leverage it. There are a few things remaining, like setting information about the generated objects and other metadata (like the icon and the provider and you’re done). The final Kamelet can look like the following:
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: twitter-search-source
annotations:
camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,..." # Truncated (1)
camel.apache.org/provider: "Apache Software Foundation"
labels:
camel.apache.org/kamelet.type: "source"
camel.apache.org/kamelet.group: "Twitter"
spec:
definition:
title: "Twitter Search Source"
description: |-
Allows to get all tweets on particular keywords from Twitter.
It requires tokens that can be obtained by creating an application
in the Twitter developer portal: https://developer.twitter.com/.
required:
- keywords
- apiKey
- apiKeySecret
- accessToken
- accessTokenSecret
properties:
keywords:
title: Keywords
description: The keywords to use in the Twitter search (Supports Twitter standard operators)
type: string
example: "Apache Camel"
apiKey:
title: API Key
description: The API Key from the Twitter application in the developer portal
type: string
format: password
x-descriptors:
- urn:alm:descriptor:com.tectonic.ui:password
apiKeySecret:
title: API Key Secret
description: The API Key Secret from the Twitter application in the developer portal
type: string
format: password
x-descriptors:
- urn:alm:descriptor:com.tectonic.ui:password
accessToken:
title: Access Token
description: The Access Token from the Twitter application in the developer portal
type: string
format: password
x-descriptors:
- urn:alm:descriptor:com.tectonic.ui:password
accessTokenSecret:
title: Access Token Secret
description: The Access Token Secret from the Twitter application in the developer portal
type: string
format: password
x-descriptors:
- urn:alm:descriptor:com.tectonic.ui:password
types: (2)
out:
mediaType: application/json
template: (3)
from:
uri: "twitter-search:{{keywords}}"
parameters:
accessToken: "{{accessToken}}"
accessTokenSecret: "{{accessTokenSecret}}"
consumerKey: "{{apiKey}}"
consumerSecret: "{{apiKeySecret}}"
steps:
- marshal:
json: {}
- to: "kamelet:sink"
1 | An icon with an appropriate license, better using svg+base64 URL encoding. You can encode icons using services like this one |
2 | The types section indicates that the Kamelet is going to produce JSON data. This part of the Kamelet spec is likely to change in the future but is not fundamental for the Kamelet mechanics to work |
3 | The previous YAML flow |
The Kamelet can be shared on the Catalog and or created on a Kubernetes cluster to let users use it.
Trying it out
A simple way to try it out is to apply it on a cluster, together with a simple binding. Assuming that you have a Kubernetes cluster and you’re connected to a namespace where the Camel K operator can act, just create the Kamelet:
kubectl apply -f twitter-search-source.kamelet.yaml
Then you can create a binding like the following one to try it out:
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: twitter-search-source-binding
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: twitter-search-source
properties:
keywords: "Apache Camel"
apiKey: "your own"
apiKeySecret: "your own"
accessToken: "your own"
accessTokenSecret: "your own"
sink:
uri: "log:info"
This can be created using:
kubectl apply -f twitter-search-source-binding.yaml
Once created, you can see the logs of the binding using:
kamel logs twitter-search-source-binding
If everything goes right, you should get some tweets in the logs after the integration is created.
Refer to the Kamelets User Guide for more information on how to use it in different contexts (like Knative, Kafka, etc.).
Creating a complex Kamelet
We’re now going to create a Kamelet with a high degree of complexity, to show how the Kamelet model can be used also to go over the functionality provided by a single Camel Component.
This example is complicated on purpose and uses several components and EIPs from Apache Camel: luckily your Kamelets will be much simpler than this one. |
It will be a Kamelet of type "source", but most of the principles explained here can be taken into account also when developing a Kamelet of type "sink". The technical differences between the two scenarios will be highlighted in the "Creating a sink Kamelet" section.
We’re going to take a real world use case having a moderate complexity: we want to create a source of eartquake events around the world, taking data from the USGS APIs.
Step 1: write an end-to-end integration
Contrary to what one might expect, the first thing you need to do is to forget about Kamelets and just try to write a Camel K integration that consumes the earthquake data.
You may choose the language that you prefer to write the first integration, even writing it directly in YAML. We write it using the Java DSL because that is the language that most Apache Camel users are familiar with and it’s also supported by the tooling.
For a great developer experience, we suggest to use Visual Studio Code with the Camel Extension Pack |
We start from scratch by creating an integration file
kamel init Earthquake.java
This will scaffold a Java source file with a timer-to-log integration, that we’ll edit according to our need. A first version of the integration might look like the following:
// camel-k: language=java
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.Exchange;
public class Earthquake extends RouteBuilder {
@Override
public void configure() throws Exception {
from("timer:earthquake?period=10000") (1)
.setHeader(Exchange.HTTP_METHOD).constant("GET")
.to("https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson") (2)
.convertBodyTo(String.class)
.to("log:info"); (3)
}
}
1 | We do a timed poll from the API because there’s no way to consume it direcly |
2 | Look at https://earthquake.usgs.gov/fdsnws/event/1/ for more information about the API. We’re using the GeoJSON format |
3 | The integration ends in a "log:info" endpoint, because we just want to see if we can contact the API and get some results back |
In order to run the integration above, if you have a Kubernetes cluster with Camel K installed, you can rely on that using kamel run Earthquake.java
, but there’s a
simpler solution that just requires your own machine:
kamel local run Earthquake.java
The local
keyword will tell Camel K to use your own machine (you need maven 3.6+ and Java 11+ in order for this to work).
The integration will start and begin printing out earthquake data every 10 seconds.
I show an excerpt of what is printed by the integration:
{
"type":"FeatureCollection",
"metadata":{
"generated":1614860715000,
"url":"https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson",
"title":"USGS Earthquakes",
"status":200,
"api":"1.10.3",
"count":10762
},
"features":[
{
"type":"Feature",
"properties":{
"mag":2.17,
"place":"27km ENE of Pine Valley, CA",
"time":1614859396200,
"updated":1614860064420,
"url":"https://earthquake.usgs.gov/earthquakes/eventpage/ci39808832",
"detail":"https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=ci39808832&format=geojson",
"status":"automatic",
"tsunami":0,
"sig":72,
"net":"ci",
"code":"39808832",
"ids":",ci39808832,",
"sources":",ci,",
"types":",focal-mechanism,nearby-cities,origin,phase-data,scitech-link,",
"nst":57,
"dmin":0.04475,
"rms":0.22,
"gap":60,
"magType":"ml",
"type":"earthquake",
"title":"M 2.2 - 27km ENE of Pine Valley, CA"
},
"geometry":{
"type":"Point",
"coordinates":[
-116.2648333,
32.9236667,
3.54
]
},
"id":"ci39808832"
}
]
}
We’ve truncated the list of "features" to just the first one, but it contains a lot more data |
Step 2 (optional): iterate on the integration
Since the integration above produces useful data, its route could be technically used to build a source Kamelet, but there are a few problems we may want to address before publishing it:
-
It produces a lot of data (10762 events, last 30 days by default). We may want to start emitting events of the last e.g. 2 hours by default for this use case: we can add a filter on the query to accomplish this.
-
It produces a collection of features (earthquake events), while you may want to push to the destination the individual features. We can use Camel’s built-in
split
andjsonpath
support to split the collection into separate entries. -
It continuously produces the same data: i.e. just wait another 10 seconds and you’ll get the same data again and again (with a shift of 10 seconds over the last 30 days). A good approach here is to try to filter out duplicates at the source as much as possible. We can think to store the time when the last update has been generated by the server and use it in subsequent queries to only obtain new events. This will not guarantee an "exactly once" semantics, because e.g. if the integration is restarted it will lose the in-memory state and start from the beginning, but it prevents sending an high amount of redundant data if the integration is kept alive. To store the time when last result has been generated by the API, we can use one of the in-memory caches that Camel provides, such as camel-caffeine-cache.
We’re going to use an in-memory cache because we need to store a single value. When using stateful data repositories, such as caches, it’s always a good practice to limit their size to a low value and avoid them to increase their size over time |
If an end-to-end "exactly once" semantics is needed, you could later add a stateful idempotent repository in the global integration, but these aspects should be external to the Kamelet definition |
Let’s try sorting out these issues in the route (we publish here the final version):
// camel-k: language=java
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.Exchange;
public class Earthquake extends RouteBuilder {
@Override
public void configure() throws Exception {
from("timer:earthquake?period=10000")
.setHeader("CamelCaffeineAction").constant("GET")
.toD("caffeine-cache:cache-${routeId}?key=lastUpdate") (1)
.choice()
.when().simple("${header.CamelCaffeineActionHasResult}")
.setProperty("lastUpdate", body())
.otherwise()
.setProperty("lastUpdate", simple("${date-with-timezone:now-120m:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}")) (2)
.end()
.setHeader(Exchange.HTTP_METHOD).constant("GET")
.toD("https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&updatedafter=${exchangeProperty.lastUpdate}&orderby=time-asc") (3)
.unmarshal().json()
.setProperty("generated", simple("${body[metadata][generated]}")) (4)
.setProperty("lastUpdate", simple("${date-with-timezone:exchangeProperty.generated:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}"))
.claimCheck(ClaimCheckOperation.Push) (5)
.setBody().exchangeProperty("lastUpdate")
.setHeader("CamelCaffeineAction").constant("PUT")
.toD("caffeine-cache:cache-${routeId}?key=lastUpdate")
.claimCheck(ClaimCheckOperation.Pop)
.split().jsonpath("$.features[*]") (6)
.marshal().json()
.to("log:info") (7)
.end();
}
}
1 | We start each poll by checking if there has been a previous run (and get the corresponding time) |
2 | If it’s the first run of the integration, we set the clock back to 120m from the current time, to get events of the last 2 hours |
3 | We always include the time from which we want to receive updates in the query to the service |
4 | The service returns a "generated" field which includes a timestamp when the response has been generated: we’ll use it in the following requests |
5 | We put the current body in the claim check stack to use it for storing the "lastUpdate" field in the cache, then we restore the previous body |
6 | Individual records of the response are sent to the destination (which is "log:info" in this phase). In case an exception is thrown while processing a single entry, individual errors are sent to the route error handler and the processing continues |
Don’t be scared from the complexity of the route, as this is a complicated example by choice: most of the Kamelets in the Kamelet Catalog don’t use any processing logic or EIP |
When writing a route like this, you should always think to errors that might happen in various phases of the execution: here the "lastUpdate" value in the cache is updated after a successful invocation of the API but before the individual exchanges are sent to the destination, so that the source is protected by individual errors on the features (that are sent to the route error handler), but continues to process new data if a single feature can’t be processed. |
This integration (which seems complex at first sight, but it should be still readable) solves the issues identified above by using multiple features available in Apache Camel (caches, "Simple" language, HTTP component, JSON data format, splitter EIP, claim check, JSONPath). Even if it’s not recommended to write overly-complicated integrations in a Kamelet (i.e. consider writing a plain component if it becomes too complicated and unreadable), you can see here how powerful is the Kamelet model.
We might have written the integration above in multiple routes connected using "direct:" endpoints, but a Kamelet contains a single route template and the mapping will be easier if the integration is composed of a single route (it’s also possible to define multiple supporting routes in a Kamelet, but we’re not going to show how to do it here) |
Step 3: externalize parameters
The next step in the development is answering the following question: if I was a user instantiating this source, what aspects I would like to configure?
For the example above, there are 2 things that a user may want to configure:
-
period
: the time interval between polls to the earthquake API. This may seem a technical issue, but it becomes a business issue when contacting APIs that do rate limiting -
lookAhead
: the number of minutes before the current time I would like to receive events since (it affects the events received when the source is first started or restarted)
Those two will become Kamelet parameters as you might expect, but for the time being, let’s refactor the integration to externalize them as standard Camel K properties:
// camel-k: language=java property=period=20000 property=lookAhead=120 (1)
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.ClaimCheckOperation;
import org.apache.camel.Exchange;
public class Earthquake extends RouteBuilder {
@Override
public void configure() throws Exception {
from("timer:earthquake?period={{period}}") (2)
// ...
.choice()
.when().simple("${header.CamelCaffeineActionHasResult}")
.setProperty("lastUpdate", body())
.otherwise()
.setProperty("lastUpdate", simple("${date-with-timezone:now-{{lookAhead}}m:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}")) (3)
.end()
// ...
.end();
}
}
1 | Modeline header defines the two parameters with a "development" value |
2 | Placeholder {{period}} is used |
3 | Placeholder {{lookAhead}} is used |
This looks the same as before, but notice that the period
and lookAhead
parameters are set in the modeline, while the route uses the {{period}}
and {{lookAhead}}
placeholders instead of the actual values.
As before, this integration can be tested with kamel local run Earthquake.java
(the modeline parameters will be automatically added by the kamel CLI).
Step 4 (optional): translate into YAML DSL
The integration is now ready to be turned into a Kamelet, but in case you’ve not written it directly in YAML DSL, you need to convert it before proceeding. The YAML DSL is the default DSL for Kamelets and the reason for that is that it provides multiple advantages over the other DSLs, the most important one being the ability to easily compile YAML integrations into Quarkus-based binary executables in the future, with all the advantages that derive from a point of view of performance and resource utilization.
If we managed to reduce our integration to contain only a Camel route, converting it to YAML is straightforward:
# camel-k: language=yaml property=period=20000 property=lookAhead=120 dependency=camel-quarkus:caffeine dependency=camel-quarkus:http
- from:
uri: "timer:earthquake"
parameters:
period: "{{period}}"
steps:
- set-header:
name: CamelCaffeineAction
constant: GET
- tod: "caffeine-cache:cache-${routeId}?key=lastUpdate"
- choice:
when:
- simple: "${header.CamelCaffeineActionHasResult}"
steps:
- set-property:
name: lastUpdate
simple: "${body}"
otherwise:
steps:
- set-property:
name: lastUpdate
simple: "${date-with-timezone:now-{{lookAhead}}m:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}"
- set-header:
name: CamelHttpMethod
constant: GET
- tod: "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&updatedafter=${exchangeProperty.lastUpdate}&orderby=time-asc"
- unmarshal:
json: {}
- set-property:
name: generated
simple: "${body[metadata][generated]}"
- set-property:
name: lastUpdate
simple: "${date-with-timezone:exchangeProperty.generated:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}"
- claim-check:
operation: Push
- set-body:
exchange-property: lastUpdate
- set-header:
name: CamelCaffeineAction
constant: PUT
- tod: "caffeine-cache:cache-${routeId}?key=lastUpdate"
- claim-check:
operation: Pop
- split:
jsonpath: "$.features[*]"
steps:
- marshal:
json: {}
- to: "log:info"
If you compare the YAML version of the route to the Java one, you see that they map 1-1.
The Camel Extension Pack for Visual Studio Code helps you writing the YAML route by providing auto-completion and error highlighting |
Since the YAML DSL is quite new in the Camel ecosystem, it may miss some features available in the Java one, e.g. Camel K is not able to detect some dependencies automatically and we’ve specified them in the modeline header |
This route can be run like the previous one using the kamel
CLI:
kamel local run earthquake.yaml
Step 5: wrap it into a Kamelet
We’re about to write down an "Earthquake Source Kamelet" from the route we’ve built. As starting point, we may just wrap the previous YAML route into the Kamelet envelope. The result looks like:
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: earthquake-source
labels:
camel.apache.org/kamelet.type: "source"
spec:
template: (1)
from:
uri: "timer:earthquake"
parameters:
period: "{{period}}"
steps:
- set-header:
name: CamelCaffeineAction
constant: GET
- tod: "caffeine-cache:cache-${routeId}?key=lastUpdate"
- choice:
when:
- simple: "${header.CamelCaffeineActionHasResult}"
steps:
- set-property:
name: lastUpdate
simple: "${body}"
otherwise:
steps:
- set-property:
name: lastUpdate
simple: "${date-with-timezone:now-{{lookAhead}}m:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}"
- set-header:
name: CamelHttpMethod
constant: GET
- tod: "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&updatedafter=${exchangeProperty.lastUpdate}&orderby=time-asc"
- unmarshal:
json: {}
- set-property:
name: generated
simple: "${body[metadata][generated]}"
- set-property:
name: lastUpdate
simple: "${date-with-timezone:exchangeProperty.generated:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}"
- claim-check:
operation: Push
- set-body:
exchange-property: lastUpdate
- set-header:
name: CamelCaffeineAction
constant: PUT
- tod: "caffeine-cache:cache-${routeId}?key=lastUpdate"
- claim-check:
operation: Pop
- split:
jsonpath: "$.features[*]"
steps:
- marshal:
json: {}
- to: "kamelet:sink" (2)
1 | Flow contains the (single) route template we have identified before |
2 | The old reference to "log:info" has been replaced with "kamelet:sink" here |
The only difference between the YAML route embedded in the Kamelet and the one identified before is the final sink, which was "log:info" and now is "kamelet:sink", i.e. a placeholder that will be replaced with something else when the Kamelet is actually used (the user decides what is the destination of the earthquake events).
Step 6: describe the parameters
The Kamelet above is incomplete, we need to define the two parameters we’ve identified in the template and also give a description to the Kamelet itself. The way to express all this information is via a JSON Schema specification in the Kamelet YAML.
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: earthquake-source
labels:
camel.apache.org/kamelet.type: "source"
spec:
definition: (1)
title: Earthquake Source
description: |-
Get data about current earthquake events happening in the world using the USGS API
properties:
period: (2)
title: Period between polls
description: The interval between fetches to the earthquake API in milliseconds
type: integer
default: 60000
lookAhead: (3)
title: Look-ahead minutes
description: The amount of minutes to look ahead when starting the integration afresh
type: integer
default: 120
template:
from:
uri: "timer:earthquake"
# ...
1 | The definition part starts with general information about the Kamelet |
2 | Definition of the period parameter (used with the {{period}} placeholder in the route) |
3 | Definition of the lookAhead parameter |
Step 7: add metadata and sugar
We should complete the Kamelet with all mandatory (also optional) options that are described in the guidelines for contributing Kamelets.
The final result should look like:
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: earthquake-source
annotations:
camel.apache.org/kamelet.icon: "data:image/svg+xml;base64..." # truncated (1)
camel.apache.org/provider: "Apache Software Foundation"
labels:
camel.apache.org/kamelet.type: "source"
camel.apache.org/requires.runtime: "camel-quarkus" (2)
spec:
definition:
title: Earthquake Source
description: |-
Get data about current earthquake events happening in the world using the USGS API
properties:
period:
title: Period between polls
description: The interval between fetches to the earthquake API in milliseconds
type: integer
default: 60000
lookAhead:
title: Look-ahead minutes
description: The amount of minutes to look ahead when starting the integration afresh
type: integer
default: 120
types: (3)
out:
mediaType: application/json
dependencies: (4)
- camel-quarkus:caffeine
- camel-quarkus:http
template:
from:
uri: "timer:earthquake"
parameters:
period: "{{period}}"
steps:
- set-header:
name: CamelCaffeineAction
constant: GET
- tod: "caffeine-cache:cache-${routeId}?key=lastUpdate"
- choice:
when:
- simple: "${header.CamelCaffeineActionHasResult}"
steps:
- set-property:
name: lastUpdate
simple: "${body}"
otherwise:
steps:
- set-property:
name: lastUpdate
simple: "${date-with-timezone:now-{{lookAhead}}m:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}"
- set-header:
name: CamelHttpMethod
constant: GET
- tod: "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&updatedafter=${exchangeProperty.lastUpdate}&orderby=time-asc"
- unmarshal:
json: {}
- set-property:
name: generated
simple: "${body[metadata][generated]}"
- set-property:
name: lastUpdate
simple: "${date-with-timezone:exchangeProperty.generated:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}"
- claim-check:
operation: Push
- set-body:
exchange-property: lastUpdate
- set-header:
name: CamelCaffeineAction
constant: PUT
- tod: "caffeine-cache:cache-${routeId}?key=lastUpdate"
- claim-check:
operation: Pop
- split:
jsonpath: "$.features[*]"
steps:
- marshal:
json: {}
- to: "kamelet:sink"
1 | Add an icon with an appropriate license, better using svg+base64 URL encoding. You can encode icons using services like this one |
2 | This marks the Kamelet as dependant on Quarkus since we’re specifying explicit dependencies on Quarkus artifacts in the spec → dependencies section |
3 | The types section indicates that the Kamelet is going to produce JSON data. This part of the Kamelet spec will be subject of revision but is not fundamental for the Kamelet mechanics to work |
4 | Dependencies that we previously specified in the modeline options should be expressed now in the Kamelet spec |
The Kamelet is now ready to be used!
Trying it out
You can install the Kamelet on your Kubernetes instance to see if it can be picked up and used by the Camel K runtime.
We assume that you’re connected to a Kubernetes cluster and working on a namespace where the Camel K operator is allowed to materialize integrations.
To create the Kamelet, you can execute:
kubectl apply -f earthquake-source.kamelet.yaml
If the Kamelet is valid, this will result in the Kamelet resource being created in the current namespace.
To check if it works, you can create a simple binding:
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: earthquake-source-binding
spec:
source:
ref: (1)
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: earthquake-source
properties:
period: 10000 (2)
sink:
uri: "log:info" (3)
1 | Kubernetes reference to the previously created Kamelet |
2 | We redefine the period to speed it up, otherwise the default is used (60000) |
3 | We just sink into "log:info", but we’re free to change it to anything else |
The developer write Camel DSL to make a Kamelet work, but the end-user uses it declaratively without any idea of the complexity of the development process behind it |
Creating this resource will tell the operator to materialize the binding using an integration:
kubectl apply -f earthquake-source-binding.yaml
We can check the logs of the integration using:
kamel logs earthquake-source-binding
If everything went well, you should see the events in the log.
Refer to the Kamelets User Guide for more information on how to use it in different contexts (like Knative, Kafka, etc.).
Creating a sink Kamelet
So far we’ve focused on the steps needed to create Kamelets of type "source", but the same steps can be used for type "sink" Kamelets with some minor changes.
We’re now going to create a "sink" Kamelet and look at the differences. For this part, we’ll write a Telegram sink Kamelet.
Analyze the use cases
Differently from sources, where you usually generate a single type of data, or even multiple ones depending on some static user parameter, a sink should always take into account that it can be fed dynamically with different type of data.
For example, in the case of a Telegram sink, a user may want to send both textual data, but also images with (or without) a caption.
In order to implement sending different kinds of data, the Kamelet should adapt according to the content that is received as input.
We’ll start by writing an end-to-end integration, then we’ll convert it into a Kamelet. This time, we’ll write routes directly in YAML DSL.
For this particular use case, I’ve created a simple integration before to get the Chat ID corresponding to my phone from the bot: more info here. |
Let’s start with a simple integration:
# camel-k: language=yaml property=chatId=158584902 (1)
- from: (2)
uri: "direct:endpoint"
steps:
- to:
uri: "telegram:bots"
parameters:
authorizationToken: "{{authorizationToken}}"
chatId: "{{chatId}}"
- marshal: (3)
json: {}
- from: (4)
uri: timer:tick
parameters:
period: 5000
steps:
- set-body:
constant: Hello
- to: direct:endpoint
1 | Setting the chatId property directly in modeline, the authorizationToken will be passed from command line |
2 | The route that will become the Kamelet route template |
3 | We marhsal the output as JSON because it may be required to be transferred over the wire |
4 | A testing route to check if the integration works |
The end-to-end integration above should be good as initial scaffolding for the integration. We can run it using the following command:
kamel run telegram.yaml -p authorizationToken=the-token-you-got-from-bot-father
If everything went well, you should get a "Hello" message into your phone every 5 seconds.
Now, let’s check if we can also send an image, by changing the second route:
# first route as before
# ...
- from:
uri: timer:tick
parameters:
period: 5000
steps:
- set-header:
name: CamelHttpMethod
constant: GET
- to: https://github.com/apache/camel/raw/7204aa132662ab6cb8e3c5afea8b9b0859eff0e8/docs/img/logo.png
- to: direct:endpoint
The intended behavior is that we get the image in our phone via Telegram, but it’s throwing an error instead. This is something that often happens because standard Camel components are not suited to be used out-of-the-box as connectors.
In this case, the Telegram component requires that a CamelTelegramMediaType
header is set to PHOTO_PNG
in the exchange in order
to accept the image, and that the body is converted to byte[]
.
But we cannot require that who sends the message to the Kamelet obey to all Camel rules. In general we should follow these guidelines:
-
We SHOULD NOT require that the sender sets Camel-specific bits in the message over the wire (e.g. a
CamelTelegramMediaType
): we should hide Camel under the covers as much as possible -
We CAN use the "Content-Type" header to distinguish the type of incoming data
-
We CAN define new headers and allow the users to set them on the incoming message (e.g. when the incoming message is a picture, we can let the sender specify a caption for it in the "text" header)
-
When defining an header, it MUST be documented in the Kamelet definition
-
When defining an header, say "text", we should also account for an additional header named "ce-text": in some contexts, like Knative, only headers allowed by the CloudEvents specification are accepted in the brokers/channels (i.e. a
ce-
prefix is mandatory)
When applied to the current use case, the main route can be changed into something like this:
- from:
uri: "direct:endpoint"
steps:
- choice: (1)
when:
- simple: "${header[Content-Type]} == 'image/png'"
steps:
- convert-body-to:
type: "byte[]"
- set-header:
name: CamelTelegramMediaType
constant: PHOTO_PNG
- simple: "${header[Content-Type]} == 'image/jpeg'"
steps:
- convert-body-to:
type: "byte[]"
- set-header:
name: CamelTelegramMediaType
constant: PHOTO_JPG
otherwise:
steps:
- convert-body-to:
type: "java.lang.String"
- choice: (2)
when:
- simple: "${header[text]}"
steps:
- set-header:
name: CamelTelegramMediaTitleCaption
simple: "${header[text]}"
- simple: "${header[ce-text]}"
steps:
- set-header:
name: CamelTelegramMediaTitleCaption
simple: "${header[ce-text]}"
- choice: (3)
when:
- simple: "${header[chat-id]}"
steps:
- set-header:
name: CamelTelegramChatId
simple: "${header[chat-id]}"
- simple: "${header[ce-chat-id]}"
steps:
- set-header:
name: CamelTelegramChatId
simple: "${header[ce-chat-id]}"
- to:
uri: "telegram:bots"
parameters:
authorizationToken: "{{authorizationToken}}"
chatId: "{{chatId}}"
- marshal:
json: {}
1 | We do content-type based conversion into appropriate objects for the component |
2 | We allow specifying a text or ce-text header to set the image caption |
3 | We allow overriding the chat ID using a chat-id or ce-chat-id header |
It’s not always obvious if it’s responsibility of the Kamelet to prepare the exchange to be fed into the Camel producer endpoint or if the Camel component should be changed to be more elastic. In this case, it seems appropriate to implement things like content-type base conversion and support for streaming content at component level. The Kamelet above is acceptable for the time being, but it needs to be simplified if such changes land into the component. |
Having defined the main route template, we need to document the Kamelet and the parameters. We show here the final Kamelet:
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: telegram-sink
annotations:
camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,..." # truncated
camel.apache.org/provider: "Apache Software Foundation"
labels:
camel.apache.org/kamelet.type: "sink"
camel.apache.org/kamelet.group: "Telegram"
spec:
definition: (1)
title: "Telegram Sink"
description: |-
Send a message to a Telegram chat using your Telegram bot as sender.
To create a bot, contact the @botfather account using the Telegram app.
This sink supports the following message types:
- Standard text messages
- PNG images (`Content-Type` must be set to `image/png`)
- JPEG images (`Content-Type` must be set to `image/jpeg`)
This following message headers are also supported:
- `text` / `ce-text`: when sending an image, the image caption
- `chat-id` / `ce-chat-id`: to override the default chat where messages are sent to
required:
- authorizationToken
properties:
authorizationToken:
title: Token
description: The token to access your bot on Telegram. You you can obtain it from the Telegram @botfather.
type: string
x-descriptors:
- urn:alm:descriptor:com.tectonic.ui:password
chatId:
title: Chat ID
description: The Chat ID where messages should be sent by default
type: string
types: (2)
out:
mediaType: application/json
template: (3)
from:
uri: "kamelet:source"
steps:
- choice:
when:
- simple: "${header[Content-Type]} == 'image/png'"
steps:
- log: h1
- convert-body-to:
type: "byte[]"
- set-header:
name: CamelTelegramMediaType
constant: PHOTO_PNG
- simple: "${header[Content-Type]} == 'image/jpeg'"
steps:
- convert-body-to:
type: "byte[]"
- set-header:
name: CamelTelegramMediaType
constant: PHOTO_JPG
otherwise:
steps:
- convert-body-to:
type: "java.lang.String"
- choice:
when:
- simple: "${header[text]}"
steps:
- set-header:
name: CamelTelegramMediaTitleCaption
simple: "${header[text]}"
- simple: "${header[ce-text]}"
steps:
- set-header:
name: CamelTelegramMediaTitleCaption
simple: "${header[ce-text]}"
- choice:
when:
- simple: "${header[chat-id]}"
steps:
- set-header:
name: CamelTelegramChatId
simple: "${header[chat-id]}"
- simple: "${header[ce-chat-id]}"
steps:
- set-header:
name: CamelTelegramChatId
simple: "${header[ce-chat-id]}"
- to:
uri: "telegram:bots"
parameters:
authorizationToken: "{{authorizationToken}}"
chatId: "{{chatId}}"
- marshal:
json: {}
1 | JSON schema definition of the Kamelet configuration |
2 | The Kamelet has a single possible output of type JSON |
3 | The flow identified above as Kamelet route template |
Try it out
To try a sink Kamelet, we should feed it with some data. The best way to do it is to do it directly with another Kamelet.
So, for example, to send a text message to a chat, we may create a binding like the following:
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: telegram-text-binding
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: timer-source
properties:
period: 10000
message: Hello first Kamelet!
sink:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: telegram-sink
properties:
authorizationToken: "put-your-own"
chatId: "your-chat-id"
You can create the Kamelet with:
kubectl apply -f telegram-sink.kamelet.yaml
Then apply the binding with:
kubectl apply -f telegram-text-binding.yaml
If everything goes well, you should get a "Hello first Kamelet!" message in your phone every 10 seconds.
To check if we can also receive pictures using the above Kamelet, we can create the following binding:
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: telegram-image-binding
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: http-source
properties:
url: "https://github.com/apache/camel/raw/7204aa132662ab6cb8e3c5afea8b9b0859eff0e8/docs/img/logo.png"
contentType: "image/png"
period: 10000
sink:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: telegram-sink
properties:
authorizationToken: "put-your-own"
chatId: "your-chat-id"
This will create a new integration that forwards the Apache Camel logo to your phone every 10 seconds.
Testing
The most obvious way to test a Kamelet is via an e2e tests that verifies if the Kamelet respects its specification.
YAKS is the framework of choice for such e2e tests. You can find more information and documentation starting from the YAKS github repository. Here we’ll provide examples for the Kamelets above.
Testing a source
YAKS allows writing a declarative Gherkin file to specify the behavior of the Kamelet.
Let’s try to test the earthquake Kamelet above, a Gherkin file for it should look like:
Feature: Kamelet earthquake-source works
Background:
Given Disable auto removal of Kamelet resources
Given Disable auto removal of Kubernetes resources
Given Camel K resource polling configuration
| maxAttempts | 60 |
| delayBetweenAttempts | 3000 |
Scenario: Bind Kamelet to service
Given create Kubernetes service test-service with target port 8080
And bind Kamelet earthquake-source to uri http://test-service.${YAKS_NAMESPACE}.svc.cluster.local/test
When create KameletBinding earthquake-source-uri
Then KameletBinding earthquake-source-uri should be available
And Camel K integration earthquake-source-uri should be running
Scenario: Verify binding
Given HTTP server "test-service"
And HTTP server timeout is 120000 ms
Then expect HTTP request header: Content-Type="application/json;charset=UTF-8"
And receive POST /test
And delete KameletBinding earthquake-source-uri
As you see this is a declarative test that is materialized into something that actually checks that the service generates some data. Checks can be also more detailed than this one, but checking that it generates some JSON data is enough for a "smoke test" that verifies that the Kamelet can be actually used.
The test requires that you’re connected to a Kubernetes cluster and have also YAKS installed (refer to the YAKS documentation for more information). We’re also going to use the CLI:
# We assume the Kamelet is already installed in the namespace
yaks test earthquake-source.feature
When testing a source, the backbone of the Gherking file that you’ll write is similar to the one above. Depending on the source under test, you may need to stimulate the production of some data using additional Gherking steps before verifying that the data has been produced (in our case, it’s better not to try to stimulate an earthquake :D).
Testing a sink
A test for a sink is similar to the one for the source, except that we’re going to generate data to feed it.
To send data to the Kamelet we may think to bind it to another Kamelet of type webhook-source
, that allows us to
send data to it via HTTP. Let’s create a parameterized binding like the following one:
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: webhook-to-telegram
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: webhook-source
properties:
subpath: message
sink:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: telegram-sink
properties:
authorizationToken: "${telegram.authorization.token}"
chatId: "${telegram.chat.id}"
This will expose an HTTP endpoint that we can use to forward a message to Telegram. It requires that two parameters are set in the YAKS configuration before creation. Those can be set in a simple property file:
telegram.authorization.token=your-own-token
telegram.chat.id=your-own-chat
Then we’re ready to define the feature we want to test, i.e. the ability to send a message via the Telegram API.
An example of "smoke test" can be the following one:
Feature: Kamelet telegram-sink works
Background:
Given Disable auto removal of Kamelet resources
Given Disable auto removal of Kubernetes resources
Given Camel K resource polling configuration
| maxAttempts | 60 |
| delayBetweenAttempts | 3000 |
Scenario: Bind webhook to Kamelet sink
Given load variables telegram-credentials.properties
And load KameletBinding webhook-to-telegram.yaml
Then KameletBinding webhook-to-telegram should be available
And Camel K integration webhook-to-telegram should be running
Scenario: Send a message to the Telegram Chat
Given URL: http://webhook-to-telegram.${YAKS_NAMESPACE}.svc.cluster.local
And HTTP request timeout is 60000 milliseconds
And wait for GET on path / to return 404
Given HTTP request headers
| Content-Type | text/plain |
And HTTP request body
"""
Hello from YAKS!
"""
When send POST /message
Then receive HTTP 200 OK
And delete KameletBinding webhook-to-telegram
This test will only check that the Telegram API accept the message created by the test.
This can be run with the following command:
# We assume that both the webhook-source and the telegram-sink kamelet are already present in the namespace
yaks test telegram-sink.feature --resource webhook-to-telegram.yaml --resource telegram-credentials.properties
If everything goes well, you should receive a message during the test execution.
For a more specific test that checks also the content sent to Telegram, you should add additional Gherking steps to get and verify the actual message via other Telegram APIs. We’re not going in so much details for this example, but the Gherkin file highlighted above is a good approximation of the backbone you’ll find in tests for Kamelets of type "sink".
KEDA Integration
Kamelets of type source
can be augmented with KEDA metadata to automatically configure autoscalers.
The additional KEDA metadata is needed for the following purposes:
-
Map Kamelet properties to corresponding KEDA parameters
-
Distinguish which KEDA parameters are needed for authentication (and need to be placed in a
Secret
) -
Mark KEDA parameters as required to signal an error during reconciliation
Basic properties to KEDA parameter mapping
Any Kamelet property can be mapped to a KEDA parameter by simply declaring the mapping in the x-descriptors
list.
For example:
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: aws-sqs-source
labels:
camel.apache.org/kamelet.type: "source"
spec:
definition:
# ...
properties:
queueNameOrArn:
title: Queue Name
description: The SQS Queue Name or ARN
type: string
x-descriptors:
- urn:keda:metadata:queueURL (1)
- urn:keda:required (2)
# ...
1 | The Kamelet property queueNameOrArn corresponds to a KEDA metadata parameter named queueURL |
2 | The queueURL parameter is required by KEDA |
In the example above, the queueNameOrArn
Kamelet property is declared to correspond to a KEDA metadata parameter named queueURL
, using the urn:keda:metadata:
prefix.
The queueURL
parameter is documented in the the KEDA AWS SQS Queue scaler together with other options
required by KEDA to configure an autoscaler (it can be a full queue URL or a simple queue name).
By using the marker descriptor urn:keda:required
, it is also marked as required by KEDA.
The queueURL
is a metadata parameter for the autoscaler. In order to configure authentication parameters, the syntax is slightly different:
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: aws-sqs-source
labels:
camel.apache.org/kamelet.type: "source"
spec:
definition:
# ...
properties:
# ...
accessKey:
title: Access Key
description: The access key obtained from AWS
type: string
format: password
x-descriptors:
- urn:alm:descriptor:com.tectonic.ui:password
- urn:camel:group:credentials
- urn:keda:authentication:awsAccessKeyID (1)
- urn:keda:required
# ...
1 | The Kamelet property access corresponds to a KEDA authentication parameter named awsAccessKeyID |
This time the property mapping uses the urn:keda:authentication:
prefix, declaring it as a KEDA authentication parameter.
The difference between the two approaches is that authentication parameters will be injected into a secret by the Camel K
operator and linked to the KEDA ScaledObject using a TriggerAuthentication (refer to the KEDA documentation for more info).
Advanced KEDA property mapping
There are cases where KEDA requires some static values to be set in a ScaledObject or also values computed from multiple Kamelet properties.
To deal with these cases it’s possible to use annotations on the Kamelet prefixed with camel.apache.org/keda.metadata.
(for metadata parameters)
or camel.apache.org/keda.authentication.
(for authentication parameters). Those annotations can contain plain fixed values or also templates (using the Go syntax).
For example:
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: my-source
labels:
camel.apache.org/kamelet.type: "source"
annotations:
camel.apache.org/keda.authentication.sasl: "plaintext" (1)
camel.apache.org/keda.metadata.queueLength: "5" (2)
camel.apache.org/keda.metadata.queueAddress: "https://myhost.com/queues/{{.queueName}}" (3)
spec:
definition:
# ...
properties:
queueName:
title: Queue Name
description: The Queue Name
type: string
# ...
1 | An authentication parameter with a fixed value |
2 | A metadata parameter with a fixed value |
3 | A metadata parameter with a valued computed from a template |
When using the template syntax, all Kamelet properties are available as fields. The default values are used in case they are missing from the user configuration.
For information on how to use Kamelets with KEDA, see the KEDA section in the user guide.