Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve env validation and definition #19

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions configure.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,20 @@ export async function configure(command: ConfigureCommand) {
await codemods.defineEnvValidations({
variables: {
KAFKA_BROKERS: `KafkaEnv.schema.brokers()`,

KAFKA_CLIENT_ID: `Env.schema.string.optional()`,
KAFKA_GROUP_ID: `Env.schema.string.optional()`,
KAFKA_LOG_LEVEL: `Env.schema.enum.optional(['fatal', 'error', 'warn', 'info', 'debug', 'trace'])`,

KAFKA_CONNECTION_TIMEOUT: `Env.schema.number.optional()`,
KAFKA_REQUEST_TIMEOUT: `Env.schema.number.optional()`,
KAFKA_LOG_LEVEL: `Env.schema.enum.optional(['fatal', 'error', 'warn', 'info', 'debug', 'trace'])`,
KAFKA_AUTHENTICATION_TIMEOUT: `Env.schema.number.optional()`,
KAFKA_REAUTHENTICATION_TIMEOUT: `Env.schema.number.optional()`,

// We don't yet support AWS IAM or OAuth Bearer or custom mechanisms
KAFKA_SECURITY_PROTOCOL: `Env.schema.enum.optional(['PLAIN', 'SSL'])`,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may want:

KAFKA_SECURITY_PROTOCOL: the security protocol the hashing service should use when communicating with
the Kafka cluster. Can be set to ssl, plaintext, or sasl_plaintext and defaults to plaintext.

KAFKA_SASL_MECHANISM: `Env.schema.enum.optional(['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'])`,
KAFKA_SASL_USERNAME: `Env.schema.string.optional()`,
KAFKA_SASL_PASSWORD: `Env.schema.string.optional()`,
},
leadingComment: 'Variables for configuring kafka package',
})
Expand Down
20 changes: 19 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import type {
ConsumerRunConfig as KafkaConsumerRunConfig,
Message as KafkaMessage,
EachMessagePayload as KafkaEachMessagePayload,
SASLOptions,
SASLOptions as KafkaSASLOptions,
OauthbearerProviderResponse as KafkaOauthbearerProviderResponse,
} from 'kafkajs'

import type tls from 'node:tls'
Expand All @@ -15,6 +16,23 @@ import type { Producer } from './producer.ts'

import { Kafka } from './index.ts'

// AWS mechanism isn't currently well supported:
type SASLAWSOption = { mechanism: 'aws' } & {
authorizationIdentity: string
accessKeyId: string
secretAccessKey: string
sessionToken?: string
}

// OAuth Bearer mechanism isn't currently well supported:
type SASLOAuthOption = {
mechanism: 'oauthbearer'
} & {
oauthBearerProvider: () => Promise<KafkaOauthbearerProviderResponse>
}

type SASLOptions = Exclude<KafkaSASLOptions, SASLAWSOption | SASLOAuthOption>

export type ProducerConfig = KafkaProducerConfig

export type ConsumerGroupConfig = KafkaConsumerConfig &
Expand Down
23 changes: 21 additions & 2 deletions stubs/config/kafka.stub
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,30 @@
}}}
import env from '#start/env'

const kafkaSecurityProtocol = env.get('KAFKA_SECURITY_PROTOCOL', 'PLAIN')
const useSSL = kafkaSecurityProtocol === 'SSL'
const useSasl = env.get('KAFKA_SASL_MECHANISM') !== undefined

const saslOptions = useSasl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure we want this in the default stub. I’d opt for getting folks going fast in a dev env. But we could leave this commented out so folks know they have this path for a live env

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm, maybe. Idk.

? {
mechanism: env.get('KAFKA_SASL_MECHANISM')?.toLowerCase(),
username: env.get('KAFKA_SASL_USERNAME'),
password: env.get('KAFKA_SASL_PASSWORD'),
}
: undefined

const kafkaConfig = {
brokers: env.get('KAFKA_BROKERS', 'localhost:9092'),
clientId: env.get('KAFKA_CLIENT_ID'),
ssl: useSSL,
sasl: saslOptions,
clientId: env.get('KAFKA_CLIENT_ID', 'local'),
timeouts: {
connection: env.get('KAFKA_CONNECTION_TIMEOUT'),
authentication: env.get('KAFKA_AUTHENTICATION_TIMEOUT'),
reauthentication: env.get('KAFKA_REAUTHENTICATION_TIMEOUT'),
request: env.get('KAFKA_REQUEST_TIMEOUT'),
},
logLevel: env.get('KAFKA_LOG_LEVEL', env.get('LOG_LEVEL')),
}

export default kafkaConfig

Loading