Skip to content

Kubernetes Operators II: Using Operator-Framework SDK on top of Kubebuilder

The second in a two-part series on building a Kubernetes Operator implements a use case with real value for our Kubernetes cluster security.

In the first instalment of this two-post series , we described how to install and configure our environment to implement a simple Kubernetes operator skeleton using the Operator-Framework SDK. We continue our work today by implementing a real use-case in which we use Clair as a backend for indexing and scanning our deployed Pod container images. Newly discovered Common Vulnerabilities and Exposures (CVEs) will trigger a notifier app via webhooks and inform us on Slack about it.

Requirements

As described above, our environment needs to be installed and configured. This includes kinD as our local Kubernetes cluster, a local container registry and the Operator-Framework itself. We expect to have an up-and-running kinD cluster and container registry.

Slack

For our use case, we need to prepare a Slack-based endpoint to send notifications to, a Slack-Channel to send our notifications to and as a registered Slack-App to receive webhook messages. We won't go into how you create a Slack-Channel and a registered webhook Slack-App here: Just keep the Slack webhook URL in mind for later use.

Install Clair

For the sake of simplicity, we will run Clair and its dependencies in our cluster in combo mode. Make sure the KUBECONFIG environment variable points to the right Kubernetes config file.

text
$ export KUBECONFIG=./kinD/kind.kubeconf
$ kubectl create namespace clair

Clair needs a config file for all its components: the notifier, matcher and indexer:

text
$ cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
  name: clair-config
  namespace: clair
data:
  config.yml: |
    log_level: debug-color
    http_listen_addr: ":6000"
    updaters: {}
    indexer:
      connstring: host=clair-db port=5432 user=clair dbname=clair sslmode=disable
      scanlock_retry: 10
      layer_scan_concurrency: 5
      migrations: true
    matcher:
      indexer_addr: http://clair:6000/
      connstring: host=clair-db port=5432 user=clair dbname=clair sslmode=disable
      max_conn_pool: 100
      migrations: true
    notifier:
      indexer_addr: http://clair:6000/
      matcher_addr: http://clair:6000/
      connstring: host=clair-db port=5432 user=clair dbname=clair sslmode=disable
      migrations: true
      delivery_interval: 5s
      poll_interval: 15s
      webhook:
        target: "http://clair-notifier.operator-system:8080/notify"
        callback: "http://clair.clair:6000/notifier/api/v1/notification"
EOF

Manifests, vulnerabilities and notifications are stored in a database by Clair:

text
$ cat <<EOF | kubectl apply -f -
apiVersion: apps/v1
kind: Deployment
metadata:
  name: clair-db
  namespace: clair
  labels:
    app: clair-db
spec:
  replicas: 1
  selector:
    matchLabels:
      app: clair-db
  template:
    metadata:
      labels:
        app: clair-db
    spec:
      containers:
      - name: clair
        image: postgres:12.1
        ports:
        - containerPort: 5432
        env:
        - name: POSTGRES_USER
          value: clair
        - name: POSTGRES_DB
          value: clair
EOF

We will run Clair in combo mode for now to have all components in one place. Feel free to deploy all services in separate pods.

text
$ cat <<EOF | kubectl apply -f -
apiVersion: apps/v1
kind: Deployment
metadata:
  name: clair
  namespace: clair
  labels:
    app: clair
spec:
  replicas: 1
  selector:
    matchLabels:
      app: clair
  template:
    metadata:
      labels:
        app: clair
    spec:
      initContainers:
      - name: wait
        image: postgres:12.1
        command: ["pg_isready", "-h", "clair-db", "-p", "5432"]
      containers:
      - name: clair
        image: quay.io/coreos/clair:v4.1.1
        ports:
        - containerPort: 6000
        args: ["-conf", "/etc/config/config.yml"]
        volumeMounts:
          - name: config-volume
            mountPath: /etc/config
        env:
        - name: CLAIR_MODE
          value: combo
      volumes:
        - name: config-volume
          configMap:
            name: clair-config
            items:
            - key: config.yml
              path: config.yml
