Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve env validation and definition #19

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions configure.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,22 @@ export async function configure(command: ConfigureCommand) {
await codemods.defineEnvValidations({
variables: {
KAFKA_BROKERS: `KafkaEnv.schema.brokers()`,

KAFKA_CLIENT_ID: `Env.schema.string.optional()`,
KAFKA_GROUP_ID: `Env.schema.string.optional()`,
KAFKA_LOG_LEVEL: `Env.schema.enum.optional(['fatal', 'error', 'warn', 'info', 'debug', 'trace'])`,

KAFKA_CONNECTION_TIMEOUT: `Env.schema.number.optional()`,
KAFKA_REQUEST_TIMEOUT: `Env.schema.number.optional()`,
KAFKA_LOG_LEVEL: `Env.schema.enum.optional(['fatal', 'error', 'warn', 'info', 'debug', 'trace'])`,
KAFKA_AUTHENTICATION_TIMEOUT: `Env.schema.number.optional()`,
KAFKA_REAUTHENTICATION_TIMEOUT: `Env.schema.number.optional()`,

// We don't yet support AWS IAM or OAuth Bearer or custom mechanisms
KAFKA_SECURITY_PROTOCOL: `Env.schema.enum.optional(['PLAIN', 'SSL'])`,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may want:

KAFKA_SECURITY_PROTOCOL: the security protocol the hashing service should use when communicating with
the Kafka cluster. Can be set to ssl, plaintext, or sasl_plaintext and defaults to plaintext.

KAFKA_SASL_MECHANISM: `Env.schema.enum.optional(['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'])`,
KAFKA_SASL_USERNAME: `Env.schema.string.optional()`,
KAFKA_SASL_PASSWORD: `Env.schema.string.optional()`,
},
leadingComment: 'Variables for configuring kafka package',
leadingComment: `Variables for configuring ${command.name}`,
})

/**
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"typescript": "^5.3.3"
},
"dependencies": {
"@poppinss/validator-lite": "^1.0.3",
"kafkajs": "^2.2.4"
},
"peerDependencies": {
Expand Down
28 changes: 21 additions & 7 deletions src/env/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { schema } from '@poppinss/validator-lite'

/**
* This is used to validate the start/env.ts for Kafka Brokers
*/
Expand All @@ -14,17 +16,29 @@ export const KafkaEnv: KafkaEnv = {
brokers() {
return (name: string, value?: string): string[] => {
if (!value) {
throw new Error(`value for $${name} is required`)
throw new Error(`Missing environment variable "${name}"`)
}

const urls = value.split(',')
const valid = urls.every((url) => {
return URL.canParse(url)
const hostValidator = schema.string({
format: 'host',
message: `Value for environment variable "${name}" must only contain valid domain or ip addresses, instead received "${value}"`,
})

if (!valid) {
throw new Error(`Invalid URLs in $${name}`)
}
const portValidator = schema.number()

const urls = value.split(',')
urls.every((url) => {
if (!url.includes(':')) {
throw new Error(
`Value for environment variable "${name}" must include both hostnames and port numbers, instead received ${value}`
)
}

const [hostname, port] = url.split(':', 2)

hostValidator(name, hostname)
portValidator(name, port)
})

return urls
}
Expand Down
20 changes: 19 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import type {
ConsumerRunConfig as KafkaConsumerRunConfig,
Message as KafkaMessage,
EachMessagePayload as KafkaEachMessagePayload,
SASLOptions,
SASLOptions as KafkaSASLOptions,
OauthbearerProviderResponse as KafkaOauthbearerProviderResponse,
} from 'kafkajs'

import type tls from 'node:tls'
Expand All @@ -15,6 +16,23 @@ import type { Producer } from './producer.ts'

import { Kafka } from './index.ts'

// AWS mechanism isn't currently well supported:
type SASLAWSOption = { mechanism: 'aws' } & {
authorizationIdentity: string
accessKeyId: string
secretAccessKey: string
sessionToken?: string
}

// OAuth Bearer mechanism isn't currently well supported:
type SASLOAuthOption = {
mechanism: 'oauthbearer'
} & {
oauthBearerProvider: () => Promise<KafkaOauthbearerProviderResponse>
}

type SASLOptions = Exclude<KafkaSASLOptions, SASLAWSOption | SASLOAuthOption>

export type ProducerConfig = KafkaProducerConfig

export type ConsumerGroupConfig = KafkaConsumerConfig &
Expand Down
23 changes: 21 additions & 2 deletions stubs/config/kafka.stub
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,30 @@
}}}
import env from '#start/env'

const kafkaSecurityProtocol = env.get('KAFKA_SECURITY_PROTOCOL', 'PLAIN')
const useSSL = kafkaSecurityProtocol === 'SSL'
const useSasl = env.get('KAFKA_SASL_MECHANISM') !== undefined

const saslOptions = useSasl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure we want this in the default stub. I’d opt for getting folks going fast in a dev env. But we could leave this commented out so folks know they have this path for a live env

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm, maybe. Idk.

? {
mechanism: env.get('KAFKA_SASL_MECHANISM')?.toLowerCase(),
username: env.get('KAFKA_SASL_USERNAME'),
password: env.get('KAFKA_SASL_PASSWORD'),
}
: undefined

const kafkaConfig = {
brokers: env.get('KAFKA_BROKERS', 'localhost:9092'),
clientId: env.get('KAFKA_CLIENT_ID'),
ssl: useSSL,
sasl: saslOptions,
clientId: env.get('KAFKA_CLIENT_ID', 'local'),
timeouts: {
connection: env.get('KAFKA_CONNECTION_TIMEOUT'),
authentication: env.get('KAFKA_AUTHENTICATION_TIMEOUT'),
reauthentication: env.get('KAFKA_REAUTHENTICATION_TIMEOUT'),
request: env.get('KAFKA_REQUEST_TIMEOUT'),
},
logLevel: env.get('KAFKA_LOG_LEVEL', env.get('LOG_LEVEL')),
}

export default kafkaConfig

72 changes: 72 additions & 0 deletions tests/kafka_env.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { test } from '@japa/runner'
import { KafkaEnv } from '../src/env/index.ts'

interface BrokersAssertion {
input: string
expected: string[]
error: boolean
}

test.group('KafkaEnv', () => {
const cases: BrokersAssertion[] = [
{ input: 'localhost:9092', error: false, expected: ['localhost:9092'] },
{
input:
'b-1.example.foobar.c19.kafka.us-east-1.amazonaws.com:9096,b-2.example.foobar.c19.kafka.us-east-1.amazonaws.com:9096',
error: false,
expected: [
'b-1.example.foobar.c19.kafka.us-east-1.amazonaws.com:9096',
'b-2.example.foobar.c19.kafka.us-east-1.amazonaws.com:9096',
],
},
{ input: '0.0.0.0:9092', error: false, expected: ['0.0.0.0:9092'] },
{
input: '172.17.0.2:9092,172.17.0.3:9092',
error: false,
expected: ['172.17.0.2:9092', '172.17.0.3:9092'],
},
{
input: 'localhost:9092,localhost:9093',
error: false,
expected: ['localhost:9092', 'localhost:9093'],
},
// missing port:
{ input: 'localhost', error: true, expected: [] },
{ input: '0.0.0.0', error: true, expected: [] },
// missing host
{ input: ':9092', error: true, expected: [] },
// missing port
{ input: 'localhost:', error: true, expected: [] },
// invalid port:
{ input: 'localhost:aaa', error: true, expected: [] },
// empty element:
{ input: 'localhost,,', error: true, expected: [] },
]

for (const testcase of cases) {
test(`schema.brokers with "${testcase.input}" should be ${testcase.error ? 'invalid' : 'valid'}`, async ({
assert,
}) => {
const key = 'KAFKA_BROKERS'
const validator = KafkaEnv.schema.brokers()
if (testcase.error) {
assert.throws(() => {
validator(key, testcase.input)
})
} else {
assert.doesNotThrow(() => {
try {
validator(key, testcase.input)
} catch (err) {
console.log(err)
throw err
}
})

const result = validator(key, testcase.input)

assert.sameMembers(result, testcase.expected)
}
})
}
})
Loading