Skip to content

Commit

Permalink
Reduce apiserver request count for pod_service_mapper
Browse files Browse the repository at this point in the history
- disable podmapper freshness check from SD, rely on event from the kubernetes check
- add a delay parameter to throttle requests to the apiserver
- remove check_services_cache_freshness and use process_events only for cache invalidation
- add collect_service_tags to disable collection completely

Needs changes in integration-core
  • Loading branch information
xvello authored and olivielpeau committed Jun 15, 2017
1 parent 4e80bfc commit 6e6164f
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 80 deletions.
27 changes: 27 additions & 0 deletions tests/core/test_kube_event_retriever.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# stdlib
import time # noqa: F401

# 3rd party
from mock import patch

Expand Down Expand Up @@ -38,6 +41,30 @@ def test_events_resversion_filtering(self):
self.assertEquals(0, len(events)) # No new event
self.assertEquals(2709, retr.last_resversion)

@patch('time.time')
def test_events_delay(self, mock_time):
jsons = self._load_json_array(
['service_cache_events1.json', 'service_cache_events2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
retr = KubeEventRetriever(self.kube, delay=500)

mock_time.return_value = 10000
events = retr.get_event_array()
self.assertEquals(3, len(events))
self.assertEquals(2707, retr.last_resversion)

# Must skip request
mock_time.return_value = 10400
events = retr.get_event_array()
self.assertEquals(0, len(events))
self.assertEquals(2707, retr.last_resversion)

# Must retrieve events
mock_time.return_value = 10600
events = retr.get_event_array()
self.assertEquals(2, len(events))
self.assertEquals(2709, retr.last_resversion)

def test_namespace_serverside_filtering(self):
with patch.object(self.kube, 'retrieve_json_auth', return_value={}) as mock_method:
retr = KubeEventRetriever(self.kube, namespaces=['testns'])
Expand Down
38 changes: 9 additions & 29 deletions tests/core/test_kube_pod_service_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_init(self):
self.assertEqual(0, len(mapper._pod_services_mapping))

def test_service_cache_fill(self):
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
mapper._fill_services_cache()
Expand All @@ -46,36 +46,16 @@ def test_service_cache_fill(self):
self.assertEqual('hello', redis['app'])
self.assertEqual('db', redis['tier'])

def test_service_cache_invalidation_true(self):
jsons = self._load_json_array(
['service_cache_events1.json', 'service_cache_services1.json', 'service_cache_events2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
mapper._fill_services_cache()
mapper.check_services_cache_freshness()
self.assertEqual(True, mapper._service_cache_invalidated)

def test_service_cache_invalidation_false(self):
jsons = self._load_json_array(
['service_cache_events1.json', 'service_cache_services1.json', 'service_cache_events1.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
self.assertEqual(True, mapper._service_cache_invalidated)
mapper._fill_services_cache()
self.assertEqual(False, mapper._service_cache_invalidated)
mapper.check_services_cache_freshness()
self.assertEqual(False, mapper._service_cache_invalidated)

def test_pod_to_service_no_match(self):
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
mapper._fill_services_cache()
no_match = self._build_pod_metadata(0, {'app': 'unknown'})
self.assertEqual(0, len(mapper.match_services_for_pod(no_match)))

def test_pod_to_service_two_matches(self):
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
two_matches = self._build_pod_metadata(0, {'app': 'hello', 'tier': 'db'})
Expand All @@ -86,7 +66,7 @@ def test_pod_to_service_two_matches(self):
sorted(mapper.match_services_for_pod(two_matches, names=True)))

def test_pod_to_service_cache(self):
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
two_matches = self._build_pod_metadata(0, {'app': 'hello', 'tier': 'db'})
Expand All @@ -97,7 +77,7 @@ def test_pod_to_service_cache(self):
sorted(mapper.match_services_for_pod({'uid': 0}, names=True)))

def test_pods_for_service(self):
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
# Fill pod label cache
mapper = PodServiceMapper(self.kube)
Expand All @@ -123,7 +103,7 @@ def _prepare_events_tests(self, jsonfiles):
return mapper

def test_event_pod_invalidation(self):
mapper = self._prepare_events_tests(['service_cache_events2.json', 'service_cache_services2.json'])
mapper = self._prepare_events_tests(['service_cache_services2.json'])
self.assertTrue(0 in mapper._pod_labels_cache)
self.assertTrue(0 in mapper._pod_services_mapping)
self.assertTrue(1 in mapper._pod_labels_cache)
Expand All @@ -138,7 +118,7 @@ def test_event_pod_invalidation(self):
self.assertTrue(1 in mapper._pod_services_mapping)

def test_event_service_deleted_invalidation(self):
mapper = self._prepare_events_tests(['service_cache_events2.json', 'service_cache_services2.json'])
mapper = self._prepare_events_tests(['service_cache_services2.json'])
self.assertEqual(2, len(mapper.match_services_for_pod({'uid': 0})))

event = {'involvedObject': {'kind': 'Service', 'uid': REDIS_HELLO_UID},
Expand All @@ -149,13 +129,13 @@ def test_event_service_deleted_invalidation(self):
self.assertEqual(1, len(mapper.match_services_for_pod({'uid': 0})))

def test_event_service_created_invalidation(self):
mapper = self._prepare_events_tests(['service_cache_events1.json', 'service_cache_services1.json'])
mapper = self._prepare_events_tests(['service_cache_services1.json'])
self.assertEqual(1, len(mapper.match_services_for_pod(
self._build_pod_metadata(0, {'app': 'hello', 'tier': 'db'}))))

event = {'involvedObject': {'kind': 'Service', 'uid': ALL_HELLO_UID},
'reason': 'CreatedLoadBalancer'}
jsons = self._load_json_array(['service_cache_events2.json', 'service_cache_services2.json'])
jsons = self._load_json_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
# Three pods must be reloaded
self.assertEqual(set([0, 1, 3]), mapper.process_events([event]))
Expand Down
21 changes: 20 additions & 1 deletion utils/kubernetes/kube_event_retriever.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import time

log = logging.getLogger('collector')

Expand All @@ -25,12 +26,22 @@ class KubeEventRetriever:
[3] https://github.com/kubernetes/kubernetes/issues/1362
"""

def __init__(self, kubeutil_object, namespaces=None, kinds=None):
def __init__(self, kubeutil_object, namespaces=None, kinds=None, delay=None):
"""
:param kubeutil_object: valid, initialised KubeUtil objet to route requests through
:param namespaces: namespace(s) to watch (string or list)
:param kinds: kinds(s) to watch (string or list)
:param delay: minimum time (in seconds) between two apiserver requests, return [] in the meantime
"""
self.kubeutil = kubeutil_object
self.last_resversion = -1
self.set_namespaces(namespaces)
self.set_kinds(kinds)

# Request throttling to reduce apiserver traffic
self._request_interval = delay
self._last_lookup_timestamp = -1

def set_namespaces(self, namespaces):
self.request_url = self.kubeutil.kubernetes_api_url + '/events'
self.namespace_filter = None
Expand Down Expand Up @@ -60,6 +71,14 @@ def get_event_array(self):
Fetch latest events from the apiserver for the namespaces and kinds set on init
and returns an array of event objects
"""

# Request throttling
if self._request_interval:
if (time.time() - self._last_lookup_timestamp) < self._request_interval:
return []
else:
self._last_lookup_timestamp = time.time()

lastest_resversion = None
filtered_events = []

Expand Down
25 changes: 10 additions & 15 deletions utils/kubernetes/kubeutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class KubeUtil:
DEFAULT_MASTER_PORT = 8080
DEFAULT_MASTER_NAME = 'kubernetes' # DNS name to reach the master from a pod.
DEFAULT_LABEL_PREFIX = 'kube_'
DEFAULT_COLLECT_SERVICE_TAG = True
CA_CRT_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/ca.crt'
AUTH_TOKEN_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/token'

Expand Down Expand Up @@ -102,6 +103,9 @@ def __init__(self, instance=None):
self.metrics_url = urljoin(self.cadvisor_url, KubeUtil.METRICS_PATH)
self.machine_info_url = urljoin(self.cadvisor_url, KubeUtil.MACHINE_INFO_PATH)

from config import _is_affirmative
self.collect_service_tag = _is_affirmative(instance.get('collect_service_tags', KubeUtil.DEFAULT_COLLECT_SERVICE_TAG))

# keep track of the latest k8s event we collected and posted
# default value is 0 but TTL for k8s events is one hour anyways
self.last_event_collection_ts = 0
Expand Down Expand Up @@ -231,9 +235,10 @@ def extract_kube_pod_tags(self, pods_list, excluded_keys=None, label_prefix=None
podtags = self.get_pod_creator_tags(metadata)

# Extract services tags
for service in self.match_services_for_pod(metadata):
if service is not None:
podtags.append(u'kube_service:%s' % service)
if self.collect_service_tag:
for service in self.match_services_for_pod(metadata):
if service is not None:
podtags.append(u'kube_service:%s' % service)

# Extract labels
for k, v in labels.iteritems():
Expand Down Expand Up @@ -387,16 +392,6 @@ def get_auth_token(cls):

return None

def check_services_cache_freshness(self):
"""
Entry point for sd_docker_backend to check whether to invalidate the cached services
For now, we remove the whole cache as the fill_service_cache logic
doesn't handle partial lookups
We use the event's resourceVersion, as using the service's version wouldn't catch deletion
"""
return self._service_mapper.check_services_cache_freshness()

def match_services_for_pod(self, pod_metadata, refresh=False):
"""
Match the pods labels with services' label selectors to determine the list
Expand All @@ -408,11 +403,11 @@ def match_services_for_pod(self, pod_metadata, refresh=False):
#log.warning("Matches for %s: %s" % (pod_metadata.get('name'), str(s)))
return s

def get_event_retriever(self, namespaces=None, kinds=None):
def get_event_retriever(self, namespaces=None, kinds=None, delay=None):
"""
Returns a KubeEventRetriever object ready for action
"""
return KubeEventRetriever(self, namespaces, kinds)
return KubeEventRetriever(self, namespaces, kinds, delay)

def match_containers_for_pods(self, pod_uids, podlist=None):
"""
Expand Down
28 changes: 3 additions & 25 deletions utils/kubernetes/pod_service_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,16 @@ def __init__(self, kubeutil_object):
The apiserver requests are routed through the given KubeUtil instance
"""
self.kube = kubeutil_object
self._event_retriever = self.kube.get_event_retriever(kinds=['Service'])
self._service_cache_selectors = defaultdict(dict) # {service_uid:{selectors}}
self._service_cache_names = {} # {service_uid:service_name
self._service_cache_invalidated = False # True to trigger service parsing
self._service_cache_invalidated = True
self._pod_labels_cache = defaultdict(dict) # {pod_uid:{label}}
self._pod_services_mapping = defaultdict(list) # {pod_uid:[service_uid]}

# Consume past events
self.check_services_cache_freshness()
self._service_cache_invalidated = True

def _fill_services_cache(self):
"""
Get the list of services from the kubelet API and store the label selector dicts.
The cache is to be invalidated by the user class by calling check_services_cache_freshness
The cache is to be invalidated by the user class by calling process_events
"""
try:
reply = self.kube.retrieve_json_auth(self.kube.kubernetes_api_url + '/services')
Expand All @@ -51,23 +46,6 @@ def _fill_services_cache(self):
self._service_cache_names = {}
self._service_cache_invalidated = False

def check_services_cache_freshness(self):
"""
Entry point for sd_docker_backend to check whether to invalidate the cached services
For now, we remove the whole cache as the fill_service_cache logic
doesn't handle partial lookups
"""

# Don't check if cache is already invalidated
if self._service_cache_invalidated:
return

try:
if self._event_retriever.get_event_array():
self._service_cache_invalidated = True
except Exception as e:
log.warning("Exception while parsing service events, not invalidating cache: %s", e)

def match_services_for_pod(self, pod_metadata, refresh=False, names=False):
"""
Match the pods labels with services' label selectors to determine the list
Expand Down Expand Up @@ -168,7 +146,7 @@ def process_events(self, event_array):
service_uid = event.get('involvedObject', {}).get('uid', None)

if service_cache_checked is False:
self.check_services_cache_freshness()
self._service_cache_invalidated = True
service_cache_checked = True

# Possible values in kubernetes/pkg/controller/service/servicecontroller.go
Expand Down
15 changes: 5 additions & 10 deletions utils/service_discovery/sd_docker_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,6 @@ def _make_fetch_state(self):
def update_checks(self, changed_containers):
state = self._make_fetch_state()

if Platform.is_k8s():
self.kubeutil.check_services_cache_freshness()

conf_reload_set = set()
for c_id in changed_containers:
checks = self._get_checks_to_refresh(state, c_id)
Expand Down Expand Up @@ -305,10 +302,11 @@ def get_tags(self, state, c_id):
tags.extend(creator_tags)

# add services tags
services = self.kubeutil.match_services_for_pod(pod_metadata)
for s in services:
if s is not None:
tags.append('kube_service:%s' % s)
if self.kubeutil.collect_service_tag:
services = self.kubeutil.match_services_for_pod(pod_metadata)
for s in services:
if s is not None:
tags.append('kube_service:%s' % s)

elif Platform.is_swarm():
c_labels = c_inspect.get('Config', {}).get('Labels', {})
Expand Down Expand Up @@ -360,9 +358,6 @@ def get_configs(self):
container.get('Id'), container.get('Labels')
) for container in self.docker_client.containers()]

if Platform.is_k8s():
self.kubeutil.check_services_cache_freshness()

for image, cid, labels in containers:
try:
# value of the DATADOG_ID tag or the image name if the label is missing
Expand Down

0 comments on commit 6e6164f

Please sign in to comment.