Skip to content

Commit

Permalink
[LI-HOTFIX] Add quota bound and utilization rate sensor
Browse files Browse the repository at this point in the history
This patch adds QuotaBound sensor and QuotaUtilization sensor, in
addition to the existing byte-rate & throttle-count sensors.
The QuotaBound sensor records the value of of
`org.apache.kafka.common.metrics.Quota#bound` if it exists.

This process happens on the
`kafka.server.ClientQuotaManager#recordAndGetThrottleTimeMs` code path,
where quota check actually takes place.

TICKET = N/A
LI_DESCRIPTION = LIKAFKA-35289
EXIT_CRITERIA = When upstream implement similar sensors
  • Loading branch information
lmr3796 committed Apr 22, 2021
1 parent a8d5db4 commit 51cf430
Showing 1 changed file with 53 additions and 3 deletions.
56 changes: 53 additions & 3 deletions core/src/main/scala/kafka/server/ClientQuotaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
import org.apache.kafka.common.{Cluster, MetricName}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate}
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate, Value}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType}
Expand All @@ -40,7 +40,8 @@ import scala.collection.JavaConverters._
* @param quotaSensor @Sensor that tracks the quota
* @param throttleTimeSensor @Sensor that tracks the throttle time
*/
case class ClientSensors(metricTags: Map[String, String], quotaSensor: Sensor, throttleTimeSensor: Sensor)
case class ClientSensors(metricTags: Map[String, String], quotaSensor: Sensor, throttleTimeSensor: Sensor,
quotaBoundSensor: Sensor, quotaUtilizationSensor: Sensor)

/**
* Configuration settings for quota management
Expand Down Expand Up @@ -287,12 +288,23 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
def recordAndGetThrottleTimeMs(session: Session, clientId: String, value: Double, timeMs: Long): Int = {
var throttleTimeMs = 0
val clientSensors = getOrCreateQuotaSensors(session, clientId)
val clientMetric = metrics.metrics().get(clientRateMetricName(clientSensors.metricTags))

// Quotas may 1) not be configured or 2) not be applicable to all metrics,
// so it's necessary to check for the bound's validity beforehand.
Option(clientMetric.config).flatMap(cfg => Option(cfg.quota)).map(_.bound) match {
case Some(quotaBoundVal) =>
clientSensors.quotaBoundSensor.record(quotaBoundVal)
clientSensors.quotaUtilizationSensor.record(
if (quotaBoundVal.isNaN || quotaBoundVal == 0.0) 0.0 else value / quotaBoundVal
)
}

try {
clientSensors.quotaSensor.record(value, timeMs)
} catch {
case _: QuotaViolationException =>
// Compute the delay
val clientMetric = metrics.metrics().get(clientRateMetricName(clientSensors.metricTags))
throttleTimeMs = throttleTime(clientMetric).toInt
info("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
}
Expand Down Expand Up @@ -416,6 +428,20 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
throttleMetricName(metricTags),
None,
new Avg
),
sensorAccessor.getOrCreate(
getQuotaBoundSensorName(metricTags),
ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
clientRateQuotaBoundMetricName(metricTags),
None,
new Value
),
sensorAccessor.getOrCreate(
getQuotaUtilizationSensorName(metricTags),
ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
clientRateQuotaUtilizationMetricName(metricTags),
None,
new Value
)
)
if (quotaCallback.quotaResetRequired(clientQuotaType))
Expand All @@ -429,6 +455,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private def getThrottleTimeSensorName(metricTags: Map[String, String]): String =
s"${quotaType}ThrottleTime-${metricTagsToSensorSuffix(metricTags)}"

private def getQuotaBoundSensorName(metricTags: Map[String, String]): String =
s"${quotaType}QuotaBound-${metricTagsToSensorSuffix(metricTags)}"

private def getQuotaUtilizationSensorName(metricTags: Map[String, String]): String =
s"${quotaType}QuotaUtilization-${metricTagsToSensorSuffix(metricTags)}"

private def getQuotaSensorName(metricTags: Map[String, String]): String =
s"$quotaType-${metricTagsToSensorSuffix(metricTags)}"

Expand Down Expand Up @@ -561,6 +593,24 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
quotaMetricTags.asJava)
}

private def clientRateQuotaBoundMetricName(quotaMetricTags: Map[String, String]): MetricName = {
metrics.metricName(
"byte-rate-quota-bound",
quotaType.toString,
"Tracking the byte-rate quota bound per user/client-id",
quotaMetricTags.asJava
)
}

private def clientRateQuotaUtilizationMetricName(quotaMetricTags: Map[String, String]): MetricName = {
metrics.metricName(
"byte-rate-quota-utilization",
quotaType.toString,
"Tracking the utilization rate of byte-rate quota bound per user/client-id",
quotaMetricTags.asJava
)
}

private def throttleMetricName(quotaMetricTags: Map[String, String]): MetricName = {
metrics.metricName("throttle-time",
quotaType.toString,
Expand Down

0 comments on commit 51cf430

Please sign in to comment.