Error: InvalidOperationException: Cannot add OnCommit action on a completed transaction context. #1169
Replies: 1 comment
-
The error must be because this part _ = Task.Run(() =>
{
var auditEvent = new AuditEvent
{
AuditEventCode = AuditEventCode.EmailSent,
EventData = System.Text.Json.JsonSerializer.Serialize(new
{
// Our data
})
};
After calling this line we get the issue.
_auditor.AuditEvent(auditEvent);
}); kicks off an asynchronous task which is NOT awaited. This means that the execution of it may continue beyond the execution of the Please either _ = Task.Run(() => {
using var scope = new RebusTransactionScope();
// queue up stuff to do
await bus.Send(...);
await bus.Send(...);
await bus.Send(...);
// do it
await scope.CompleteAsync();
}); or use a transaction scope suppressor to perform the bus operations immediately: _ = Task.Run(() => {
using var _ = new RebusTransactionScopeSuppressor();
// do stuff
await bus.Send(...);
await bus.Send(...);
await bus.Send(...);
}); |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Error: InvalidOperationException: Cannot add OnCommit action on a completed transaction context.
Rebus.Exceptions.RebusApplicationException: 'Could not 'GetOrAdd' item with key 'outgoing-messages' as type System.Collections.Concurrent.ConcurrentQueue`1[Rebus.Transport.OutgoingTransportMessage]'
We are getting the above error when running the following piece of code.
await retryPolicy.ExecuteAsync(async () =>
{
if (messageType == MessageType.Direct)
//This is the line that's executed and where the error is thrown.
await _messageBus.Send(message);
else
await _messageBus.Publish(message);
});
A bit of context:
We use Rabbit MQ to communicate between our different microservices.
We host our applications in both Azure and AKS
We have an NVC project which fires off a direct message to our Mail Microservice to send out emails. (This works perfectly)
After each mail is sent we audit the result but putting an direct audit message on the message queue.(This is where the problem is and causes the error seen above.)
MVC configuration (The application that puts the direct email message on the queue)
var messageQueuesConfig = configuration.GetRequiredSection("MessageQueues");
var transport = messageQueuesConfig.GetValue("Transport");
var hostName = messageQueuesConfig.GetValue("HostName");
var userName = messageQueuesConfig.GetValue("UserName");
var password = messageQueuesConfig.GetValue("Password");
var queuePrefix = messageQueuesConfig.GetValue("QueuePrefix");
var connectionString = MessageBusConnectionStringFactory.GetConnectionString(transport, hostName, userName, password);
services.AddRebus((config, provider) => config
.Transport(t => t.UseRabbitMq(connectionString, MessageBusQueueNameFactory.GetQueueName(MessageBusQueueIdentifier.Web, queuePrefix, MessageBusQueueType.PubSub)))
.Options(o => o.RetryStrategy(secondLevelRetriesEnabled: true))
.Routing(r => r.TypeBased()
.Map(MessageBusQueueNameFactory.GetQueueName(MessageBusQueueIdentifier.Audit, queuePrefix, MessageBusQueueType.Direct))
.Map(MessageBusQueueNameFactory.GetQueueName(MessageBusQueueIdentifier.Mail, queuePrefix, MessageBusQueueType.Direct))
),
onCreated: async bus =>
{
await bus.Subscribe();
}
);
Mail Microservice configuration (The hosted service that picks up the direct email message from the queue)
var messageQueuesConfig = configuration.GetRequiredSection("MessageQueues");
var transport = messageQueuesConfig.GetValue("Transport");
var hostName = messageQueuesConfig.GetValue("HostName");
var userName = messageQueuesConfig.GetValue("UserName");
var password = messageQueuesConfig.GetValue("Password");
var queuePrefix = messageQueuesConfig.GetValue("QueuePrefix");
var connectionString = MessageBusConnectionStringFactory.GetConnectionString(transport, hostName, userName, password);
services.AddRebus((config, provider) => config
.Transport(t => t.UseRabbitMq(connectionString, MessageBusQueueNameFactory.GetQueueName(MessageBusQueueIdentifier.Mail, queuePrefix, MessageBusQueueType.Direct)))
.Options(o => o.RetryStrategy(secondLevelRetriesEnabled: true))
.Routing(r => r.TypeBased()
.Map(MessageBusQueueNameFactory.GetQueueName(MessageBusQueueIdentifier.Audit, queuePrefix, MessageBusQueueType.Direct))
),
onCreated: async bus =>
{
await bus.Subscribe();
}
);
Here is the code that gets the direct message from the queue and processes the mail. Finally a new direct message is put on the queue to the Audit Microservice
public void AuditEvent(AuditEvent auditEvent)
{
ArgumentNullException.ThrowIfNull(auditEvent);
if (auditEvent.AuditEventCode == AuditEventCode.NotSet)
throw new InvalidOperationException("AuditEventCode must be set before calling this method.");
_messageBus.SendDirectMessage(auditEvent);
}
It's a puzzler as we can't solve the problem.
Can you someone help or point us in the right direction?
Beta Was this translation helpful? Give feedback.
All reactions