Reconcile sources

Now that we have a Source CRD defined with Sink and SinkURI fields, we'll need to use those fields in the Source controller to resolve Sink references and set the SinkURI in the Source object's status.

Remove Deployment creation from the Reconcile function

Locate the Reconcile function generated by Kubebuilder. In the reference project, this is pkg/controller/samplesource/samplesource_controller.go.

The generated Reconcile function creates a Deployment owned by the reconciled resource. Since we don't need to create a deployment for this tutorial, remove this code.

Remove the kubebuilder:rbac annotation comments giving the controller RBAC permissions to manipulate Deployments. Since the controller no longer creates Deployment objects, it won't need these permissions. These comments are above the Reconcile function declaration.

Remove the second Watch call watching Deployments from the add function. Since the source no longer owns Deployment objects, there's no need to watch for changes to them.

These edits in the reference project can be viewed at https://github.com/grantr/sample-source/pull/5.

Update Source Status in Reconcile function

The controller's Reconcile function needs to update the reconciled source object's Status. Add code to the function to preserve the original object, then compare it with the reconciled object and update the status if a change was made.

// Create a copy to determine whether the instance has been modified.
original := instance.DeepCopy()

// Reconcile the object. If an error occurred, don't return immediately;
// update the object Status first.
reconcileErr := r.reconcile(ctx, instance)

// Update object Status if necessary. This happens even if the reconcile
// returned an error.
if !equality.Semantic.DeepEqual(original.Status, instance.Status) {
  log.Info("Updating Status", "request", request.NamespacedName)
  // An error may occur here if the object was updated since the last Get.
  // Return the error so the request can be retried later.
  // This call uses the /status subresource to ensure that the object's spec
  // is never updated by the controller.
  if err := r.Status().Update(ctx, instance); err != nil {
    return reconcile.Result{}, err
  }
}

return reconcile.Result{}, reconcileErr

You'll need to define the inner reconcile function also. Leave it as a stub for now.

func (r *ReconcileSampleSource) reconcile(ctx context.Context, instance *sourcesv1alpha1.SampleSource) error {
  return nil
}

These edits in the reference project can be viewed at https://github.com/grantr/sample-source/pull/6.

Resolve SinkURI from the source's Sink reference

To populate the SinkURI status field, the controller needs to get the object named in the Sink reference and look for a URI that can be used as a sink address. Many Knative resources have the following fields in their Status:

status:
  address:
    hostname: example.default.svc.cluster.local

Resources with these fields are called Addressable. The source controller will use the value in the hostname field to determine a SinkURI.

Add code to the inner reconcile function to set the SinkURI field in the source Status.

func (r *ReconcileSampleSource) reconcile(ctx context.Context, instance *sourcesv1alpha1.SampleSource) error {

  // Resolve the Sink URI based on the sink reference.
  sinkURI, err := r.resolveSinkRef(ctx, instance.Spec.Sink)
  if err != nil {
    return fmt.Errorf("Failed to get sink URI: %v", err)
  }

  // Set the SinkURI field on the SampleSource Status.
  instance.Status.SinkURI = sinkURI

  //TODO(user): Add additional behavior.
  return nil
}

Write the resolveSinkRef function. This will take the sink reference from the Source spec, get the referenced object, and return its Addressable hostname as a string.

After completing this tutorial, consider replacing the code below with existing sink resolution helpers provided by Knative: AddressableType from [github.com/knative/pkg/apis/duck/v1alpha1](https://github.com/knative/pkg/tree/release-0.14/apis/duck/v1alpha1) and GetSinkURI from [github.com/knative/eventing-contrib/pkg/controller/sinks](https://github.com/knative/eventing-contrib/tree/release-0.14/pkg/controller/sinks).

type addressableType struct {
  Status struct {
    Address *struct {
      Hostname string
    }
  }
}

// TODO(user): A version of this function is also available in the
// github.com/knative/eventing-contrib/pkg/controller/sinks package.
func (r *ReconcileSampleSource) resolveSinkRef(ctx context.Context, sinkRef *corev1.ObjectReference) (string, error) {
  // Make sure the reference is not nil.
  if sinkRef == nil {
    return "", fmt.Errorf("sink reference is nil")
  }

  //TODO(user): Add support for corev1.Service.

  // Get the referenced Sink as an Unstructured object.
  sink := &unstructured.Unstructured{}
  sink.SetGroupVersionKind(sinkRef.GroupVersionKind())
  if err := r.Get(ctx, client.ObjectKey{Namespace: sinkRef.Namespace, Name: sinkRef.Name}, sink); err != nil {
    return "", fmt.Errorf("Failed to get sink object: %v", err)
  }

  // Marshal the Sink into an Addressable struct to more easily extract its
  // hostname.
  addressable := &addressableType{}
  raw, err := sink.MarshalJSON()
  if err != nil {
    return "", fmt.Errorf("Failed to marshal sink: %v", err)
  }
  if err := json.Unmarshal(raw, addressable); err != nil {
    return "", fmt.Errorf("Failed to marshal sink into Addressable: %v", err)
  }

  // Check that the Addressable fields are present.
  if addressable.Status.Address == nil {
    return "", fmt.Errorf("Failed to resolve sink URI: sink does not contain address")
  }
  if addressable.Status.Address.Hostname == "" {
    return "", fmt.Errorf("Failed to resolve sink URI: address hostname is empty")
  }
  // Translate the Hostname into a URI.
  return fmt.Sprintf("http://%s/", addressable.Status.Address.Hostname), nil
}

Add test to verify SinkURI resolution

To verify that the inner reconcile and resolveSinkRef functions work, we'll create an Addressable object before the test, then verify that the source is updated with its Addressable hostname.

Create an Addressable object to use as sink. In the reference project, this relies on a TestSink CRD created in pkg/controller/samplesource/samplesource_controller_suite_test.go (not shown). The test itself is in pkg/controller/samplesource/samplesource_controller_test.go.

sink := &unstructured.Unstructured{}
sink.SetGroupVersionKind(schema.GroupVersionKind{
  Group:   "sources.knative.dev",
  Version: "v1alpha1",
  Kind:    "TestSink",
})
sink.SetName("foosink")
sink.SetNamespace("default")
sink.SetUnstructuredContent(
  map[string]interface{}{
    "Status": map[string]interface{}{
      "Address": map[string]interface{}{
        "Hostname": "example.com",
      },
    },
  },
)

err = c.Create(context.TODO(), sink)

After the test reconcile occurs, verify that the SinkURI was correctly updated.

// Expect the SampleSource object to be updated with the SinkURI
updatedInstance := &sourcesv1alpha1.SampleSource{}
g.Eventually(func() error {
  if err := c.Get(context.TODO(), srcKey, updatedInstance); err != nil {
    return err
  }
  if updatedInstance.Status.SinkURI != "http://example.com/" {
    t.Errorf("Unexpected SinkURI: want %q, got %q", "https://example.com/", updatedInstance.Status.SinkURI)
  }
  return nil
}, timeout).Should(gomega.Succeed())

These edits in the reference project can be viewed at https://github.com/grantr/sample-source/pull/9.

Next: Publish to Cluster