From 51cf4304828ee918e04f08a459b5e683d7de4124 Mon Sep 17 00:00:00 2001 From: "Joseph (Ting-Chou) Lin" Date: Wed, 10 Mar 2021 17:45:41 -0800 Subject: [PATCH] [LI-HOTFIX] Add quota bound and utilization rate sensor This patch adds QuotaBound sensor and QuotaUtilization sensor, in addition to the existing byte-rate & throttle-count sensors. The QuotaBound sensor records the value of of `org.apache.kafka.common.metrics.Quota#bound` if it exists. This process happens on the `kafka.server.ClientQuotaManager#recordAndGetThrottleTimeMs` code path, where quota check actually takes place. TICKET = N/A LI_DESCRIPTION = LIKAFKA-35289 EXIT_CRITERIA = When upstream implement similar sensors --- .../kafka/server/ClientQuotaManager.scala | 56 ++++++++++++++++++- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index eda7ac3f49a72..deb9d28db2aff 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -27,7 +27,7 @@ import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread} import org.apache.kafka.common.{Cluster, MetricName} import org.apache.kafka.common.metrics._ import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate} +import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate, Value} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{Sanitizer, Time} import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType} @@ -40,7 +40,8 @@ import scala.collection.JavaConverters._ * @param quotaSensor @Sensor that tracks the quota * @param throttleTimeSensor @Sensor that tracks the throttle time */ -case class ClientSensors(metricTags: Map[String, String], quotaSensor: Sensor, throttleTimeSensor: Sensor) +case class ClientSensors(metricTags: Map[String, String], quotaSensor: Sensor, throttleTimeSensor: Sensor, + quotaBoundSensor: Sensor, quotaUtilizationSensor: Sensor) /** * Configuration settings for quota management @@ -287,12 +288,23 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, def recordAndGetThrottleTimeMs(session: Session, clientId: String, value: Double, timeMs: Long): Int = { var throttleTimeMs = 0 val clientSensors = getOrCreateQuotaSensors(session, clientId) + val clientMetric = metrics.metrics().get(clientRateMetricName(clientSensors.metricTags)) + + // Quotas may 1) not be configured or 2) not be applicable to all metrics, + // so it's necessary to check for the bound's validity beforehand. + Option(clientMetric.config).flatMap(cfg => Option(cfg.quota)).map(_.bound) match { + case Some(quotaBoundVal) => + clientSensors.quotaBoundSensor.record(quotaBoundVal) + clientSensors.quotaUtilizationSensor.record( + if (quotaBoundVal.isNaN || quotaBoundVal == 0.0) 0.0 else value / quotaBoundVal + ) + } + try { clientSensors.quotaSensor.record(value, timeMs) } catch { case _: QuotaViolationException => // Compute the delay - val clientMetric = metrics.metrics().get(clientRateMetricName(clientSensors.metricTags)) throttleTimeMs = throttleTime(clientMetric).toInt info("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs)) } @@ -416,6 +428,20 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, throttleMetricName(metricTags), None, new Avg + ), + sensorAccessor.getOrCreate( + getQuotaBoundSensorName(metricTags), + ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds, + clientRateQuotaBoundMetricName(metricTags), + None, + new Value + ), + sensorAccessor.getOrCreate( + getQuotaUtilizationSensorName(metricTags), + ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds, + clientRateQuotaUtilizationMetricName(metricTags), + None, + new Value ) ) if (quotaCallback.quotaResetRequired(clientQuotaType)) @@ -429,6 +455,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, private def getThrottleTimeSensorName(metricTags: Map[String, String]): String = s"${quotaType}ThrottleTime-${metricTagsToSensorSuffix(metricTags)}" + private def getQuotaBoundSensorName(metricTags: Map[String, String]): String = + s"${quotaType}QuotaBound-${metricTagsToSensorSuffix(metricTags)}" + + private def getQuotaUtilizationSensorName(metricTags: Map[String, String]): String = + s"${quotaType}QuotaUtilization-${metricTagsToSensorSuffix(metricTags)}" + private def getQuotaSensorName(metricTags: Map[String, String]): String = s"$quotaType-${metricTagsToSensorSuffix(metricTags)}" @@ -561,6 +593,24 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, quotaMetricTags.asJava) } + private def clientRateQuotaBoundMetricName(quotaMetricTags: Map[String, String]): MetricName = { + metrics.metricName( + "byte-rate-quota-bound", + quotaType.toString, + "Tracking the byte-rate quota bound per user/client-id", + quotaMetricTags.asJava + ) + } + + private def clientRateQuotaUtilizationMetricName(quotaMetricTags: Map[String, String]): MetricName = { + metrics.metricName( + "byte-rate-quota-utilization", + quotaType.toString, + "Tracking the utilization rate of byte-rate quota bound per user/client-id", + quotaMetricTags.asJava + ) + } + private def throttleMetricName(quotaMetricTags: Map[String, String]): MetricName = { metrics.metricName("throttle-time", quotaType.toString,