Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(microservices): add support for topic exchange (rabbitmq) #14540

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
38 changes: 38 additions & 0 deletions integration/microservices/e2e/topic-exchange-rmq.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { INestApplication } from '@nestjs/common';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import * as request from 'supertest';
import { RMQTopicExchangeController } from '../src/rmq/topic-exchange-rmq.controller';

describe('RabbitMQ transport (Topic Exchange)', () => {
let server: any;
let app: INestApplication;

beforeEach(async () => {
const module = await Test.createTestingModule({
controllers: [RMQTopicExchangeController],
}).compile();

app = module.createNestApplication();
server = app.getHttpAdapter().getInstance();

app.connectMicroservice<MicroserviceOptions>({
transport: Transport.RMQ,
options: {
urls: [`amqp://0.0.0.0:5672`],
queue: 'test',
topicExchange: 'test',
},
});
await app.startAllMicroservices();
await app.init();
});

it(`should send message to wildcard topic exchange`, () => {
return request(server).get('/topic-exchange').expect(200, 'wildcard.a.b');
});

afterEach(async () => {
await app.close();
});
});
36 changes: 36 additions & 0 deletions integration/microservices/src/rmq/topic-exchange-rmq.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { Controller, Get } from '@nestjs/common';
import {
ClientProxy,
ClientProxyFactory,
Ctx,
MessagePattern,
RmqContext,
Transport,
} from '@nestjs/microservices';
import { lastValueFrom } from 'rxjs';

@Controller()
export class RMQTopicExchangeController {
client: ClientProxy;

constructor() {
this.client = ClientProxyFactory.create({
transport: Transport.RMQ,
options: {
urls: [`amqp://localhost:5672`],
queue: 'test',
topicExchange: 'test',
},
});
}

@Get('topic-exchange')
async topicExchange() {
return lastValueFrom(this.client.send<string>('wildcard.a.b', 1));
}

@MessagePattern('wildcard.*.*')
handleTopicExchange(@Ctx() ctx: RmqContext): string {
return ctx.getPattern();
}
}
107 changes: 67 additions & 40 deletions packages/microservices/client/client-rmq.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Logger } from '@nestjs/common/services/logger.service';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util';
import { isFunction } from '@nestjs/common/utils/shared.utils';
import { isFunction, isString } from '@nestjs/common/utils/shared.utils';
import { EventEmitter } from 'events';
import {
EmptyError,
Expand Down Expand Up @@ -55,8 +55,8 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
protected readonly logger = new Logger(ClientProxy.name);
protected connection$: ReplaySubject<any>;
protected connectionPromise: Promise<void>;
protected client: AmqpConnectionManager = null;
protected channel: ChannelWrapper = null;
protected client: AmqpConnectionManager | null = null;
protected channel: ChannelWrapper | null = null;
protected pendingEventListeners: Array<{
event: keyof RmqEvents;
callback: RmqEvents[keyof RmqEvents];
Expand Down Expand Up @@ -113,7 +113,7 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
this.registerDisconnectListener(this.client);
this.registerConnectListener(this.client);
this.pendingEventListeners.forEach(({ event, callback }) =>
this.client.on(event, callback),
this.client!.on(event, callback),
);
this.pendingEventListeners = [];

Expand All @@ -140,7 +140,7 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {

public createChannel(): Promise<void> {
return new Promise(resolve => {
this.channel = this.client.createChannel({
this.channel = this.client!.createChannel({
json: false,
setup: (channel: Channel) => this.setupChannel(channel, resolve),
});
Expand Down Expand Up @@ -224,8 +224,8 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);
await channel.consume(
this.replyQueue,
(msg: ConsumeMessage) =>
this.responseEmitter.emit(msg.properties.correlationId, msg),
(msg: ConsumeMessage | null) =>
this.responseEmitter.emit(msg!.properties.correlationId, msg),
{
noAck,
},
Expand Down Expand Up @@ -359,23 +359,35 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
delete serializedPacket.options;

this.responseEmitter.on(correlationId, listener);
this.channel
.sendToQueue(
this.queue,
Buffer.from(JSON.stringify(serializedPacket)),
{
replyTo: this.replyQueue,
persistent: this.getOptionsProp(
this.options,
'persistent',
RQM_DEFAULT_PERSISTENT,
),
...options,
headers: this.mergeHeaders(options?.headers),
correlationId,
},
)
.catch(err => callback({ err }));

const content = Buffer.from(JSON.stringify(serializedPacket));
const sendOptions = {
replyTo: this.replyQueue,
persistent: this.getOptionsProp(
this.options,
'persistent',
RQM_DEFAULT_PERSISTENT,
),
...options,
headers: this.mergeHeaders(options?.headers),
correlationId,
};

if (this.options.topicExchange) {
const stringifiedPattern = isString(message.pattern)
? message.pattern
: JSON.stringify(message.pattern);
this.channel!.publish(
this.options.topicExchange,
stringifiedPattern,
content,
sendOptions,
).catch(err => callback({ err }));
} else {
this.channel!.sendToQueue(this.queue, content, sendOptions).catch(err =>
callback({ err }),
);
}
return () => this.responseEmitter.removeListener(correlationId, listener);
} catch (err) {
callback({ err });
Expand All @@ -390,22 +402,37 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
const options = serializedPacket.options;
delete serializedPacket.options;

return new Promise<void>((resolve, reject) =>
this.channel.sendToQueue(
this.queue,
Buffer.from(JSON.stringify(serializedPacket)),
{
persistent: this.getOptionsProp(
this.options,
'persistent',
RQM_DEFAULT_PERSISTENT,
),
...options,
headers: this.mergeHeaders(options?.headers),
},
(err: unknown) => (err ? reject(err as Error) : resolve()),
),
);
return new Promise<void>((resolve, reject) => {
const content = Buffer.from(JSON.stringify(serializedPacket));
const sendOptions = {
persistent: this.getOptionsProp(
this.options,
'persistent',
RQM_DEFAULT_PERSISTENT,
),
...options,
headers: this.mergeHeaders(options?.headers),
};
const errorCallback = (err: unknown) =>
err ? reject(err as Error) : resolve();

return this.options.topicExchange
? this.channel!.publish(
this.options.topicExchange,
isString(packet.pattern)
? packet.pattern
: JSON.stringify(packet.pattern),
content,
sendOptions,
errorCallback,
)
: this.channel!.sendToQueue(
this.queue,
content,
sendOptions,
errorCallback,
);
});
}

protected initializeSerializer(options: RmqOptions['options']) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,13 @@ export interface RmqOptions {
persistent?: boolean;
headers?: Record<string, string>;
noAssert?: boolean;
/**
* Set only if you want to use Topic Exchange for routing messages to queues.
* Enabling this will allow you to use wildcards (*, #) as message and event patterns.
* Topic exchange can have any arbitrary name, but it should be the same for the producer (client) and consumer (server).
* @see https://www.rabbitmq.com/tutorials/tutorial-five-python#topic-exchange
*/
topicExchange?: string;
/**
* Maximum number of connection attempts.
* Applies only to the consumer configuration.
Expand Down
Loading
Loading