As part of a recent project which needs to automatically issue new TLS certificates for hosts
defined in Kubernetes Ingress objects, I ended up having to create a library that would let me detect such events in a simplified manner for part of a larger Python program which needs to react to such events.
My goal was to be able to define a simple specification that lets the calling code define what type of Kubernbetes object to get events for, as well as any label/field filters to further refine the result.
The native Kubernetes API provides a low-level mechanism for this via the “watch” capability for different k8s resource types (see “efficient detection of changes”) and the Python k8s library reflects this same capability. That said, the functionality is low level, and I wanted a simplified abstraction over this that lets the calling code just react to events it cares about and not having to worry about the details.
k8s-watcher
k8s-watcher is a Python module that simplifies watching anything on a kubernetes cluster. You can utilize this module in your own python application to fulfill the typical “list then watch” functionality as described in the “Efficient detection of changes” section of the kubernetes API documentation. (without having to write that code yourself!).
The library provides a few different ways to invoke it.
First off you can define one or more K8sWatchConfig
instances, which simply define what
you want to watch
. Such as:
watch_config = K8sWatchConfig(**{ \
"namespace": "my-namespace", \
"kind": "Pod", \
"sdk_client_class_name": "CoreV1Api", \
"sdk_list_function_name": "list_namespaced_pod", \
"field_selector": None, \
"label_selector": "mylabel=x,myotherlabel=y",
"include_k8s_objects": True
})
basic direct usage
Next you can start watching directly inline by creating a K8sWatcher
for each K8sWatchConfig
and invoke its watcher()
method that returns a generator you can iterate over as events become available that match your criteria.
Note that iterating over the results as they become available is blocking… so this may not be ideal depending on your use-case…
k8s_watcher = K8sWatcher(watch_config).watcher()
for event in k8s_watcher:
print(json.dumps(event.dict(),default=str,indent=2))
Queues usage
A more realistic approach would be to let k8s-watcher work in the background, and write events to a queue
that your code can consume and react to on its own terms… well k8s-watcher
supports this scenario as well.
If you are not interested in writing your own consumer Thread
code and would like each K8sWatchEvent
to be delivered via python Queues
you can use the K8sWatcherService.queuing_watch(K8sWatchConfig, unified_queue=True|False)
method. Each time you call queuing_watch
, K8sWatcherService
creates a new Thread
bound to a unique K8sWatcher
instance to automatically capture all events emitted from it. Each event will be placed on a Queue
that is returned to you. If you pass unified_queue=True
, the same Queue
instance will be returned for every call to queuing_watch()
so you only have to monitor a single Queue
that will contains different K8sWatchEvents
across all the different K8sWatchConfigs
you define.
import json
from k8swatcher import K8sWatchConfig, K8sWatchService, K8sWatchEvent
from threading import Thread
class MyConsumerThread(Thread):
def __init__(self, event_queue_to_monitor, *args, **kwargs):
kwargs.setdefault('daemon', True)
super().__init__(*args, **kwargs)
def run(self):
while True:
watch_event:K8sWatchEvent = self.watch_event_queue.get()
print(json.dumps(watch_event.dict(),default=str,indent=2))
pod_watch_config = K8sWatchConfig(**{ \
"id": k8s_kind,
"namespace": k8s_namespace, \
"kind": "Pod", \
...})
ingress_watch_config = K8sWatchConfig(**{ \
"id": k8s_kind,
"namespace": k8s_namespace, \
"kind": "Ingress", \
...})
watch_service = K8sWatcherService()
"""
With `unified_queue=False` (the default):
... each distinct call to queuing_watch() returns
a dedicated Queue per K8sWatchConfig
"""
pod_event_queue:Queue = watch_service.queuing_watch(pod_watch_config)
ingress_event_queue:Queue = watch_service.queuing_watch(ingress_watch_config)
pod_consumer = MyConsumerThread(pod_event_queue)
pod_consumer.start()
ingress_consumer = MyConsumerThread(ingress_event_queue)
ingress_consumer.start()
pod_consumer.join()
ingress_consuner.join()
watch_service.join()
"""
However with `unified_queue=True`:
... each distinct call to queuing_watch() returns
a the same Queue that will get events for all K8sWatchConfigs
"""
global_event_queue:Queue = watch_service.queuing_watch(pod_watch_config,unified_queue=True)
watch_service.queuing_watch(ingress_watch_config,unified_queue=True)
all_events_consumer = MyConsumerThread(global_event_queue)
all_events_consumer.start()
all_events_consumer.join()
watch_service.join()
...
Asyncio USAGE
If you don’t want to manage any Threads
at all, you can utilize the K8sEventHandler
method. In this usage pattern you simply provide a class instance that implements the K8sEventHandler
method async def handle_k8s_watch_event(self, k8s_watch_event:K8sWatchEvent)
and your handler class will be called every time a new K8sWatchEvent
is created. Internally K8sWatcherService
manages a consumer thread automatically for you that captures all events and calls your async
handler and then awaits
it’s finish.
import json
from k8swatcher import K8sWatchConfig, K8sWatchService, K8sWatchEvent
class MyCustomHandler(K8sEventHandler):
async def handle_k8s_watch_event(self, k8s_watch_event:K8sWatchEvent):
watch_event:K8sWatchEvent = self.watch_event_queue.get()
print(json.dumps(watch_event.dict(),default=str,indent=2))
await doMyCustomStuff(k8s_watch_event)
pod_watch_config = K8sWatchConfig(**{ \
"id": k8s_kind,
"namespace": k8s_namespace, \
"kind": "Pod", \
...})
ingress_watch_config = K8sWatchConfig(**{ \
"id": k8s_kind,
"namespace": k8s_namespace, \
"kind": "Ingress", \
...})
watch_service = K8sWatcherService()
watch_service.asyncio_watch([pod_watch_config,ingress_watch_config],MyCustomHandler())
watch_service.join()
run locally w/ the built in CLI
In addition to being able to utilize this module inline in your code, the module also includes a simple CLI you can use for testing out the functionality. The CLI is not intended for any production use. You can see more details here at github.com/bitsofinfo/k8swatcher