Writing an Event Source using the easy way

Introduction

As stated in tutorial on writing a Source with a Receive Adapter, there are multiple ways to create event sources. The way in that tutorial is to create an independent event source that has its own CRD.

This tutorial provides a simpler mechanism to build an event source in Javascript and use it with ContainerSource and / or the SinkBinding.

ContainerSource is an easy way to turn any dispatcher container into an Event Source. Similarly, another option is using SinkBinding which provides a framework for injecting environment variables into any Kubernetes resource which has a spec.template that looks like a Pod (aka PodSpecable).

SinkBinding is a newer concept and it should be preferred over ContainerSource.

Code for this tutorial is available here.

Bootstrapping

Create the project and add the dependencies:

npm init
npm install cloudevents-sdk@2.0.1 --save

Please note that because of a bug, you will need at least 2.0.1 version of the Javascript SDK.

Making use of ContainerSource

ContainerSource and SinkBinding both work by injecting environment variables to an application. Injected environment variables at minimum contain the URL of a sink that will receive events.

Following example emits an event to the sink every 1000 milliseconds. The sink URL to post the events will be made available to the application via the K_SINK environment variable by ContainerSource.

// File - index.js

const { CloudEvent, HTTPEmitter } = require("cloudevents-sdk");

let sinkUrl = process.env['K_SINK'];

console.log("Sink URL is " + sinkUrl);

let emitter = new HTTPEmitter({
    url: sinkUrl
});

let eventIndex = 0;
setInterval(function () {
    console.log("Emitting event #" + ++eventIndex);

    let myevent = new CloudEvent({
        source: "urn:event:from:my-api/resource/123",
        type: "your.event.source.type",
        id: "your-event-id",
        dataContentType: "application/json",
        data: {"hello": "World " + eventIndex},
    });

    // Emit the event
    emitter.send(myevent)
        .then(response => {
            // Treat the response
            console.log("Event posted successfully");
            console.log(response.data);
        })
        .catch(err => {
            // Deal with errors
            console.log("Error during event post");
            console.error(err);
        });
}, 1000);
# File - Dockerfile

FROM node:10
WORKDIR /usr/src/app
COPY package*.json ./
RUN npm install
COPY . .
EXPOSE 8080
CMD [ "node", "index.js" ]

Build and push the image:

docker build . -t path/to/image/registry/node-knative-heartbeat-source:v1
docker push path/to/image/registry/node-knative-heartbeat-source:v1

Create the event display service which simply logs any cloudevents posted to it.

cat <<EOS |kubectl apply -f -
---
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: event-display
spec:
  template:
    spec:
      containers:
        - image: docker.io/aliok/event_display-864884f202126ec3150c5fcef437d90c@sha256:93cb4dcda8fee80a1f68662ae6bf20301471b046ede628f3c3f94f39752fbe08
EOS

Create the ContainerSource:

cat <<EOS |kubectl apply -f -
---
apiVersion: sources.knative.dev/v1alpha2
kind: ContainerSource
metadata:
  name: test-heartbeats
spec:
  template:
    spec:
      containers:
        - image: path/to/image/registry/node-knative-heartbeat-source:v1
          name: heartbeats
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display
EOS

Check the logs of the event display service. You will see a new message is pushed every second:

$ kubectl logs -l serving.knative.dev/service=event-display -c user-container

☁️  cloudevents.Event
Validation: valid
Context Attributes,
  specversion: 1.0
  type: your.event.source.type
  source: urn:event:from:your-api/resource/123
  id: your-event-id
  datacontenttype: application/json
Data,
  {
    "hello": "World 1"
  }

If you are interested in seeing what is injected into the event source as a K_SINK, you can check the logs:

$ kubectl logs test-heartbeats-deployment-7575c888c7-85w5t

Sink URL is http://event-display.default.svc.cluster.local
Emitting event #1
Emitting event #2
Event posted successfully
Event posted successfully

Please note that the example code above is using Binary mode for CloudEvents. Simply change

let binding = new v1.BinaryHTTPEmitter(config);

with

let binding = new v1.StructuredHTTPEmitter(config);

to employ structured mode.

However, binary mode should be used in most of the cases as:

  • It is faster in terms of serialization and deserialization
  • It works better with cloudevents-aware proxies (like Knative Channels) can simply check the header instead of parsing the payload

Making use of SinkBinding

SinkBinding is a more powerful way of making any Kubernetes resource an event source.

ContainerSource will create the container for your event source's image and it will be ContainerSource responsibility to manage the container.

SinkBinding though, will not create any containers. It will inject the sink information to the already existing Kubernetes resources. This is a more flexible approach as you can use any Kubernetes PodSpecable as an event source, such as Deployment, Job, Knative Service, DaemonSet etc.

We don't need any code changes in our source for making it work with SinkBinding.

Create the event display as in the section before:

cat <<EOS |kubectl apply -f -
---
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: event-display
spec:
  template:
    spec:
      containers:
        - image: docker.io/aliok/event_display-864884f202126ec3150c5fcef437d90c@sha256:93cb4dcda8fee80a1f68662ae6bf20301471b046ede628f3c3f94f39752fbe08
EOS

Create a Kubernetes deployment that runs the event source:

cat <<EOS |kubectl apply -f -
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: node-heartbeats-deployment
  labels:
    app: node-heartbeats
spec:
  replicas: 2
  selector:
    matchLabels:
      app: node-heartbeats
  template:
    metadata:
      labels:
        app: node-heartbeats
    spec:
      containers:
      - name: node-heartbeats
        image: path/to/image/registry/node-knative-heartbeat-source:v1
        ports:
        - containerPort: 8080
EOS

As the SinkBinding is not created yet, K_SINK environment variable is not yet injected and the event source will complain about that.

$ kubectl logs node-heartbeats-deployment-9ffbb644b-llkzk

Sink URL is undefined
Emitting event #1
Error during event post
TypeError [ERR_INVALID_ARG_TYPE]: The "url" argument must be of type string. Received type undefined

Create the SinkBinding:

cat <<EOS |kubectl apply -f -
---
apiVersion: sources.knative.dev/v1alpha1
kind: SinkBinding
metadata:
  name: bind-node-heartbeat
spec:
  subject:
    apiVersion: apps/v1
    kind: Deployment
    selector:
      matchLabels:
        app: node-heartbeats
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display
EOS

You will see the pods are recreated and this time the K_SINK environment variable is injected.

Also note that since the replicas is set to 2, there will be 2 pods that are posting events to the sink.

$ kubectl logs event-display-dpplv-deployment-67c9949cf9-bvjvk -c user-container

☁️  cloudevents.Event
Validation: valid
Context Attributes,
  specversion: 1.0
  type: your.event.source.type
  source: urn:event:from:your-api/resource/123
  id: your-event-id
  datacontenttype: application/json
Data,
  {
    "hello": "World 1"
  }

☁️  cloudevents.Event
Validation: valid
Context Attributes,
  specversion: 1.0
  type: your.event.source.type
  source: urn:event:from:your-api/resource/123
  id: your-event-id
  datacontenttype: application/json
Data,
  {
    "hello": "World 1"
  }