From 62ddb857244ea4a27a7fcf22f082414565da51a5 Mon Sep 17 00:00:00 2001 From: Emelia Smith Date: Fri, 31 May 2024 00:27:27 +0200 Subject: [PATCH 1/3] Fix KafkaEnv.schema.brokers validator --- package.json | 1 + src/env/index.ts | 28 ++++++++++++---- tests/kafka_env.spec.ts | 72 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 7 deletions(-) create mode 100644 tests/kafka_env.spec.ts diff --git a/package.json b/package.json index fba8e3a..ea3d6a2 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,7 @@ "typescript": "^5.3.3" }, "dependencies": { + "@poppinss/validator-lite": "^1.0.3", "kafkajs": "^2.2.4" }, "peerDependencies": { diff --git a/src/env/index.ts b/src/env/index.ts index 0647d9e..34147c7 100644 --- a/src/env/index.ts +++ b/src/env/index.ts @@ -1,3 +1,5 @@ +import { schema } from '@poppinss/validator-lite' + /** * This is used to validate the start/env.ts for Kafka Brokers */ @@ -14,17 +16,29 @@ export const KafkaEnv: KafkaEnv = { brokers() { return (name: string, value?: string): string[] => { if (!value) { - throw new Error(`value for $${name} is required`) + throw new Error(`Missing environment variable "${name}"`) } - const urls = value.split(',') - const valid = urls.every((url) => { - return URL.canParse(url) + const hostValidator = schema.string({ + format: 'host', + message: `Value for environment variable "${name}" must only contain valid domain or ip addresses, instead received "${value}"`, }) - if (!valid) { - throw new Error(`Invalid URLs in $${name}`) - } + const portValidator = schema.number() + + const urls = value.split(',') + urls.every((url) => { + if (!url.includes(':')) { + throw new Error( + `Value for environment variable "${name}" must include both hostnames and port numbers, instead received ${value}` + ) + } + + const [hostname, port] = url.split(':', 2) + + hostValidator(name, hostname) + portValidator(name, port) + }) return urls } diff --git a/tests/kafka_env.spec.ts b/tests/kafka_env.spec.ts new file mode 100644 index 0000000..66f7f9e --- /dev/null +++ b/tests/kafka_env.spec.ts @@ -0,0 +1,72 @@ +import { test } from '@japa/runner' +import { KafkaEnv } from '../src/env/index.ts' + +interface BrokersAssertion { + input: string + expected: string[] + error: boolean +} + +test.group('KafkaEnv', () => { + const cases: BrokersAssertion[] = [ + { input: 'localhost:9092', error: false, expected: ['localhost:9092'] }, + { + input: + 'b-1.example.foobar.c19.kafka.us-east-1.amazonaws.com:9096,b-2.example.foobar.c19.kafka.us-east-1.amazonaws.com:9096', + error: false, + expected: [ + 'b-1.example.foobar.c19.kafka.us-east-1.amazonaws.com:9096', + 'b-2.example.foobar.c19.kafka.us-east-1.amazonaws.com:9096', + ], + }, + { input: '0.0.0.0:9092', error: false, expected: ['0.0.0.0:9092'] }, + { + input: '172.17.0.2:9092,172.17.0.3:9092', + error: false, + expected: ['172.17.0.2:9092', '172.17.0.3:9092'], + }, + { + input: 'localhost:9092,localhost:9093', + error: false, + expected: ['localhost:9092', 'localhost:9093'], + }, + // missing port: + { input: 'localhost', error: true, expected: [] }, + { input: '0.0.0.0', error: true, expected: [] }, + // missing host + { input: ':9092', error: true, expected: [] }, + // missing port + { input: 'localhost:', error: true, expected: [] }, + // invalid port: + { input: 'localhost:aaa', error: true, expected: [] }, + // empty element: + { input: 'localhost,,', error: true, expected: [] }, + ] + + for (const testcase of cases) { + test(`schema.brokers with "${testcase.input}" should be ${testcase.error ? 'invalid' : 'valid'}`, async ({ + assert, + }) => { + const key = 'KAFKA_BROKERS' + const validator = KafkaEnv.schema.brokers() + if (testcase.error) { + assert.throws(() => { + validator(key, testcase.input) + }) + } else { + assert.doesNotThrow(() => { + try { + validator(key, testcase.input) + } catch (err) { + console.log(err) + throw err + } + }) + + const result = validator(key, testcase.input) + + assert.sameMembers(result, testcase.expected) + } + }) + } +}) From fadca3c6af51deaa56527d404d5148b540ef9793 Mon Sep 17 00:00:00 2001 From: Emelia Smith Date: Fri, 31 May 2024 00:56:01 +0200 Subject: [PATCH 2/3] Improve environment validation and variable definitions --- configure.ts | 13 +++++++++++-- src/types.ts | 20 +++++++++++++++++++- stubs/config/kafka.stub | 23 +++++++++++++++++++++-- 3 files changed, 51 insertions(+), 5 deletions(-) diff --git a/configure.ts b/configure.ts index 5bca2ae..f3b93fb 100644 --- a/configure.ts +++ b/configure.ts @@ -56,11 +56,20 @@ export async function configure(command: ConfigureCommand) { await codemods.defineEnvValidations({ variables: { KAFKA_BROKERS: `KafkaEnv.schema.brokers()`, + KAFKA_CLIENT_ID: `Env.schema.string.optional()`, - KAFKA_GROUP_ID: `Env.schema.string.optional()`, + KAFKA_LOG_LEVEL: `Env.schema.enum.optional(['fatal', 'error', 'warn', 'info', 'debug', 'trace'])`, + KAFKA_CONNECTION_TIMEOUT: `Env.schema.number.optional()`, KAFKA_REQUEST_TIMEOUT: `Env.schema.number.optional()`, - KAFKA_LOG_LEVEL: `Env.schema.enum.optional(['fatal', 'error', 'warn', 'info', 'debug', 'trace'])`, + KAFKA_AUTHENTICATION_TIMEOUT: `Env.schema.number.optional()`, + KAFKA_REAUTHENTICATION_TIMEOUT: `Env.schema.number.optional()`, + + // We don't yet support AWS IAM or OAuth Bearer or custom mechanisms + KAFKA_SECURITY_PROTOCOL: `Env.schema.enum.optional(['PLAIN', 'SSL'])`, + KAFKA_SASL_MECHANISM: `Env.schema.enum.optional(['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'])`, + KAFKA_SASL_USERNAME: `Env.schema.string.optional()`, + KAFKA_SASL_PASSWORD: `Env.schema.string.optional()`, }, leadingComment: 'Variables for configuring kafka package', }) diff --git a/src/types.ts b/src/types.ts index f873d5e..bd2a3f0 100644 --- a/src/types.ts +++ b/src/types.ts @@ -4,7 +4,8 @@ import type { ConsumerRunConfig as KafkaConsumerRunConfig, Message as KafkaMessage, EachMessagePayload as KafkaEachMessagePayload, - SASLOptions, + SASLOptions as KafkaSASLOptions, + OauthbearerProviderResponse as KafkaOauthbearerProviderResponse, } from 'kafkajs' import type tls from 'node:tls' @@ -15,6 +16,23 @@ import type { Producer } from './producer.ts' import { Kafka } from './index.ts' +// AWS mechanism isn't currently well supported: +type SASLAWSOption = { mechanism: 'aws' } & { + authorizationIdentity: string + accessKeyId: string + secretAccessKey: string + sessionToken?: string +} + +// OAuth Bearer mechanism isn't currently well supported: +type SASLOAuthOption = { + mechanism: 'oauthbearer' +} & { + oauthBearerProvider: () => Promise +} + +type SASLOptions = Exclude + export type ProducerConfig = KafkaProducerConfig export type ConsumerGroupConfig = KafkaConsumerConfig & diff --git a/stubs/config/kafka.stub b/stubs/config/kafka.stub index 19172bd..71879ae 100644 --- a/stubs/config/kafka.stub +++ b/stubs/config/kafka.stub @@ -3,11 +3,30 @@ }}} import env from '#start/env' +const kafkaSecurityProtocol = env.get('KAFKA_SECURITY_PROTOCOL', 'PLAIN') +const useSSL = kafkaSecurityProtocol === 'SSL' +const useSasl = env.get('KAFKA_SASL_MECHANISM') !== undefined + +const saslOptions = useSasl + ? { + mechanism: env.get('KAFKA_SASL_MECHANISM')?.toLowerCase(), + username: env.get('KAFKA_SASL_USERNAME'), + password: env.get('KAFKA_SASL_PASSWORD'), + } + : undefined + const kafkaConfig = { brokers: env.get('KAFKA_BROKERS', 'localhost:9092'), - clientId: env.get('KAFKA_CLIENT_ID'), + ssl: useSSL, + sasl: saslOptions, + clientId: env.get('KAFKA_CLIENT_ID', 'local'), + timeouts: { + connection: env.get('KAFKA_CONNECTION_TIMEOUT'), + authentication: env.get('KAFKA_AUTHENTICATION_TIMEOUT'), + reauthentication: env.get('KAFKA_REAUTHENTICATION_TIMEOUT'), + request: env.get('KAFKA_REQUEST_TIMEOUT'), + }, logLevel: env.get('KAFKA_LOG_LEVEL', env.get('LOG_LEVEL')), } export default kafkaConfig - From 18acbed84709596ea1c571d1ae528114d958a60a Mon Sep 17 00:00:00 2001 From: Emelia Smith Date: Fri, 31 May 2024 00:57:21 +0200 Subject: [PATCH 3/3] Improve comment for env variables --- configure.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/configure.ts b/configure.ts index f3b93fb..6190416 100644 --- a/configure.ts +++ b/configure.ts @@ -71,7 +71,7 @@ export async function configure(command: ConfigureCommand) { KAFKA_SASL_USERNAME: `Env.schema.string.optional()`, KAFKA_SASL_PASSWORD: `Env.schema.string.optional()`, }, - leadingComment: 'Variables for configuring kafka package', + leadingComment: `Variables for configuring ${command.name}`, }) /**