Reacting to K8s Events with k8s-watcher

As part of a recent project which needs to automatically issue new TLS certificates for hosts defined in Kubernetes Ingress objects, I ended up having to create a library that would let me detect such events in a simplified manner for part of a larger Python program which needs to react to such events.

My goal was to be able to define a simple specification that lets the calling code define what type of Kubernbetes object to get events for, as well as any label/field filters to further refine the result.

The native Kubernetes API provides a low-level mechanism for this via the “watch” capability for different k8s resource types (see “efficient detection of changes”) and the Python k8s library reflects this same capability. That said, the functionality is low level, and I wanted a simplified abstraction over this that lets the calling code just react to events it cares about and not having to worry about the details.

k8s-watcher

k8s-watcher is a Python module that simplifies watching anything on a kubernetes cluster. You can utilize this module in your own python application to fulfill the typical “list then watch” functionality as described in the “Efficient detection of changes” section of the kubernetes API documentation. (without having to write that code yourself!).

The library provides a few different ways to invoke it.

First off you can define one or more K8sWatchConfig instances, which simply define what you want to watch. Such as:

watch_config = K8sWatchConfig(**{ \
                    "namespace": "my-namespace", \
                    "kind": "Pod", \
                    "sdk_client_class_name": "CoreV1Api", \
                    "sdk_list_function_name": "list_namespaced_pod", \
                    "field_selector": None, \
                    "label_selector": "mylabel=x,myotherlabel=y",
                    "include_k8s_objects": True
                })

basic direct usage

Next you can start watching directly inline by creating a K8sWatcher for each K8sWatchConfig and invoke its watcher() method that returns a generator you can iterate over as events become available that match your criteria.

Note that iterating over the results as they become available is blocking… so this may not be ideal depending on your use-case…

k8s_watcher = K8sWatcher(watch_config).watcher()

for event in k8s_watcher:
    print(json.dumps(event.dict(),default=str,indent=2))

Queues usage

A more realistic approach would be to let k8s-watcher work in the background, and write events to a queue that your code can consume and react to on its own terms… well k8s-watcher supports this scenario as well.

If you are not interested in writing your own consumer Thread code and would like each K8sWatchEvent to be delivered via python Queues you can use the K8sWatcherService.queuing_watch(K8sWatchConfig, unified_queue=True|False) method. Each time you call queuing_watchK8sWatcherService creates a new Thread bound to a unique K8sWatcher instance to automatically capture all events emitted from it. Each event will be placed on a Queue that is returned to you. If you pass unified_queue=True, the same Queue instance will be returned for every call to queuing_watch() so you only have to monitor a single Queue that will contains different K8sWatchEvents across all the different K8sWatchConfigs you define.

import json
from k8swatcher import K8sWatchConfig, K8sWatchService, K8sWatchEvent
from threading import Thread

class MyConsumerThread(Thread):

    def __init__(self, event_queue_to_monitor, *args, **kwargs):
        kwargs.setdefault('daemon', True)
        super().__init__(*args, **kwargs)

    def run(self):
        while True:
            watch_event:K8sWatchEvent = self.watch_event_queue.get()
            print(json.dumps(watch_event.dict(),default=str,indent=2))

pod_watch_config = K8sWatchConfig(**{ \
                    "id": k8s_kind,
                    "namespace": k8s_namespace, \
                    "kind": "Pod", \
                  ...})

ingress_watch_config = K8sWatchConfig(**{ \
                    "id": k8s_kind,
                    "namespace": k8s_namespace, \
                    "kind": "Ingress", \
                  ...})

watch_service = K8sWatcherService()

"""
With `unified_queue=False` (the default):

... each distinct call to queuing_watch() returns
a dedicated Queue per K8sWatchConfig
"""
pod_event_queue:Queue = watch_service.queuing_watch(pod_watch_config)
ingress_event_queue:Queue = watch_service.queuing_watch(ingress_watch_config)

pod_consumer = MyConsumerThread(pod_event_queue)
pod_consumer.start()

ingress_consumer = MyConsumerThread(ingress_event_queue)
ingress_consumer.start()

pod_consumer.join()
ingress_consuner.join()
watch_service.join()


"""
However with `unified_queue=True`:

... each distinct call to queuing_watch() returns
a the same Queue that will get events for all K8sWatchConfigs 
"""
global_event_queue:Queue = watch_service.queuing_watch(pod_watch_config,unified_queue=True)
watch_service.queuing_watch(ingress_watch_config,unified_queue=True)

all_events_consumer = MyConsumerThread(global_event_queue)
all_events_consumer.start()
all_events_consumer.join()

watch_service.join()
...

Asyncio USAGE

If you don’t want to manage any Threads at all, you can utilize the K8sEventHandler method. In this usage pattern you simply provide a class instance that implements the K8sEventHandler method async def handle_k8s_watch_event(self, k8s_watch_event:K8sWatchEvent) and your handler class will be called every time a new K8sWatchEvent is created. Internally K8sWatcherService manages a consumer thread automatically for you that captures all events and calls your async handler and then awaits it’s finish.

import json
from k8swatcher import K8sWatchConfig, K8sWatchService, K8sWatchEvent

class MyCustomHandler(K8sEventHandler):
    async def handle_k8s_watch_event(self, k8s_watch_event:K8sWatchEvent):
        watch_event:K8sWatchEvent = self.watch_event_queue.get()
        print(json.dumps(watch_event.dict(),default=str,indent=2))
        await doMyCustomStuff(k8s_watch_event)


pod_watch_config = K8sWatchConfig(**{ \
                    "id": k8s_kind,
                    "namespace": k8s_namespace, \
                    "kind": "Pod", \
                  ...})

ingress_watch_config = K8sWatchConfig(**{ \
                    "id": k8s_kind,
                    "namespace": k8s_namespace, \
                    "kind": "Ingress", \
                  ...})

watch_service = K8sWatcherService()

watch_service.asyncio_watch([pod_watch_config,ingress_watch_config],MyCustomHandler())

watch_service.join()

run locally w/ the built in CLI

In addition to being able to utilize this module inline in your code, the module also includes a simple CLI you can use for testing out the functionality. The CLI is not intended for any production use. You can see more details here at github.com/bitsofinfo/k8swatcher

Leave a comment