Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions Sources/Queues/Queue.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

import Foundation
import Logging
import Metrics
Expand Down Expand Up @@ -29,6 +28,14 @@ public protocol Queue: Sendable {
/// Pushes the next job into a queue
/// - Parameter id: The ID of the job
func push(_ id: JobIdentifier) -> EventLoopFuture<Void>

/// Recovers stale jobs from the processing queue. This is called on worker startup.
/// Jobs older than `staleJobTimeout` will be requeued.
///
/// Default implementation returns 0 (no recovery support).
/// Drivers that support recovery should implement this method.
/// - Returns: The number of stale jobs recovered
func recoverStaleJobs() -> EventLoopFuture<Int>
}

extension Queue {
Expand Down Expand Up @@ -98,7 +105,7 @@ extension Queue {
}.flatMapWithEventLoop { _, eventLoop in
Counter(label: "dispatched.jobs.counter", dimensions: [
("queueName", self.queueName.string),
("jobName", J.name),
("jobName", J.name)
]).increment()
self.logger.info("Dispatched queue job")
return self.sendNotification(of: "dispatch", logger: logger) {
Expand Down Expand Up @@ -135,4 +142,12 @@ extension Queue {
}
}
}

/// Default implementation of ``Queue/recoverStaleJobs()``.
/// Drivers that support recovery should provide their own implementation.
/// - Returns: The number of stale jobs recovered (default: 0)
func recoverStaleJobs() -> EventLoopFuture<Int> {
// Default implementation: no recovery support
return self.eventLoop.makeSucceededFuture(0)
}
}
132 changes: 102 additions & 30 deletions Sources/Queues/QueuesCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,56 +9,57 @@ import Atomics
public final class QueuesCommand: AsyncCommand, Sendable {
// See `Command.signature`.
public let signature = Signature()

// See `Command.Signature`.
public struct Signature: CommandSignature {
public init() {}

@Option(name: "queue", help: "Specifies a single queue to run")
var queue: String?

@Flag(name: "scheduled", help: "Runs the scheduled queue jobs")
var scheduled: Bool
}

// See `Command.help`.
public var help: String { "Starts the Vapor Queues worker" }

private let application: Application

private let box: NIOLockedValueBox<Box>

struct Box: Sendable {
var jobTasks: [RepeatedTask]
var scheduledTasks: [String: AnyScheduledJob.Task]
var recoveryTask: RepeatedTask? // Periodic stale job recovery task (like Sidekiq Beat)
var signalSources: [any DispatchSourceSignal]
var didShutdown: Bool
}

/// Create a new ``QueuesCommand``.
///
/// - Parameters:
/// - application: The active Vapor `Application`.
/// - scheduled: This parameter is a historical artifact and has no effect.
public init(application: Application, scheduled: Bool = false) {
self.application = application
self.box = .init(.init(jobTasks: [], scheduledTasks: [:], signalSources: [], didShutdown: false))
self.box = .init(.init(jobTasks: [], scheduledTasks: [:], recoveryTask: nil, signalSources: [], didShutdown: false))
}

// See `AsyncCommand.run(using:signature:)`.
public func run(using context: CommandContext, signature: QueuesCommand.Signature) async throws {
// shutdown future
let promise = self.application.eventLoopGroup.any().makePromise(of: Void.self)
self.application.running = .start(using: promise)

// setup signal sources for shutdown
let signalQueue = DispatchQueue(label: "codes.vapor.jobs.command")
func makeSignalSource(_ code: Int32) {
#if canImport(Darwin)
/// https://github.com/swift-server/swift-service-lifecycle/blob/main/Sources/UnixSignals/UnixSignalsSequence.swift#L77-L82
signal(code, SIG_IGN)
#endif

let source = DispatchSource.makeSignalSource(signal: code, queue: signalQueue)
source.setEventHandler {
print() // clear ^C
Expand All @@ -69,22 +70,48 @@ public final class QueuesCommand: AsyncCommand, Sendable {
}
makeSignalSource(SIGTERM)
makeSignalSource(SIGINT)

if signature.scheduled {
self.application.logger.info("Starting scheduled jobs worker")
try self.startScheduledJobs()
} else {
let queue: QueueName = signature.queue.map { .init(string: $0) } ?? .default

self.application.logger.info("Starting jobs worker", metadata: ["queue": .string(queue.string)])
try self.startJobs(on: queue)
}
}

/// Starts an in-process jobs worker for queued tasks
///
/// - Parameter queueName: The queue to run the jobs on
public func startJobs(on queueName: QueueName) throws {
let queue = self.application.queues.queue(queueName, on: self.application.eventLoopGroup.any())

// Recover stale jobs on startup (if recovery is enabled)
if self.application.queues.configuration.enableStaleJobRecovery {
let recoveryFuture = queue.recoverStaleJobs()

// Log recovery result, but don't block - start workers regardless
recoveryFuture.whenComplete { result in
switch result {
case .success(let count):
if count > 0 {
self.application.logger.info("Recovered stale jobs", metadata: ["count": "\(count)", "queue": .string(queueName.string)])
} else {
self.application.logger.trace("No stale jobs to recover", metadata: ["queue": .string(queueName.string)])
}
case .failure(let error):
self.application.logger.error("Failed to recover stale jobs", metadata: [
"queue": .string(queueName.string),
"error": "\(String(reflecting: error))"
])
}
}
} else {
self.application.logger.trace("Stale job recovery is disabled", metadata: ["queue": .string(queueName.string)])
}

let workerCount: Int
switch self.application.queues.configuration.workerCount {
case .default:
Expand Down Expand Up @@ -121,8 +148,45 @@ public final class QueuesCommand: AsyncCommand, Sendable {

self.box.withLockedValue { $0.jobTasks = tasks }
self.application.logger.trace("Finished adding jobTasks, total count: \(tasks.count)")

// Schedule periodic stale job recovery (like Sidekiq Beat - runs every 15 seconds by default)
// Only if recovery is enabled
if self.application.queues.configuration.enableStaleJobRecovery {
let recoveryInterval = self.application.queues.configuration.staleJobRecoveryInterval
let recoveryQueue = self.application.queues.queue(queueName, on: self.application.eventLoopGroup.any())

let recoveryTask = self.application.eventLoopGroup.any().scheduleRepeatedAsyncTask(
initialDelay: recoveryInterval, // Wait before first check (after startup recovery)
delay: recoveryInterval
) { task in
recoveryQueue.logger.trace("Running periodic stale job recovery check")

return recoveryQueue.recoverStaleJobs().map { count in
if count > 0 {
recoveryQueue.logger.info("Periodic recovery: recovered stale jobs", metadata: ["count": "\(count)"])
}
}.recover { error in
recoveryQueue.logger.error("Periodic recovery failed", metadata: ["error": "\(String(reflecting: error))"])
}.map {
// Check if shutdown was requested
if self.box.withLockedValue({ $0.didShutdown }) {
recoveryQueue.logger.trace("Shutting down, cancelling recovery task")
task.cancel()
}
}
}

self.box.withLockedValue { $0.recoveryTask = recoveryTask }
let recoveryIntervalSeconds = Double(recoveryInterval.nanoseconds) / 1_000_000_000.0
self.application.logger.info("Started periodic stale job recovery", metadata: [
"interval": "\(Int(recoveryIntervalSeconds))s",
"queue": .string(queueName.string)
])
} else {
self.application.logger.trace("Periodic stale job recovery is disabled", metadata: ["queue": .string(queueName.string)])
}
}

/// Starts the scheduled jobs in-process
public func startScheduledJobs() throws {
self.application.logger.trace("Checking for scheduled jobs to begin the worker")
Expand All @@ -138,7 +202,7 @@ public final class QueuesCommand: AsyncCommand, Sendable {
self.schedule($0)
}
}

private func schedule(_ job: AnyScheduledJob) {
self.box.withLockedValue { box in
if box.didShutdown {
Expand All @@ -153,7 +217,7 @@ public final class QueuesCommand: AsyncCommand, Sendable {
logger: self.application.logger,
on: self.application.eventLoopGroup.any()
)

guard let task = job.schedule(context: context) else {
return
}
Expand All @@ -174,19 +238,19 @@ public final class QueuesCommand: AsyncCommand, Sendable {
}
}
}

/// Shuts down the jobs worker
public func shutdown() {
self.box.withLockedValue { box in
box.didShutdown = true

// stop running in case shutting down from signal
self.application.running?.stop()

// clear signal sources
box.signalSources.forEach { $0.cancel() } // clear refs
box.signalSources = []

// stop all job queue workers
box.jobTasks.forEach {
$0.syncCancel(on: self.application.eventLoopGroup.any())
Expand All @@ -195,24 +259,28 @@ public final class QueuesCommand: AsyncCommand, Sendable {
box.scheduledTasks.values.forEach {
$0.task.syncCancel(on: self.application.eventLoopGroup.any())
}
// stop periodic recovery task
if let recoveryTask = box.recoveryTask {
recoveryTask.syncCancel(on: self.application.eventLoopGroup.any())
}
}
}

public func asyncShutdown() async {
let (jobTasks, scheduledTasks) = self.box.withLockedValue { box in
let (jobTasks, scheduledTasks, recoveryTask) = self.box.withLockedValue { box in
box.didShutdown = true

// stop running in case shutting down from signal
self.application.running?.stop()

// clear signal sources
box.signalSources.forEach { $0.cancel() } // clear refs
box.signalSources = []

// Release the lock before we start any suspensions
return (box.jobTasks, box.scheduledTasks)
return (box.jobTasks, box.scheduledTasks, box.recoveryTask)
}

// stop all job queue workers
for jobTask in jobTasks {
await jobTask.asyncCancel(on: self.application.eventLoopGroup.any())
Expand All @@ -221,8 +289,12 @@ public final class QueuesCommand: AsyncCommand, Sendable {
for scheduledTask in scheduledTasks.values {
await scheduledTask.task.asyncCancel(on: self.application.eventLoopGroup.any())
}
// stop periodic recovery task
if let recoveryTask = recoveryTask {
await recoveryTask.asyncCancel(on: self.application.eventLoopGroup.any())
}
}

deinit {
assert(self.box.withLockedValue { $0.didShutdown }, "JobsCommand did not shutdown before deinit")
}
Expand Down
Loading