Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(microservices): add support for topic exchange (rabbitmq) #14540

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
38 changes: 38 additions & 0 deletions integration/microservices/e2e/topic-exchange-rmq.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { INestApplication } from '@nestjs/common';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import * as request from 'supertest';
import { RMQTopicExchangeController } from '../src/rmq/topic-exchange-rmq.controller';

describe('RabbitMQ transport (Topic Exchange - wildcards)', () => {
let server: any;
let app: INestApplication;

beforeEach(async () => {
const module = await Test.createTestingModule({
controllers: [RMQTopicExchangeController],
}).compile();

app = module.createNestApplication();
server = app.getHttpAdapter().getInstance();

app.connectMicroservice<MicroserviceOptions>({
transport: Transport.RMQ,
options: {
urls: [`amqp://0.0.0.0:5672`],
queue: 'test2',
wildcards: true,
},
});
await app.startAllMicroservices();
await app.init();
});

it(`should send message to wildcard topic exchange`, () => {
return request(server).get('/topic-exchange').expect(200, 'wildcard.a.b');
});

afterEach(async () => {
await app.close();
});
});
36 changes: 36 additions & 0 deletions integration/microservices/src/rmq/topic-exchange-rmq.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { Controller, Get } from '@nestjs/common';
import {
ClientProxy,
ClientProxyFactory,
Ctx,
MessagePattern,
RmqContext,
Transport,
} from '@nestjs/microservices';
import { lastValueFrom } from 'rxjs';

