Add async send queue and write timeout#626
Conversation
Introduce a non-blocking send pipeline and timeout handling to prevent the server from blocking on socket writes. Changes include: - Add SEND_TIMEOUT_MS constant and a BlockingCollection sendQueue to enqueue raw packets. - Start a background SendWorker thread in the Session constructor to encrypt and write queued packets. - Modify Send to copy packets into the sendQueue instead of performing encryption/sends inline. - Update SendRaw to use WriteAsync with a timeout, observe task faults, log warnings on timeouts, and disconnect on failure. - Ensure sendQueue is completed on Dispose and handle worker exceptions safely. - Add extra logging for cancelled pipeline tasks and adjust log levels for write failures. - Add SendOp.Insignia and RecvOp.Insignia to the lists that skip verbose packet logging. These changes offload CPU work and IO to a background thread, improve resiliency against blocking writes, and add safer shutdown behavior.
📝 WalkthroughWalkthroughA single file modification that introduces an asynchronous send pipeline with background worker thread and non-blocking queue, replacing synchronous writes with timeout protection and improved error handling throughout the session's lifecycle. Changes
Sequence DiagramsequenceDiagram
participant Client as Client/Session
participant Queue as SendQueue<br/>(BlockingCollection)
participant Worker as SendWorker<br/>(Background Thread)
participant Crypt as Encryption
participant SendRaw as SendRaw
Client->>Queue: Enqueue Raw Packet
loop Background Worker
Worker->>Queue: Dequeue Packet<br/>(with timeout)
alt Packet Available
Queue-->>Worker: Return Packet
Worker->>Crypt: Encrypt Packet
Crypt-->>Worker: Encrypted Data
Worker->>SendRaw: Send Encrypted Data<br/>(timeout protected)
SendRaw-->>Worker: Success/Error
else Timeout/Cancel
Worker->>Worker: Log & Continue
end
end
Client->>Queue: Complete Queue<br/>(on disposal)
Queue-->>Worker: Signal completion
Worker->>Worker: Exit worker loop
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Important Action Needed: IP Allowlist UpdateIf your organization protects your Git platform with IP whitelisting, please add the new CodeRabbit IP address to your allowlist:
Reviews will stop working after February 8, 2026 if the new IP is not added to your allowlist. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@Maple2.Server.Core/Network/Session.cs`:
- Around line 53-54: The sendQueue BlockingCollection<(byte[] packet, int
length)> is not disposed, causing a resource leak; update the
Session.Dispose(bool) implementation to call sendQueue.CompleteAdding() (if not
already) and then sendQueue.Dispose() to release resources, and ensure any
producers/consumers handle ObjectDisposedException accordingly; additionally,
consider refactoring the send path to use a System.IO.Pipelines.Pipe (similar to
the existing recvPipe) by replacing sendQueue usage in the send loop and
producers with a PipeWriter/PipeReader pair for higher-performance async I/O and
better integration with the Session networking pipeline.
- Around line 86-92: The SendWorker thread started as sendWorkerThread1 is not
stored or joined on disposal causing it to potentially run after Dispose
completes; update the class to store the thread as a field (e.g., private Thread
_sendWorkerThread), assign it when creating the thread in the constructor where
SendWorker is started, and in Dispose(bool) ensure you check _sendWorkerThread
!= null and call Join (or interrupt then Join) similar to how the main thread
field is handled so the send worker is properly stopped before disposal returns.
🧹 Nitpick comments (1)
Maple2.Server.Core/Network/Session.cs (1)
278-292: Double packet copy creates unnecessary allocations.The packet is copied twice: once for
lastSentPackets(line 281) and again for the send queue (line 286). This doubles memory allocation and GC pressure for every sent packet.♻️ Proposed fix to reuse the single copy
// Track last sent packet by opcode if (length >= 2) { var op = (SendOp) (packet[1] << 8 | packet[0]); - // Store a copy to avoid mutation issues - lastSentPackets[op] = packet.Take(length).ToArray(); } // Queue the raw packet for background processing // Make a copy since the caller may reuse the buffer - byte[] packetCopy = packet.Take(length).ToArray(); + byte[] packetCopy = new byte[length]; + Buffer.BlockCopy(packet, 0, packetCopy, 0, length); + + // Track last sent packet by opcode (reuse the copy) + if (length >= 2) { + var op = (SendOp) (packetCopy[1] << 8 | packetCopy[0]); + lastSentPackets[op] = packetCopy; + } + try { sendQueue.Add((packetCopy, length));Note: Using
Buffer.BlockCopyis more efficient than LINQ's.Take().ToArray()for byte arrays.
| // Send queue for non-blocking sends | ||
| private readonly BlockingCollection<(byte[] packet, int length)> sendQueue = new(new ConcurrentQueue<(byte[], int)>()); |
There was a problem hiding this comment.
BlockingCollection is not disposed, causing potential resource leak.
BlockingCollection<T> implements IDisposable and should be disposed in Dispose(bool). While CompleteAdding() is called, the underlying resources are not released.
Additionally, the coding guidelines recommend using System.IO.Pipelines for high-performance async I/O in the Session networking layer. Consider using a Pipe for the send path similar to recvPipe, which would provide better integration with the existing pipeline architecture.
🔧 Proposed fix to dispose sendQueue
protected virtual void Dispose(bool disposing) {
if (disposed) return;
disposed = true;
State = SessionState.Disconnected;
try {
Complete();
} catch (Exception ex) {
Logger.Debug(ex, "Complete() threw during Dispose");
}
+ try {
+ sendQueue.Dispose();
+ } catch (Exception ex) {
+ Logger.Debug(ex, "sendQueue.Dispose() failed");
+ }
try {
thread.Join(STOP_TIMEOUT);As per coding guidelines: "Use System.IO.Pipelines for high-performance async I/O in Session networking layer"
🤖 Prompt for AI Agents
In `@Maple2.Server.Core/Network/Session.cs` around lines 53 - 54, The sendQueue
BlockingCollection<(byte[] packet, int length)> is not disposed, causing a
resource leak; update the Session.Dispose(bool) implementation to call
sendQueue.CompleteAdding() (if not already) and then sendQueue.Dispose() to
release resources, and ensure any producers/consumers handle
ObjectDisposedException accordingly; additionally, consider refactoring the send
path to use a System.IO.Pipelines.Pipe (similar to the existing recvPipe) by
replacing sendQueue usage in the send loop and producers with a
PipeWriter/PipeReader pair for higher-performance async I/O and better
integration with the Session networking pipeline.
|
|
||
| // Start send worker thread | ||
| var sendWorkerThread1 = new Thread(SendWorker) { | ||
| Name = $"SendWorker-{name}", | ||
| IsBackground = true, | ||
| }; | ||
| sendWorkerThread1.Start(); |
There was a problem hiding this comment.
Send worker thread is not joined during disposal.
The Dispose method joins the main thread but not sendWorkerThread1. This could leave the send worker running briefly after disposal completes. Consider storing the thread reference and joining it in Dispose.
🔧 Proposed fix to track and join send worker
Add field:
private readonly Thread thread;
+ private readonly Thread sendWorkerThread;
private readonly QueuedPipeScheduler pipeScheduler;Update constructor:
- var sendWorkerThread1 = new Thread(SendWorker) {
+ sendWorkerThread = new Thread(SendWorker) {
Name = $"SendWorker-{name}",
IsBackground = true,
};
- sendWorkerThread1.Start();
+ sendWorkerThread.Start();Add join in Dispose(bool):
try {
thread.Join(STOP_TIMEOUT);
} catch (Exception ex) {
Logger.Debug(ex, "thread.Join failed");
}
+ try {
+ sendWorkerThread.Join(STOP_TIMEOUT);
+ } catch (Exception ex) {
+ Logger.Debug(ex, "sendWorkerThread.Join failed");
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Start send worker thread | |
| var sendWorkerThread1 = new Thread(SendWorker) { | |
| Name = $"SendWorker-{name}", | |
| IsBackground = true, | |
| }; | |
| sendWorkerThread1.Start(); | |
| // Start send worker thread | |
| sendWorkerThread = new Thread(SendWorker) { | |
| Name = $"SendWorker-{name}", | |
| IsBackground = true, | |
| }; | |
| sendWorkerThread.Start(); |
🤖 Prompt for AI Agents
In `@Maple2.Server.Core/Network/Session.cs` around lines 86 - 92, The SendWorker
thread started as sendWorkerThread1 is not stored or joined on disposal causing
it to potentially run after Dispose completes; update the class to store the
thread as a field (e.g., private Thread _sendWorkerThread), assign it when
creating the thread in the constructor where SendWorker is started, and in
Dispose(bool) ensure you check _sendWorkerThread != null and call Join (or
interrupt then Join) similar to how the main thread field is handled so the send
worker is properly stopped before disposal returns.
Introduce a non-blocking send pipeline and timeout handling to prevent the server from blocking on socket writes. Changes include:
These changes offload CPU work and IO to a background thread, improve resiliency against blocking writes, and add safer shutdown behavior.
Summary by CodeRabbit
Release Notes
Performance Improvements
Bug Fixes & Stability