Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Linux Connection ID based RSS #4551

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/core/binding.c
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,8 @@ QuicBindingProcessStatelessOperation(
RecvPacket->Route,
SendData,
SendDatagram->Length,
1);
1,
FALSE); // ?
SendData = NULL;

Exit:
Expand Down Expand Up @@ -1787,9 +1788,11 @@ QuicBindingSend(
_In_ const CXPLAT_ROUTE* Route,
_In_ CXPLAT_SEND_DATA* SendData,
_In_ uint32_t BytesToSend,
_In_ uint32_t DatagramsToSend
_In_ uint32_t DatagramsToSend,
_In_ BOOLEAN Connected
)
{
CxPlatSetHandshakeDone(SendData, Connected);
#if QUIC_TEST_DATAPATH_HOOKS_ENABLED
QUIC_TEST_DATAPATH_HOOKS* Hooks = MsQuicLib.TestDatapathHooks;
if (Hooks != NULL) {
Expand Down Expand Up @@ -1817,6 +1820,7 @@ QuicBindingSend(
#if QUIC_TEST_DATAPATH_HOOKS_ENABLED
}
#endif
CxPlatSetHandshakeDone(SendData, FALSE);

QuicPerfCounterAdd(QUIC_PERF_COUNTER_UDP_SEND, DatagramsToSend);
QuicPerfCounterAdd(QUIC_PERF_COUNTER_UDP_SEND_BYTES, BytesToSend);
Expand Down
3 changes: 2 additions & 1 deletion src/core/binding.h
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,8 @@ QuicBindingSend(
_In_ const CXPLAT_ROUTE* Route,
_In_ CXPLAT_SEND_DATA* SendData,
_In_ uint32_t BytesToSend,
_In_ uint32_t DatagramsToSend
_In_ uint32_t DatagramsToSend,
_In_ BOOLEAN Connected // hack to identify whether the connection handshake is done
);

//
Expand Down
13 changes: 13 additions & 0 deletions src/core/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -4553,6 +4553,19 @@
&Frame);
if (QUIC_SUCCEEDED(Status)) {
AckEliciting = TRUE;
if (QuicConnIsServer(Connection) && Connection->State.Connected) {
QUIC_CID_HASH_ENTRY* SourceCid =
CXPLAT_CONTAINING_RECORD(
Connection->SourceCids.Next,
QUIC_CID_HASH_ENTRY,
Link);
CxPlatUpdateRSSRule(
Connection->Paths[0].Binding->Socket,
SourceCid->CID.Length,
SourceCid->CID.Data
);
// CXPLAT_FREE(SourceCid, QUIC_POOL_CIDHASH);

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
}
} else if (Status == QUIC_STATUS_OUT_OF_MEMORY) {
QuicPacketLogDrop(Connection, Packet, "Crypto frame process OOM");
return FALSE;
Expand Down
4 changes: 3 additions & 1 deletion src/core/packet_builder.c
Original file line number Diff line number Diff line change
Expand Up @@ -1062,12 +1062,14 @@ QuicPacketBuilderSendBatch(
"Sending batch. %hu datagrams",
(uint16_t)Builder->TotalCountDatagrams);

QUIC_CONNECTION *Connection = Builder->Connection;
QuicBindingSend(
Builder->Path->Binding,
&Builder->Path->Route,
Builder->SendData,
Builder->TotalDatagramsLength,
Builder->TotalCountDatagrams);
Builder->TotalCountDatagrams,
Connection->State.Connected);

Builder->PacketBatchSent = TRUE;
Builder->SendData = NULL;
Expand Down
15 changes: 15 additions & 0 deletions src/inc/quic_datapath.h
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,14 @@ CxPlatSocketCreateUdp(
_Out_ CXPLAT_SOCKET** Socket
);

_IRQL_requires_max_(PASSIVE_LEVEL)
QUIC_STATUS
CxPlatUpdateRSSRule(
_In_ CXPLAT_SOCKET* Socket,
_In_ uint8_t CIDLength,
_In_ uint8_t* CIDData
);

//
// Creates a TCP socket for the given (optional) local address and (required)
// remote address. This function immediately registers for upcalls from the
Expand Down Expand Up @@ -722,6 +730,13 @@ CxPlatSendDataIsFull(
_In_ CXPLAT_SEND_DATA* SendData
);

_IRQL_requires_max_(DISPATCH_LEVEL)
void
CxPlatSetHandshakeDone(
_In_ CXPLAT_SEND_DATA* SendData,
_In_ BOOLEAN HandshakeDone
);

//
// Sends the data over the socket.
//
Expand Down
1 change: 1 addition & 0 deletions src/platform/datapath_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -2731,6 +2731,7 @@ CxPlatDataPathSocketProcessIoCompletion(
if (SocketContext->Binding->Type == CXPLAT_SOCKET_TCP_LISTENER) {
CxPlatSocketContextAcceptCompletion(SocketContext, Cqe);
} else {
fprintf(stderr, "[EPOLL] Partition: %d, Thread: %d\n", SocketContext->DatapathPartition->PartitionIndex, gettid());
CxPlatSocketReceive(SocketContext);
}
}
Expand Down
13 changes: 12 additions & 1 deletion src/platform/datapath_raw.c
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,16 @@ RawSendDataIsFull(

#define TH_ACK 0x10

_IRQL_requires_max_(DISPATCH_LEVEL)
void
RawSetHandshakeDone(
_In_ CXPLAT_SEND_DATA* SendData,
_In_ BOOLEAN HandshakeDone
)
{
SendData->HandshakeDone = HandshakeDone;
}

_IRQL_requires_max_(DISPATCH_LEVEL)
QUIC_STATUS
RawSocketSend(
Expand Down Expand Up @@ -376,7 +386,8 @@ RawSocketSend(
Interface->OffloadStatus.Transmit.TransportLayerXsum,
Route->TcpState.SequenceNumber,
Route->TcpState.AckNumber,
TH_ACK);
TH_ACK,
SendData->HandshakeDone);
CxPlatDpRawTxEnqueue(SendData);
return QUIC_STATUS_SUCCESS;
}
3 changes: 2 additions & 1 deletion src/platform/datapath_raw.h
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ CxPlatFramingWriteHeaders(
_In_ BOOLEAN SkipTransportLayerXsum,
_In_ uint32_t TcpSeqNum,
_In_ uint32_t TcpAckNum,
_In_ uint8_t TcpFlags
_In_ uint8_t TcpFlags,
_In_ BOOLEAN FakeNatRebinding
);


Expand Down
1 change: 1 addition & 0 deletions src/platform/datapath_raw_linux.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ RawSocketCreateUdp(
NewSocket->CibirIdOffsetSrc = Config->CibirIdOffsetSrc;
NewSocket->CibirIdOffsetDst = Config->CibirIdOffsetDst;
NewSocket->AuxSocket = INVALID_SOCKET;
NewSocket->ServerOwned = !!(Config->Flags & CXPLAT_SOCKET_SERVER_OWNED);
NewSocket->UseTcp = Raw->UseTcp;
if (Config->CibirIdLength) {
memcpy(NewSocket->CibirId, Config->CibirId, Config->CibirIdLength);
Expand Down
53 changes: 45 additions & 8 deletions src/platform/datapath_raw_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#pragma warning(disable:4116) // unnamed type definition in parentheses
#pragma warning(disable:4100) // unreferenced formal parameter

static int ClientPort;

uint32_t
CxPlatGetRawSocketSize(void) {
Expand Down Expand Up @@ -179,7 +180,15 @@ CxPlatDpRawParseUdp(
Packet->Reserved = L4_TYPE_UDP;

Packet->Route->RemoteAddress.Ipv4.sin_port = Udp->SourcePort;
Packet->Route->LocalAddress.Ipv4.sin_port = Udp->DestinationPort;
if (htons(55555) == Udp->SourcePort) {
fprintf(stderr, "Source port 55555 arrived\n");
}
if (htons(55555) == Udp->DestinationPort) {
fprintf(stderr, "Destination port 55555 arrived, rewrite to %d\n", ntohs(ClientPort));
Packet->Route->LocalAddress.Ipv4.sin_port = ClientPort;
} else {
Packet->Route->LocalAddress.Ipv4.sin_port = Udp->DestinationPort;
}

Packet->Buffer = (uint8_t*)Udp->Data;
Packet->BufferLength = QuicNetByteSwapShort(Udp->Length) - sizeof(UDP_HEADER);
Expand Down Expand Up @@ -561,7 +570,7 @@ CxPlatDpRawSocketAckFin(
Interface->OffloadStatus.Transmit.TransportLayerXsum,
ReceivedTcpHeader->AckNumber,
CxPlatByteSwapUint32(CxPlatByteSwapUint32(ReceivedTcpHeader->SequenceNumber) + 1),
TH_FIN | TH_ACK);
TH_FIN | TH_ACK, FALSE);
CxPlatDpRawTxEnqueue(SendData);
}

Expand Down Expand Up @@ -602,7 +611,7 @@ CxPlatDpRawSocketAckSyn(
Interface->OffloadStatus.Transmit.TransportLayerXsum,
ReceivedTcpHeader->AckNumber,
CxPlatByteSwapUint32(CxPlatByteSwapUint32(ReceivedTcpHeader->SequenceNumber) + 1),
TcpFlags);
TcpFlags, FALSE);
CxPlatDpRawTxEnqueue(SendData);

SendData = InterlockedFetchAndClearPointer((void*)&Socket->PausedTcpSend);
Expand All @@ -622,7 +631,7 @@ CxPlatDpRawSocketAckSyn(
Interface->OffloadStatus.Transmit.TransportLayerXsum,
CxPlatByteSwapUint32(CxPlatByteSwapUint32(ReceivedTcpHeader->AckNumber) + 1),
CxPlatByteSwapUint32(CxPlatByteSwapUint32(ReceivedTcpHeader->SequenceNumber) + 1),
TH_ACK);
TH_ACK, FALSE);
CxPlatDpRawTxEnqueue(SendData);

SendData = CxPlatSendDataAlloc(CxPlatRawToSocket(Socket), &SendConfig);
Expand All @@ -645,7 +654,7 @@ CxPlatDpRawSocketAckSyn(
Interface->OffloadStatus.Transmit.TransportLayerXsum,
ReceivedTcpHeader->AckNumber,
CxPlatByteSwapUint32(CxPlatByteSwapUint32(ReceivedTcpHeader->SequenceNumber) + 1),
TH_RST | TH_ACK);
TH_RST | TH_ACK, FALSE);
Socket->CachedRstSend = SendData;
}
}
Expand Down Expand Up @@ -679,7 +688,7 @@ CxPlatDpRawSocketSyn(
Socket, Route, &SendData->Buffer, SendData->ECN,
Interface->OffloadStatus.Transmit.NetworkLayerXsum,
Interface->OffloadStatus.Transmit.TransportLayerXsum,
Route->TcpState.SequenceNumber, 0, TH_SYN);
Route->TcpState.SequenceNumber, 0, TH_SYN, FALSE);
CxPlatDpRawTxEnqueue(SendData);
}

