Kubernetes Operators II: Using Operator-Framework SDK on top of Kubebuilder
The second in a two-part series on building a Kubernetes Operator implements a use case with real value for our Kubernetes cluster security.
In the first instalment of this two-post series , we described how to install and configure our environment to implement a simple Kubernetes operator skeleton using the Operator-Framework SDK. We continue our work today by implementing a real use-case in which we use Clair as a backend for indexing and scanning our deployed Pod container images. Newly discovered Common Vulnerabilities and Exposures (CVEs) will trigger a notifier app via webhooks and inform us on Slack about it.
Requirements
As described above, our environment needs to be installed and configured. This includes kinD as our local Kubernetes cluster, a local container registry and the Operator-Framework itself. We expect to have an up-and-running kinD cluster and container registry.
Slack
For our use case, we need to prepare a Slack-based endpoint to send notifications to, a Slack-Channel to send our notifications to and as a registered Slack-App to receive webhook messages. We won't go into how you create a Slack-Channel and a registered webhook Slack-App here: Just keep the Slack webhook URL in mind for later use.
Install Clair
For the sake of simplicity, we will run Clair and its dependencies in our cluster in combo mode. Make sure the KUBECONFIG environment variable points to the right Kubernetes config file.
$ export KUBECONFIG=./kinD/kind.kubeconf
$ kubectl create namespace clair
Clair needs a config file for all its components: the notifier, matcher and indexer:
$ cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
name: clair-config
namespace: clair
data:
config.yml: |
log_level: debug-color
http_listen_addr: ":6000"
updaters: {}
indexer:
connstring: host=clair-db port=5432 user=clair dbname=clair sslmode=disable
scanlock_retry: 10
layer_scan_concurrency: 5
migrations: true
matcher:
indexer_addr: http://clair:6000/
connstring: host=clair-db port=5432 user=clair dbname=clair sslmode=disable
max_conn_pool: 100
migrations: true
notifier:
indexer_addr: http://clair:6000/
matcher_addr: http://clair:6000/
connstring: host=clair-db port=5432 user=clair dbname=clair sslmode=disable
migrations: true
delivery_interval: 5s
poll_interval: 15s
webhook:
target: "http://clair-notifier.operator-system:8080/notify"
callback: "http://clair.clair:6000/notifier/api/v1/notification"
EOF
Manifests, vulnerabilities and notifications are stored in a database by Clair:
$ cat <<EOF | kubectl apply -f -
apiVersion: apps/v1
kind: Deployment
metadata:
name: clair-db
namespace: clair
labels:
app: clair-db
spec:
replicas: 1
selector:
matchLabels:
app: clair-db
template:
metadata:
labels:
app: clair-db
spec:
containers:
- name: clair
image: postgres:12.1
ports:
- containerPort: 5432
env:
- name: POSTGRES_USER
value: clair
- name: POSTGRES_DB
value: clair
EOF
We will run Clair in combo mode for now to have all components in one place. Feel free to deploy all services in separate pods.
$ cat <<EOF | kubectl apply -f -
apiVersion: apps/v1
kind: Deployment
metadata:
name: clair
namespace: clair
labels:
app: clair
spec:
replicas: 1
selector:
matchLabels:
app: clair
template:
metadata:
labels:
app: clair
spec:
initContainers:
- name: wait
image: postgres:12.1
command: ["pg_isready", "-h", "clair-db", "-p", "5432"]
containers:
- name: clair
image: quay.io/coreos/clair:v4.1.1
ports:
- containerPort: 6000
args: ["-conf", "/etc/config/config.yml"]
volumeMounts:
- name: config-volume
mountPath: /etc/config
env:
- name: CLAIR_MODE
value: combo
volumes:
- name: config-volume
configMap:
name: clair-config
items:
- key: config.yml
path: config.yml
EOF
Clair Operator
Unlike our part 1 Operator, our Clair-Operator will not define a scheduling resource. We will instead create a controller that watches for Pod change events — such as CREATE or UPDATE — and trigger our reconciler function related to that Pod. Handling DELETE events is more difficult because the Pod reference is lost. Our Operator needs to register container images to Clair using container manifests and let Clair create vulnerability reports for it.
Create Operator
First, we initialise our project and create an API in group core and version v1alpha1 as well as our sample Scanner CR, as we did in part 1.
$ mkdir operator
$ cd operator
$ operator-sdk init --repo github.com/nearform/clair-operator --domain nearform.com
$ operator-sdk create api --group core --version v1alpha1 --kind Scanner --resource --controller
Set up controller
As described above, we need a Pod watcher to trigger our reconciler function. As we know from part 1, the SetupWithManager()
function in controllers/scanner_controller.go builds and sets up our controller type and returns it to the caller in the main()
function before setting up a signal handler and starting the controller manager.
func (r *ScannerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
Watches(&source.Kind{Type: &corev1.Pod{}},
handler.EnqueueRequestsFromMapFunc(
func(pod client.Object) []reconcile.Request {
return []reconcile.Request{
{NamespacedName: types.NamespacedName{
Name: pod.GetName(),
Namespace: pod.GetNamespace(),
}},
}
},
),
).
Complete(r)
}
Custom Resource definition
Let’s have a look at our Custom Resource, our ScannerSpec Type defined in api/v1alpha1/scanner_types.go:
type ScannerSpec struct {
Backend string `json:"backend,omitempty"`
ClairBaseUrl string `json:"clairBaseUrl,omitempty"`
SlackWebhookUrl string `json:"slackWebhookUrl,omitempty"`
Notifier string `json:"notifier,omitempty"`
}
The idea behind the Backend field is to define different scanner backends. In our sample, we will only have a Clair typed Scanner resource. As Notifier, we only provide a Slack notifier, but in order to show the flexibility of our Scanner type, it is showing here. ClairBaseUrl
as well as SlackWebhookUrl
are self-explanatory and point to the corresponding endpoints.
$ cat <<EOF | kubectl apply -f -
apiVersion: core.nearform.com/v1alpha1
kind: Scanner
metadata:
name: clair-scanner
spec:
slackWebhookUrl: "Your Webhook URL"
backend: "clair"
notifier: "slack"
clairBaseUrl: "http://clair.clair:6000/"
Scanner Reconciler
We are now ready to implement our operator logic, which will be done in the Reconcile()
function of our ScannerReconciler
type.
func (r *ScannerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
First, we need to define variables for a list of Scanners and exactly one Pod — the Pod which creates an Event our Manager was watching for.
var (
scanners corev1alpha1.ScannerList
pod corev1.Pod
)
Because our scanner is responsible for a specific Pod and not for Scanner objects, we have to request the scanner objects for the specified namespace and store them in our scanner's variable.
opts := []client.ListOption{
client.InNamespace(req.NamespacedName.Namespace),
}
if err := r.List(ctx, &scanners, opts...); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
If no Scanner CRs are defined for the specified namespace, we can stop here and return to the Manager.
if len(scanners.Items) == 0 {
log.Info("No Scanners found for namespace " + req.NamespacedName.Namespace)
return ctrl.Result{}, nil
}
In case a Scanner was found, we need to fetch the Pod which is referenced by its name and namespace and save it into our pod variable.
if err := r.Get(ctx, req.NamespacedName, &pod); err != nil {
log.Error(err, "unable to fetch Pod")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
As described before, handling DELETE events is more difficult and we need to add and remove a custom finalizer to the current Pod.
isPodMarkedToBeDeleted := pod.GetDeletionTimestamp() != nil
if isPodMarkedToBeDeleted {
if controllerutil.ContainsFinalizer(&pod, podFinalizer) {
if err := r.finalizePod(ctx, &pod, scanners.Items[0].Spec.ClairBaseUrl, scanners.Items[0].Spec.SlackWebhookUrl); err != nil {
return ctrl.Result{}, err
}
controllerutil.RemoveFinalizer(&pod, podFinalizer)
_ = r.Update(ctx, &pod)
}
}
if !controllerutil.ContainsFinalizer(&pod, podFinalizer) {
controllerutil.AddFinalizer(&pod, podFinalizer)
_ = r.Update(ctx, &pod)
}
The finalizePod() function implements a cleanup logic for Clair index database and informs us on Slack. Currently, Clair API does not provide a delete endpoint.
func (r *ScannerReconciler) finalizePod(ctx context.Context, pod *corev1.Pod, clairBaseUrl string, slackWebhookUrl string) error {
[...]
}
We don't need the Pod itself — only its referenced container images. So let’s iterate through all the containers. Iteration through InitContainers is not implemented here but can be done in the same way.
for _, container := range pod.Status.ContainerStatuses{...}
First, let’s extract the manifest definition to pass it to Clair’s indexing endpoint:
manifest, err := docker.Inspect(ctx, container.Image)
if err != nil {
log.Error(err, "Error while inspecting container Manifest")
continue
}
The docker.Inspect()
function of module docker returns a struct of type claircore.Manifest
needs to be implemented. You can find out how to do this in github.com/quay/clair . Inspect()
just extracts the image name and repository to connect to the registry storing that image and gets the Digests and Layers to build the claircore.Manifest
type.
func Inspect(ctx context.Context, image string) (*claircore.Manifest, error) {
[...]
}
In order to avoid reindexing already processed Pods we can define Annotations for better filtering. We use Patch() here instead of Update() to prevent the Pod from being fetched again and continue working with the initialised Pod instance.
if pod.Annotations["com.nearform."+container.Name+".hash"] == manifest.Hash.String() {
continue
} else {
patch := []byte(`{"metadata":{"annotations":{"com.nearform.` + container.Name + `.hash": "` + manifest.Hash.String() + `"}}}`)
if err := r.Patch(ctx, &pod, client.RawPatch(types.StrategicMergePatchType, patch)); err != nil {
continue
}
}
Now we can request Clair index API endpoint by using ClairBaseUrl from the current Scanner CR.
func ManifestToJson(ctx context.Context, manifest *claircore.Manifest) ([]byte, error) {
payload, err := json.Marshal(manifest)
if err != nil {
return []byte{}, err
}
return payload, nil
}
With an already indexed container image, we can request the vulnerability report endpoint.
func VulnerabilityReport(ctx context.Context, url string, hash string) ([]byte, error) {
resp, err := http.Get(url + hash)
if err != nil {
return []byte{}, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return []byte{}, err
}
return body, nil
}
Slack notifications
We have three types of Slack notifications to handle. Each time a new Container image is indexed (pod created or updated), we will notify Slack using Webhooks.
func CreateIndexReportSlackMessage(ctx context.Context, r []byte, pod *corev1.Pod, image string, clairBaseUrl string) (slack.WebhookMessage, error) {
report := claircore.IndexReport{}
// Build your Slack Message here
[...]
}
We will also send notifications about vulnerability reports for the newly indexed images.
func CreateVulnerabilityReportSlackMessage(ctx context.Context, r []byte, pod *corev1.Pod, image string, clairBaseUrl string) (slack.WebhookMessage, error) {
report := claircore.VulnerabilityReport{}
// Build your Slack Message here
[...]
}
The third option to notify is not a direct part of the Operator. Clair itself can send notification webhooks for newly discovered vulnerabilities for already indexed container images. The payload of that webhook is fixed and cannot be customised via config parameters. So for simplicity let’s add a simple small web server in go and run it in our Kubernetes Cluster to listen for these webhooks and transform them into Slack compliant webhooks.
package main
import (
"net/http"
"github.com/slack-go/slack"
"log"
"io/ioutil"
"encoding/json"
"github.com/quay/claircore"
"os"
"time"
)
type Notification struct {
Id string `json:"id"`
Manifest string `json:"manifest"`
Reason string `json:"reason"`
Vulnerability *VulnSummary `json:"vulnerability"`
}
type VulnSummary struct {
Name string `json:"name"`
FixedInVersion string `json:"fixed_in_version"`
Links string `json:"links"`
Description string `json:"description"`
NormalizedSeverity string `json:"normalized_severity"`
Package *claircore.Package `json:"package"`
Distribution *claircore.Distribution `json:"distribution"`
Repository *claircore.Repository `json:"repository"`
}
type Page struct {
Size int32 `json:"size"`
Next string `json:"next"`
}
type PageNotification struct {
Page *Page `json:"page"`
Notifications []*Notification `json:"notifications"`
}
type Webhook struct {
Id string `json:"notification_id"`
Callback string `json:"callback"`
}
func CreateNotificationSlackMessage(w []byte) (slack.WebhookMessage, error) {
webhook := Webhook{}
if err := json.Unmarshal(w, &webhook); err != nil {
return slack.WebhookMessage{}, err
}
resp, err := http.Get(webhook.Callback)
if err != nil {
return slack.WebhookMessage{}, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return slack.WebhookMessage{}, err
}
pageNotification := PageNotification{}
if err := json.Unmarshal(body, &pageNotification); err != nil {
return slack.WebhookMessage{}, err
}
// Build your Slack Message here
[...]
}
func NotifyHandler(w http.ResponseWriter, req *http.Request) {
body, _ := ioutil.ReadAll(req.Body)
msg, _ := CreateNotificationSlackMessage(body)
err := slack.PostWebhook(os.Getenv("SLACK_WEBHOOK_URL"), &msg)
}
func main() {
http.HandleFunc("/notify", NotifyHandler)
}
We need to build a container Image using docker CLI
$ docker build -t localhost:5000/clair-notifier:0.0.1
$ docker push localhost:5000/clair-notifier:0.0.1
Let’s deploy our notifier app into our Kubernetes cluster. We use a Secret for our Slack Webhook URL:
$ cat >>EOF | kubectl apply -f -
apiVersion: v1
kind: Service
metadata:
name: clair-notifier
namespace: operator-system
spec:
selector:
app: clair-notifier
ports:
- protocol: TCP
port: 8080
targetPort: 8080
EOF
What we've achieved
We have a running Clair scanner and notifier for Slack. Our scanner CR is Namespace scoped and will be triggered every time a Pod is created, updated or deleted within that Namespace. Furthermore, we defined Annotations for already existing Pods to prevent reprocessing without container image changes.
A container manifest is created using a docker registry and requests Clair to index it and create a first vulnerability report. To inform users about newly created reports we created a webhook for Slack. Newly discovered vulnerabilities for running and indexed container images create events we receive with our notifier to transform them into Slack webhooks. We did not handle multiple Scanners for the same Namespace nor cover a fine granulated and structured CRD for different Scanner types and backends.
Insight, imagination and expertly engineered solutions to accelerate and sustain progress.
Contact