EOF

Clair Operator

Unlike our part 1 Operator, our Clair-Operator will not define a scheduling resource. We will instead create a controller that watches for Pod change events — such as CREATE or UPDATE — and trigger our reconciler function related to that Pod. Handling DELETE events is more difficult because the Pod reference is lost. Our Operator needs to register container images to Clair using container manifests and let Clair create vulnerability reports for it.

Create Operator

First, we initialise our project and create an API in group core and version v1alpha1 as well as our sample Scanner CR, as we did in part 1.

text
$ mkdir operator
$ cd operator
$ operator-sdk init --repo github.com/nearform/clair-operator --domain nearform.com
$ operator-sdk create api --group core --version v1alpha1 --kind Scanner --resource --controller

Set up controller

As described above, we need a Pod watcher to trigger our reconciler function. As we know from part 1, the SetupWithManager() function in controllers/scanner_controller.go builds and sets up our controller type and returns it to the caller in the main() function before setting up a signal handler and starting the controller manager.

js
func (r *ScannerReconciler) SetupWithManager(mgr ctrl.Manager) error {
  return ctrl.NewControllerManagedBy(mgr).
    For(&corev1.Pod{}).
    Watches(&source.Kind{Type: &corev1.Pod{}},
      handler.EnqueueRequestsFromMapFunc(
	  func(pod client.Object) []reconcile.Request {
	    return []reconcile.Request{
	      {NamespacedName: types.NamespacedName{
	        Name:      pod.GetName(),
		  Namespace: pod.GetNamespace(),
		}},
	    }
	  },
	),
    ).
    Complete(r)
}

Custom Resource definition

Let’s have a look at our Custom Resource, our ScannerSpec Type defined in api/v1alpha1/scanner_types.go:

js
type ScannerSpec struct {
  Backend         string `json:"backend,omitempty"`
  ClairBaseUrl    string `json:"clairBaseUrl,omitempty"`
  SlackWebhookUrl string `json:"slackWebhookUrl,omitempty"`
  Notifier        string `json:"notifier,omitempty"`
}

The idea behind the Backend field is to define different scanner backends. In our sample, we will only have a Clair typed Scanner resource. As Notifier, we only provide a Slack notifier, but in order to show the flexibility of our Scanner type, it is showing here. ClairBaseUrl as well as SlackWebhookUrl are self-explanatory and point to the corresponding endpoints.

text
$ cat <<EOF | kubectl apply -f -
apiVersion: core.nearform.com/v1alpha1
kind: Scanner
metadata:
  name: clair-scanner
spec:
  slackWebhookUrl: "Your Webhook URL"
  backend: "clair"
  notifier: "slack"
  clairBaseUrl: "http://clair.clair:6000/"

Scanner Reconciler

We are now ready to implement our operator logic, which will be done in the Reconcile() function of our ScannerReconciler type.

text
func (r *ScannerReconciler) Reconcile(ctx context.Context, req ctrl.Request)

First, we need to define variables for a list of Scanners and exactly one Pod — the Pod which creates an Event our Manager was watching for.

js
var (
  scanners corev1alpha1.ScannerList
  pod      corev1.Pod
)

Because our scanner is responsible for a specific Pod and not for Scanner objects, we have to request the scanner objects for the specified namespace and store them in our scanner's variable.

js
opts := []client.ListOption{
  client.InNamespace(req.NamespacedName.Namespace),
}
if err := r.List(ctx, &scanners, opts...); err != nil {
  return ctrl.Result{}, client.IgnoreNotFound(err)
}

If no Scanner CRs are defined for the specified namespace, we can stop here and return to the Manager.

js
if len(scanners.Items) == 0 {
  log.Info("No Scanners found for namespace " + req.NamespacedName.Namespace)
  return ctrl.Result{}, nil
}

In case a Scanner was found, we need to fetch the Pod which is referenced by its name and namespace and save it into our pod variable.

js
if err := r.Get(ctx, req.NamespacedName, &pod); err != nil {
  log.Error(err, "unable to fetch Pod")
  return ctrl.Result{}, client.IgnoreNotFound(err)
}