Expand All @@ -694,7 +703,8 @@ CxPlatFramingWriteHeaders(
_In_ BOOLEAN SkipTransportLayerXsum,
_In_ uint32_t TcpSeqNum,
_In_ uint32_t TcpAckNum,
_In_ uint8_t TcpFlags
_In_ uint8_t TcpFlags,
_In_ BOOLEAN FakeNatRebinding
)
{
uint8_t* Transport;
Expand Down Expand Up @@ -735,7 +745,34 @@ CxPlatFramingWriteHeaders(
//
UDP = (UDP_HEADER*)(Buffer->Buffer - sizeof(UDP_HEADER));
UDP->DestinationPort = Route->RemoteAddress.Ipv4.sin_port;
UDP->SourcePort = Route->LocalAddress.Ipv4.sin_port;

char* EnvPath = getenv("MSQUIC_FAKE_NAT_REBINDING");
if (EnvPath && EnvPath[0] == '1')
{
static BOOLEAN toggleRebinding = FALSE;
static uint64_t waitUntil = 1000;
static uint64_t count = 0;
if (FakeNatRebinding && !Socket->ServerOwned) {
if (toggleRebinding) {
fprintf(stderr, "FakeNatRebinding. set to 55555 (%d)\n", htons(55555));
ClientPort = Route->LocalAddress.Ipv4.sin_port;
UDP->SourcePort = htons(55555);
} else {
fprintf(stderr, "No FakeNatRebinding. set to %d (%d)\n", ntohs(Route->LocalAddress.Ipv4.sin_port), Route->LocalAddress.Ipv4.sin_port);
UDP->SourcePort = Route->LocalAddress.Ipv4.sin_port;
}
count++;
if (count > waitUntil && count % 2 == 0) {
count = waitUntil;
toggleRebinding = !toggleRebinding;
}
} else {
UDP->SourcePort = Route->LocalAddress.Ipv4.sin_port;
}
} else {
UDP->SourcePort = Route->LocalAddress.Ipv4.sin_port;
}

UDP->Length = QuicNetByteSwapShort((uint16_t)Buffer->Length + sizeof(UDP_HEADER));
UDP->Checksum = 0;
Transport = (uint8_t*)UDP;
Expand Down
Loading
Loading