Skip to content

Commit

Permalink
Merge pull request #23 from neighbourhoodie/chore/rename-consumer-to-…
Browse files Browse the repository at this point in the history
…consumer-group

Rename Consumer to ConsumerGroup
  • Loading branch information
janl authored Jun 4, 2024
2 parents 59ed985 + c418092 commit a73f421
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 67 deletions.
2 changes: 1 addition & 1 deletion index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@

export { configure } from './configure.ts'
export { Kafka } from './src/index.ts'
export { Consumer } from './src/consumer.ts'
export { ConsumerGroup } from './src/consumer_group.ts'
export { Producer } from './src/producer.ts'
export { defineConfig } from './src/define_config.ts'
14 changes: 4 additions & 10 deletions providers/kafka_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,18 @@ export default class KafkaProvider implements ContainerProviderContract {

async boot() {
const kafka = await this.app.container.make('kafka')
await kafka.start()
await kafka.boot()
}

// Has to be ready to make use of preloads:
async ready() {
const kafka = await this.app.container.make('kafka')

for (const producer in kafka.producers) {
await kafka.producers[producer].start()
}

for (const consumer of kafka.consumers) {
await consumer.start()
}
await kafka.startProducers()
await kafka.startConsumerGroups()
}

async shutdown() {
const kafka = await this.app.container.make('kafka')
await kafka.disconnect()
await kafka.stop()
}
}
2 changes: 1 addition & 1 deletion src/consumer.ts → src/consumer_group.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type {
ConsumerCommitCallback,
} from './types.ts'

export class Consumer {
export class ConsumerGroup {
config: ConsumerGroupConfig
topics: string[]
events: Record<string, ConsumerCallback[]>
Expand Down
34 changes: 19 additions & 15 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ import type { Logger } from '@adonisjs/core/logger'
import { ApplicationService, KafkaConfig, KafkaContract } from '@adonisjs/core/types'

import type { ProducerConfig, ConsumerGroupConfig } from './types.ts'
import { Consumer } from './consumer.ts'
import { ConsumerGroup } from './consumer_group.ts'
import { Producer } from './producer.ts'
import { defineConfig } from './define_config.ts'
import { type KafkaLogLevel, toAdonisLoggerLevel, toKafkaLogLevel } from './logging.ts'

export class Kafka implements KafkaContract {
protected application!: ApplicationService

#consumers: Consumer[]
#consumerGroups: ConsumerGroup[]
#producers: {
[key: string]: Producer
}
Expand All @@ -23,11 +23,11 @@ export class Kafka implements KafkaContract {
constructor(config: KafkaConfig, logger: Logger) {
this.#config = defineConfig(config)
this.#logger = logger.child({ module: 'kafka' })
this.#consumers = []
this.#consumerGroups = []
this.#producers = {}
}

async start() {
async boot() {
this.createKafka()
}

Expand All @@ -42,21 +42,25 @@ export class Kafka implements KafkaContract {
return producer
}

createConsumer(config: ConsumerGroupConfig) {
createConsumerGroup(config: ConsumerGroupConfig) {
// TODO: Assert that consumers have different groupId's
const consumer = new Consumer(this.#kafka, config)
const consumerGroup = new ConsumerGroup(this.#kafka, config)

this.#consumers.push(consumer)
this.#consumerGroups.push(consumerGroup)

return consumer
return consumerGroup
}

get producers() {
return this.#producers
async startConsumerGroups() {
for (const consumerGroup of this.#consumerGroups) {
await consumerGroup.start()
}
}

get consumers() {
return this.#consumers
async startProducers() {
for (const producer in this.#producers) {
await this.#producers[producer].start()
}
}

private getBrokers() {
Expand Down Expand Up @@ -94,9 +98,9 @@ export class Kafka implements KafkaContract {
})
}

async disconnect() {
for await (let consumer of this.#consumers) {
await consumer.stop()
async stop() {
for await (let consumerGroup of this.#consumerGroups) {
await consumerGroup.stop()
}

for (let producer in this.#producers) {
Expand Down
11 changes: 7 additions & 4 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type {
import type tls from 'node:tls'

import type { Level } from '@adonisjs/logger/types'
import type { Consumer } from './consumer.ts'
import type { ConsumerGroup } from './consumer_group.ts'
import type { Producer } from './producer.ts'

import { Kafka } from './index.ts'
Expand Down Expand Up @@ -54,10 +54,13 @@ declare module '@adonisjs/core/types' {
}

export interface KafkaContract {
start: (...args: any[]) => void
disconnect: () => void
boot(...args: any[]): void
startConsumerGroups(): Promise<void>
startProducers(): Promise<void>
stop(): Promise<void>

createProducer(name: string, config?: ProducerConfig): Producer
createConsumer(config: ConsumerGroupConfig): Consumer
createConsumerGroup(config: ConsumerGroupConfig): ConsumerGroup
}
}

Expand Down
42 changes: 21 additions & 21 deletions tests/consumer.spec.ts → tests/consumer_group.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { test } from '@japa/runner'
import * as sinon from 'sinon'

import { Consumer } from '../src/consumer.ts'
import { ConsumerGroup } from '../src/consumer_group.ts'
import { Kafka as Kafkajs } from 'kafkajs'
process.env['KAFKAJS_NO_PARTITIONER_WARNING'] = '1'

Expand All @@ -15,7 +15,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})
const consumer = sinon.spy(kafkajs, 'consumer')
new Consumer(kafkajs, { groupId: 'test' })
new ConsumerGroup(kafkajs, { groupId: 'test' })
assert.isTrue(consumer.called)
})

Expand All @@ -24,7 +24,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test' })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test' })
const connect = sinon.replace(consumer.consumer, 'connect', sinon.fake())
const run = sinon.replace(consumer.consumer, 'run', sinon.fake())
await consumer.start()
Expand All @@ -40,7 +40,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test' })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test' })
const handler = sinon.spy()
consumer.onError('test', handler)
const error = new Error('test')
Expand All @@ -56,7 +56,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test' })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test' })

const handler1 = sinon.spy()
consumer.onError('topic-1', handler1)
Expand Down Expand Up @@ -106,7 +106,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test', autoCommit: false })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test', autoCommit: false })
const eachMessage = sinon.spy(consumer, 'eachMessage')

