-
Notifications
You must be signed in to change notification settings - Fork 61
Add async send queue and write timeout #626
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -26,6 +26,9 @@ public abstract class Session : IDisposable { | |||||||||||||||||||||||||||
| private const int HANDSHAKE_SIZE = 19; | ||||||||||||||||||||||||||||
| private const int STOP_TIMEOUT = 2000; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Send timeout to prevent indefinite blocking (in milliseconds) | ||||||||||||||||||||||||||||
| private const int SEND_TIMEOUT_MS = 5000; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| public SessionState State { get; set; } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| public EventHandler<string>? OnError; | ||||||||||||||||||||||||||||
|
|
@@ -47,6 +50,9 @@ public abstract class Session : IDisposable { | |||||||||||||||||||||||||||
| private readonly QueuedPipeScheduler pipeScheduler; | ||||||||||||||||||||||||||||
| private readonly Pipe recvPipe; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Send queue for non-blocking sends | ||||||||||||||||||||||||||||
| private readonly BlockingCollection<(byte[] packet, int length)> sendQueue = new(new ConcurrentQueue<(byte[], int)>()); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| public long AccountId { get; protected set; } | ||||||||||||||||||||||||||||
| public long CharacterId { get; protected set; } | ||||||||||||||||||||||||||||
| private readonly ConcurrentDictionary<SendOp, byte[]> lastSentPackets = []; | ||||||||||||||||||||||||||||
|
|
@@ -77,6 +83,13 @@ protected Session(TcpClient tcpClient) { | |||||||||||||||||||||||||||
| networkStream = tcpClient.GetStream(); | ||||||||||||||||||||||||||||
| sendCipher = new MapleCipher.Encryptor(VERSION, siv, BLOCK_IV); | ||||||||||||||||||||||||||||
| recvCipher = new MapleCipher.Decryptor(VERSION, riv, BLOCK_IV); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Start send worker thread | ||||||||||||||||||||||||||||
| var sendWorkerThread1 = new Thread(SendWorker) { | ||||||||||||||||||||||||||||
| Name = $"SendWorker-{name}", | ||||||||||||||||||||||||||||
| IsBackground = true, | ||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||
| sendWorkerThread1.Start(); | ||||||||||||||||||||||||||||
|
Comment on lines
+86
to
+92
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Send worker thread is not joined during disposal. The 🔧 Proposed fix to track and join send workerAdd field: private readonly Thread thread;
+ private readonly Thread sendWorkerThread;
private readonly QueuedPipeScheduler pipeScheduler;Update constructor: - var sendWorkerThread1 = new Thread(SendWorker) {
+ sendWorkerThread = new Thread(SendWorker) {
Name = $"SendWorker-{name}",
IsBackground = true,
};
- sendWorkerThread1.Start();
+ sendWorkerThread.Start();Add join in try {
thread.Join(STOP_TIMEOUT);
} catch (Exception ex) {
Logger.Debug(ex, "thread.Join failed");
}
+ try {
+ sendWorkerThread.Join(STOP_TIMEOUT);
+ } catch (Exception ex) {
+ Logger.Debug(ex, "sendWorkerThread.Join failed");
+ }📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| ~Session() => Dispose(false); | ||||||||||||||||||||||||||||
|
|
@@ -112,13 +125,14 @@ protected void Complete() { | |||||||||||||||||||||||||||
| recvPipe.Writer.Complete(); | ||||||||||||||||||||||||||||
| recvPipe.Reader.Complete(); | ||||||||||||||||||||||||||||
| pipeScheduler.Complete(); | ||||||||||||||||||||||||||||
| sendQueue.CompleteAdding(); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| public void Disconnect([CallerMemberName] string caller = "", [CallerLineNumber] int line = 0, [CallerFilePath] string filePath = "") { | ||||||||||||||||||||||||||||
| if (disposed) return; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Logger.Information("Disconnected {Session} at {Caller} in {FilePath} on line {LineNumber}", this, caller, filePath, line); | ||||||||||||||||||||||||||||
| if (Interlocked.Exchange(ref disconnecting, 1) == 1) return; | ||||||||||||||||||||||||||||
| Logger.Information("Disconnected {Session} at {Caller} in {FilePath} on line {LineNumber}", this, caller, filePath, line); | ||||||||||||||||||||||||||||
| Dispose(); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
@@ -159,6 +173,8 @@ private void StartInternal() { | |||||||||||||||||||||||||||
| Task.WhenAll(writeTask, readTask).ContinueWith(t => { | ||||||||||||||||||||||||||||
| if (t.IsFaulted) { | ||||||||||||||||||||||||||||
| Logger.Debug(t.Exception, "Pipeline aggregate fault account={AccountId} char={CharacterId}", AccountId, CharacterId); | ||||||||||||||||||||||||||||
| } else if (t.IsCanceled) { | ||||||||||||||||||||||||||||
| Logger.Debug("Pipeline tasks cancelled account={AccountId} char={CharacterId}", AccountId, CharacterId); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| CloseClient(); | ||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||
|
|
@@ -202,7 +218,10 @@ private async Task WriteRecvPipe(Socket socket, PipeWriter writer) { | |||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| result = await writer.FlushAsync(); | ||||||||||||||||||||||||||||
| } while (!disposed && !result.IsCompleted); | ||||||||||||||||||||||||||||
| } catch (Exception ex) { Logger.Debug(ex, "WriteRecvPipe exception account={AccountId} char={CharacterId}", AccountId, CharacterId); Disconnect(); } | ||||||||||||||||||||||||||||
| } catch (Exception ex) { | ||||||||||||||||||||||||||||
| Logger.Debug(ex, "WriteRecvPipe exception account={AccountId} char={CharacterId}", AccountId, CharacterId); | ||||||||||||||||||||||||||||
| Disconnect(); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| private async Task ReadRecvPipe(PipeReader reader) { | ||||||||||||||||||||||||||||
|
|
@@ -262,24 +281,74 @@ private void SendInternal(byte[] packet, int length) { | |||||||||||||||||||||||||||
| lastSentPackets[op] = packet.Take(length).ToArray(); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| lock (sendCipher) { | ||||||||||||||||||||||||||||
| // re-check after potential delay acquiring lock | ||||||||||||||||||||||||||||
| if (disposed || disconnecting == 1) return; | ||||||||||||||||||||||||||||
| using PoolByteWriter encryptedPacket = sendCipher.Encrypt(packet, 0, length); | ||||||||||||||||||||||||||||
| SendRaw(encryptedPacket); | ||||||||||||||||||||||||||||
| // Queue the raw packet for background processing | ||||||||||||||||||||||||||||
| // Make a copy since the caller may reuse the buffer | ||||||||||||||||||||||||||||
| byte[] packetCopy = packet.Take(length).ToArray(); | ||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||
| sendQueue.Add((packetCopy, length)); | ||||||||||||||||||||||||||||
| } catch (InvalidOperationException) { | ||||||||||||||||||||||||||||
| // Queue was completed/disposed | ||||||||||||||||||||||||||||
| Logger.Debug("SendQueue add failed - queue completed"); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| private void SendRaw(ByteWriter packet) { | ||||||||||||||||||||||||||||
| if (disposed || disconnecting == 1) return; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||
| networkStream.Write(packet.Buffer, 0, packet.Length); | ||||||||||||||||||||||||||||
| // Use async write with timeout to prevent indefinite blocking | ||||||||||||||||||||||||||||
| Task writeTask = networkStream.WriteAsync(packet.Buffer, 0, packet.Length); | ||||||||||||||||||||||||||||
| if (!writeTask.Wait(SEND_TIMEOUT_MS)) { | ||||||||||||||||||||||||||||
| Logger.Warning("SendRaw timeout after {Timeout}ms, disconnecting account={AccountId} char={CharacterId}", | ||||||||||||||||||||||||||||
| SEND_TIMEOUT_MS, AccountId, CharacterId); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Observe the task exception to prevent unobserved task exception | ||||||||||||||||||||||||||||
| // when the task eventually completes/faults after timeout | ||||||||||||||||||||||||||||
| _ = writeTask.ContinueWith(t => { | ||||||||||||||||||||||||||||
| if (t.IsFaulted) { | ||||||||||||||||||||||||||||
| Logger.Debug(t.Exception, "WriteAsync faulted after timeout account={AccountId} char={CharacterId}", | ||||||||||||||||||||||||||||
| AccountId, CharacterId); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| }, TaskContinuationOptions.OnlyOnFaulted); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Disconnect(); | ||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Check if write actually failed | ||||||||||||||||||||||||||||
| if (writeTask.IsFaulted) { | ||||||||||||||||||||||||||||
| throw writeTask.Exception?.GetBaseException() ?? new Exception("Write task faulted"); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } catch (Exception ex) { | ||||||||||||||||||||||||||||
| Logger.Debug(ex, "[LIFECYCLE] SendRaw write failed account={AccountId} char={CharacterId}", AccountId, CharacterId); | ||||||||||||||||||||||||||||
| Logger.Warning(ex, "[LIFECYCLE] SendRaw write failed account={AccountId} char={CharacterId}", AccountId, CharacterId); | ||||||||||||||||||||||||||||
| Disconnect(); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| private void SendWorker() { | ||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||
| foreach ((byte[] packet, int length) in sendQueue.GetConsumingEnumerable()) { | ||||||||||||||||||||||||||||
| if (disposed || disconnecting == 1) break; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Encrypt outside lock, then send with timeout | ||||||||||||||||||||||||||||
| PoolByteWriter encryptedPacket; | ||||||||||||||||||||||||||||
| lock (sendCipher) { | ||||||||||||||||||||||||||||
| if (disposed || disconnecting == 1) break; | ||||||||||||||||||||||||||||
| encryptedPacket = sendCipher.Encrypt(packet, 0, length); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||
| SendRaw(encryptedPacket); | ||||||||||||||||||||||||||||
| } finally { | ||||||||||||||||||||||||||||
| encryptedPacket.Dispose(); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } catch (Exception ex) { | ||||||||||||||||||||||||||||
| if (!disposed) { | ||||||||||||||||||||||||||||
| Logger.Error(ex, "SendWorker exception account={AccountId} char={CharacterId}", AccountId, CharacterId); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| public byte[]? GetLastSentPacket(SendOp op) { | ||||||||||||||||||||||||||||
| lastSentPackets.TryGetValue(op, out byte[]? packet); | ||||||||||||||||||||||||||||
| return packet; | ||||||||||||||||||||||||||||
|
|
@@ -304,6 +373,7 @@ private void LogSend(byte[] packet, int length) { | |||||||||||||||||||||||||||
| case SendOp.FurnishingInventory: | ||||||||||||||||||||||||||||
| case SendOp.FurnishingStorage: | ||||||||||||||||||||||||||||
| case SendOp.Vibrate: | ||||||||||||||||||||||||||||
| case SendOp.Insignia: | ||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||
| Logger.Verbose("{Mode} ({Name} - {OpCode}): {Packet}", "SEND".ColorRed(), opcode, $"0x{op:X4}", packet.ToHexString(length, ' ')); | ||||||||||||||||||||||||||||
|
|
@@ -320,6 +390,7 @@ private void LogRecv(byte[] packet) { | |||||||||||||||||||||||||||
| case RecvOp.GuideObjectSync: | ||||||||||||||||||||||||||||
| case RecvOp.RideSync: | ||||||||||||||||||||||||||||
| case RecvOp.ResponseHeartbeat: | ||||||||||||||||||||||||||||
| case RecvOp.Insignia: | ||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||
| Logger.Verbose("{Mode} ({Name} - {OpCode}): {Packet}", "RECV".ColorGreen(), opcode, $"0x{op:X4}", packet.ToHexString(packet.Length, ' ')); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BlockingCollectionis not disposed, causing potential resource leak.BlockingCollection<T>implementsIDisposableand should be disposed inDispose(bool). WhileCompleteAdding()is called, the underlying resources are not released.Additionally, the coding guidelines recommend using
System.IO.Pipelinesfor high-performance async I/O in the Session networking layer. Consider using aPipefor the send path similar torecvPipe, which would provide better integration with the existing pipeline architecture.🔧 Proposed fix to dispose sendQueue
protected virtual void Dispose(bool disposing) { if (disposed) return; disposed = true; State = SessionState.Disconnected; try { Complete(); } catch (Exception ex) { Logger.Debug(ex, "Complete() threw during Dispose"); } + try { + sendQueue.Dispose(); + } catch (Exception ex) { + Logger.Debug(ex, "sendQueue.Dispose() failed"); + } try { thread.Join(STOP_TIMEOUT);As per coding guidelines: "Use System.IO.Pipelines for high-performance async I/O in Session networking layer"
🤖 Prompt for AI Agents