Skip to content

Commit

Permalink
🔖 1.24.10221.11803
Browse files Browse the repository at this point in the history
  • Loading branch information
AigioL committed Feb 21, 2024
1 parent 8704705 commit 4fd4056
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
// https://github.com/dotnet/runtime/blob/v7.0.3/src/libraries/Microsoft.Extensions.Http/src/DefaultHttpClientFactory.cs

using Fusillade;
using Punchclock;
using Splat;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

#if ANDROID
using HttpHandlerType = Xamarin.Android.Net.AndroidMessageHandler;
#elif IOS || MACCATALYST
Expand Down Expand Up @@ -30,6 +34,10 @@ public FusilladeHttpClientFactory(HttpMessageHandler handler)
{
this.handler = handler;
Locator.CurrentMutable.RegisterConstant(handler, typeof(HttpMessageHandler));
OperationQueue operationQueue = new(12);
NetCache.Speculative = new RateLimitedHttpMessageHandler2(handler, Priority.Speculative, 0, 5242880L, opQueue: operationQueue);
NetCache.UserInitiated = new RateLimitedHttpMessageHandler2(handler, Priority.UserInitiated, opQueue: operationQueue);
NetCache.Background = new RateLimitedHttpMessageHandler2(handler, Priority.Background, opQueue: operationQueue);
}

public static HttpMessageHandler CreateHandler()
Expand Down Expand Up @@ -179,3 +187,229 @@ public void Dispose()
GC.SuppressFinalize(this);
}
}

sealed class InflightRequest(Action onFullyCancelled)
{
int _refCount = 1;

public AsyncSubject<HttpResponseMessage> Response { get; protected set; }
= new AsyncSubject<HttpResponseMessage>();

public void AddRef() => Interlocked.Increment(ref _refCount);

public void Cancel()
{
if (Interlocked.Decrement(ref _refCount) <= 0)
{
onFullyCancelled();
}
}
}