As described before, handling DELETE events is more difficult and we need to add and remove a custom finalizer to the current Pod.

js
isPodMarkedToBeDeleted := pod.GetDeletionTimestamp() != nil
if isPodMarkedToBeDeleted {
  if controllerutil.ContainsFinalizer(&pod, podFinalizer) {
    if err := r.finalizePod(ctx, &pod, scanners.Items[0].Spec.ClairBaseUrl, scanners.Items[0].Spec.SlackWebhookUrl); err != nil {
	return ctrl.Result{}, err
    } 
    controllerutil.RemoveFinalizer(&pod, podFinalizer)
    _ = r.Update(ctx, &pod)
  }
}
if !controllerutil.ContainsFinalizer(&pod, podFinalizer) {
  controllerutil.AddFinalizer(&pod, podFinalizer)
  _ = r.Update(ctx, &pod)
}

The finalizePod() function implements a cleanup logic for Clair index database and informs us on Slack. Currently, Clair API does not provide a delete endpoint.

js
func (r *ScannerReconciler) finalizePod(ctx context.Context, pod *corev1.Pod, clairBaseUrl string, slackWebhookUrl string) error {
  [...]
}

We don't need the Pod itself — only its referenced container images. So let’s iterate through all the containers. Iteration through InitContainers is not implemented here but can be done in the same way.

js
for _, container := range pod.Status.ContainerStatuses{...}

First, let’s extract the manifest definition to pass it to Clair’s indexing endpoint:

js
manifest, err := docker.Inspect(ctx, container.Image)
if err != nil {
  log.Error(err, "Error while inspecting container Manifest")
  continue
}

The docker.Inspect() function of module docker returns a struct of type claircore.Manifest needs to be implemented. You can find out how to do this in github.com/quay/clair . Inspect() just extracts the image name and repository to connect to the registry storing that image and gets the Digests and Layers to build the claircore.Manifest type.

js
func Inspect(ctx context.Context, image string) (*claircore.Manifest, error) {
 [...]
}

In order to avoid reindexing already processed Pods we can define Annotations for better filtering. We use Patch() here instead of Update() to prevent the Pod from being fetched again and continue working with the initialised Pod instance.

js
if pod.Annotations["com.nearform."+container.Name+".hash"] == manifest.Hash.String() {
  continue
} else {
  patch := []byte(`{"metadata":{"annotations":{"com.nearform.` + container.Name + `.hash": "` + manifest.Hash.String() + `"}}}`)
  if err := r.Patch(ctx, &pod, client.RawPatch(types.StrategicMergePatchType, patch)); err != nil {
    continue
  }
}

Now we can request Clair index API endpoint by using ClairBaseUrl from the current Scanner CR.

js
func ManifestToJson(ctx context.Context, manifest *claircore.Manifest) ([]byte, error) {
  payload, err := json.Marshal(manifest)
  if err != nil {
    return []byte{}, err
  }
  return payload, nil
}

With an already indexed container image, we can request the vulnerability report endpoint.

js
func VulnerabilityReport(ctx context.Context, url string, hash string) ([]byte, error) {
  resp, err := http.Get(url + hash)
  if err != nil {
    return []byte{}, err
  }
  defer resp.Body.Close()
  body, err := ioutil.ReadAll(resp.Body)
  if err != nil {
    return []byte{}, err
  }
  return body, nil
}

Slack notifications

We have three types of Slack notifications to handle. Each time a new Container image is indexed (pod created or updated), we will notify Slack using Webhooks.

js
func CreateIndexReportSlackMessage(ctx context.Context, r []byte, pod *corev1.Pod, image string, clairBaseUrl string) (slack.WebhookMessage, error) {
  report := claircore.IndexReport{}
  // Build your Slack Message here
  [...]
}

We will also send notifications about vulnerability reports for the newly indexed images.

