-
Hi, I'm creating following pipeline which consists of a ConcurrencyLimiter and a RateLimiter: var limiter = PartitionedRateLimiter.Create<ResilienceContext, string>(context =>
{
string partitionKey = GetPartitionKey(context);
return RateLimitPartition.GetTokenBucketLimiter(
partitionKey,
key => options.Value.RateLimiterOptions);
});
return new ResiliencePipelineBuilder()
.AddConcurrencyLimiter(100,10)
.AddRateLimiter(new RateLimiterStrategyOptions
{
RateLimiter = args => limiter.AcquireAsync(args.Context, 1, args.Context.CancellationToken),
OnRejected = x => OnRejected(x,logger),
Name="RateLimiter"
}).Build();
static string GetPartitionKey(ResilienceContext context)
=> context.Properties.GetValue(UserProperty, "Default"); When the pipeline rejects a request by throwing a Is there a way to know which limiter caused the rejection? |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 3 replies
-
Found a solution: In the Example: private static readonly ResiliencePropertyKey<string> UserProperty = new("UserName");
private static readonly ResiliencePropertyKey<bool> RateLimitHitProperty = new("RateLimitHit");
private readonly ResiliencePipeline _resiliencePipeline = CreateResiliencePipeline(options);
private static ResiliencePipeline CreateResiliencePipeline(IOptions<MyOptions> options, ILogger logger)
{
var limiter = PartitionedRateLimiter.Create<ResilienceContext, string>(context =>
{
string partitionKey = GetPartitionKey(context);
return RateLimitPartition.GetTokenBucketLimiter(
partitionKey,
key => options.Value.RateLimiterOptions);
});
return new ResiliencePipelineBuilder()
.AddConcurrencyLimiter(options.Value.ConcurrencyLimiterOptions)
.AddRateLimiter(new RateLimiterStrategyOptions
{
RateLimiter = args => limiter.AcquireAsync(args.Context, 1, args.Context.CancellationToken),
OnRejected = OnRejected,
Name="RateLimiter"
}).Build();
static string GetPartitionKey(ResilienceContext context)
=> context.Properties.GetValue(UserProperty, "Default");
}
private static ValueTask OnRejected(OnRateLimiterRejectedArguments arguments)
{
//Set property to indicate that the RateLimit has been hit
arguments.Context.Properties.Set(RateLimitHitProperty, true);
return ValueTask.CompletedTask;
}
public async Task Handle(Request req)
{
var resilienceContext = ResilienceContextPool.Shared.Get(CancellationToken.None);
resilienceContext.Properties.Set(UserProperty, request.User!);
try
{
var outcome = await _resiliencePipeline.ExecuteOutcomeAsync(static async (ctx, state) =>
{
try
{
await HandleRequest(req); //Code for HandleRequest omitted for brevity
return Outcome.FromResult<int>(0);
}
catch (Exception ex)
{
return Outcome.FromException<Unit>(ex);
}
}, resilienceContext, string.Empty);
if (outcome.Exception is RateLimiterRejectedException rex)
{
if (resilienceContext.Properties.TryGetValue(RateLimitHitProperty, out var _))
await req.SendErrorReply(-1, $"Rate limit exceeded, retry after: {rex.RetryAfter}.");
else
await req.SendErrorReply(-1, $"Service is busy, try again later.");
}
}
finally
{
ResilienceContextPool.Shared.Return(resilienceContext);
}
}
|
Beta Was this translation helpful? Give feedback.
-
Can you please elaborate why do you need both kind of limiters in the same pipeline? |
Beta Was this translation helpful? Give feedback.
Found a solution:
In the
OnRejected
callback, set a ResilienceContext property indicating the rate limit has been hit. If the pipeline throws aRateLimiterRejectedException
check the ResilienceContext if the property exists. If it exists, the rate limit has been hit, otherwise the concurrency limit has been hit.Example: