We use analytics and cookies to understand site traffic. Information about your use of our site is shared with Google for that purpose. Learn more.
Apache Kafka Source Example
Tutorial on how to build and deploy a KafkaSource
Eventing source using a Knative Serving Service
.
Prerequisites
You must ensure that you meet the prerequisites listed in the Apache Kafka overview.
Creating a KafkaSource
source CRD
- Install the
KafkaSource
sub-component to your Knative cluster:kubectl apply -f https://storage.googleapis.com/knative-releases/eventing-contrib/latest/kafka-source.yaml
- Check that the
kafka-controller-manager-0
pod is running.kubectl get pods --namespace knative-sources NAME READY STATUS RESTARTS AGE kafka-controller-manager-0 1/1 Running 0 42m
- Check the
kafka-controller-manager-0
pod logs.$ kubectl logs kafka-controller-manager-0 -n knative-sources 2019/03/19 22:25:54 Registering Components. 2019/03/19 22:25:54 Setting up Controller. 2019/03/19 22:25:54 Adding the Apache Kafka Source controller. 2019/03/19 22:25:54 Starting Apache Kafka controller.
Apache Kafka Topic (Optional)
- If using Strimzi, you can set a topic modifying
source/kafka-topic.yaml
with your desired:
-
Topic
-
Cluster Name
-
Partitions
-
Replicas
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaTopic metadata: name: knative-demo-topic namespace: kafka labels: strimzi.io/cluster: my-cluster spec: partitions: 3 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824
-
Deploy the
KafkaTopic
$ kubectl apply -f strimzi-topic.yaml kafkatopic.kafka.strimzi.io/knative-demo-topic created
-
Ensure the
KafkaTopic
is running.$ kubectl -n kafka get kafkatopics.kafka.strimzi.io NAME AGE knative-demo-topic 16s
Create the Event Display service
-
Download a copy of the code:
git clone -b "release-0.14" https://github.com/knative/docs knative-docs cd knative-docs/docs/eventing/samples/kafka/source
-
Build the Event Display Service (
event-display.yaml
)apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display namespace: default spec: template: spec: containers: - # This corresponds to # https://github.com/knative/eventing-contrib/tree/master/cmd/event_display/main.go image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/event_display
-
Deploy the Event Display Service
$ kubectl apply --filename event-display.yaml ... service.serving.knative.dev/event-display created
-
Ensure that the Service pod is running. The pod name will be prefixed with
event-display
.$ kubectl get pods NAME READY STATUS RESTARTS AGE event-display-00001-deployment-5d5df6c7-gv2j4 2/2 Running 0 72s ...
Apache Kafka Event Source
-
Modify
source/event-source.yaml
accordingly with bootstrap servers, topics, etc…:apiVersion: sources.eventing.knative.dev/v1alpha1 kind: KafkaSource metadata: name: kafka-source spec: consumerGroup: knative-group bootstrapServers: my-cluster-kafka-bootstrap.kafka:9092 #note the kafka namespace topics: knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
-
Deploy the event source.
$ kubectl apply -f event-source.yaml ... kafkasource.sources.eventing.knative.dev/kafka-source created
-
Check that the event source pod is running. The pod name will be prefixed with
kafka-source
.$ kubectl get pods NAME READY STATUS RESTARTS AGE kafka-source-xlnhq-5544766765-dnl5s 1/1 Running 0 40m
-
Ensure the Apache Kafka Event Source started with the necessary configuration.
$ kubectl logs --selector='knative-eventing-source-name=kafka-source' {"level":"info","ts":"2019-04-01T19:09:32.164Z","caller":"receive_adapter/main.go:97","msg":"Starting Apache Kafka Receive Adapter...","Bootstrap Server":"...","Topics":".","ConsumerGroup":"...","SinkURI":"...","TLS":false}
Verify
-
Produce a message (
{"msg": "This is a test!"}
) to the Apache Kafka topic, like shown below:kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic knative-demo-topic If you don't see a command prompt, try pressing enter. >{"msg": "This is a test!"}
-
Check that the Apache Kafka Event Source consumed the message and sent it to its sink properly.
$ kubectl logs --selector='knative-eventing-source-name=kafka-source' ... {"level":"info","ts":"2019-04-15T20:37:24.702Z","caller":"receive_adapter/main.go:99","msg":"Starting Apache Kafka Receive Adapter...","bootstrap_server":"...","Topics":"knative-demo-topic","ConsumerGroup":"knative-group","SinkURI":"...","TLS":false} {"level":"info","ts":"2019-04-15T20:37:24.702Z","caller":"adapter/adapter.go:100","msg":"Starting with config: ","bootstrap_server":"...","Topics":"knative-demo-topic","ConsumerGroup":"knative-group","SinkURI":"...","TLS":false} {"level":"info","ts":1553034726.546107,"caller":"adapter/adapter.go:154","msg":"Successfully sent event to sink"}
-
Ensure the Event Display received the message sent to it by the Event Source.
$ kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.kafka.event source: /apis/v1/namespaces/default/kafkasources/kafka-source#my-topic subject: partition:0#564 id: partition:0/offset:564 time: 2020-02-10T18:10:23.861866615Z datacontenttype: application/json Extensions, key: Data, { “msg”: “This is a test!” }
## Teardown Steps
1. Remove the Apache Kafka Event Source
$ kubectl delete -f source/source.yaml kafkasource.sources.eventing.knative.dev “kafka-source” deleted
2. Remove the Event Display
$ kubectl delete -f source/event-display.yaml service.serving.knative.dev “event-display” deleted
3. Remove the Apache Kafka Event Controller
$ kubectl delete -f https://storage.googleapis.com/knative-releases/eventing-contrib/latest/kafka-source.yaml serviceaccount “kafka-controller-manager” deleted clusterrole.rbac.authorization.k8s.io “eventing-sources-kafka-controller” deleted clusterrolebinding.rbac.authorization.k8s.io “eventing-sources-kafka-controller” deleted customresourcedefinition.apiextensions.k8s.io “kafkasources.sources.eventing.knative.dev” deleted service “kafka-controller” deleted statefulset.apps “kafka-controller-manager” deleted
4. (Optional) Remove the Apache Kafka Topic
```shell
$ kubectl delete -f kafka-topic.yaml
kafkatopic.kafka.strimzi.io "knative-demo-topic" deleted
(Optional) Specify the key deserializer
When KafkaSource
receives a message from Kafka, it dumps the key in the Event extension called Key
and dumps Kafka message headers in the extensions starting with kafkaheader
.
You can specify the key deserializer among four types:
string
(default) for UTF-8 encoded stringsint
for 32-bit & 64-bit signed integersfloat
for 32-bit & 64-bit floating pointsbyte-array
for a Base64 encoded byte array
To specify it, add the label kafkasources.sources.eventing.knative.dev/key-type
to the KafkaSource
definition like:
apiVersion: sources.eventing.knative.dev/v1alpha1
kind: KafkaSource
metadata:
name: kafka-source
labels:
kafkasources.sources.eventing.knative.dev/key-type: int
spec:
consumerGroup: knative-group
bootstrapServers: my-cluster-kafka-bootstrap.kafka:9092 #note the kafka namespace
topics: knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.