From 336cd38be863730043e432221f019a457604c9fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=82ngelo=20Tadeucci?= Date: Tue, 3 Feb 2026 15:11:24 -0300 Subject: [PATCH] Add async send queue and write timeout Introduce a non-blocking send pipeline and timeout handling to prevent the server from blocking on socket writes. Changes include: - Add SEND_TIMEOUT_MS constant and a BlockingCollection sendQueue to enqueue raw packets. - Start a background SendWorker thread in the Session constructor to encrypt and write queued packets. - Modify Send to copy packets into the sendQueue instead of performing encryption/sends inline. - Update SendRaw to use WriteAsync with a timeout, observe task faults, log warnings on timeouts, and disconnect on failure. - Ensure sendQueue is completed on Dispose and handle worker exceptions safely. - Add extra logging for cancelled pipeline tasks and adjust log levels for write failures. - Add SendOp.Insignia and RecvOp.Insignia to the lists that skip verbose packet logging. These changes offload CPU work and IO to a background thread, improve resiliency against blocking writes, and add safer shutdown behavior. --- Maple2.Server.Core/Network/Session.cs | 89 ++++++++++++++++++++++++--- 1 file changed, 80 insertions(+), 9 deletions(-) diff --git a/Maple2.Server.Core/Network/Session.cs b/Maple2.Server.Core/Network/Session.cs index 54c0f568..81dd86ae 100644 --- a/Maple2.Server.Core/Network/Session.cs +++ b/Maple2.Server.Core/Network/Session.cs @@ -26,6 +26,9 @@ public abstract class Session : IDisposable { private const int HANDSHAKE_SIZE = 19; private const int STOP_TIMEOUT = 2000; + // Send timeout to prevent indefinite blocking (in milliseconds) + private const int SEND_TIMEOUT_MS = 5000; + public SessionState State { get; set; } public EventHandler? OnError; @@ -47,6 +50,9 @@ public abstract class Session : IDisposable { private readonly QueuedPipeScheduler pipeScheduler; private readonly Pipe recvPipe; + // Send queue for non-blocking sends + private readonly BlockingCollection<(byte[] packet, int length)> sendQueue = new(new ConcurrentQueue<(byte[], int)>()); + public long AccountId { get; protected set; } public long CharacterId { get; protected set; } private readonly ConcurrentDictionary lastSentPackets = []; @@ -77,6 +83,13 @@ protected Session(TcpClient tcpClient) { networkStream = tcpClient.GetStream(); sendCipher = new MapleCipher.Encryptor(VERSION, siv, BLOCK_IV); recvCipher = new MapleCipher.Decryptor(VERSION, riv, BLOCK_IV); + + // Start send worker thread + var sendWorkerThread1 = new Thread(SendWorker) { + Name = $"SendWorker-{name}", + IsBackground = true, + }; + sendWorkerThread1.Start(); } ~Session() => Dispose(false); @@ -112,13 +125,14 @@ protected void Complete() { recvPipe.Writer.Complete(); recvPipe.Reader.Complete(); pipeScheduler.Complete(); + sendQueue.CompleteAdding(); } public void Disconnect([CallerMemberName] string caller = "", [CallerLineNumber] int line = 0, [CallerFilePath] string filePath = "") { if (disposed) return; - Logger.Information("Disconnected {Session} at {Caller} in {FilePath} on line {LineNumber}", this, caller, filePath, line); if (Interlocked.Exchange(ref disconnecting, 1) == 1) return; + Logger.Information("Disconnected {Session} at {Caller} in {FilePath} on line {LineNumber}", this, caller, filePath, line); Dispose(); } @@ -159,6 +173,8 @@ private void StartInternal() { Task.WhenAll(writeTask, readTask).ContinueWith(t => { if (t.IsFaulted) { Logger.Debug(t.Exception, "Pipeline aggregate fault account={AccountId} char={CharacterId}", AccountId, CharacterId); + } else if (t.IsCanceled) { + Logger.Debug("Pipeline tasks cancelled account={AccountId} char={CharacterId}", AccountId, CharacterId); } CloseClient(); }); @@ -202,7 +218,10 @@ private async Task WriteRecvPipe(Socket socket, PipeWriter writer) { result = await writer.FlushAsync(); } while (!disposed && !result.IsCompleted); - } catch (Exception ex) { Logger.Debug(ex, "WriteRecvPipe exception account={AccountId} char={CharacterId}", AccountId, CharacterId); Disconnect(); } + } catch (Exception ex) { + Logger.Debug(ex, "WriteRecvPipe exception account={AccountId} char={CharacterId}", AccountId, CharacterId); + Disconnect(); + } } private async Task ReadRecvPipe(PipeReader reader) { @@ -262,24 +281,74 @@ private void SendInternal(byte[] packet, int length) { lastSentPackets[op] = packet.Take(length).ToArray(); } - lock (sendCipher) { - // re-check after potential delay acquiring lock - if (disposed || disconnecting == 1) return; - using PoolByteWriter encryptedPacket = sendCipher.Encrypt(packet, 0, length); - SendRaw(encryptedPacket); + // Queue the raw packet for background processing + // Make a copy since the caller may reuse the buffer + byte[] packetCopy = packet.Take(length).ToArray(); + try { + sendQueue.Add((packetCopy, length)); + } catch (InvalidOperationException) { + // Queue was completed/disposed + Logger.Debug("SendQueue add failed - queue completed"); } } private void SendRaw(ByteWriter packet) { if (disposed || disconnecting == 1) return; + try { - networkStream.Write(packet.Buffer, 0, packet.Length); + // Use async write with timeout to prevent indefinite blocking + Task writeTask = networkStream.WriteAsync(packet.Buffer, 0, packet.Length); + if (!writeTask.Wait(SEND_TIMEOUT_MS)) { + Logger.Warning("SendRaw timeout after {Timeout}ms, disconnecting account={AccountId} char={CharacterId}", + SEND_TIMEOUT_MS, AccountId, CharacterId); + + // Observe the task exception to prevent unobserved task exception + // when the task eventually completes/faults after timeout + _ = writeTask.ContinueWith(t => { + if (t.IsFaulted) { + Logger.Debug(t.Exception, "WriteAsync faulted after timeout account={AccountId} char={CharacterId}", + AccountId, CharacterId); + } + }, TaskContinuationOptions.OnlyOnFaulted); + + Disconnect(); + return; + } + + // Check if write actually failed + if (writeTask.IsFaulted) { + throw writeTask.Exception?.GetBaseException() ?? new Exception("Write task faulted"); + } } catch (Exception ex) { - Logger.Debug(ex, "[LIFECYCLE] SendRaw write failed account={AccountId} char={CharacterId}", AccountId, CharacterId); + Logger.Warning(ex, "[LIFECYCLE] SendRaw write failed account={AccountId} char={CharacterId}", AccountId, CharacterId); Disconnect(); } } + private void SendWorker() { + try { + foreach ((byte[] packet, int length) in sendQueue.GetConsumingEnumerable()) { + if (disposed || disconnecting == 1) break; + + // Encrypt outside lock, then send with timeout + PoolByteWriter encryptedPacket; + lock (sendCipher) { + if (disposed || disconnecting == 1) break; + encryptedPacket = sendCipher.Encrypt(packet, 0, length); + } + try { + SendRaw(encryptedPacket); + } finally { + encryptedPacket.Dispose(); + } + } + } catch (Exception ex) { + if (!disposed) { + Logger.Error(ex, "SendWorker exception account={AccountId} char={CharacterId}", AccountId, CharacterId); + } + } + } + public byte[]? GetLastSentPacket(SendOp op) { lastSentPackets.TryGetValue(op, out byte[]? packet); return packet; @@ -304,6 +373,7 @@ private void LogSend(byte[] packet, int length) { case SendOp.FurnishingInventory: case SendOp.FurnishingStorage: case SendOp.Vibrate: + case SendOp.Insignia: break; default: Logger.Verbose("{Mode} ({Name} - {OpCode}): {Packet}", "SEND".ColorRed(), opcode, $"0x{op:X4}", packet.ToHexString(length, ' ')); @@ -320,6 +390,7 @@ private void LogRecv(byte[] packet) { case RecvOp.GuideObjectSync: case RecvOp.RideSync: case RecvOp.ResponseHeartbeat: + case RecvOp.Insignia: break; default: Logger.Verbose("{Mode} ({Name} - {OpCode}): {Packet}", "RECV".ColorGreen(), opcode, $"0x{op:X4}", packet.ToHexString(packet.Length, ' '));