Skip to content

Commit

Permalink
Merge pull request #250 from nolar/249-cluster-resources-in-namespaces
Browse files Browse the repository at this point in the history
Hot-fix the cluster-scoped resource discovery in namespaced operators
  • Loading branch information
nolar authored Nov 20, 2019
2 parents dc804e0 + 0622e24 commit 067ddab
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 9 deletions.
8 changes: 7 additions & 1 deletion kopf/clients/auth.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import base64
import functools
import os
Expand All @@ -10,6 +11,7 @@
import aiohttp

from kopf.structs import credentials
from kopf.structs import resources

# Per-operator storage and exchange point for authentication methods.
# Used by the client wrappers to retrieve the credentials and report the failures.
Expand Down Expand Up @@ -85,8 +87,10 @@ class APISession(aiohttp.ClientSession):
performed inside of those threads: everything is in the main thread/loop.
"""
server: str
default_namespace: Optional[str] = None
default_namespace: Optional[str]
_tempfiles: "_TempFiles"
_discovery_lock: asyncio.Lock
_discovered_resources: Dict[resources.Resource, Dict[str, object]]

@classmethod
def from_connection_info(
Expand Down Expand Up @@ -177,6 +181,8 @@ def from_connection_info(
session.server = info.server
session.default_namespace = info.default_namespace
session._tempfiles = tempfiles # for purging on garbage collection
session._discovery_lock = asyncio.Lock()
session._discovered_resources = {}

return session

Expand Down
35 changes: 35 additions & 0 deletions kopf/clients/discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from typing import Dict, Optional, cast

from kopf.clients import auth
from kopf.structs import resources


async def discover(
*,
resource: resources.Resource,
session: auth.APISession,
) -> Optional[Dict[str, object]]:
if resource not in session._discovered_resources:
async with session._discovery_lock:
if resource not in session._discovered_resources:

response = await session.get(
url=resource.get_version_url(server=session.server),
)
response.raise_for_status()
respdata = await response.json()

session._discovered_resources.update({
resources.Resource(resource.group, resource.version, info['name']): info
for info in respdata['resources']
})
return session._discovered_resources.get(resource, None)


async def is_namespaced(
*,
resource: resources.Resource,
session: auth.APISession,
) -> bool:
info = await discover(resource=resource, session=session)
return cast(bool, info['namespaced']) if info is not None else True # assume namespaced
10 changes: 8 additions & 2 deletions kopf/clients/fetching.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import aiohttp

from kopf.clients import auth
from kopf.clients import discovery
from kopf.structs import bodies
from kopf.structs import resources

Expand All @@ -25,6 +26,7 @@ async def read_crd(
) -> Union[bodies.Body, _T]:
if session is None:
raise RuntimeError("API instance is not injected by the decorator.")

try:
response = await session.get(
url=CRD_CRD.get_url(server=session.server, name=resource.name),
Expand All @@ -51,8 +53,10 @@ async def read_obj(
if session is None:
raise RuntimeError("API instance is not injected by the decorator.")

is_namespaced = await discovery.is_namespaced(resource=resource, session=session)
namespace = namespace if is_namespaced else None

try:
# TODO: also add cluster-wide resource when --namespace is set?
response = await session.get(
url=resource.get_url(server=session.server, namespace=namespace, name=name),
)
Expand Down Expand Up @@ -88,7 +92,9 @@ async def list_objs_rv(
if session is None:
raise RuntimeError("API instance is not injected by the decorator.")

# TODO: also add cluster-wide resource when --namespace is set?
is_namespaced = await discovery.is_namespaced(resource=resource, session=session)
namespace = namespace if is_namespaced else None

response = await session.get(
url=resource.get_url(server=session.server, namespace=namespace),
)
Expand Down
5 changes: 5 additions & 0 deletions kopf/clients/patching.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import aiohttp

from kopf.clients import auth
from kopf.clients import discovery
from kopf.structs import bodies
from kopf.structs import patches
from kopf.structs import resources
Expand Down Expand Up @@ -36,6 +37,10 @@ async def patch_obj(

namespace = body.get('metadata', {}).get('namespace') if body is not None else namespace
name = body.get('metadata', {}).get('name') if body is not None else name

is_namespaced = await discovery.is_namespaced(resource=resource, session=session)
namespace = namespace if is_namespaced else None

if body is None:
body = cast(bodies.Body, {'metadata': {'name': name}})
if namespace is not None:
Expand Down
5 changes: 4 additions & 1 deletion kopf/clients/watching.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from kopf import config
from kopf.clients import auth
from kopf.clients import discovery
from kopf.clients import fetching
from kopf.structs import bodies
from kopf.structs import resources
Expand Down Expand Up @@ -140,14 +141,16 @@ async def watch_objs(
if session is None:
raise RuntimeError("API instance is not injected by the decorator.")

is_namespaced = await discovery.is_namespaced(resource=resource, session=session)
namespace = namespace if is_namespaced else None

params: Dict[str, str] = {}
params['watch'] = 'true'
if since is not None:
params['resourceVersion'] = since
if timeout is not None:
params['timeoutSeconds'] = str(timeout)

# TODO: also add cluster-wide resource when --namespace is set?
response = await session.get(
url=resource.get_url(server=session.server, namespace=namespace, params=params),
timeout=aiohttp.ClientTimeout(total=None),
Expand Down
23 changes: 21 additions & 2 deletions kopf/structs/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,34 @@ def get_url(
name: Optional[str] = None,
params: Optional[Mapping[str, str]] = None,
) -> str:
parts: List[Optional[str]] = [
return self._build_url(server, params, [
'/api' if self.group == '' and self.version == 'v1' else '/apis',
self.group,
self.version,
'namespaces' if namespace is not None else None,
namespace,
self.plural,
name,
]
])

def get_version_url(
self,
*,
server: Optional[str] = None,
params: Optional[Mapping[str, str]] = None,
) -> str:
return self._build_url(server, params, [
'/api' if self.group == '' and self.version == 'v1' else '/apis',
self.group,
self.version,
])

def _build_url(
self,
server: Optional[str],
params: Optional[Mapping[str, str]],
parts: List[Optional[str]],
) -> str:
query = urllib.parse.urlencode(params, encoding='utf-8') if params else ''
path = '/'.join([part for part in parts if part])
url = path + ('?' if query else '') + query
Expand Down
14 changes: 12 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import time
from unittest.mock import Mock

import aiohttp
import aiohttp.web
import asynctest
import pytest
import pytest_mock
Expand Down Expand Up @@ -175,7 +175,17 @@ async def resp_mock_effect(request):


@pytest.fixture()
def stream(fake_vault, resp_mocker, aresponses, hostname, resource):
def version_api(resp_mocker, aresponses, hostname, resource):
result = {'resources': [{
'name': resource.plural,
'namespaced': True,
}]}
list_mock = resp_mocker(return_value=aiohttp.web.json_response(result))
aresponses.add(hostname, resource.get_version_url(), 'get', list_mock)


@pytest.fixture()
def stream(fake_vault, resp_mocker, aresponses, hostname, resource, version_api):
""" A mock for the stream of events as if returned by K8s client. """

def feed(*args, namespace=None):
Expand Down
2 changes: 1 addition & 1 deletion tests/k8s/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@


@pytest.fixture(autouse=True)
def _autouse_resp_mocker(resp_mocker):
def _autouse_resp_mocker(resp_mocker, version_api):
pass

0 comments on commit 067ddab

Please sign in to comment.