const message = {
Expand Down Expand Up @@ -138,7 +138,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test', autoCommit: true })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test', autoCommit: true })
sinon.replace(consumer.consumer, 'commitOffsets', sinon.spy())
const callback = sinon.stub().callsArg(1)
consumer.events['test'] = [callback]
Expand Down Expand Up @@ -167,7 +167,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test' })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test' })
const callback = sinon.stub().callsArg(1)
consumer.events['test'] = [callback]

Expand Down Expand Up @@ -196,7 +196,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test' })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test' })
const callback = sinon.stub().callsArg(1)
consumer.events['test'] = [callback]
const handleError = sinon.replace(consumer, 'handleError', sinon.fake())
Expand Down Expand Up @@ -230,7 +230,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test', autoCommit: false })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test', autoCommit: false })
const commitOffset = sinon.replace(consumer.consumer, 'commitOffsets', sinon.spy())
const callback = sinon.stub().callsArgWith(1, true)
consumer.events['test'] = [callback]
Expand Down Expand Up @@ -268,7 +268,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test', autoCommit: false })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test', autoCommit: false })
const subscribe = sinon.replace(consumer.consumer, 'subscribe', sinon.spy())
const commitOffset = sinon.replace(consumer.consumer, 'commitOffsets', sinon.spy())
// Note: we are not calling the commit() function here:
Expand Down Expand Up @@ -308,7 +308,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test', autoCommit: true })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test', autoCommit: true })
const subscribe = sinon.replace(consumer.consumer, 'subscribe', sinon.spy())
const commitOffset = sinon.replace(consumer.consumer, 'commitOffsets', sinon.spy())
// Note: we are not calling the commit() function here:
Expand Down Expand Up @@ -348,7 +348,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test' })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test' })
sinon.replace(consumer.consumer, 'commitOffsets', sinon.spy())
const callback = sinon.stub().callsFake(async function (_result, commit, heartbeat, pause) {
await heartbeat()
Expand Down Expand Up @@ -377,7 +377,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test' })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test' })
const wrongCallback = 123
assert.rejects(
async () =>
Expand All @@ -397,7 +397,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test' })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test' })
const subscribe = sinon.replace(consumer.consumer, 'subscribe', sinon.spy())
const callback = sinon.spy()
assert.doesNotReject(
Expand Down Expand Up @@ -426,7 +426,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test' })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test' })
const subscribe = sinon.replace(consumer.consumer, 'subscribe', sinon.spy())
const callback = sinon.spy()
assert.doesNotReject(
Expand Down Expand Up @@ -454,7 +454,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test' })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test' })
const subscribe = sinon.replace(consumer.consumer, 'subscribe', sinon.spy())
const callback = sinon.spy()
assert.doesNotReject(
Expand All @@ -481,7 +481,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test' })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test' })
const subscribe = sinon.replace(consumer.consumer, 'subscribe', sinon.spy())
const callback = sinon.spy()
assert.doesNotReject(
Expand Down Expand Up @@ -509,7 +509,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test' })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test' })
const subscribe = sinon.replace(consumer.consumer, 'subscribe', sinon.spy())
const callback = sinon.spy()
assert.doesNotReject(
Expand Down Expand Up @@ -537,7 +537,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test' })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test' })
const subscribe = sinon.replace(consumer.consumer, 'subscribe', sinon.spy())
const callback = sinon.spy()
assert.doesNotReject(
Expand Down Expand Up @@ -565,7 +565,7 @@ test.group('Kafka Consumer', (group) => {
brokers: ['asd'],
})

const consumer = new Consumer(kafkajs, { groupId: 'test' })
const consumer = new ConsumerGroup(kafkajs, { groupId: 'test' })
const subscribe = sinon.replace(consumer.consumer, 'subscribe', sinon.spy())
const handleError = sinon.replace(consumer, 'handleError', sinon.fake())
const callback = sinon.stub().throws()
Expand Down
Loading

0 comments on commit a73f421

Please sign in to comment.