@Controller()
export class RMQTopicExchangeController {
client: ClientProxy;

constructor() {
this.client = ClientProxyFactory.create({
transport: Transport.RMQ,
options: {
urls: [`amqp://localhost:5672`],
queue: 'test2',
wildcards: true,
},
});
}

@Get('topic-exchange')
async topicExchange() {
return lastValueFrom(this.client.send<string>('wildcard.a.b', 1));
}

@MessagePattern('wildcard.*.*')
handleTopicExchange(@Ctx() ctx: RmqContext): string {
return ctx.getPattern();
}
}
133 changes: 94 additions & 39 deletions packages/microservices/client/client-rmq.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/* eslint-disable @typescript-eslint/no-redundant-type-constituents */
import { Logger } from '@nestjs/common/services/logger.service';
import { loadPackage } from '@nestjs/common/utils/load-package.util';
import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util';
import { isFunction } from '@nestjs/common/utils/shared.utils';
import { isFunction, isString } from '@nestjs/common/utils/shared.utils';
import { EventEmitter } from 'events';
import {
EmptyError,
Expand Down Expand Up @@ -55,8 +56,8 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
protected readonly logger = new Logger(ClientProxy.name);
protected connection$: ReplaySubject<any>;
protected connectionPromise: Promise<void>;
protected client: AmqpConnectionManager = null;
protected channel: ChannelWrapper = null;
protected client: AmqpConnectionManager | null = null;
protected channel: ChannelWrapper | null = null;
protected pendingEventListeners: Array<{
event: keyof RmqEvents;
callback: RmqEvents[keyof RmqEvents];
Expand Down Expand Up @@ -113,7 +114,7 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
this.registerDisconnectListener(this.client);
this.registerConnectListener(this.client);
this.pendingEventListeners.forEach(({ event, callback }) =>
this.client.on(event, callback),
this.client!.on(event, callback),
);
this.pendingEventListeners = [];

Expand All @@ -140,7 +141,7 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {

public createChannel(): Promise<void> {
return new Promise(resolve => {
this.channel = this.client.createChannel({
this.channel = this.client!.createChannel({
json: false,
setup: (channel: Channel) => this.setupChannel(channel, resolve),
});
Expand Down Expand Up @@ -215,6 +216,22 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
);
}

if (this.options.wildcards) {
const exchange = this.getOptionsProp(
this.options,
'exchange',
this.options.queue,
);
const exchangeType = this.getOptionsProp(
this.options,
'exchangeType',
'topic',
);
await channel.assertExchange(exchange, exchangeType, {
durable: true,
});
}

await channel.prefetch(prefetchCount, isGlobalPrefetchCount);
await this.consumeChannel(channel);
resolve();
Expand All @@ -224,8 +241,8 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);
await channel.consume(
this.replyQueue,
(msg: ConsumeMessage) =>
this.responseEmitter.emit(msg.properties.correlationId, msg),
(msg: ConsumeMessage | null) =>
this.responseEmitter.emit(msg!.properties.correlationId, msg),
{
noAck,
},
Expand Down Expand Up @@ -359,23 +376,44 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
delete serializedPacket.options;

this.responseEmitter.on(correlationId, listener);
this.channel
.sendToQueue(

const content = Buffer.from(JSON.stringify(serializedPacket));
const sendOptions = {
replyTo: this.replyQueue,
persistent: this.getOptionsProp(
this.options,
'persistent',
RQM_DEFAULT_PERSISTENT,
),
...options,
headers: this.mergeHeaders(options?.headers),
correlationId,
};

if (this.options.wildcards) {
const stringifiedPattern = isString(message.pattern)
? message.pattern
: JSON.stringify(message.pattern);

// The exchange is the same as the queue when wildcards are enabled
// and the exchange is not explicitly set
const exchange = this.getOptionsProp(
this.options,
'exchange',
this.queue,
Buffer.from(JSON.stringify(serializedPacket)),
{
replyTo: this.replyQueue,
persistent: this.getOptionsProp(
this.options,
'persistent',
RQM_DEFAULT_PERSISTENT,
),
...options,
headers: this.mergeHeaders(options?.headers),
correlationId,
},
)
.catch(err => callback({ err }));
);

this.channel!.publish(
exchange,
stringifiedPattern,
content,
sendOptions,
).catch(err => callback({ err }));
} else {
this.channel!.sendToQueue(this.queue, content, sendOptions).catch(err =>
callback({ err }),
);
}
return () => this.responseEmitter.removeListener(correlationId, listener);
} catch (err) {
callback({ err });
Expand All @@ -390,22 +428,39 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
const options = serializedPacket.options;
delete serializedPacket.options;

return new Promise<void>((resolve, reject) =>
this.channel.sendToQueue(
this.queue,
Buffer.from(JSON.stringify(serializedPacket)),
{
persistent: this.getOptionsProp(
this.options,
'persistent',
RQM_DEFAULT_PERSISTENT,
),
...options,
headers: this.mergeHeaders(options?.headers),
},
(err: unknown) => (err ? reject(err as Error) : resolve()),
),
);
return new Promise<void>((resolve, reject) => {
const content = Buffer.from(JSON.stringify(serializedPacket));
const sendOptions = {
persistent: this.getOptionsProp(
this.options,
'persistent',
RQM_DEFAULT_PERSISTENT,
),
...options,
headers: this.mergeHeaders(options?.headers),
};
const errorCallback = (err: unknown) =>
err ? reject(err as Error) : resolve();

return this.options.wildcards
? this.channel!.publish(
// The exchange is the same as the queue when wildcards are enabled
// and the exchange is not explicitly set
this.getOptionsProp(this.options, 'exchange', this.queue),
isString(packet.pattern)
? packet.pattern
: JSON.stringify(packet.pattern),
content,
sendOptions,
errorCallback,
)
: this.channel!.sendToQueue(
this.queue,
content,
sendOptions,
errorCallback,
);
});
}

protected initializeSerializer(options: RmqOptions['options']) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,22 +215,89 @@ export interface NatsOptions {
export interface RmqOptions {
transport?: Transport.RMQ;
options?: {
/**
* An array of connection URLs to try in order.
*/
urls?: string[] | RmqUrl[];
/**
* The name of the queue.
*/
queue?: string;
/**
* A prefetch count for this channel. The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement;
* once there are count messages outstanding, the server will not send more messages on this channel until one or more have been acknowledged.
*/
prefetchCount?: number;
/**
* Sets the per-channel behavior for prefetching messages.
*/
isGlobalPrefetchCount?: boolean;
/**
* Amqplib queue options.
* @see https://amqp-node.github.io/amqplib/channel_api.html#channel_assertQueue
*/
queueOptions?: AmqplibQueueOptions;
/**
* AMQP Connection Manager socket options.
*/
socketOptions?: AmqpConnectionManagerSocketOptions;
exchange?: string;
routingKey?: string;
/**
* Iif true, the broker won’t expect an acknowledgement of messages delivered to this consumer; i.e., it will dequeue messages as soon as they’ve been sent down the wire.
* @default false
*/
noAck?: boolean;
/**
* A name which the server will use to distinguish message deliveries for the consumer; mustn’t be already in use on the channel. It’s usually easier to omit this, in which case the server will create a random name and supply it in the reply.
*/
consumerTag?: string;
/**
* A serializer for the message payload.
*/
serializer?: Serializer;
/**
* A deserializer for the message payload.
*/
deserializer?: Deserializer;
/**
* A reply queue for the producer.
* @default 'amq.rabbitmq.reply-to'
*/
replyQueue?: string;
/**
* If truthy, the message will survive broker restarts provided it’s in a queue that also survives restarts.
*/
persistent?: boolean;
/**
* Additional headers to be sent with every message.
* Applies only to the producer configuration.
*/
headers?: Record<string, string>;
/**
* When false, a queue will not be asserted before consuming.
* @default false
*/
noAssert?: boolean;
/**
* Name for the exchange. Defaults to the queue name when "wildcards" is set to true.
* @default ''
*/
exchange?: string;
/**
* Type of the exchange
* @default 'topic'
*/
exchangeType?: 'direct' | 'fanout' | 'topic' | 'headers';
/**
* Additional routing key for the topic exchange.
*/
routingKey?: string;
/**
* Set to true only if you want to use Topic Exchange for routing messages to queues.
* Enabling this will allow you to use wildcards (*, #) as message and event patterns.
* @see https://www.rabbitmq.com/tutorials/tutorial-five-python#topic-exchange
* @default false
*/
wildcards?: boolean;
/**
* Maximum number of connection attempts.
* Applies only to the consumer configuration.
Expand Down
Loading
Loading