We use analytics and cookies to understand site traffic. Information about your use of our site is shared with Google for that purpose. Learn more.
Receive Adapter Implementation and Design
Receive Adapter cmd
Similar to the controller, we'll need an injection based main.go
similar to the controller under cmd/receiver_adapter/main.go
// This Adapter generates events at a regular interval.
package main
import (
"knative.dev/eventing/pkg/adapter"
myadapter "knative.dev/sample-source/pkg/adapter"
)
func main() {
adapter.Main("sample-source", myadapter.NewEnv, myadapter.NewAdapter)
}
Defining NewAdapter implementation and Start function
The adapter's pkg
implementation consists of two main functions;
- A
NewAdapter(ctx context.Context, aEnv adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {}
call, which creates the new adapter with passed variables via theEnvConfigAccessor
. The created adapter will be passed the cloudevents client (which is where the events are forwarded to). This is sometimes referred to as a sink, orceClient
in the Knative ecosystem. The return value is a reference to the adapter as defined by the adapter's local struct.
In our sample-source
's case;
// Adapter generates events at a regular interval.
type Adapter struct {
logger *zap.Logger
interval time.Duration
nextID int
client cloudevents.Client
}
Start
function implemented as an interface to the adapter struct.
func (a *Adapter) Start(stopCh <-chan struct{}) error {
stopCh
is the signal to stop the Adapter. Otherwise the role of the function is to process the next
event. In the case of the sample-source
, it creates an event to forward to the specified cloudevent sink/client
every X interval, as specified by the loaded EnvConfigAccessor
(loaded via the resource yaml).
func (a *Adapter) Start(stopCh <-chan struct{}) error {
a.logger.Infow("Starting heartbeat", zap.String("interval", a.interval.String()))
for {
select {
case <-time.After(a.interval):
event := a.newEvent()
a.logger.Infow("Sending new event", zap.String("event", event.String()))
if result := a.client.Send(context.Background(), event); !cloudevents.IsACK(result) {
a.logger.Infow("failed to send event", zap.String("event", event.String()), zap.Error(result))
// We got an error but it could be transient, try again next interval.
continue
}
case <-stopCh:
a.logger.Info("Shutting down...")
return nil
}
}
}
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.