/// <summary>
/// A http handler which will limit the rate at which we can read.
/// </summary>
/// <remarks>
/// Initializes a new instance of the <see cref="RateLimitedHttpMessageHandler2"/> class.
/// </remarks>
/// <param name="handler">The handler we are wrapping.</param>
/// <param name="basePriority">The base priority of the request.</param>
/// <param name="priority">The priority of the request.</param>
/// <param name="maxBytesToRead">The maximum number of bytes we can read.</param>
/// <param name="opQueue">The operation queue on which to run the operation.</param>
/// <param name="cacheResultFunc">A method that is called if we need to get cached results.</param>
sealed class RateLimitedHttpMessageHandler2(HttpMessageHandler handler, Priority basePriority, int priority = 0, long? maxBytesToRead = null, OperationQueue? opQueue = null, Func<HttpRequestMessage, HttpResponseMessage, string, CancellationToken, Task>? cacheResultFunc = null) : LimitingHttpMessageHandler(handler)
{
readonly int _priority = (int)basePriority + priority;
readonly Dictionary<string, InflightRequest> _inflightResponses = new();
long? _maxBytesToRead = maxBytesToRead;

/// <summary>
/// Generates a unique key for a <see cref="HttpRequestMessage"/>.
/// This assists with the caching.
/// </summary>
/// <param name="originalRequestUri"></param>
/// <param name="request">The request to generate a unique key for.</param>
/// <returns>The unique key.</returns>
public static string UniqueKeyForRequest(
string originalRequestUri,
HttpRequestMessage request)
{
// https://github.com/reactiveui/Fusillade/blob/2.4.67/src/Fusillade/RateLimitedHttpMessageHandler.cs#L54-L89

using var s = new MemoryStream();
s.Write(Encoding.UTF8.GetBytes(originalRequestUri));
s.Write("\r\n"u8);
s.Write(Encoding.UTF8.GetBytes(request.Method.Method));
s.Write("\r\n"u8);
static void Write(Stream s, IEnumerable<object> items)
{
foreach (var item in items)
{
var str = item.ToString();
if (!string.IsNullOrEmpty(str))
s.Write(Encoding.UTF8.GetBytes(str));
s.Write("|"u8);
}
}
Write(s, request.Headers.Accept);
s.Write("\r\n"u8);
Write(s, request.Headers.AcceptEncoding);
s.Write("\r\n"u8);
var referrer = request.Headers.Referrer;
if (referrer == default)
s.Write("http://example"u8);
else
s.Write(Encoding.UTF8.GetBytes(referrer.ToString()));
s.Write("\r\n"u8);
Write(s, request.Headers.UserAgent);
s.Write("\r\n"u8);
if (request.Headers.Authorization != null)
{
var parameter = request.Headers.Authorization.Parameter;
if (!string.IsNullOrEmpty(parameter))
s.Write(Encoding.UTF8.GetBytes(parameter));
s.Write(Encoding.UTF8.GetBytes(request.Headers.Authorization.Scheme));
s.Write("\r\n"u8);
}
s.Position = 0;
var bytes = SHA384.HashData(s);
var str = bytes.ToHexString();
return str;
}

/// <summary>
/// Generates a unique key for a <see cref="HttpRequestMessage"/>.
/// This assists with the caching.
/// </summary>
/// <param name="request">The request to generate a unique key for.</param>
/// <returns>The unique key.</returns>
public static string UniqueKeyForRequest(HttpRequestMessage request)
{
var requestUriString = ImageHttpClientService.ImageHttpRequestMessage.GetOriginalRequestUri(request);
return UniqueKeyForRequest(requestUriString, request);
}

/// <inheritdoc />
public override void ResetLimit(long? maxBytesToRead = null) => _maxBytesToRead = maxBytesToRead;

/// <inheritdoc />
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
if (request is null)
{
throw new ArgumentNullException(nameof(request));
}

var method = request.Method;
if (method != HttpMethod.Get && method != HttpMethod.Head && method != HttpMethod.Options)
{
return base.SendAsync(request, cancellationToken);
}

var cacheResult = cacheResultFunc;
if (cacheResult == null && NetCache.RequestCache != null)
{
cacheResult = NetCache.RequestCache.Save;
}

if (_maxBytesToRead < 0)
{
var tcs = new TaskCompletionSource<HttpResponseMessage>();
#if NETSTANDARD2_0
tcs.SetCanceled();
#else
tcs.SetCanceled(cancellationToken);
#endif
return tcs.Task;
}

var key = UniqueKeyForRequest(request);
var realToken = new CancellationTokenSource();
var ret = new InflightRequest(() =>
{
lock (_inflightResponses)
{
_inflightResponses.Remove(key);
}
realToken.Cancel();
});

lock (_inflightResponses)
{
if (_inflightResponses.TryGetValue(key, out var value))
{
var val = value;
val.AddRef();
cancellationToken.Register(val.Cancel);

return val.Response.ToTask(cancellationToken);
}

_inflightResponses[key] = ret;
}

cancellationToken.Register(ret.Cancel);

var queue = new OperationQueue();

queue.Enqueue(
_priority,
null!,
async () =>
{
try
{
var resp = await base.SendAsync(request, realToken.Token).ConfigureAwait(false);
if (_maxBytesToRead != null && resp.Content?.Headers.ContentLength != null)
{
_maxBytesToRead -= resp.Content.Headers.ContentLength;
}
if (cacheResult != null && resp.Content != null)
{
var ms = new MemoryStream();
#if NET5_0_OR_GREATER
var stream = await resp.Content.ReadAsStreamAsync(realToken.Token).ConfigureAwait(false);
#else
var stream = await resp.Content.ReadAsStreamAsync().ConfigureAwait(false);
#endif
await stream.CopyToAsync(ms, 32 * 1024, realToken.Token).ConfigureAwait(false);
realToken.Token.ThrowIfCancellationRequested();
var newResp = new HttpResponseMessage();
foreach (var kvp in resp.Headers)
{
newResp.Headers.TryAddWithoutValidation(kvp.Key, kvp.Value);
}
var newContent = new ByteArrayContent(ms.ToArray());
foreach (var kvp in resp.Content.Headers)
{
newContent.Headers.TryAddWithoutValidation(kvp.Key, kvp.Value);
}
newResp.Content = newContent;
resp = newResp;
await cacheResult(request, resp, key, realToken.Token).ConfigureAwait(false);
}
return resp;
}
finally
{
lock (_inflightResponses)
{
_inflightResponses.Remove(key);
}
}
},
realToken.Token).ToObservable().Subscribe(ret.Response);

return ret.Response.ToTask(cancellationToken);
}
}
28 changes: 28 additions & 0 deletions src/BD.Common/Net/Http/ImageHttpClientService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,34 @@ public ImageHttpRequestMessage(HttpMethod method, [StringSyntax(StringSyntaxAttr
}

public string OriginalRequestUri { get; }

/// <summary>
/// 默认请求地址
/// </summary>
public const string DefaultRequestUri = "/";

/// <summary>
/// 从请求消息中获取原始请求地址
/// </summary>
/// <param name="request"></param>
/// <param name="defaultRequestUri"></param>
/// <returns></returns>
public static string GetOriginalRequestUri(HttpRequestMessage request, string defaultRequestUri = DefaultRequestUri)
{
string? originalRequestUri;
if (request is ImageHttpRequestMessage request2)
{
// 对于一些重定向的 Url 使用原始 Url 进行唯一性的计算
originalRequestUri = request2.OriginalRequestUri;
}
else
{
originalRequestUri = request.RequestUri?.ToString()!;
}
if (string.IsNullOrEmpty(originalRequestUri))
originalRequestUri = defaultRequestUri;
return originalRequestUri;
}
}

async Task<MemoryStream?> GetImageMemoryStreamCoreAsync(
Expand Down
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<ImplicitUsings>enable</ImplicitUsings>
<IsTrimmable>true</IsTrimmable>
<!--<Version>1.yy.1MMdd.1hhmm</Version>-->
<Version>1.23.11113.11410</Version>
<Version>1.24.10221.11803</Version>
<PackageIconUrl>https://avatars.githubusercontent.com/u/79355691?s=200&amp;v=4</PackageIconUrl>
<Company>江苏蒸汽凡星科技有限公司</Company>
<Copyright>©️ $(Company). All rights reserved.</Copyright>
Expand Down

0 comments on commit 4fd4056

Please sign in to comment.