Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions examples/informer_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright 2024 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Example: use SharedInformer to watch pods in the default namespace.

The informer runs a background daemon thread that keeps a local cache
synchronised with the Kubernetes API server. The main thread is free to
query the cache at any time without worrying about connectivity or retries.
"""

import time

import kubernetes
from kubernetes import config
from kubernetes.client import CoreV1Api
from kubernetes.informer import ADDED, DELETED, MODIFIED, SharedInformer


def on_pod_added(pod):
name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"]
print("[ADDED] ", name)


def on_pod_modified(pod):
name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"]
print("[MODIFIED]", name)


def on_pod_deleted(pod):
name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"]
print("[DELETED] ", name)


def main():
config.load_kube_config()

v1 = CoreV1Api()
informer = SharedInformer(
list_func=v1.list_namespaced_pod,
namespace="default",
resync_period=60,
)

informer.add_event_handler(ADDED, on_pod_added)
informer.add_event_handler(MODIFIED, on_pod_modified)
informer.add_event_handler(DELETED, on_pod_deleted)

informer.start()
print("Informer started. Watching pods in "default" namespace ...")

try:
while True:
cached = informer.cache.list()
print("Cached pods: {}".format(len(cached)))
time.sleep(10)
except KeyboardInterrupt:
pass
finally:
informer.stop()
print("Informer stopped.")


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
from . import stream
from . import utils
from . import leaderelection
from . import informer
177 changes: 177 additions & 0 deletions kubernetes/e2e_test/test_informer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# Copyright 2024 The Kubernetes Authors.
# Licensed under the Apache License, Version 2.0 (the "License").
# End-to-end tests for kubernetes.informer.SharedInformer.

import threading
import time
import unittest
import uuid

from kubernetes.client import api_client
from kubernetes.client.api import core_v1_api
from kubernetes.e2e_test import base
from kubernetes.informer import ADDED, DELETED, MODIFIED, SharedInformer

_TIMEOUT = 30


def _uid():
return str(uuid.uuid4())[-12:]


def _cm(name, payload=None):
return {
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {"name": name, "labels": {"inf-e2e": "1"}},
"data": payload or {"k": "v"},
}


def _name_of(obj):
if hasattr(obj, "metadata"):
return obj.metadata.name
return (obj.get("metadata") or {}).get("name")


class TestSharedInformerE2E(unittest.TestCase):

@classmethod
def setUpClass(cls):
cls.cfg = base.get_e2e_configuration()
cls.apiclient = api_client.ApiClient(configuration=cls.cfg)
cls.api = core_v1_api.CoreV1Api(cls.apiclient)

def _drop(self, cm_name):
try:
self.api.delete_namespaced_config_map(name=cm_name, namespace="default")
except Exception:
pass

def _expect(self, ev, label):
if not ev.wait(timeout=_TIMEOUT):
self.fail("Timeout waiting for: " + label)

def _wait_in_cache(self, inf, key):
stop = time.monotonic() + _TIMEOUT
while time.monotonic() < stop:
if inf.cache.get_by_key(key) is not None:
return
time.sleep(0.25)
self.fail("key " + key + " never appeared in cache")

def _wait_listed(self, inf):
stop = time.monotonic() + _TIMEOUT
while inf._resource_version is None and time.monotonic() < stop:
time.sleep(0.1)
self.assertIsNotNone(inf._resource_version, "initial list never completed")

# -------------------------------------------------------

def test_cache_populated_after_start(self):
"""Pre-existing ConfigMaps appear in the cache once the informer starts."""
name = "inf-pre-" + _uid()
self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
self.addCleanup(self._drop, name)

inf = SharedInformer(
list_func=self.api.list_namespaced_config_map,
namespace="default",
label_selector="inf-e2e=1",
)
inf.start()
self.addCleanup(inf.stop)

self._wait_in_cache(inf, "default/" + name)
self.assertEqual(_name_of(inf.cache.get_by_key("default/" + name)), name)

def test_added_event_and_cache_entry(self):
"""Creating a ConfigMap fires ADDED and the object appears in the cache."""
name = "inf-add-" + _uid()
seen = threading.Event()

inf = SharedInformer(
list_func=self.api.list_namespaced_config_map,
namespace="default",
label_selector="inf-e2e=1",
)
inf.add_event_handler(ADDED, lambda o: seen.set() if _name_of(o) == name else None)
inf.start()
self.addCleanup(inf.stop)
self.addCleanup(self._drop, name)

self._wait_listed(inf)
self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
self._expect(seen, "ADDED/" + name)
self.assertIsNotNone(inf.cache.get_by_key("default/" + name))

def test_modified_event_and_cache_refresh(self):
"""Patching a ConfigMap fires MODIFIED and the cache holds the updated object."""
name = "inf-mod-" + _uid()
seen = threading.Event()

inf = SharedInformer(
list_func=self.api.list_namespaced_config_map,
namespace="default",
label_selector="inf-e2e=1",
)
inf.add_event_handler(MODIFIED, lambda o: seen.set() if _name_of(o) == name else None)
inf.start()
self.addCleanup(inf.stop)
self.addCleanup(self._drop, name)

self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
self._wait_in_cache(inf, "default/" + name)

self.api.patch_namespaced_config_map(
name=name, namespace="default", body={"data": {"k": "updated"}}
)
self._expect(seen, "MODIFIED/" + name)
self.assertIsNotNone(inf.cache.get_by_key("default/" + name))

def test_deleted_event_removes_from_cache(self):
"""Deleting a ConfigMap fires DELETED and removes it from the cache."""
name = "inf-del-" + _uid()
seen = threading.Event()

inf = SharedInformer(
list_func=self.api.list_namespaced_config_map,
namespace="default",
label_selector="inf-e2e=1",
)
inf.add_event_handler(DELETED, lambda o: seen.set() if _name_of(o) == name else None)
inf.start()
self.addCleanup(inf.stop)

self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
self._wait_in_cache(inf, "default/" + name)

self.api.delete_namespaced_config_map(name=name, namespace="default")
self._expect(seen, "DELETED/" + name)
self.assertIsNone(inf.cache.get_by_key("default/" + name))

def test_resource_version_advances(self):
"""The stored resourceVersion advances after watch events are received."""
name = "inf-rv-" + _uid()
seen = threading.Event()

inf = SharedInformer(
list_func=self.api.list_namespaced_config_map,
namespace="default",
label_selector="inf-e2e=1",
)
inf.add_event_handler(ADDED, lambda o: seen.set() if _name_of(o) == name else None)
inf.start()
self.addCleanup(inf.stop)
self.addCleanup(self._drop, name)

self._wait_listed(inf)
rv_before = int(inf._resource_version)

self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
self._expect(seen, "ADDED/" + name)
self.assertGreater(int(inf._resource_version), rv_before)


if __name__ == "__main__":
unittest.main()
27 changes: 27 additions & 0 deletions kubernetes/informer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2024 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .cache import ObjectCache, _meta_namespace_key
from .informer import SharedInformer, ADDED, MODIFIED, DELETED, BOOKMARK, ERROR

__all__ = [
"ObjectCache",
"_meta_namespace_key",
"SharedInformer",
"ADDED",
"MODIFIED",
"DELETED",
"BOOKMARK",
"ERROR",
]
94 changes: 94 additions & 0 deletions kubernetes/informer/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright 2024 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Thread-safe in-memory store for the Kubernetes informer."""

