We use analytics and cookies to understand site traffic. Information about your use of our site is shared with Google for that purpose. Learn more.
Reconcile sources
Now that we have a Source CRD defined with Sink and SinkURI fields, we'll need to use those fields in the Source controller to resolve Sink references and set the SinkURI in the Source object's status.
Remove Deployment creation from the Reconcile function
Locate the Reconcile function generated by Kubebuilder. In the reference
project, this is pkg/controller/samplesource/samplesource_controller.go
.
The generated Reconcile function creates a Deployment owned by the reconciled resource. Since we don't need to create a deployment for this tutorial, remove this code.
Remove the kubebuilder:rbac
annotation comments giving the controller RBAC
permissions to manipulate Deployments. Since the controller no longer creates
Deployment objects, it won't need these permissions. These comments are above
the Reconcile
function declaration.
Remove the second Watch
call watching Deployments from the add
function.
Since the source no longer owns Deployment objects, there's no need to watch for
changes to them.
These edits in the reference project can be viewed at https://github.com/grantr/sample-source/pull/5.
Update Source Status in Reconcile function
The controller's Reconcile function needs to update the reconciled source object's Status. Add code to the function to preserve the original object, then compare it with the reconciled object and update the status if a change was made.
// Create a copy to determine whether the instance has been modified.
original := instance.DeepCopy()
// Reconcile the object. If an error occurred, don't return immediately;
// update the object Status first.
reconcileErr := r.reconcile(ctx, instance)
// Update object Status if necessary. This happens even if the reconcile
// returned an error.
if !equality.Semantic.DeepEqual(original.Status, instance.Status) {
log.Info("Updating Status", "request", request.NamespacedName)
// An error may occur here if the object was updated since the last Get.
// Return the error so the request can be retried later.
// This call uses the /status subresource to ensure that the object's spec
// is never updated by the controller.
if err := r.Status().Update(ctx, instance); err != nil {
return reconcile.Result{}, err
}
}
return reconcile.Result{}, reconcileErr
You'll need to define the inner reconcile
function also. Leave it as a stub
for now.
func (r *ReconcileSampleSource) reconcile(ctx context.Context, instance *sourcesv1alpha1.SampleSource) error {
return nil
}
These edits in the reference project can be viewed at https://github.com/grantr/sample-source/pull/6.
Resolve SinkURI from the source's Sink reference
To populate the SinkURI status field, the controller needs to get the object named in the Sink reference and look for a URI that can be used as a sink address. Many Knative resources have the following fields in their Status:
status:
address:
hostname: example.default.svc.cluster.local
Resources with these fields are called Addressable. The source controller will
use the value in the hostname
field to determine a SinkURI.
Add code to the inner reconcile
function to set the SinkURI field in the
source Status.
func (r *ReconcileSampleSource) reconcile(ctx context.Context, instance *sourcesv1alpha1.SampleSource) error {
// Resolve the Sink URI based on the sink reference.
sinkURI, err := r.resolveSinkRef(ctx, instance.Spec.Sink)
if err != nil {
return fmt.Errorf("Failed to get sink URI: %v", err)
}
// Set the SinkURI field on the SampleSource Status.
instance.Status.SinkURI = sinkURI
//TODO(user): Add additional behavior.
return nil
}
Write the resolveSinkRef
function. This will take the sink reference from the
Source spec, get the referenced object, and return its Addressable hostname as a
string.
After completing this tutorial, consider replacing the code below with existing
sink resolution helpers provided by Knative: AddressableType
from
[github.com/knative/pkg/apis/duck/v1alpha1](https://github.com/knative/pkg/tree/release-0.13/apis/duck/v1alpha1)
and GetSinkURI
from
[github.com/knative/eventing-contrib/pkg/controller/sinks](https://github.com/knative/eventing-contrib/tree/release-0.13/pkg/controller/sinks)
.
type addressableType struct {
Status struct {
Address *struct {
Hostname string
}
}
}
// TODO(user): A version of this function is also available in the
// github.com/knative/eventing-contrib/pkg/controller/sinks package.
func (r *ReconcileSampleSource) resolveSinkRef(ctx context.Context, sinkRef *corev1.ObjectReference) (string, error) {
// Make sure the reference is not nil.
if sinkRef == nil {
return "", fmt.Errorf("sink reference is nil")
}
//TODO(user): Add support for corev1.Service.
// Get the referenced Sink as an Unstructured object.
sink := &unstructured.Unstructured{}
sink.SetGroupVersionKind(sinkRef.GroupVersionKind())
if err := r.Get(ctx, client.ObjectKey{Namespace: sinkRef.Namespace, Name: sinkRef.Name}, sink); err != nil {
return "", fmt.Errorf("Failed to get sink object: %v", err)
}
// Marshal the Sink into an Addressable struct to more easily extract its
// hostname.
addressable := &addressableType{}
raw, err := sink.MarshalJSON()
if err != nil {
return "", fmt.Errorf("Failed to marshal sink: %v", err)
}
if err := json.Unmarshal(raw, addressable); err != nil {
return "", fmt.Errorf("Failed to marshal sink into Addressable: %v", err)
}
// Check that the Addressable fields are present.
if addressable.Status.Address == nil {
return "", fmt.Errorf("Failed to resolve sink URI: sink does not contain address")
}
if addressable.Status.Address.Hostname == "" {
return "", fmt.Errorf("Failed to resolve sink URI: address hostname is empty")
}
// Translate the Hostname into a URI.
return fmt.Sprintf("http://%s/", addressable.Status.Address.Hostname), nil
}
Add test to verify SinkURI resolution
To verify that the inner reconcile
and resolveSinkRef
functions work, we'll
create an Addressable object before the test, then verify that the source is
updated with its Addressable hostname.
Create an Addressable object to use as sink. In the reference project, this
relies on a TestSink CRD created in
pkg/controller/samplesource/samplesource_controller_suite_test.go
(not shown).
The test itself is in
pkg/controller/samplesource/samplesource_controller_test.go
.
sink := &unstructured.Unstructured{}
sink.SetGroupVersionKind(schema.GroupVersionKind{
Group: "sources.knative.dev",
Version: "v1alpha1",
Kind: "TestSink",
})
sink.SetName("foosink")
sink.SetNamespace("default")
sink.SetUnstructuredContent(
map[string]interface{}{
"Status": map[string]interface{}{
"Address": map[string]interface{}{
"Hostname": "example.com",
},
},
},
)
err = c.Create(context.TODO(), sink)
After the test reconcile occurs, verify that the SinkURI was correctly updated.
// Expect the SampleSource object to be updated with the SinkURI
updatedInstance := &sourcesv1alpha1.SampleSource{}
g.Eventually(func() error {
if err := c.Get(context.TODO(), srcKey, updatedInstance); err != nil {
return err
}
if updatedInstance.Status.SinkURI != "http://example.com/" {
t.Errorf("Unexpected SinkURI: want %q, got %q", "https://example.com/", updatedInstance.Status.SinkURI)
}
return nil
}, timeout).Should(gomega.Succeed())
These edits in the reference project can be viewed at https://github.com/grantr/sample-source/pull/9.
Next: Publish to Cluster
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.