diff --git a/Sources/Queues/Queue.swift b/Sources/Queues/Queue.swift index a1b6f0d..7826963 100644 --- a/Sources/Queues/Queue.swift +++ b/Sources/Queues/Queue.swift @@ -1,4 +1,3 @@ - import Foundation import Logging import Metrics @@ -29,6 +28,14 @@ public protocol Queue: Sendable { /// Pushes the next job into a queue /// - Parameter id: The ID of the job func push(_ id: JobIdentifier) -> EventLoopFuture + + /// Recovers stale jobs from the processing queue. This is called on worker startup. + /// Jobs older than `staleJobTimeout` will be requeued. + /// + /// Default implementation returns 0 (no recovery support). + /// Drivers that support recovery should implement this method. + /// - Returns: The number of stale jobs recovered + func recoverStaleJobs() -> EventLoopFuture } extension Queue { @@ -98,7 +105,7 @@ extension Queue { }.flatMapWithEventLoop { _, eventLoop in Counter(label: "dispatched.jobs.counter", dimensions: [ ("queueName", self.queueName.string), - ("jobName", J.name), + ("jobName", J.name) ]).increment() self.logger.info("Dispatched queue job") return self.sendNotification(of: "dispatch", logger: logger) { @@ -135,4 +142,12 @@ extension Queue { } } } + + /// Default implementation of ``Queue/recoverStaleJobs()``. + /// Drivers that support recovery should provide their own implementation. + /// - Returns: The number of stale jobs recovered (default: 0) + func recoverStaleJobs() -> EventLoopFuture { + // Default implementation: no recovery support + return self.eventLoop.makeSucceededFuture(0) + } } diff --git a/Sources/Queues/QueuesCommand.swift b/Sources/Queues/QueuesCommand.swift index c865bfc..a6ced72 100644 --- a/Sources/Queues/QueuesCommand.swift +++ b/Sources/Queues/QueuesCommand.swift @@ -9,32 +9,33 @@ import Atomics public final class QueuesCommand: AsyncCommand, Sendable { // See `Command.signature`. public let signature = Signature() - + // See `Command.Signature`. public struct Signature: CommandSignature { public init() {} - + @Option(name: "queue", help: "Specifies a single queue to run") var queue: String? - + @Flag(name: "scheduled", help: "Runs the scheduled queue jobs") var scheduled: Bool } - + // See `Command.help`. public var help: String { "Starts the Vapor Queues worker" } - + private let application: Application - + private let box: NIOLockedValueBox - + struct Box: Sendable { var jobTasks: [RepeatedTask] var scheduledTasks: [String: AnyScheduledJob.Task] + var recoveryTask: RepeatedTask? // Periodic stale job recovery task (like Sidekiq Beat) var signalSources: [any DispatchSourceSignal] var didShutdown: Bool } - + /// Create a new ``QueuesCommand``. /// /// - Parameters: @@ -42,15 +43,15 @@ public final class QueuesCommand: AsyncCommand, Sendable { /// - scheduled: This parameter is a historical artifact and has no effect. public init(application: Application, scheduled: Bool = false) { self.application = application - self.box = .init(.init(jobTasks: [], scheduledTasks: [:], signalSources: [], didShutdown: false)) + self.box = .init(.init(jobTasks: [], scheduledTasks: [:], recoveryTask: nil, signalSources: [], didShutdown: false)) } - + // See `AsyncCommand.run(using:signature:)`. public func run(using context: CommandContext, signature: QueuesCommand.Signature) async throws { // shutdown future let promise = self.application.eventLoopGroup.any().makePromise(of: Void.self) self.application.running = .start(using: promise) - + // setup signal sources for shutdown let signalQueue = DispatchQueue(label: "codes.vapor.jobs.command") func makeSignalSource(_ code: Int32) { @@ -58,7 +59,7 @@ public final class QueuesCommand: AsyncCommand, Sendable { /// https://github.com/swift-server/swift-service-lifecycle/blob/main/Sources/UnixSignals/UnixSignalsSequence.swift#L77-L82 signal(code, SIG_IGN) #endif - + let source = DispatchSource.makeSignalSource(signal: code, queue: signalQueue) source.setEventHandler { print() // clear ^C @@ -69,22 +70,48 @@ public final class QueuesCommand: AsyncCommand, Sendable { } makeSignalSource(SIGTERM) makeSignalSource(SIGINT) - + if signature.scheduled { self.application.logger.info("Starting scheduled jobs worker") try self.startScheduledJobs() } else { let queue: QueueName = signature.queue.map { .init(string: $0) } ?? .default - + self.application.logger.info("Starting jobs worker", metadata: ["queue": .string(queue.string)]) try self.startJobs(on: queue) } } - + /// Starts an in-process jobs worker for queued tasks /// /// - Parameter queueName: The queue to run the jobs on public func startJobs(on queueName: QueueName) throws { + let queue = self.application.queues.queue(queueName, on: self.application.eventLoopGroup.any()) + + // Recover stale jobs on startup (if recovery is enabled) + if self.application.queues.configuration.enableStaleJobRecovery { + let recoveryFuture = queue.recoverStaleJobs() + + // Log recovery result, but don't block - start workers regardless + recoveryFuture.whenComplete { result in + switch result { + case .success(let count): + if count > 0 { + self.application.logger.info("Recovered stale jobs", metadata: ["count": "\(count)", "queue": .string(queueName.string)]) + } else { + self.application.logger.trace("No stale jobs to recover", metadata: ["queue": .string(queueName.string)]) + } + case .failure(let error): + self.application.logger.error("Failed to recover stale jobs", metadata: [ + "queue": .string(queueName.string), + "error": "\(String(reflecting: error))" + ]) + } + } + } else { + self.application.logger.trace("Stale job recovery is disabled", metadata: ["queue": .string(queueName.string)]) + } + let workerCount: Int switch self.application.queues.configuration.workerCount { case .default: @@ -121,8 +148,45 @@ public final class QueuesCommand: AsyncCommand, Sendable { self.box.withLockedValue { $0.jobTasks = tasks } self.application.logger.trace("Finished adding jobTasks, total count: \(tasks.count)") + + // Schedule periodic stale job recovery (like Sidekiq Beat - runs every 15 seconds by default) + // Only if recovery is enabled + if self.application.queues.configuration.enableStaleJobRecovery { + let recoveryInterval = self.application.queues.configuration.staleJobRecoveryInterval + let recoveryQueue = self.application.queues.queue(queueName, on: self.application.eventLoopGroup.any()) + + let recoveryTask = self.application.eventLoopGroup.any().scheduleRepeatedAsyncTask( + initialDelay: recoveryInterval, // Wait before first check (after startup recovery) + delay: recoveryInterval + ) { task in + recoveryQueue.logger.trace("Running periodic stale job recovery check") + + return recoveryQueue.recoverStaleJobs().map { count in + if count > 0 { + recoveryQueue.logger.info("Periodic recovery: recovered stale jobs", metadata: ["count": "\(count)"]) + } + }.recover { error in + recoveryQueue.logger.error("Periodic recovery failed", metadata: ["error": "\(String(reflecting: error))"]) + }.map { + // Check if shutdown was requested + if self.box.withLockedValue({ $0.didShutdown }) { + recoveryQueue.logger.trace("Shutting down, cancelling recovery task") + task.cancel() + } + } + } + + self.box.withLockedValue { $0.recoveryTask = recoveryTask } + let recoveryIntervalSeconds = Double(recoveryInterval.nanoseconds) / 1_000_000_000.0 + self.application.logger.info("Started periodic stale job recovery", metadata: [ + "interval": "\(Int(recoveryIntervalSeconds))s", + "queue": .string(queueName.string) + ]) + } else { + self.application.logger.trace("Periodic stale job recovery is disabled", metadata: ["queue": .string(queueName.string)]) + } } - + /// Starts the scheduled jobs in-process public func startScheduledJobs() throws { self.application.logger.trace("Checking for scheduled jobs to begin the worker") @@ -138,7 +202,7 @@ public final class QueuesCommand: AsyncCommand, Sendable { self.schedule($0) } } - + private func schedule(_ job: AnyScheduledJob) { self.box.withLockedValue { box in if box.didShutdown { @@ -153,7 +217,7 @@ public final class QueuesCommand: AsyncCommand, Sendable { logger: self.application.logger, on: self.application.eventLoopGroup.any() ) - + guard let task = job.schedule(context: context) else { return } @@ -174,19 +238,19 @@ public final class QueuesCommand: AsyncCommand, Sendable { } } } - + /// Shuts down the jobs worker public func shutdown() { self.box.withLockedValue { box in box.didShutdown = true - + // stop running in case shutting down from signal self.application.running?.stop() - + // clear signal sources box.signalSources.forEach { $0.cancel() } // clear refs box.signalSources = [] - + // stop all job queue workers box.jobTasks.forEach { $0.syncCancel(on: self.application.eventLoopGroup.any()) @@ -195,24 +259,28 @@ public final class QueuesCommand: AsyncCommand, Sendable { box.scheduledTasks.values.forEach { $0.task.syncCancel(on: self.application.eventLoopGroup.any()) } + // stop periodic recovery task + if let recoveryTask = box.recoveryTask { + recoveryTask.syncCancel(on: self.application.eventLoopGroup.any()) + } } } - + public func asyncShutdown() async { - let (jobTasks, scheduledTasks) = self.box.withLockedValue { box in + let (jobTasks, scheduledTasks, recoveryTask) = self.box.withLockedValue { box in box.didShutdown = true - + // stop running in case shutting down from signal self.application.running?.stop() - + // clear signal sources box.signalSources.forEach { $0.cancel() } // clear refs box.signalSources = [] - + // Release the lock before we start any suspensions - return (box.jobTasks, box.scheduledTasks) + return (box.jobTasks, box.scheduledTasks, box.recoveryTask) } - + // stop all job queue workers for jobTask in jobTasks { await jobTask.asyncCancel(on: self.application.eventLoopGroup.any()) @@ -221,8 +289,12 @@ public final class QueuesCommand: AsyncCommand, Sendable { for scheduledTask in scheduledTasks.values { await scheduledTask.task.asyncCancel(on: self.application.eventLoopGroup.any()) } + // stop periodic recovery task + if let recoveryTask = recoveryTask { + await recoveryTask.asyncCancel(on: self.application.eventLoopGroup.any()) + } } - + deinit { assert(self.box.withLockedValue { $0.didShutdown }, "JobsCommand did not shutdown before deinit") } diff --git a/Sources/Queues/QueuesConfiguration.swift b/Sources/Queues/QueuesConfiguration.swift index f9a995a..23fdd84 100644 --- a/Sources/Queues/QueuesConfiguration.swift +++ b/Sources/Queues/QueuesConfiguration.swift @@ -9,15 +9,18 @@ public struct QueuesConfiguration: Sendable { var refreshInterval: TimeAmount = .seconds(1) var persistenceKey: String = "vapor_queues" var workerCount: WorkerCount = .default + var staleJobTimeout: TimeAmount = .seconds(300) // 5 minutes default (like Sidekiq) + var staleJobRecoveryInterval: TimeAmount = .seconds(15) // 15 seconds default (like Sidekiq Beat) + var enableStaleJobRecovery: Bool = true // Enable recovery by default (can be disabled) var userInfo: [AnySendableHashable: any Sendable] = [:] - + var jobs: [String: any AnyJob] = [:] var scheduledJobs: [AnyScheduledJob] = [] var notificationHooks: [any JobEventDelegate] = [] } - + private let dataBox: NIOLockedValueBox = .init(.init()) - + /// The number of seconds to wait before checking for the next job. Defaults to `1` public var refreshInterval: TimeAmount { get { self.dataBox.withLockedValue { $0.refreshInterval } } @@ -49,44 +52,72 @@ public struct QueuesConfiguration: Sendable { get { self.dataBox.withLockedValue { $0.workerCount } } set { self.dataBox.withLockedValue { $0.workerCount = newValue } } } - + + /// The timeout for considering a job in the processing queue as stale and eligible for recovery. + /// Defaults to 5 minutes (300 seconds). Jobs older than this timeout will be requeued on worker startup. + public var staleJobTimeout: TimeAmount { + get { self.dataBox.withLockedValue { $0.staleJobTimeout } } + set { self.dataBox.withLockedValue { $0.staleJobTimeout = newValue } } + } + + /// The interval at which stale jobs are checked and recovered. Defaults to 15 seconds (like Sidekiq Beat). + /// This periodic check ensures that stale jobs are recovered even if no workers restart. + public var staleJobRecoveryInterval: TimeAmount { + get { self.dataBox.withLockedValue { $0.staleJobRecoveryInterval } } + set { self.dataBox.withLockedValue { $0.staleJobRecoveryInterval = newValue } } + } + + /// Enable or disable stale job recovery. Defaults to `true`. + /// When disabled, stale jobs will not be recovered on startup or periodically. + /// This can be useful for simple use cases where recovery overhead is not needed. + public var enableStaleJobRecovery: Bool { + get { self.dataBox.withLockedValue { $0.enableStaleJobRecovery } } + set { self.dataBox.withLockedValue { $0.enableStaleJobRecovery = newValue } } + } + /// A logger public let logger: Logger - + // Arbitrary user info to be stored public var userInfo: [AnySendableHashable: any Sendable] { get { self.dataBox.withLockedValue { $0.userInfo } } set { self.dataBox.withLockedValue { $0.userInfo = newValue } } } - + var jobs: [String: any AnyJob] { get { self.dataBox.withLockedValue { $0.jobs } } set { self.dataBox.withLockedValue { $0.jobs = newValue } } } - + var scheduledJobs: [AnyScheduledJob] { get { self.dataBox.withLockedValue { $0.scheduledJobs } } set { self.dataBox.withLockedValue { $0.scheduledJobs = newValue } } } - + var notificationHooks: [any JobEventDelegate] { get { self.dataBox.withLockedValue { $0.notificationHooks } } set { self.dataBox.withLockedValue { $0.notificationHooks = newValue } } } - + /// Creates an empty ``QueuesConfiguration``. public init( refreshInterval: TimeAmount = .seconds(1), persistenceKey: String = "vapor_queues", workerCount: WorkerCount = .default, + staleJobTimeout: TimeAmount = .seconds(300), // 5 minutes default + staleJobRecoveryInterval: TimeAmount = .seconds(15), // 15 seconds default (like Sidekiq Beat) + enableStaleJobRecovery: Bool = true, // Enabled by default logger: Logger = .init(label: "codes.vapor.queues") ) { self.logger = logger self.refreshInterval = refreshInterval self.persistenceKey = persistenceKey self.workerCount = workerCount + self.staleJobTimeout = staleJobTimeout + self.staleJobRecoveryInterval = staleJobRecoveryInterval + self.enableStaleJobRecovery = enableStaleJobRecovery } - + /// Adds a new ``Job`` to the queue configuration. /// /// This must be called on all ``Job`` objects before they can be run in a queue. @@ -99,7 +130,7 @@ public struct QueuesConfiguration: Sendable { } self.jobs[J.name] = job } - + /// Schedules a new job for execution at a later date. /// /// config.schedule(Cleanup()) diff --git a/Sources/XCTQueues/TestQueueDriver.swift b/Sources/XCTQueues/TestQueueDriver.swift index de6f335..ad1c41d 100644 --- a/Sources/XCTQueues/TestQueueDriver.swift +++ b/Sources/XCTQueues/TestQueueDriver.swift @@ -20,7 +20,6 @@ extension Application.Queues.Provider { } struct TestQueuesDriver: QueuesDriver { - init() {} func makeQueue(with context: QueueContext) -> any Queue { TestQueue(_context: .init(context)) @@ -32,7 +31,6 @@ struct TestQueuesDriver: QueuesDriver { } struct AsyncTestQueuesDriver: QueuesDriver { - init() {} func makeQueue(with context: QueueContext) -> any Queue { AsyncTestQueue(_context: .init(context)) } func shutdown() {} } @@ -144,6 +142,11 @@ struct TestQueue: Queue { return context.eventLoop.makeSucceededVoidFuture() } } + + // Test queue doesn't support recovery - uses default implementation + func recoverStaleJobs() -> EventLoopFuture { + return self.eventLoop.makeSucceededFuture(0) + } } struct AsyncTestQueue: AsyncQueue { @@ -155,4 +158,9 @@ struct AsyncTestQueue: AsyncQueue { func clear(_ id: JobIdentifier) async throws { self._context.withLockedValue { $0.application.queues.asyncTest.jobs[id] = nil } } func pop() async throws -> JobIdentifier? { self._context.withLockedValue { $0.application.queues.asyncTest.queue.popLast() } } func push(_ id: JobIdentifier) async throws { self._context.withLockedValue { $0.application.queues.asyncTest.queue.append(id) } } + + // Test queue doesn't support recovery - uses default implementation + func recoverStaleJobs() -> EventLoopFuture { + return self.eventLoop.makeSucceededFuture(0) + } }