import threading


def _meta_namespace_key(obj):
"""Build a lookup key from object metadata.

Supports both dict-based objects and generated model objects.
Returns namespace/name for namespaced objects, just name otherwise.
"""
if isinstance(obj, dict):
meta = obj.get("metadata") or {}
ns = meta.get("namespace") or ""
name = meta.get("name") or ""
else:
meta = getattr(obj, "metadata", None)
if meta is None:
return ""
if hasattr(meta, "namespace"):
ns = getattr(meta, "namespace", None) or ""
name = getattr(meta, "name", None) or ""
else:
ns = meta.get("namespace") or ""
name = meta.get("name") or ""
if ns:
return "{}/{}".format(ns, name)
return name


class ObjectCache:
"""Thread-safe in-memory mapping of Kubernetes objects.

The SharedInformer keeps this store synchronised with the API server.
Consumers can call list() and get_by_key() from any thread safely.
"""

def __init__(self, key_func=None):
self._key_func = key_func if key_func is not None else _meta_namespace_key
self._objects = {}
self._rlock = threading.RLock()

# --- mutation helpers (called by SharedInformer) ---

def _put(self, obj):
key = self._key_func(obj)
with self._rlock:
self._objects[key] = obj

def _remove(self, obj):
key = self._key_func(obj)
with self._rlock:
self._objects.pop(key, None)

def _replace_all(self, objects):
rebuilt = {self._key_func(o): o for o in objects}
with self._rlock:
self._objects = rebuilt

# --- public read API ---

def list(self):
"""Return a snapshot list of all cached objects."""
with self._rlock:
return list(self._objects.values())

def list_keys(self):
"""Return a snapshot list of all cache keys."""
with self._rlock:
return list(self._objects.keys())

def get(self, obj):
"""Look up the cached copy of obj. Returns None when absent."""
key = self._key_func(obj)
return self.get_by_key(key)

def get_by_key(self, key):
"""Look up an object by key. Returns None when absent."""
with self._rlock:
return self._objects.get(key)
Loading