From ab8664589f366295f4008bef5c4fd5bd322fca01 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Feb 2026 18:34:37 +0000 Subject: [PATCH 1/5] Initial plan From ef3f21c30459e265659e8993b8c934aeabf5a23b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Feb 2026 18:41:42 +0000 Subject: [PATCH 2/5] Add informer implementation: SharedInformer, ObjectCache, tests, and example Co-authored-by: brendandburns <5751682+brendandburns@users.noreply.github.com> --- examples/informer_example.py | 75 ++++++++ kubernetes/__init__.py | 1 + kubernetes/informer/__init__.py | 26 +++ kubernetes/informer/cache.py | 94 ++++++++++ kubernetes/informer/informer.py | 251 ++++++++++++++++++++++++++ kubernetes/test/test_informer.py | 294 +++++++++++++++++++++++++++++++ 6 files changed, 741 insertions(+) create mode 100644 examples/informer_example.py create mode 100644 kubernetes/informer/__init__.py create mode 100644 kubernetes/informer/cache.py create mode 100644 kubernetes/informer/informer.py create mode 100644 kubernetes/test/test_informer.py diff --git a/examples/informer_example.py b/examples/informer_example.py new file mode 100644 index 0000000000..73cbab9bc6 --- /dev/null +++ b/examples/informer_example.py @@ -0,0 +1,75 @@ +# Copyright 2024 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Example: use SharedInformer to watch pods in the default namespace. + +The informer runs a background daemon thread that keeps a local cache +synchronised with the Kubernetes API server. The main thread is free to +query the cache at any time without worrying about connectivity or retries. +""" + +import time + +import kubernetes +from kubernetes import config +from kubernetes.client import CoreV1Api +from kubernetes.informer import ADDED, DELETED, MODIFIED, SharedInformer + + +def on_pod_added(pod): + name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"] + print("[ADDED] ", name) + + +def on_pod_modified(pod): + name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"] + print("[MODIFIED]", name) + + +def on_pod_deleted(pod): + name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"] + print("[DELETED] ", name) + + +def main(): + config.load_kube_config() + + v1 = CoreV1Api() + informer = SharedInformer( + list_func=v1.list_namespaced_pod, + namespace="default", + resync_period=60, + ) + + informer.add_event_handler(ADDED, on_pod_added) + informer.add_event_handler(MODIFIED, on_pod_modified) + informer.add_event_handler(DELETED, on_pod_deleted) + + informer.start() + print("Informer started. Watching pods in "default" namespace ...") + + try: + while True: + cached = informer.cache.list() + print("Cached pods: {}".format(len(cached))) + time.sleep(10) + except KeyboardInterrupt: + pass + finally: + informer.stop() + print("Informer stopped.") + + +if __name__ == "__main__": + main() diff --git a/kubernetes/__init__.py b/kubernetes/__init__.py index 9ad37e9903..3281e1fdf7 100644 --- a/kubernetes/__init__.py +++ b/kubernetes/__init__.py @@ -23,3 +23,4 @@ from . import stream from . import utils from . import leaderelection +from . import informer diff --git a/kubernetes/informer/__init__.py b/kubernetes/informer/__init__.py new file mode 100644 index 0000000000..3b6013e14b --- /dev/null +++ b/kubernetes/informer/__init__.py @@ -0,0 +1,26 @@ +# Copyright 2024 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .cache import ObjectCache, _meta_namespace_key +from .informer import SharedInformer, ADDED, MODIFIED, DELETED, ERROR + +__all__ = [ + "ObjectCache", + "_meta_namespace_key", + "SharedInformer", + "ADDED", + "MODIFIED", + "DELETED", + "ERROR", +] diff --git a/kubernetes/informer/cache.py b/kubernetes/informer/cache.py new file mode 100644 index 0000000000..c161575211 --- /dev/null +++ b/kubernetes/informer/cache.py @@ -0,0 +1,94 @@ +# Copyright 2024 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Thread-safe in-memory store for the Kubernetes informer.""" + +import threading + + +def _meta_namespace_key(obj): + """Build a lookup key from object metadata. + + Supports both dict-based objects and generated model objects. + Returns namespace/name for namespaced objects, just name otherwise. + """ + if isinstance(obj, dict): + meta = obj.get("metadata") or {} + ns = meta.get("namespace") or "" + name = meta.get("name") or "" + else: + meta = getattr(obj, "metadata", None) + if meta is None: + return "" + if hasattr(meta, "namespace"): + ns = getattr(meta, "namespace", None) or "" + name = getattr(meta, "name", None) or "" + else: + ns = meta.get("namespace") or "" + name = meta.get("name") or "" + if ns: + return "{}/{}".format(ns, name) + return name + + +class ObjectCache: + """Thread-safe in-memory mapping of Kubernetes objects. + + The SharedInformer keeps this store synchronised with the API server. + Consumers can call list() and get_by_key() from any thread safely. + """ + + def __init__(self, key_func=None): + self._key_func = key_func if key_func is not None else _meta_namespace_key + self._objects = {} + self._rlock = threading.RLock() + + # --- mutation helpers (called by SharedInformer) --- + + def _put(self, obj): + key = self._key_func(obj) + with self._rlock: + self._objects[key] = obj + + def _remove(self, obj): + key = self._key_func(obj) + with self._rlock: + self._objects.pop(key, None) + + def _replace_all(self, objects): + rebuilt = {self._key_func(o): o for o in objects} + with self._rlock: + self._objects = rebuilt + + # --- public read API --- + + def list(self): + """Return a snapshot list of all cached objects.""" + with self._rlock: + return list(self._objects.values()) + + def list_keys(self): + """Return a snapshot list of all cache keys.""" + with self._rlock: + return list(self._objects.keys()) + + def get(self, obj): + """Look up the cached copy of obj. Returns None when absent.""" + key = self._key_func(obj) + return self.get_by_key(key) + + def get_by_key(self, key): + """Look up an object by key. Returns None when absent.""" + with self._rlock: + return self._objects.get(key) diff --git a/kubernetes/informer/informer.py b/kubernetes/informer/informer.py new file mode 100644 index 0000000000..43f50844e4 --- /dev/null +++ b/kubernetes/informer/informer.py @@ -0,0 +1,251 @@ +# Copyright 2024 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Informer implementation for the Kubernetes Python client. + +Provides SharedInformer: a background watcher that keeps a local +ObjectCache in sync with the Kubernetes API server and notifies +registered event-handler callbacks. +""" + +import logging +import threading +import time + +from kubernetes.client.exceptions import ApiException +from kubernetes.watch import Watch + +from .cache import ObjectCache, _meta_namespace_key + +logger = logging.getLogger(__name__) + + +# Event types emitted to registered handlers +ADDED = "ADDED" +MODIFIED = "MODIFIED" +DELETED = "DELETED" +ERROR = "ERROR" + + +class SharedInformer: + """Watch a Kubernetes resource and maintain a local cache. + + The informer starts a daemon thread that continuously watches the + given resource via ``list_func``. On each event the local + :class:`ObjectCache` is updated and registered + event-handler callbacks are invoked. + + Parameters + ---------- + list_func: + Bound API method used for the initial list **and** as the watch + source. It must accept a watch keyword argument (e.g. + CoreV1Api().list_namespaced_pod). + namespace: + Kubernetes namespace to watch. Pass None for cluster-scoped + or all-namespace list functions. + resync_period: + How often (seconds) to perform a full re-list from the API server. + Defaults to 0 which disables periodic resyncs. + label_selector: + Optional label selector string forwarded to the API server. + field_selector: + Optional field selector string forwarded to the API server. + key_func: + Optional callable (obj) -> str used to key objects in the + cache. Defaults to namespace/name. + """ + + def __init__( + self, + list_func, + namespace=None, + resync_period=0, + label_selector=None, + field_selector=None, + key_func=None, + ): + self._list_func = list_func + self._namespace = namespace + self._resync_period = resync_period + self._label_selector = label_selector + self._field_selector = field_selector + + self._cache = ObjectCache(key_func=key_func) + self._handlers = {ADDED: [], MODIFIED: [], DELETED: [], ERROR: []} + self._handler_lock = threading.Lock() + + self._watch = None + self._thread = None + self._stop_event = threading.Event() + + # ---------------------------------------------------------------- # + # Public API # + # ---------------------------------------------------------------- # + + @property + def cache(self): + """The :class:`ObjectCache` maintained by this informer.""" + return self._cache + + def add_event_handler(self, event_type, handler): + """Register a callback for a specific event type. + + Parameters + ---------- + event_type: + One of :data:`ADDED`, :data:`MODIFIED`, :data:`DELETED` or + :data:`ERROR`. + handler: + Callable invoked with the event object (or the raw exception for + ERROR events). + """ + if event_type not in self._handlers: + raise ValueError( + "Unknown event_type {!r}. Use one of: {}".format( + event_type, ", ".join(sorted(self._handlers)), + ) + ) + with self._handler_lock: + self._handlers[event_type].append(handler) + + def remove_event_handler(self, event_type, handler): + """Deregister a previously registered *handler*. + + No-op if *handler* is not registered. + """ + with self._handler_lock: + try: + self._handlers[event_type].remove(handler) + except (KeyError, ValueError): + pass + + def start(self): + """Start the background watch loop in a daemon thread. + + Calling :meth:`start` more than once without an intervening + :meth:`stop` is a no-op. + """ + if self._thread is not None and self._thread.is_alive(): + return + self._stop_event.clear() + self._thread = threading.Thread( + target=self._run_loop, + name="SharedInformer", + daemon=True, + ) + self._thread.start() + + def stop(self): + """Ask the background watch loop to stop and join the thread.""" + self._stop_event.set() + if self._watch is not None: + self._watch.stop() + if self._thread is not None: + self._thread.join() + self._thread = None + + # ---------------------------------------------------------------- # + # Internal helpers # + # ---------------------------------------------------------------- # + + def _build_kwargs(self): + kw = {} + if self._namespace is not None: + kw["namespace"] = self._namespace + if self._label_selector is not None: + kw["label_selector"] = self._label_selector + if self._field_selector is not None: + kw["field_selector"] = self._field_selector + return kw + + def _fire(self, event_type, obj): + with self._handler_lock: + handlers = list(self._handlers.get(event_type, [])) + for fn in handlers: + try: + fn(obj) + except Exception: + logger.exception( + "Exception in informer handler for %s", event_type + ) + + def _initial_list(self): + """Do the initial list and populate the cache.""" + kw = self._build_kwargs() + resource_version = "0" + resp = self._list_func(**kw) + items = getattr(resp, "items", []) or [] + self._cache._replace_all(items) + rv = None + meta = getattr(resp, "metadata", None) + if meta is not None: + rv = getattr(meta, "resource_version", None) + if rv: + resource_version = rv + return resource_version + + def _run_loop(self): + """Background loop: list then watch, reconnect on errors.""" + while not self._stop_event.is_set(): + try: + resource_version = self._initial_list() + except Exception as exc: + logger.exception("Error during initial list; retrying") + self._fire(ERROR, exc) + self._stop_event.wait(timeout=5) + continue + + # Watch loop + last_resync = time.monotonic() + self._watch = Watch() + kw = self._build_kwargs() + kw["resource_version"] = resource_version + try: + for event in self._watch.stream(self._list_func, **kw): + if self._stop_event.is_set(): + break + evt_type = event.get("type") + obj = event.get("object") + if evt_type == ADDED: + self._cache._put(obj) + self._fire(ADDED, obj) + elif evt_type == MODIFIED: + self._cache._put(obj) + self._fire(MODIFIED, obj) + elif evt_type == DELETED: + self._cache._remove(obj) + self._fire(DELETED, obj) + elif evt_type == ERROR: + self._fire(ERROR, obj) + # Periodic resync: re-list and fire MODIFIED for all cached objects + if ( + self._resync_period > 0 + and (time.monotonic() - last_resync) >= self._resync_period + ): + logger.debug("Informer resync triggered") + for cached_obj in self._cache.list(): + self._fire(MODIFIED, cached_obj) + last_resync = time.monotonic() + except ApiException as exc: + logger.warning( + "Watch stream ended with ApiException (status=%s); reconnecting", + exc.status, + ) + self._fire(ERROR, exc) + except Exception as exc: + logger.exception("Unexpected error in watch loop; reconnecting") + self._fire(ERROR, exc) + finally: + self._watch = None diff --git a/kubernetes/test/test_informer.py b/kubernetes/test/test_informer.py new file mode 100644 index 0000000000..665a7e4101 --- /dev/null +++ b/kubernetes/test/test_informer.py @@ -0,0 +1,294 @@ +# Copyright 2024 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for kubernetes.informer.""" + +import threading +import time +import unittest +from unittest.mock import MagicMock, patch + +from kubernetes.informer.cache import ObjectCache, _meta_namespace_key +from kubernetes.informer.informer import ( + ADDED, + DELETED, + ERROR, + MODIFIED, + SharedInformer, +) + + +def _make_pod(namespace, name): + """Return a simple dict-based pod object.""" + return {"metadata": {"namespace": namespace, "name": name}} + + +class TestMetaNamespaceKey(unittest.TestCase): + def test_namespaced_dict(self): + obj = {"metadata": {"namespace": "ns", "name": "pod"}} + self.assertEqual(_meta_namespace_key(obj), "ns/pod") + + def test_cluster_scoped_dict(self): + obj = {"metadata": {"name": "node1"}} + self.assertEqual(_meta_namespace_key(obj), "node1") + + def test_no_metadata(self): + obj = MagicMock() + obj.metadata = None + self.assertEqual(_meta_namespace_key(obj), "") + + def test_model_object(self): + meta = MagicMock() + meta.namespace = "default" + meta.name = "mypod" + obj = MagicMock() + obj.metadata = meta + self.assertEqual(_meta_namespace_key(obj), "default/mypod") + + +class TestObjectCache(unittest.TestCase): + def setUp(self): + self.cache = ObjectCache() + + def test_put_and_list(self): + pod = _make_pod("default", "p1") + self.cache._put(pod) + self.assertIn(pod, self.cache.list()) + + def test_remove(self): + pod = _make_pod("default", "p1") + self.cache._put(pod) + self.cache._remove(pod) + self.assertEqual(self.cache.list(), []) + + def test_remove_nonexistent_is_noop(self): + pod = _make_pod("default", "missing") + self.cache._remove(pod) # should not raise + + def test_replace_all(self): + pod1 = _make_pod("default", "p1") + pod2 = _make_pod("default", "p2") + self.cache._put(pod1) + self.cache._replace_all([pod2]) + keys = self.cache.list_keys() + self.assertNotIn("default/p1", keys) + self.assertIn("default/p2", keys) + + def test_get_by_key(self): + pod = _make_pod("default", "p1") + self.cache._put(pod) + self.assertIs(self.cache.get_by_key("default/p1"), pod) + self.assertIsNone(self.cache.get_by_key("default/ghost")) + + def test_get(self): + pod = _make_pod("kube-system", "coredns") + self.cache._put(pod) + self.assertIs(self.cache.get(pod), pod) + + def test_thread_safety(self): + """Concurrent puts should not raise exceptions.""" + errors = [] + + def worker(n): + try: + for i in range(50): + self.cache._put(_make_pod("default", "pod-{}-{}".format(n, i))) + except Exception as exc: + errors.append(exc) + + threads = [threading.Thread(target=worker, args=(t,)) for t in range(5)] + for t in threads: + t.start() + for t in threads: + t.join() + self.assertEqual(errors, []) + + +class TestSharedInformerHandlers(unittest.TestCase): + def setUp(self): + self.list_func = MagicMock() + # Minimal list response + list_resp = MagicMock() + list_resp.items = [] + list_resp.metadata = MagicMock(resource_version="1") + self.list_func.return_value = list_resp + + self.informer = SharedInformer(list_func=self.list_func) + + def test_add_handler_and_fire(self): + received = [] + self.informer.add_event_handler(ADDED, received.append) + pod = _make_pod("default", "p1") + self.informer._fire(ADDED, pod) + self.assertEqual(received, [pod]) + + def test_remove_handler(self): + received = [] + self.informer.add_event_handler(ADDED, received.append) + self.informer.remove_event_handler(ADDED, received.append) + self.informer._fire(ADDED, _make_pod("default", "p1")) + self.assertEqual(received, []) + + def test_remove_unknown_handler_noop(self): + self.informer.remove_event_handler(MODIFIED, lambda x: x) # should not raise + + def test_invalid_event_type_raises(self): + with self.assertRaises(ValueError): + self.informer.add_event_handler("UNKNOWN", lambda x: x) + + def test_handler_exception_is_swallowed(self): + """A crashing handler must not stop the informer loop.""" + def bad_handler(obj): + raise RuntimeError("boom") + + self.informer.add_event_handler(ADDED, bad_handler) + # Should not raise + self.informer._fire(ADDED, _make_pod("default", "p1")) + + +class TestSharedInformerWatchLoop(unittest.TestCase): + """Test the watch loop by mocking Watch.stream.""" + + def _make_informer_with_events(self, events): + list_func = MagicMock() + list_resp = MagicMock() + list_resp.items = [] + list_resp.metadata = MagicMock(resource_version="1") + list_func.return_value = list_resp + + informer = SharedInformer(list_func=list_func) + + def fake_stream(func, **kw): + yield from events + informer.stop() + + mock_watch = MagicMock() + mock_watch.stream.side_effect = fake_stream + informer._watch_factory = lambda: mock_watch + return informer, mock_watch + + def test_added_event_updates_cache(self): + pod = _make_pod("default", "new-pod") + events = [{"type": "ADDED", "object": pod}] + informer, _ = self._make_informer_with_events(events) + + received = [] + informer.add_event_handler(ADDED, received.append) + + with patch("kubernetes.informer.informer.Watch") as MockWatch: + mock_w = MagicMock() + + def fake_stream(func, **kw): + yield from events + informer._stop_event.set() + + mock_w.stream.side_effect = fake_stream + MockWatch.return_value = mock_w + + informer.start() + informer._thread.join(timeout=3) + + self.assertIn(pod, informer.cache.list()) + self.assertIn(pod, received) + + def test_deleted_event_removes_from_cache(self): + pod = _make_pod("default", "gone-pod") + events = [ + {"type": "ADDED", "object": pod}, + {"type": "DELETED", "object": pod}, + ] + + deleted = [] + list_func = MagicMock() + list_resp = MagicMock() + list_resp.items = [] + list_resp.metadata = MagicMock(resource_version="1") + list_func.return_value = list_resp + + informer = SharedInformer(list_func=list_func) + informer.add_event_handler(DELETED, deleted.append) + + with patch("kubernetes.informer.informer.Watch") as MockWatch: + mock_w = MagicMock() + + def fake_stream(func, **kw): + yield from events + informer._stop_event.set() + + mock_w.stream.side_effect = fake_stream + MockWatch.return_value = mock_w + + informer.start() + informer._thread.join(timeout=3) + + self.assertEqual(informer.cache.list(), []) + self.assertIn(pod, deleted) + + def test_modified_event_updates_cache(self): + pod_v1 = _make_pod("default", "mod-pod") + pod_v2 = dict(pod_v1) + pod_v2["status"] = "Running" + + events = [ + {"type": "ADDED", "object": pod_v1}, + {"type": "MODIFIED", "object": pod_v2}, + ] + + list_func = MagicMock() + list_resp = MagicMock() + list_resp.items = [] + list_resp.metadata = MagicMock(resource_version="1") + list_func.return_value = list_resp + + informer = SharedInformer(list_func=list_func) + + with patch("kubernetes.informer.informer.Watch") as MockWatch: + mock_w = MagicMock() + + def fake_stream(func, **kw): + yield from events + informer._stop_event.set() + + mock_w.stream.side_effect = fake_stream + MockWatch.return_value = mock_w + + informer.start() + informer._thread.join(timeout=3) + + cached = informer.cache.get_by_key("default/mod-pod") + self.assertIs(cached, pod_v2) + + def test_start_is_idempotent(self): + list_func = MagicMock() + list_resp = MagicMock() + list_resp.items = [] + list_resp.metadata = MagicMock(resource_version="1") + list_func.return_value = list_resp + + informer = SharedInformer(list_func=list_func) + + with patch("kubernetes.informer.informer.Watch") as MockWatch: + mock_w = MagicMock() + mock_w.stream.return_value = iter([]) + MockWatch.return_value = mock_w + + informer.start() + first_thread = informer._thread + informer.start() # should be a no-op + self.assertIs(informer._thread, first_thread) + informer.stop() + + +if __name__ == "__main__": + unittest.main() From 9c708adab84896fd38a92b6d6b2edbf62756542b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Feb 2026 18:52:50 +0000 Subject: [PATCH 3/5] Add BOOKMARK event support to SharedInformer Co-authored-by: brendandburns <5751682+brendandburns@users.noreply.github.com> --- kubernetes/informer/__init__.py | 3 +- kubernetes/informer/informer.py | 12 ++++-- kubernetes/test/test_informer.py | 71 ++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 4 deletions(-) diff --git a/kubernetes/informer/__init__.py b/kubernetes/informer/__init__.py index 3b6013e14b..77d1b7adc2 100644 --- a/kubernetes/informer/__init__.py +++ b/kubernetes/informer/__init__.py @@ -13,7 +13,7 @@ # limitations under the License. from .cache import ObjectCache, _meta_namespace_key -from .informer import SharedInformer, ADDED, MODIFIED, DELETED, ERROR +from .informer import SharedInformer, ADDED, MODIFIED, DELETED, BOOKMARK, ERROR __all__ = [ "ObjectCache", @@ -22,5 +22,6 @@ "ADDED", "MODIFIED", "DELETED", + "BOOKMARK", "ERROR", ] diff --git a/kubernetes/informer/informer.py b/kubernetes/informer/informer.py index 43f50844e4..48237858bf 100644 --- a/kubernetes/informer/informer.py +++ b/kubernetes/informer/informer.py @@ -35,6 +35,7 @@ ADDED = "ADDED" MODIFIED = "MODIFIED" DELETED = "DELETED" +BOOKMARK = "BOOKMARK" ERROR = "ERROR" @@ -83,7 +84,7 @@ def __init__( self._field_selector = field_selector self._cache = ObjectCache(key_func=key_func) - self._handlers = {ADDED: [], MODIFIED: [], DELETED: [], ERROR: []} + self._handlers = {ADDED: [], MODIFIED: [], DELETED: [], BOOKMARK: [], ERROR: []} self._handler_lock = threading.Lock() self._watch = None @@ -105,8 +106,8 @@ def add_event_handler(self, event_type, handler): Parameters ---------- event_type: - One of :data:`ADDED`, :data:`MODIFIED`, :data:`DELETED` or - :data:`ERROR`. + One of :data:`ADDED`, :data:`MODIFIED`, :data:`DELETED`, + :data:`BOOKMARK` or :data:`ERROR`. handler: Callable invoked with the event object (or the raw exception for ERROR events). @@ -227,6 +228,11 @@ def _run_loop(self): elif evt_type == DELETED: self._cache._remove(obj) self._fire(DELETED, obj) + elif evt_type == BOOKMARK: + # BOOKMARK events carry an updated resource version but + # no object state change; the Watch instance already + # records the new resource_version internally. + self._fire(BOOKMARK, event.get("raw_object", obj)) elif evt_type == ERROR: self._fire(ERROR, obj) # Periodic resync: re-list and fire MODIFIED for all cached objects diff --git a/kubernetes/test/test_informer.py b/kubernetes/test/test_informer.py index 665a7e4101..c4f7f527a3 100644 --- a/kubernetes/test/test_informer.py +++ b/kubernetes/test/test_informer.py @@ -22,6 +22,7 @@ from kubernetes.informer.cache import ObjectCache, _meta_namespace_key from kubernetes.informer.informer import ( ADDED, + BOOKMARK, DELETED, ERROR, MODIFIED, @@ -269,6 +270,7 @@ def fake_stream(func, **kw): cached = informer.cache.get_by_key("default/mod-pod") self.assertIs(cached, pod_v2) + def test_start_is_idempotent(self): list_func = MagicMock() list_resp = MagicMock() @@ -289,6 +291,75 @@ def test_start_is_idempotent(self): self.assertIs(informer._thread, first_thread) informer.stop() + def test_bookmark_event_fires_handler(self): + bookmark_obj = {"metadata": {"resourceVersion": "42"}} + events = [ + {"type": "BOOKMARK", "object": bookmark_obj, "raw_object": bookmark_obj}, + ] + + received = [] + list_func = MagicMock() + list_resp = MagicMock() + list_resp.items = [] + list_resp.metadata = MagicMock(resource_version="1") + list_func.return_value = list_resp + + informer = SharedInformer(list_func=list_func) + informer.add_event_handler(BOOKMARK, received.append) + + with patch("kubernetes.informer.informer.Watch") as MockWatch: + mock_w = MagicMock() + + def fake_stream(func, **kw): + yield from events + informer._stop_event.set() + + mock_w.stream.side_effect = fake_stream + MockWatch.return_value = mock_w + + informer.start() + informer._thread.join(timeout=3) + + self.assertEqual(len(received), 1) + self.assertEqual(received[0], bookmark_obj) + # Cache should be unchanged (BOOKMARK does not add/modify/delete objects) + self.assertEqual(informer.cache.list(), []) + + def test_bookmark_event_does_not_modify_cache(self): + pod = _make_pod("default", "stable-pod") + bookmark_obj = {"metadata": {"resourceVersion": "99"}} + events = [ + {"type": "ADDED", "object": pod}, + {"type": "BOOKMARK", "object": bookmark_obj, "raw_object": bookmark_obj}, + ] + + list_func = MagicMock() + list_resp = MagicMock() + list_resp.items = [] + list_resp.metadata = MagicMock(resource_version="1") + list_func.return_value = list_resp + + informer = SharedInformer(list_func=list_func) + + with patch("kubernetes.informer.informer.Watch") as MockWatch: + mock_w = MagicMock() + + def fake_stream(func, **kw): + yield from events + informer._stop_event.set() + + mock_w.stream.side_effect = fake_stream + MockWatch.return_value = mock_w + + informer.start() + informer._thread.join(timeout=3) + + # BOOKMARK must not have altered the cache content + cached = informer.cache.list() + self.assertEqual(len(cached), 1) + self.assertIs(cached[0], pod) + if __name__ == "__main__": unittest.main() + From d21aa1be59b16eb86e63eeae6932ec5933f892fe Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 20 Feb 2026 18:59:04 +0000 Subject: [PATCH 4/5] Track and reuse resourceVersion across watch reconnects; re-list only on 410 Gone Co-authored-by: brendandburns <5751682+brendandburns@users.noreply.github.com> --- kubernetes/informer/informer.py | 58 ++++++++++++----- kubernetes/test/test_informer.py | 107 +++++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+), 17 deletions(-) diff --git a/kubernetes/informer/informer.py b/kubernetes/informer/informer.py index 48237858bf..3f1a2c4cee 100644 --- a/kubernetes/informer/informer.py +++ b/kubernetes/informer/informer.py @@ -90,6 +90,7 @@ def __init__( self._watch = None self._thread = None self._stop_event = threading.Event() + self._resource_version = None # most recent RV seen; None forces a full re-list # ---------------------------------------------------------------- # # Public API # @@ -185,7 +186,6 @@ def _fire(self, event_type, obj): def _initial_list(self): """Do the initial list and populate the cache.""" kw = self._build_kwargs() - resource_version = "0" resp = self._list_func(**kw) items = getattr(resp, "items", []) or [] self._cache._replace_all(items) @@ -193,26 +193,33 @@ def _initial_list(self): meta = getattr(resp, "metadata", None) if meta is not None: rv = getattr(meta, "resource_version", None) - if rv: - resource_version = rv - return resource_version + self._resource_version = rv or "0" def _run_loop(self): - """Background loop: list then watch, reconnect on errors.""" + """Background loop: list then watch, reconnect on errors. + + A full re-list is only performed when ``self._resource_version`` is + ``None`` (first start or after a 410 Gone response). On all other + reconnects the most recent ``resourceVersion`` is reused so that no + events are missed and the API server does not need to send a full + object snapshot. + """ while not self._stop_event.is_set(): - try: - resource_version = self._initial_list() - except Exception as exc: - logger.exception("Error during initial list; retrying") - self._fire(ERROR, exc) - self._stop_event.wait(timeout=5) - continue + # Full re-list only when we have no resource version to resume from. + if self._resource_version is None: + try: + self._initial_list() + except Exception as exc: + logger.exception("Error during initial list; retrying") + self._fire(ERROR, exc) + self._stop_event.wait(timeout=5) + continue # Watch loop last_resync = time.monotonic() self._watch = Watch() kw = self._build_kwargs() - kw["resource_version"] = resource_version + kw["resource_version"] = self._resource_version try: for event in self._watch.stream(self._list_func, **kw): if self._stop_event.is_set(): @@ -245,13 +252,30 @@ def _run_loop(self): self._fire(MODIFIED, cached_obj) last_resync = time.monotonic() except ApiException as exc: - logger.warning( - "Watch stream ended with ApiException (status=%s); reconnecting", - exc.status, - ) + if exc.status == 410: + # The stored resource version is too old; force a full re-list. + logger.warning( + "Watch expired (410 Gone); will re-list from scratch" + ) + self._resource_version = None + else: + logger.warning( + "Watch stream ended with ApiException (status=%s); reconnecting", + exc.status, + ) self._fire(ERROR, exc) except Exception as exc: logger.exception("Unexpected error in watch loop; reconnecting") self._fire(ERROR, exc) finally: + # Capture the most recent resource version seen by the Watch + # (updated on every ADDED/MODIFIED/DELETED/BOOKMARK event) so + # that the next watch connection can resume without re-listing. + # Do not overwrite a None that was set by a 410 handler above. + if ( + self._resource_version is not None + and self._watch is not None + and self._watch.resource_version + ): + self._resource_version = self._watch.resource_version self._watch = None diff --git a/kubernetes/test/test_informer.py b/kubernetes/test/test_informer.py index c4f7f527a3..df61df7c56 100644 --- a/kubernetes/test/test_informer.py +++ b/kubernetes/test/test_informer.py @@ -359,6 +359,113 @@ def fake_stream(func, **kw): self.assertEqual(len(cached), 1) self.assertIs(cached[0], pod) + def test_resource_version_stored_from_watch(self): + """After the watch stream ends the latest RV is preserved for reconnect.""" + pod = _make_pod("default", "rv-pod") + events = [{"type": "ADDED", "object": pod}] + + list_func = MagicMock() + list_resp = MagicMock() + list_resp.items = [] + list_resp.metadata = MagicMock(resource_version="10") + list_func.return_value = list_resp + + informer = SharedInformer(list_func=list_func) + + call_count = {"n": 0} + + with patch("kubernetes.informer.informer.Watch") as MockWatch: + mock_w = MagicMock() + mock_w.resource_version = "99" + + def fake_stream(func, **kw): + call_count["n"] += 1 + yield from events + informer._stop_event.set() + + mock_w.stream.side_effect = fake_stream + MockWatch.return_value = mock_w + + informer.start() + informer._thread.join(timeout=3) + + # The Watch reported RV "99"; the informer should have stored it. + self.assertEqual(informer._resource_version, "99") + # list_func should have been called once for the initial list only. + self.assertEqual(list_func.call_count, 1) + + def test_reconnect_skips_relist_when_rv_known(self): + """On reconnect without 410 the informer must NOT call the list function again.""" + pod = _make_pod("default", "reconnect-pod") + + list_func = MagicMock() + list_resp = MagicMock() + list_resp.items = [pod] + list_resp.metadata = MagicMock(resource_version="5") + list_func.return_value = list_resp + + informer = SharedInformer(list_func=list_func) + + stream_calls = {"n": 0} + + with patch("kubernetes.informer.informer.Watch") as MockWatch: + mock_w = MagicMock() + mock_w.resource_version = "7" + + def fake_stream(func, **kw): + stream_calls["n"] += 1 + if stream_calls["n"] == 1: + # First stream: yield nothing then let it reconnect + return iter([]) + # Second stream: stop the informer + informer._stop_event.set() + return iter([]) + + mock_w.stream.side_effect = fake_stream + MockWatch.return_value = mock_w + + informer.start() + informer._thread.join(timeout=3) + + # list_func is called only once (initial list); reconnect reuses the RV. + self.assertEqual(list_func.call_count, 1) + self.assertEqual(stream_calls["n"], 2) + + def test_410_gone_triggers_relist(self): + """A 410 Gone ApiException must reset resource_version and trigger re-list.""" + from kubernetes.client.exceptions import ApiException + + list_func = MagicMock() + list_resp = MagicMock() + list_resp.items = [] + list_resp.metadata = MagicMock(resource_version="3") + list_func.return_value = list_resp + + informer = SharedInformer(list_func=list_func) + + stream_calls = {"n": 0} + + with patch("kubernetes.informer.informer.Watch") as MockWatch: + mock_w = MagicMock() + mock_w.resource_version = "3" + + def fake_stream(func, **kw): + stream_calls["n"] += 1 + if stream_calls["n"] == 1: + raise ApiException(status=410, reason="Gone") + # Second stream (after re-list): stop cleanly + informer._stop_event.set() + return iter([]) + + mock_w.stream.side_effect = fake_stream + MockWatch.return_value = mock_w + + informer.start() + informer._thread.join(timeout=3) + + # list_func called twice: initial list + re-list after 410. + self.assertEqual(list_func.call_count, 2) + if __name__ == "__main__": unittest.main() From 63bd3d50736dd0029bc53855a6ff6107c25cdf34 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 21 Feb 2026 02:39:10 +0000 Subject: [PATCH 5/5] Add e2e tests for SharedInformer against a real cluster Co-authored-by: brendandburns <5751682+brendandburns@users.noreply.github.com> --- kubernetes/e2e_test/test_informer.py | 177 +++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 kubernetes/e2e_test/test_informer.py diff --git a/kubernetes/e2e_test/test_informer.py b/kubernetes/e2e_test/test_informer.py new file mode 100644 index 0000000000..06283a4d5e --- /dev/null +++ b/kubernetes/e2e_test/test_informer.py @@ -0,0 +1,177 @@ +# Copyright 2024 The Kubernetes Authors. +# Licensed under the Apache License, Version 2.0 (the "License"). +# End-to-end tests for kubernetes.informer.SharedInformer. + +import threading +import time +import unittest +import uuid + +from kubernetes.client import api_client +from kubernetes.client.api import core_v1_api +from kubernetes.e2e_test import base +from kubernetes.informer import ADDED, DELETED, MODIFIED, SharedInformer + +_TIMEOUT = 30 + + +def _uid(): + return str(uuid.uuid4())[-12:] + + +def _cm(name, payload=None): + return { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": {"name": name, "labels": {"inf-e2e": "1"}}, + "data": payload or {"k": "v"}, + } + + +def _name_of(obj): + if hasattr(obj, "metadata"): + return obj.metadata.name + return (obj.get("metadata") or {}).get("name") + + +class TestSharedInformerE2E(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.cfg = base.get_e2e_configuration() + cls.apiclient = api_client.ApiClient(configuration=cls.cfg) + cls.api = core_v1_api.CoreV1Api(cls.apiclient) + + def _drop(self, cm_name): + try: + self.api.delete_namespaced_config_map(name=cm_name, namespace="default") + except Exception: + pass + + def _expect(self, ev, label): + if not ev.wait(timeout=_TIMEOUT): + self.fail("Timeout waiting for: " + label) + + def _wait_in_cache(self, inf, key): + stop = time.monotonic() + _TIMEOUT + while time.monotonic() < stop: + if inf.cache.get_by_key(key) is not None: + return + time.sleep(0.25) + self.fail("key " + key + " never appeared in cache") + + def _wait_listed(self, inf): + stop = time.monotonic() + _TIMEOUT + while inf._resource_version is None and time.monotonic() < stop: + time.sleep(0.1) + self.assertIsNotNone(inf._resource_version, "initial list never completed") + + # ------------------------------------------------------- + + def test_cache_populated_after_start(self): + """Pre-existing ConfigMaps appear in the cache once the informer starts.""" + name = "inf-pre-" + _uid() + self.api.create_namespaced_config_map(body=_cm(name), namespace="default") + self.addCleanup(self._drop, name) + + inf = SharedInformer( + list_func=self.api.list_namespaced_config_map, + namespace="default", + label_selector="inf-e2e=1", + ) + inf.start() + self.addCleanup(inf.stop) + + self._wait_in_cache(inf, "default/" + name) + self.assertEqual(_name_of(inf.cache.get_by_key("default/" + name)), name) + + def test_added_event_and_cache_entry(self): + """Creating a ConfigMap fires ADDED and the object appears in the cache.""" + name = "inf-add-" + _uid() + seen = threading.Event() + + inf = SharedInformer( + list_func=self.api.list_namespaced_config_map, + namespace="default", + label_selector="inf-e2e=1", + ) + inf.add_event_handler(ADDED, lambda o: seen.set() if _name_of(o) == name else None) + inf.start() + self.addCleanup(inf.stop) + self.addCleanup(self._drop, name) + + self._wait_listed(inf) + self.api.create_namespaced_config_map(body=_cm(name), namespace="default") + self._expect(seen, "ADDED/" + name) + self.assertIsNotNone(inf.cache.get_by_key("default/" + name)) + + def test_modified_event_and_cache_refresh(self): + """Patching a ConfigMap fires MODIFIED and the cache holds the updated object.""" + name = "inf-mod-" + _uid() + seen = threading.Event() + + inf = SharedInformer( + list_func=self.api.list_namespaced_config_map, + namespace="default", + label_selector="inf-e2e=1", + ) + inf.add_event_handler(MODIFIED, lambda o: seen.set() if _name_of(o) == name else None) + inf.start() + self.addCleanup(inf.stop) + self.addCleanup(self._drop, name) + + self.api.create_namespaced_config_map(body=_cm(name), namespace="default") + self._wait_in_cache(inf, "default/" + name) + + self.api.patch_namespaced_config_map( + name=name, namespace="default", body={"data": {"k": "updated"}} + ) + self._expect(seen, "MODIFIED/" + name) + self.assertIsNotNone(inf.cache.get_by_key("default/" + name)) + + def test_deleted_event_removes_from_cache(self): + """Deleting a ConfigMap fires DELETED and removes it from the cache.""" + name = "inf-del-" + _uid() + seen = threading.Event() + + inf = SharedInformer( + list_func=self.api.list_namespaced_config_map, + namespace="default", + label_selector="inf-e2e=1", + ) + inf.add_event_handler(DELETED, lambda o: seen.set() if _name_of(o) == name else None) + inf.start() + self.addCleanup(inf.stop) + + self.api.create_namespaced_config_map(body=_cm(name), namespace="default") + self._wait_in_cache(inf, "default/" + name) + + self.api.delete_namespaced_config_map(name=name, namespace="default") + self._expect(seen, "DELETED/" + name) + self.assertIsNone(inf.cache.get_by_key("default/" + name)) + + def test_resource_version_advances(self): + """The stored resourceVersion advances after watch events are received.""" + name = "inf-rv-" + _uid() + seen = threading.Event() + + inf = SharedInformer( + list_func=self.api.list_namespaced_config_map, + namespace="default", + label_selector="inf-e2e=1", + ) + inf.add_event_handler(ADDED, lambda o: seen.set() if _name_of(o) == name else None) + inf.start() + self.addCleanup(inf.stop) + self.addCleanup(self._drop, name) + + self._wait_listed(inf) + rv_before = int(inf._resource_version) + + self.api.create_namespaced_config_map(body=_cm(name), namespace="default") + self._expect(seen, "ADDED/" + name) + self.assertGreater(int(inf._resource_version), rv_before) + + +if __name__ == "__main__": + unittest.main()