js
func CreateVulnerabilityReportSlackMessage(ctx context.Context, r []byte, pod *corev1.Pod, image string, clairBaseUrl string) (slack.WebhookMessage, error) {
  report := claircore.VulnerabilityReport{}
  // Build your Slack Message here
  [...]
}

The third option to notify is not a direct part of the Operator. Clair itself can send notification webhooks for newly discovered vulnerabilities for already indexed container images. The payload of that webhook is fixed and cannot be customised via config parameters. So for simplicity let’s add a simple small web server in go and run it in our Kubernetes Cluster to listen for these webhooks and transform them into Slack compliant webhooks.

js
package main
import (
  "net/http"
  "github.com/slack-go/slack"
  "log"
  "io/ioutil"
  "encoding/json"
  "github.com/quay/claircore"
  "os"
  "time"
)

type Notification struct {
  Id string `json:"id"`
  Manifest string `json:"manifest"`
  Reason string `json:"reason"`
  Vulnerability *VulnSummary `json:"vulnerability"`
}

type VulnSummary struct {
  Name string `json:"name"`
  FixedInVersion string `json:"fixed_in_version"`
  Links string `json:"links"`
  Description string `json:"description"`
  NormalizedSeverity string `json:"normalized_severity"`
  Package *claircore.Package `json:"package"`
  Distribution *claircore.Distribution `json:"distribution"`
  Repository *claircore.Repository `json:"repository"`
}

type Page struct {
  Size int32 `json:"size"`
  Next string `json:"next"`
}

type PageNotification struct {
  Page *Page `json:"page"`
  Notifications []*Notification `json:"notifications"`
}

type Webhook struct {
  Id string `json:"notification_id"`
  Callback string `json:"callback"`
}

func CreateNotificationSlackMessage(w []byte) (slack.WebhookMessage, error) {
  webhook := Webhook{}
  if err := json.Unmarshal(w, &webhook); err != nil {
    return slack.WebhookMessage{}, err
  }
  resp, err := http.Get(webhook.Callback)
  if err != nil {
    return slack.WebhookMessage{}, err
  }
  defer resp.Body.Close()
  body, err := ioutil.ReadAll(resp.Body)
  if err != nil {
    return slack.WebhookMessage{}, err
  }
  pageNotification := PageNotification{}
  if err := json.Unmarshal(body, &pageNotification); err != nil {
    return slack.WebhookMessage{}, err
  }
  // Build your Slack Message here
  [...]
}

func NotifyHandler(w http.ResponseWriter, req *http.Request) {
  body, _ := ioutil.ReadAll(req.Body)
  msg, _ := CreateNotificationSlackMessage(body)
  err := slack.PostWebhook(os.Getenv("SLACK_WEBHOOK_URL"), &msg)
}

func main() {
  http.HandleFunc("/notify", NotifyHandler)
}

We need to build a container Image using docker CLI

text
$ docker build -t localhost:5000/clair-notifier:0.0.1
$ docker push localhost:5000/clair-notifier:0.0.1

Let’s deploy our notifier app into our Kubernetes cluster. We use a Secret for our Slack Webhook URL:

text
$ cat >>EOF | kubectl apply -f -
apiVersion: v1
kind: Service
metadata:
  name: clair-notifier
  namespace: operator-system
spec:
  selector:
    app: clair-notifier
  ports:
    - protocol: TCP
      port: 8080
      targetPort: 8080
EOF

What we've achieved

We have a running Clair scanner and notifier for Slack. Our scanner CR is Namespace scoped and will be triggered every time a Pod is created, updated or deleted within that Namespace. Furthermore, we defined Annotations for already existing Pods to prevent reprocessing without container image changes.

A container manifest is created using a docker registry and requests Clair to index it and create a first vulnerability report. To inform users about newly created reports we created a webhook for Slack. Newly discovered vulnerabilities for running and indexed container images create events we receive with our notifier to transform them into Slack webhooks. We did not handle multiple Scanners for the same Namespace nor cover a fine granulated and structured CRD for different Scanner types and backends.

Insight, imagination and expertly engineered solutions to accelerate and sustain progress.

Contact