From a8e36ab2ad23b10b9574ccab0fd6736aa4008e63 Mon Sep 17 00:00:00 2001 From: "Shisong Yuan (Accenture International Limited)" Date: Thu, 23 Jan 2025 11:18:30 +0800 Subject: [PATCH 1/3] add param CUSTOM_OAUTH_CALLBACK_CLASS --- .../strimzi/operator/cluster/model/KafkaConnectCluster.java | 1 + .../operator/cluster/model/KafkaConnectClusterTest.java | 3 ++- .../kafka/scripts/kafka_connect_config_generator.sh | 5 +++++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectCluster.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectCluster.java index cfb8c5eb8f..6f3a70d174 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectCluster.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectCluster.java @@ -121,6 +121,7 @@ public class KafkaConnectCluster extends AbstractModel implements SupportsMetric protected static final String ENV_VAR_KAFKA_CONNECT_OAUTH_REFRESH_TOKEN = "KAFKA_CONNECT_OAUTH_REFRESH_TOKEN"; protected static final String ENV_VAR_KAFKA_CONNECT_OAUTH_PASSWORD_GRANT_PASSWORD = "KAFKA_CONNECT_OAUTH_PASSWORD_GRANT_PASSWORD"; protected static final String ENV_VAR_KAFKA_CONNECT_OAUTH_CLIENT_ASSERTION = "KAFKA_CONNECT_OAUTH_CLIENT_ASSERTION"; + protected static final String ENV_VAR_CUSTOM_OAUTH_CALLBACK_CLASS = "CUSTOM_OAUTH_CALLBACK_CLASS"; protected static final String ENV_VAR_STRIMZI_TRACING = "STRIMZI_TRACING"; protected static final String CO_ENV_VAR_CUSTOM_CONNECT_POD_LABELS = "STRIMZI_CUSTOM_KAFKA_CONNECT_LABELS"; diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectClusterTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectClusterTest.java index 26e0b32238..78cd002e6d 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectClusterTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectClusterTest.java @@ -1824,7 +1824,8 @@ public void testPodSetWithOAuthWithClientAssertion() { assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_SASL_MECHANISM.equals(var.getName())).findFirst().orElseThrow().getValue(), is("oauth")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CLIENT_ASSERTION.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getName(), is("my-secret-secret")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CLIENT_ASSERTION.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getKey(), is("my-secret-key")); - assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CONFIG.equals(var.getName())).findFirst().orElseThrow().getValue().trim(), + assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CONFIG.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getKey(), is("my-secret-key")); + assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_CUSTOM_OAUTH_CALLBACK_CLASS.equals(var.getName())).findFirst().orElseThrow().getValue(), is("my-callback-class")); is(String.format("%s=\"%s\" %s=\"%s\" %s=\"%s\" %s=\"%s\"", ClientConfig.OAUTH_CLIENT_ID, "my-client-id", ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, "http://my-oauth-server", ClientConfig.OAUTH_SCOPE, "all", ClientConfig.OAUTH_AUDIENCE, "kafka"))); }); } diff --git a/docker-images/kafka-based/kafka/scripts/kafka_connect_config_generator.sh b/docker-images/kafka-based/kafka/scripts/kafka_connect_config_generator.sh index 86d5242a9b..75ead31f63 100755 --- a/docker-images/kafka-based/kafka/scripts/kafka_connect_config_generator.sh +++ b/docker-images/kafka-based/kafka/scripts/kafka_connect_config_generator.sh @@ -93,6 +93,11 @@ if [ -n "$KAFKA_CONNECT_SASL_MECHANISM" ]; then SASL_MECHANISM="OAUTHBEARER" JAAS_CONFIG="org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ${KAFKA_CONNECT_OAUTH_CONFIG} ${OAUTH_CLIENT_SECRET} ${OAUTH_REFRESH_TOKEN} ${OAUTH_ACCESS_TOKEN} ${OAUTH_PASSWORD_GRANT_PASSWORD} ${OAUTH_CLIENT_ASSERTION} ${OAUTH_TRUSTSTORE};" + if [ -n "$CUSTOM_OAUTH_CALLBACK_CLASS" ]; then + OAUTH_CLIENT_ASSERTION="sasl.login.callback.handler.class=\"$CUSTOM_OAUTH_CALLBACK_CLASS\"" + else + OAUTH_CALLBACK_CLASS="sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler" + fi OAUTH_CALLBACK_CLASS="sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler" OAUTH_CALLBACK_CLASS_PRODUCER="producer.${OAUTH_CALLBACK_CLASS}" OAUTH_CALLBACK_CLASS_CONSUMER="consumer.${OAUTH_CALLBACK_CLASS}" From 681c832851cbb46994b0903c50bfdf6e1d626f3d Mon Sep 17 00:00:00 2001 From: "Shisong Yuan (Accenture International Limited)" Date: Thu, 23 Jan 2025 12:07:10 +0800 Subject: [PATCH 2/3] add param CUSTOM_OAUTH_CALLBACK_CLASS --- .../kafka/scripts/kafka_connect_config_generator.sh | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docker-images/kafka-based/kafka/scripts/kafka_connect_config_generator.sh b/docker-images/kafka-based/kafka/scripts/kafka_connect_config_generator.sh index 75ead31f63..924c2778fb 100755 --- a/docker-images/kafka-based/kafka/scripts/kafka_connect_config_generator.sh +++ b/docker-images/kafka-based/kafka/scripts/kafka_connect_config_generator.sh @@ -93,12 +93,10 @@ if [ -n "$KAFKA_CONNECT_SASL_MECHANISM" ]; then SASL_MECHANISM="OAUTHBEARER" JAAS_CONFIG="org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ${KAFKA_CONNECT_OAUTH_CONFIG} ${OAUTH_CLIENT_SECRET} ${OAUTH_REFRESH_TOKEN} ${OAUTH_ACCESS_TOKEN} ${OAUTH_PASSWORD_GRANT_PASSWORD} ${OAUTH_CLIENT_ASSERTION} ${OAUTH_TRUSTSTORE};" + OAUTH_CALLBACK_CLASS="sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler" if [ -n "$CUSTOM_OAUTH_CALLBACK_CLASS" ]; then OAUTH_CLIENT_ASSERTION="sasl.login.callback.handler.class=\"$CUSTOM_OAUTH_CALLBACK_CLASS\"" - else - OAUTH_CALLBACK_CLASS="sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler" fi - OAUTH_CALLBACK_CLASS="sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler" OAUTH_CALLBACK_CLASS_PRODUCER="producer.${OAUTH_CALLBACK_CLASS}" OAUTH_CALLBACK_CLASS_CONSUMER="consumer.${OAUTH_CALLBACK_CLASS}" OAUTH_CALLBACK_CLASS_ADMIN="admin.${OAUTH_CALLBACK_CLASS}" From 51337982bf1b4b68688b9c4a9c76d3f916c7a2ad Mon Sep 17 00:00:00 2001 From: "Shisong Yuan (Accenture International Limited)" Date: Wed, 5 Feb 2025 23:29:14 +0800 Subject: [PATCH 3/3] add param CUSTOM_OAUTH_CALLBACK_CLASS --- .../KafkaClientAuthenticationOAuth.java | 13 ++++++++++++- .../operator/cluster/model/AuthenticationUtils.java | 3 +++ .../cluster/model/KafkaConnectClusterTest.java | 3 ++- .../kafka/scripts/kafka_connect_config_generator.sh | 3 ++- 4 files changed, 19 insertions(+), 3 deletions(-) diff --git a/api/src/main/java/io/strimzi/api/kafka/model/common/authentication/KafkaClientAuthenticationOAuth.java b/api/src/main/java/io/strimzi/api/kafka/model/common/authentication/KafkaClientAuthenticationOAuth.java index 8566cc7548..38f756c545 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/common/authentication/KafkaClientAuthenticationOAuth.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/common/authentication/KafkaClientAuthenticationOAuth.java @@ -32,7 +32,7 @@ "readTimeoutSeconds", "httpRetries", "httpRetryPauseMs", "clientSecret", "passwordSecret", "accessToken", "refreshToken", "tlsTrustedCertificates", "disableTlsHostnameVerification", "maxTokenExpirySeconds", "accessTokenIsJwt", "enableMetrics", "includeAcceptHeader", "accessTokenLocation", - "clientAssertion", "clientAssertionLocation", "clientAssertionType", "saslExtensions"}) + "clientAssertion", "clientAssertionLocation", "clientAssertionType", "saslExtensions", "customOauthCallbackClass"}) @EqualsAndHashCode(callSuper = true) @ToString(callSuper = true) public class KafkaClientAuthenticationOAuth extends KafkaClientAuthentication { @@ -62,6 +62,7 @@ public class KafkaClientAuthenticationOAuth extends KafkaClientAuthentication { private String clientAssertionLocation; private String clientAssertionType; private Map saslExtensions; + private String customOauthCallbackClass; @Description("Must be `" + TYPE_OAUTH + "`") @JsonInclude(JsonInclude.Include.NON_NULL) @@ -314,4 +315,14 @@ public Map getSaslExtensions() { public void setSaslExtensions(Map saslExtensions) { this.saslExtensions = saslExtensions; } + + @Description("Custom oauth callback class. If not set, this value defaults to `io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler`.") + @JsonInclude(JsonInclude.Include.NON_NULL) + public String getCustomOauthCallbackClass() { + return customOauthCallbackClass; + } + + public void setCustomOauthCallbackClass(String customOauthCallbackClass) { + this.customOauthCallbackClass = customOauthCallbackClass; + } } diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/AuthenticationUtils.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/AuthenticationUtils.java index e680446641..0ad20ddc1c 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/AuthenticationUtils.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/AuthenticationUtils.java @@ -286,6 +286,9 @@ public static void configureClientAuthenticationEnvVars(KafkaClientAuthenticatio if (oauth.getPasswordSecret() != null) { varList.add(ContainerUtils.createEnvVarFromSecret(envVarNamer.apply("OAUTH_PASSWORD_GRANT_PASSWORD"), oauth.getPasswordSecret().getSecretName(), oauth.getPasswordSecret().getPassword())); } + if (oauth.getCustomOauthCallbackClass() != null) { + varList.add(ContainerUtils.createEnvVar(KafkaConnectCluster.ENV_VAR_CUSTOM_OAUTH_CALLBACK_CLASS, oauth.getCustomOauthCallbackClass())); + } if (oauth.getTlsTrustedCertificates() != null && !oauth.getTlsTrustedCertificates().isEmpty()) { varList.add(ContainerUtils.createEnvVar(envVarNamer.apply("OAUTH_TRUSTED_CERTS"), CertUtils.trustedCertsEnvVar(oauth.getTlsTrustedCertificates()))); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectClusterTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectClusterTest.java index 78cd002e6d..4f6987d4de 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectClusterTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectClusterTest.java @@ -1825,8 +1825,9 @@ public void testPodSetWithOAuthWithClientAssertion() { assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CLIENT_ASSERTION.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getName(), is("my-secret-secret")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CLIENT_ASSERTION.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getKey(), is("my-secret-key")); assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CONFIG.equals(var.getName())).findFirst().orElseThrow().getValueFrom().getSecretKeyRef().getKey(), is("my-secret-key")); - assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_CUSTOM_OAUTH_CALLBACK_CLASS.equals(var.getName())).findFirst().orElseThrow().getValue(), is("my-callback-class")); + assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_KAFKA_CONNECT_OAUTH_CONFIG.equals(var.getName())).findFirst().orElseThrow().getValue().trim(), is(String.format("%s=\"%s\" %s=\"%s\" %s=\"%s\" %s=\"%s\"", ClientConfig.OAUTH_CLIENT_ID, "my-client-id", ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, "http://my-oauth-server", ClientConfig.OAUTH_SCOPE, "all", ClientConfig.OAUTH_AUDIENCE, "kafka"))); + assertThat(cont.getEnv().stream().filter(var -> KafkaConnectCluster.ENV_VAR_CUSTOM_OAUTH_CALLBACK_CLASS.equals(var.getName())).findFirst().orElseThrow().getValue(), is("my-callback-class")); }); } diff --git a/docker-images/kafka-based/kafka/scripts/kafka_connect_config_generator.sh b/docker-images/kafka-based/kafka/scripts/kafka_connect_config_generator.sh index 924c2778fb..e8a8a943c3 100755 --- a/docker-images/kafka-based/kafka/scripts/kafka_connect_config_generator.sh +++ b/docker-images/kafka-based/kafka/scripts/kafka_connect_config_generator.sh @@ -95,7 +95,8 @@ if [ -n "$KAFKA_CONNECT_SASL_MECHANISM" ]; then JAAS_CONFIG="org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ${KAFKA_CONNECT_OAUTH_CONFIG} ${OAUTH_CLIENT_SECRET} ${OAUTH_REFRESH_TOKEN} ${OAUTH_ACCESS_TOKEN} ${OAUTH_PASSWORD_GRANT_PASSWORD} ${OAUTH_CLIENT_ASSERTION} ${OAUTH_TRUSTSTORE};" OAUTH_CALLBACK_CLASS="sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler" if [ -n "$CUSTOM_OAUTH_CALLBACK_CLASS" ]; then - OAUTH_CLIENT_ASSERTION="sasl.login.callback.handler.class=\"$CUSTOM_OAUTH_CALLBACK_CLASS\"" + JAAS_CONFIG="org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" + OAUTH_CALLBACK_CLASS="sasl.login.callback.handler.class=${CUSTOM_OAUTH_CALLBACK_CLASS}" fi OAUTH_CALLBACK_CLASS_PRODUCER="producer.${OAUTH_CALLBACK_CLASS}" OAUTH_CALLBACK_CLASS_CONSUMER="consumer.${OAUTH_CALLBACK_CLASS}"