From f10020b83b1d27a85d1fbc168400a67b787cbc93 Mon Sep 17 00:00:00 2001 From: Zina Rasoamanana Date: Fri, 6 Dec 2024 11:32:24 +0100 Subject: [PATCH 1/7] [IMP] queue_job: remove cron garbage collector and automatically requeue jobs in timeout [IMP] queue_job: increment 'retry' when re-queuing job that have been killed --- queue_job/README.rst | 205 +++++++++--------- queue_job/controllers/main.py | 2 + queue_job/data/queue_data.xml | 8 - queue_job/job.py | 56 +++++ queue_job/jobrunner/runner.py | 141 ++++++++---- .../migrations/18.0.1.3.0/pre-migration.py | 22 ++ queue_job/models/__init__.py | 1 + queue_job/models/queue_job.py | 50 ----- queue_job/models/queue_job_locks.py | 24 ++ queue_job/readme/CONFIGURE.md | 3 + queue_job/static/description/index.html | 74 +++---- queue_job/tests/test_requeue_dead_job.py | 133 ++++++++++++ 12 files changed, 480 insertions(+), 239 deletions(-) create mode 100644 queue_job/migrations/18.0.1.3.0/pre-migration.py create mode 100644 queue_job/models/queue_job_locks.py create mode 100644 queue_job/tests/test_requeue_dead_job.py diff --git a/queue_job/README.rst b/queue_job/README.rst index 1a03778d45..a0d7bc3153 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -1,7 +1,3 @@ -.. image:: https://odoo-community.org/readme-banner-image - :target: https://odoo-community.org/get-involved?utm_source=readme - :alt: Odoo Community Association - ========= Job Queue ========= @@ -17,7 +13,7 @@ Job Queue .. |badge1| image:: https://img.shields.io/badge/maturity-Mature-brightgreen.png :target: https://odoo-community.org/page/development-status :alt: Mature -.. |badge2| image:: https://img.shields.io/badge/license-LGPL--3-blue.png +.. |badge2| image:: https://img.shields.io/badge/licence-LGPL--3-blue.png :target: http://www.gnu.org/licenses/lgpl-3.0-standalone.html :alt: License: LGPL-3 .. |badge3| image:: https://img.shields.io/badge/github-OCA%2Fqueue-lightgray.png?logo=github @@ -65,20 +61,20 @@ instantaneous if no other job is running. Features: -- Views for jobs, jobs are stored in PostgreSQL -- Jobrunner: execute the jobs, highly efficient thanks to PostgreSQL's - NOTIFY -- Channels: give a capacity for the root channel and its sub-channels - and segregate jobs in them. Allow for instance to restrict heavy jobs - to be executed one at a time while little ones are executed 4 at a - times. -- Retries: Ability to retry jobs by raising a type of exception -- Retry Pattern: the 3 first tries, retry after 10 seconds, the 5 next - tries, retry after 1 minutes, ... -- Job properties: priorities, estimated time of arrival (ETA), custom - description, number of retries -- Related Actions: link an action on the job view, such as open the - record concerned by the job +- Views for jobs, jobs are stored in PostgreSQL +- Jobrunner: execute the jobs, highly efficient thanks to PostgreSQL's + NOTIFY +- Channels: give a capacity for the root channel and its sub-channels + and segregate jobs in them. Allow for instance to restrict heavy jobs + to be executed one at a time while little ones are executed 4 at a + times. +- Retries: Ability to retry jobs by raising a type of exception +- Retry Pattern: the 3 first tries, retry after 10 seconds, the 5 next + tries, retry after 1 minutes, ... +- Job properties: priorities, estimated time of arrival (ETA), custom + description, number of retries +- Related Actions: link an action on the job view, such as open the + record concerned by the job **Table of contents** @@ -93,18 +89,18 @@ Be sure to have the ``requests`` library. Configuration ============= -- Using environment variables and command line: +- Using environment variables and command line: - - Adjust environment variables (optional): + - Adjust environment variables (optional): - - ``ODOO_QUEUE_JOB_CHANNELS=root:4`` or any other channels - configuration. The default is ``root:1`` - - if ``xmlrpc_port`` is not set: ``ODOO_QUEUE_JOB_PORT=8069`` + - ``ODOO_QUEUE_JOB_CHANNELS=root:4`` or any other channels + configuration. The default is ``root:1`` + - if ``xmlrpc_port`` is not set: ``ODOO_QUEUE_JOB_PORT=8069`` - - Start Odoo with ``--load=web,queue_job`` and ``--workers`` greater - than 1. [1]_ + - Start Odoo with ``--load=web,queue_job`` and ``--workers`` greater + than 1. [1]_ -- Using the Odoo configuration file: +- Using the Odoo configuration file: .. code:: ini @@ -117,8 +113,8 @@ Configuration [queue_job] channels = root:2 -- Confirm the runner is starting correctly by checking the odoo log - file: +- Confirm the runner is starting correctly by checking the odoo log + file: :: @@ -127,10 +123,14 @@ Configuration ...INFO...queue_job.jobrunner.runner: queue job runner ready for db ...INFO...queue_job.jobrunner.runner: database connections ready -- Create jobs (eg using ``base_import_async``) and observe they start - immediately and in parallel. -- Tip: to enable debug logging for the queue job, use - ``--log-handler=odoo.addons.queue_job:DEBUG`` +- Create jobs (eg using ``base_import_async``) and observe they start + immediately and in parallel. +- Tip: to enable debug logging for the queue job, use + ``--log-handler=odoo.addons.queue_job:DEBUG`` + +- Jobs that remain in ``enqueued`` or ``started`` state (because, for + instance, their worker has been killed) will be automatically + re-queued. .. [1] It works with the threaded Odoo server too, although this way of @@ -287,20 +287,20 @@ only start when the previous one is done: Enqueing Job Options ~~~~~~~~~~~~~~~~~~~~ -- priority: default is 10, the closest it is to 0, the faster it will be - executed -- eta: Estimated Time of Arrival of the job. It will not be executed - before this date/time -- max_retries: default is 5, maximum number of retries before giving up - and set the job state to 'failed'. A value of 0 means infinite - retries. -- description: human description of the job. If not set, description is - computed from the function doc or method name -- channel: the complete name of the channel to use to process the - function. If specified it overrides the one defined on the function -- identity_key: key uniquely identifying the job, if specified and a job - with the same key has not yet been run, the new job will not be - created +- priority: default is 10, the closest it is to 0, the faster it will + be executed +- eta: Estimated Time of Arrival of the job. It will not be executed + before this date/time +- max_retries: default is 5, maximum number of retries before giving up + and set the job state to 'failed'. A value of 0 means infinite + retries. +- description: human description of the job. If not set, description is + computed from the function doc or method name +- channel: the complete name of the channel to use to process the + function. If specified it overrides the one defined on the function +- identity_key: key uniquely identifying the job, if specified and a + job with the same key has not yet been run, the new job will not be + created Configure default options for jobs ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -371,11 +371,11 @@ dictionary on the job function: "kwargs": {"name": "Partner"}, } -- ``enable``: when ``False``, the button has no effect (default: - ``True``) -- ``func_name``: name of the method on ``queue.job`` that returns an - action -- ``kwargs``: extra arguments to pass to the related action method +- ``enable``: when ``False``, the button has no effect (default: + ``True``) +- ``func_name``: name of the method on ``queue.job`` that returns an + action +- ``kwargs``: extra arguments to pass to the related action method Example of related action code: @@ -419,10 +419,10 @@ integers: Based on this configuration, we can tell that: -- 5 first retries are postponed 10 seconds later -- retries 5 to 10 postponed 20 seconds later -- retries 10 to 15 postponed 30 seconds later -- all subsequent retries postponed 5 minutes later +- 5 first retries are postponed 10 seconds later +- retries 5 to 10 postponed 20 seconds later +- retries 10 to 15 postponed 30 seconds later +- all subsequent retries postponed 5 minutes later **Job Context** @@ -469,11 +469,11 @@ Testing The recommended way to test jobs, rather than running them directly and synchronously is to split the tests in two parts: - - one test where the job is mocked (trap jobs with ``trap_jobs()`` - and the test only verifies that the job has been delayed with the - expected arguments - - one test that only calls the method of the job synchronously, to - validate the proper behavior of this method only + - one test where the job is mocked (trap jobs with ``trap_jobs()`` + and the test only verifies that the job has been delayed with the + expected arguments + - one test that only calls the method of the job synchronously, to + validate the proper behavior of this method only Proceeding this way means that you can prove that jobs will be enqueued properly at runtime, and it ensures your code does not have a different @@ -597,14 +597,14 @@ synchronously Tips and tricks ~~~~~~~~~~~~~~~ -- **Idempotency** - (https://www.restapitutorial.com/lessons/idempotency.html): The - queue_job should be idempotent so they can be retried several times - without impact on the data. -- **The job should test at the very beginning its relevance**: the - moment the job will be executed is unknown by design. So the first - task of a job should be to check if the related work is still relevant - at the moment of the execution. +- **Idempotency** + (https://www.restapitutorial.com/lessons/idempotency.html): The + queue_job should be idempotent so they can be retried several times + without impact on the data. +- **The job should test at the very beginning its relevance**: the + moment the job will be executed is unknown by design. So the first + task of a job should be to check if the related work is still + relevant at the moment of the execution. Patterns ~~~~~~~~ @@ -621,19 +621,20 @@ Through the time, two main patterns emerged: Known issues / Roadmap ====================== -- After creating a new database or installing ``queue_job`` on an - existing database, Odoo must be restarted for the runner to detect it. -- When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - ``started`` or ``enqueued`` state after the Odoo server is halted. - Since the runner has no way to know if they are actually running or - not, and does not know for sure if it is safe to restart the jobs, it - does not attempt to restart them automatically. Such stale jobs - therefore fill the running queue and prevent other jobs to start. You - must therefore requeue them manually, either from the Jobs view, or by - running the following SQL statement *before starting Odoo*: +- After creating a new database or installing ``queue_job`` on an + existing database, Odoo must be restarted for the runner to detect + it. +- When Odoo shuts down normally, it waits for running jobs to finish. + However, when the Odoo server crashes or is otherwise force-stopped, + running jobs are interrupted while the runner has no chance to know + they have been aborted. In such situations, jobs may remain in + ``started`` or ``enqueued`` state after the Odoo server is halted. + Since the runner has no way to know if they are actually running or + not, and does not know for sure if it is safe to restart the jobs, it + does not attempt to restart them automatically. Such stale jobs + therefore fill the running queue and prevent other jobs to start. You + must therefore requeue them manually, either from the Jobs view, or + by running the following SQL statement *before starting Odoo*: .. code:: sql @@ -645,11 +646,11 @@ Changelog Next ---- -- [ADD] Run jobrunner as a worker process instead of a thread in the - main process (when running with --workers > 0) -- [REF] ``@job`` and ``@related_action`` deprecated, any method can be - delayed, and configured using ``queue.job.function`` records -- [MIGRATION] from 13.0 branched at rev. e24ff4b +- [ADD] Run jobrunner as a worker process instead of a thread in the + main process (when running with --workers > 0) +- [REF] ``@job`` and ``@related_action`` deprecated, any method can be + delayed, and configured using ``queue.job.function`` records +- [MIGRATION] from 13.0 branched at rev. e24ff4b Bug Tracker =========== @@ -673,21 +674,21 @@ Authors Contributors ------------ -- Guewen Baconnier -- Stéphane Bidoul -- Matthieu Dietrich -- Jos De Graeve -- David Lefever -- Laurent Mignon -- Laetitia Gangloff -- Cédric Pigeon -- Tatiana Deribina -- Souheil Bejaoui -- Eric Antones -- Simone Orsi -- Nguyen Minh Chien -- Tran Quoc Duong -- Vo Hong Thien +- Guewen Baconnier +- Stéphane Bidoul +- Matthieu Dietrich +- Jos De Graeve +- David Lefever +- Laurent Mignon +- Laetitia Gangloff +- Cédric Pigeon +- Tatiana Deribina +- Souheil Bejaoui +- Eric Antones +- Simone Orsi +- Nguyen Minh Chien +- Tran Quoc Duong +- Vo Hong Thien Other credits ------------- diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index ee60ca5ca0..21f045a7d2 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -32,6 +32,8 @@ def _try_perform_job(self, env, job): job.set_started() job.store() env.cr.commit() + job.lock() + _logger.debug("%s started", job) job.perform() diff --git a/queue_job/data/queue_data.xml b/queue_job/data/queue_data.xml index 216b6cf016..1fb70f106c 100644 --- a/queue_job/data/queue_data.xml +++ b/queue_job/data/queue_data.xml @@ -1,14 +1,6 @@ - - Jobs Garbage Collector - 5 - minutes - - code - model.requeue_stuck_jobs() - Job failed diff --git a/queue_job/job.py b/queue_job/job.py index e472d70db9..fa789091d7 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -221,6 +221,61 @@ def load_many(cls, env, job_uuids): recordset = cls.db_records_from_uuids(env, job_uuids) return {cls._load_from_db_record(record) for record in recordset} + def add_lock_record(self): + """ + Create row in db to be locked once the job is performed + """ + self.env.cr.execute( + """ + INSERT INTO + queue_job_locks (id) + SELECT + id + FROM + queue_job + WHERE + uuid = %s + ON CONFLICT(id) + DO NOTHING; + """, + [self.uuid], + ) + + def lock(self): + """ + Lock row of job that is being performed + + If a job cannot be locked, + it means that the job wasn't started, + a RetryableJobError is thrown. + """ + self.env.cr.execute( + """ + SELECT + * + FROM + queue_job_locks + WHERE + id in ( + SELECT + id + FROM + queue_job + WHERE + uuid = %s + AND state='started' + ) + FOR UPDATE; + """, + [self.uuid], + ) + + # 1 job should be locked + if 1 != len(self.env.cr.fetchall()): + raise RetryableJobError( + f"Trying to lock job that wasn't started, uuid: {self.uuid}" + ) + @classmethod def _load_from_db_record(cls, job_db_record): stored = job_db_record @@ -735,6 +790,7 @@ def set_started(self): self.state = STARTED self.date_started = datetime.now() self.worker_pid = os.getpid() + self.add_lock_record() def set_done(self, result=None): self.state = DONE diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 04b163bd4a..d148b05447 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -114,22 +114,6 @@ * After creating a new database or installing queue_job on an existing database, Odoo must be restarted for the runner to detect it. -* When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - ``started`` or ``enqueued`` state after the Odoo server is halted. - Since the runner has no way to know if they are actually running or - not, and does not know for sure if it is safe to restart the jobs, - it does not attempt to restart them automatically. Such stale jobs - therefore fill the running queue and prevent other jobs to start. - You must therefore requeue them manually, either from the Jobs view, - or by running the following SQL statement *before starting Odoo*: - -.. code-block:: sql - - update queue_job set state='pending' where state in ('started', 'enqueued') - .. rubric:: Footnotes .. [1] From a security standpoint, it is safe to have an anonymous HTTP @@ -155,7 +139,7 @@ from odoo.tools import config from . import queue_job_config -from .channels import ENQUEUED, NOT_DONE, PENDING, ChannelManager +from .channels import ENQUEUED, NOT_DONE, ChannelManager SELECT_TIMEOUT = 60 ERROR_RECOVERY_DELAY = 5 @@ -212,33 +196,12 @@ def _connection_info_for(db_name): def _async_http_get(scheme, host, port, user, password, db_name, job_uuid): - # Method to set failed job (due to timeout, etc) as pending, - # to avoid keeping it as enqueued. - def set_job_pending(): - connection_info = _connection_info_for(db_name) - conn = psycopg2.connect(**connection_info) - conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) - with closing(conn.cursor()) as cr: - cr.execute( - "UPDATE queue_job SET state=%s, " - "date_enqueued=NULL, date_started=NULL " - "WHERE uuid=%s and state=%s " - "RETURNING uuid", - (PENDING, job_uuid, ENQUEUED), - ) - if cr.fetchone(): - _logger.warning( - "state of job %s was reset from %s to %s", - job_uuid, - ENQUEUED, - PENDING, - ) - # TODO: better way to HTTP GET asynchronously (grequest, ...)? # if this was python3 I would be doing this with # asyncio, aiohttp and aiopg def urlopen(): url = f"{scheme}://{host}:{port}/queue_job/runjob?db={db_name}&job_uuid={job_uuid}" + # pylint: disable=except-pass try: auth = None if user: @@ -252,10 +215,10 @@ def urlopen(): # for codes between 500 and 600 response.raise_for_status() except requests.Timeout: - set_job_pending() + # A timeout is a normal behaviour, it shouldn't be logged as an exception + pass except Exception: _logger.exception("exception in GET %s", url) - set_job_pending() thread = threading.Thread(target=urlopen) thread.daemon = True @@ -359,6 +322,96 @@ def set_job_enqueued(self, uuid): (ENQUEUED, uuid), ) + def _query_requeue_dead_jobs(self): + return """ + UPDATE + queue_job + SET + state=( + CASE + WHEN + max_retries IS NOT NULL AND + retry IS NOT NULL AND + retry>max_retries + THEN 'failed' + ELSE 'pending' + END), + retry=( + CASE + WHEN state='started' + THEN COALESCE(retry,0)+1 ELSE retry + END), + exc_name=( + CASE + WHEN + max_retries IS NOT NULL AND + retry IS NOT NULL AND + retry>max_retries + THEN 'JobFoundDead' + ELSE exc_name + END), + exc_info=( + CASE + WHEN + max_retries IS NOT NULL AND + retry IS NOT NULL AND + retry>max_retries + THEN 'Job found dead after too many retries' + ELSE exc_info + END) + WHERE + id in ( + SELECT + id + FROM + queue_job_locks + WHERE + id in ( + SELECT + id + FROM + queue_job + WHERE + state IN ('enqueued','started') + AND date_enqueued < + (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') + ) + FOR UPDATE SKIP LOCKED + ) + RETURNING uuid + """ + + def requeue_dead_jobs(self): + """ + Set started and enqueued jobs but not locked to pending + + A job is locked when it's being executed + When a job is killed, it releases the lock + + If the number of retries exceeds the number of max retries, + the job is set as 'failed' with the error 'JobFoundDead'. + + Adding a buffer on 'date_enqueued' to check + that it has been enqueued for more than 10sec. + This prevents from requeuing jobs before they are actually started. + + When Odoo shuts down normally, it waits for running jobs to finish. + However, when the Odoo server crashes or is otherwise force-stopped, + running jobs are interrupted while the runner has no chance to know + they have been aborted. + """ + + with closing(self.conn.cursor()) as cr: + query = self._query_requeue_dead_jobs() + + cr.execute(query) + + for (uuid,) in cr.fetchall(): + _logger.warning( + "Re-queued job with uuid: %s", + uuid, + ) + class QueueJobRunner: def __init__( @@ -454,6 +507,11 @@ def initialize_databases(self): else: db.close() + def requeue_dead_jobs(self): + for db in self.db_by_name.values(): + if db.has_queue_job: + db.requeue_dead_jobs() + def run_jobs(self): now = _odoo_now() for job in self.channel_manager.get_jobs_to_run(now): @@ -546,6 +604,7 @@ def run(self): _logger.info("database connections ready") # inner loop does the normal processing while not self._stop: + self.requeue_dead_jobs() self.process_notifications() self.run_jobs() self.wait_notification() diff --git a/queue_job/migrations/18.0.1.3.0/pre-migration.py b/queue_job/migrations/18.0.1.3.0/pre-migration.py new file mode 100644 index 0000000000..8dbb6ff7f1 --- /dev/null +++ b/queue_job/migrations/18.0.1.3.0/pre-migration.py @@ -0,0 +1,22 @@ +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + + +def migrate(cr, version): + # Deactivate cron garbage collector + cr.execute( + """ + UPDATE + ir_cron + SET + active=False + WHERE id IN ( + SELECT res_id + FROM + ir_model_data + WHERE + module='queue_job' + AND model='ir.cron' + AND name='ir_cron_queue_job_garbage_collector' + ); + """ + ) diff --git a/queue_job/models/__init__.py b/queue_job/models/__init__.py index 4744e7ab46..9048fd3959 100644 --- a/queue_job/models/__init__.py +++ b/queue_job/models/__init__.py @@ -3,3 +3,4 @@ from . import queue_job from . import queue_job_channel from . import queue_job_function +from . import queue_job_locks diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 4be3820ad6..411ae43af5 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -6,7 +6,6 @@ from datetime import datetime, timedelta from odoo import _, api, exceptions, fields, models -from odoo.osv import expression from odoo.tools import config, html_escape, index_exists from odoo.addons.base_sparse_field.models.fields import Serialized @@ -410,55 +409,6 @@ def autovacuum(self): break return True - def requeue_stuck_jobs(self, enqueued_delta=5, started_delta=0): - """Fix jobs that are in a bad states - - :param in_queue_delta: lookup time in minutes for jobs - that are in enqueued state - - :param started_delta: lookup time in minutes for jobs - that are in enqueued state, - 0 means that it is not checked - """ - self._get_stuck_jobs_to_requeue( - enqueued_delta=enqueued_delta, started_delta=started_delta - ).requeue() - return True - - def _get_stuck_jobs_domain(self, queue_dl, started_dl): - domain = [] - now = fields.datetime.now() - if queue_dl: - queue_dl = now - timedelta(minutes=queue_dl) - domain.append( - [ - "&", - ("date_enqueued", "<=", fields.Datetime.to_string(queue_dl)), - ("state", "=", "enqueued"), - ] - ) - if started_dl: - started_dl = now - timedelta(minutes=started_dl) - domain.append( - [ - "&", - ("date_started", "<=", fields.Datetime.to_string(started_dl)), - ("state", "=", "started"), - ] - ) - if not domain: - raise exceptions.ValidationError( - _("If both parameters are 0, ALL jobs will be requeued!") - ) - return expression.OR(domain) - - def _get_stuck_jobs_to_requeue(self, enqueued_delta, started_delta): - job_model = self.env["queue.job"] - stuck_jobs = job_model.search( - self._get_stuck_jobs_domain(enqueued_delta, started_delta) - ) - return stuck_jobs - def related_action_open_record(self): """Open a form view with the record(s) of the job. diff --git a/queue_job/models/queue_job_locks.py b/queue_job/models/queue_job_locks.py new file mode 100644 index 0000000000..d2c3d73437 --- /dev/null +++ b/queue_job/models/queue_job_locks.py @@ -0,0 +1,24 @@ +# Copyright 2025 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from odoo import models + + +class QueueJobLocks(models.AbstractModel): + + _name = "queue.job.locks" + _description = "Queue Job Locks" + + def init(self): + # Create job lock table + self.env.cr.execute( + """ + CREATE TABLE IF NOT EXISTS queue_job_locks ( + id INT PRIMARY KEY, + CONSTRAINT + queue_job_locks_queue_job_id_fkey + FOREIGN KEY (id) + REFERENCES queue_job (id) ON DELETE CASCADE + ); + """ + ) diff --git a/queue_job/readme/CONFIGURE.md b/queue_job/readme/CONFIGURE.md index 07b7b84126..216b5358af 100644 --- a/queue_job/readme/CONFIGURE.md +++ b/queue_job/readme/CONFIGURE.md @@ -35,3 +35,6 @@ channels = root:2 [^1]: It works with the threaded Odoo server too, although this way of running Odoo is obviously not for production purposes. + +* Jobs that remain in `enqueued` or `started` state (because, for instance, + their worker has been killed) will be automatically re-queued. diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html index f1196c7d89..caac6fddea 100644 --- a/queue_job/static/description/index.html +++ b/queue_job/static/description/index.html @@ -3,7 +3,7 @@ -README.rst +Job Queue -
+
+

Job Queue

- - -Odoo Community Association - -
-

Job Queue

-

Mature License: LGPL-3 OCA/queue Translate me on Weblate Try me on Runboat

+

Mature License: LGPL-3 OCA/queue Translate me on Weblate Try me on Runboat

This addon adds an integrated Job Queue to Odoo.

It allows to postpone method calls executed asynchronously.

Jobs are executed in the background by a Jobrunner, in their own @@ -450,11 +445,11 @@

Job Queue

-

Installation

+

Installation

Be sure to have the requests library.

-

Configuration

+

Configuration

  • Using environment variables and command line:
    • Adjust environment variables (optional):
        @@ -494,6 +489,9 @@

        Configuration

        immediately and in parallel.
      • Tip: to enable debug logging for the queue job, use --log-handler=odoo.addons.queue_job:DEBUG
      • +
      • Jobs that remain in enqueued or started state (because, for +instance, their worker has been killed) will be automatically +re-queued.
      @@ -504,15 +502,15 @@

      Configuration

-

Usage

+

Usage

To use this module, you need to:

  1. Go to Job Queue menu
-

Developers

+

Developers

-

Delaying jobs

+

Delaying jobs

The fast way to enqueue a job for a method is to use with_delay() on a record or model:

@@ -632,10 +630,10 @@ 

Delaying jobs

-

Enqueing Job Options

+

Enqueing Job Options

    -
  • priority: default is 10, the closest it is to 0, the faster it will be -executed
  • +
  • priority: default is 10, the closest it is to 0, the faster it will +be executed
  • eta: Estimated Time of Arrival of the job. It will not be executed before this date/time
  • max_retries: default is 5, maximum number of retries before giving up @@ -645,13 +643,13 @@

    Enqueing Job Options

    computed from the function doc or method name
  • channel: the complete name of the channel to use to process the function. If specified it overrides the one defined on the function
  • -
  • identity_key: key uniquely identifying the job, if specified and a job -with the same key has not yet been run, the new job will not be +
  • identity_key: key uniquely identifying the job, if specified and a +job with the same key has not yet been run, the new job will not be created
-

Configure default options for jobs

+

Configure default options for jobs

In earlier versions, jobs could be configured using the @job decorator. This is now obsolete, they can be configured using optional queue.job.function and queue.job.channel XML records.

@@ -779,7 +777,7 @@

Configure default options for job delaying any jobs.

-

Testing

+

Testing

Asserting enqueued jobs

The recommended way to test jobs, rather than running them directly and synchronously is to split the tests in two parts:

@@ -894,7 +892,7 @@

Testing

synchronously

-

Tips and tricks

+

Tips and tricks

  • Idempotency (https://www.restapitutorial.com/lessons/idempotency.html): The @@ -902,12 +900,12 @@

    Tips and tricks

    without impact on the data.
  • The job should test at the very beginning its relevance: the moment the job will be executed is unknown by design. So the first -task of a job should be to check if the related work is still relevant -at the moment of the execution.
  • +task of a job should be to check if the related work is still +relevant at the moment of the execution.
-

Patterns

+

Patterns

Through the time, two main patterns emerged:

  1. For data exposed to users, a model should store the data and the @@ -921,10 +919,11 @@

    Patterns

-

Known issues / Roadmap

+

Known issues / Roadmap

  • After creating a new database or installing queue_job on an -existing database, Odoo must be restarted for the runner to detect it.
  • +existing database, Odoo must be restarted for the runner to detect +it.
  • When Odoo shuts down normally, it waits for running jobs to finish. However, when the Odoo server crashes or is otherwise force-stopped, running jobs are interrupted while the runner has no chance to know @@ -934,17 +933,17 @@

    Known issues / Roadmap

    not, and does not know for sure if it is safe to restart the jobs, it does not attempt to restart them automatically. Such stale jobs therefore fill the running queue and prevent other jobs to start. You -must therefore requeue them manually, either from the Jobs view, or by -running the following SQL statement before starting Odoo:
  • +must therefore requeue them manually, either from the Jobs view, or +by running the following SQL statement before starting Odoo:
 update queue_job set state='pending' where state in ('started', 'enqueued')
 
-

Changelog

+

Changelog

-

Bug Tracker

+

Bug Tracker

Bugs are tracked on GitHub Issues. In case of trouble, please check there if your issue has already been reported. If you spotted it first, help us to smash it by providing a detailed and welcomed @@ -963,16 +962,16 @@

Bug Tracker

Do not contact contributors directly about support or help with technical issues.

-

Credits

+

Credits

-

Authors

+

Authors

  • Camptocamp
  • ACSONE SA/NV
-

Contributors

+

Contributors

-

Other credits

+

Other credits

The migration of this module from 17.0 to 18.0 was financially supported by Camptocamp.

-

Maintainers

+

Maintainers

This module is maintained by the OCA.

Odoo Community Association @@ -1012,6 +1011,5 @@

Maintainers

-
diff --git a/queue_job/tests/test_requeue_dead_job.py b/queue_job/tests/test_requeue_dead_job.py new file mode 100644 index 0000000000..3d63dd8780 --- /dev/null +++ b/queue_job/tests/test_requeue_dead_job.py @@ -0,0 +1,133 @@ +# Copyright 2025 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +from contextlib import closing +from datetime import datetime, timedelta + +from odoo.tests.common import TransactionCase + +from odoo.addons.queue_job.job import Job +from odoo.addons.queue_job.jobrunner.runner import Database + + +class TestRequeueDeadJob(TransactionCase): + def create_dummy_job(self, uuid): + """ + Create dummy job for tests + """ + return ( + self.env["queue.job"] + .with_context( + _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, + ) + .create( + { + "uuid": uuid, + "user_id": self.env.user.id, + "state": "pending", + "model_name": "queue.job", + "method_name": "write", + } + ) + ) + + def get_locks(self, uuid, cr=None): + """ + Retrieve lock rows + """ + if cr is None: + cr = self.env.cr + + cr.execute( + """ + SELECT + id + FROM + queue_job_locks + WHERE + id IN ( + SELECT + id + FROM + queue_job + WHERE + uuid = %s + ) + FOR UPDATE SKIP LOCKED + """, + [uuid], + ) + + return cr.fetchall() + + def test_add_lock_record(self): + queue_job = self.create_dummy_job("test_add_lock") + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_started() + self.assertEqual(job_obj.state, "started") + + locks = self.get_locks(job_obj.uuid) + + self.assertEqual(1, len(locks)) + + def test_lock(self): + queue_job = self.create_dummy_job("test_lock") + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_started() + job_obj.store() + + locks = self.get_locks(job_obj.uuid) + + self.assertEqual(1, len(locks)) + + # commit to update queue_job records in DB + self.env.cr.commit() # pylint: disable=E8102 + + job_obj.lock() + + with closing(self.env.registry.cursor()) as new_cr: + locks = self.get_locks(job_obj.uuid, new_cr) + + # Row should be locked + self.assertEqual(0, len(locks)) + + # clean up + queue_job.unlink() + + self.env.cr.commit() # pylint: disable=E8102 + + # because we committed the cursor, the savepoint of the test method is + # gone, and this would break TransactionCase cleanups + self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id) + + def test_requeue_dead_jobs(self): + uuid = "test_requeue_dead_jobs" + + queue_job = self.create_dummy_job(uuid) + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_enqueued() + # simulate enqueuing was in the past + job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) + job_obj.set_started() + + job_obj.store() + self.env.cr.commit() # pylint: disable=E8102 + + # requeue dead jobs using current cursor + query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() + self.env.cr.execute(query) + + uuids_requeued = self.env.cr.fetchall() + + self.assertEqual(len(uuids_requeued), 1) + self.assertEqual(uuids_requeued[0][0], uuid) + + # clean up + queue_job.unlink() + self.env.cr.commit() # pylint: disable=E8102 + + # because we committed the cursor, the savepoint of the test method is + # gone, and this would break TransactionCase cleanups + self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id) From 4e517f7a2f3d1eea3d1fb867294fdedf72fc0bb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sat, 1 Feb 2025 11:08:18 +0100 Subject: [PATCH 2/7] [IMP] queue_job: use queue_job_lock model A model is better than a manually managed table as it will protect the table from deletion by database_cleanup. --- queue_job/job.py | 8 ++++---- queue_job/jobrunner/runner.py | 6 +++--- queue_job/models/__init__.py | 2 +- queue_job/models/queue_job_lock.py | 16 ++++++++++++++++ queue_job/models/queue_job_locks.py | 24 ------------------------ queue_job/security/ir.model.access.csv | 1 + queue_job/tests/test_requeue_dead_job.py | 6 +++--- 7 files changed, 28 insertions(+), 35 deletions(-) create mode 100644 queue_job/models/queue_job_lock.py delete mode 100644 queue_job/models/queue_job_locks.py diff --git a/queue_job/job.py b/queue_job/job.py index fa789091d7..51b9846b19 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -228,9 +228,9 @@ def add_lock_record(self): self.env.cr.execute( """ INSERT INTO - queue_job_locks (id) + queue_job_lock (id, queue_job_id) SELECT - id + id, id FROM queue_job WHERE @@ -254,9 +254,9 @@ def lock(self): SELECT * FROM - queue_job_locks + queue_job_lock WHERE - id in ( + queue_job_id in ( SELECT id FROM diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index d148b05447..85475b04ad 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -362,11 +362,11 @@ def _query_requeue_dead_jobs(self): WHERE id in ( SELECT - id + queue_job_id FROM - queue_job_locks + queue_job_lock WHERE - id in ( + queue_job_id in ( SELECT id FROM diff --git a/queue_job/models/__init__.py b/queue_job/models/__init__.py index 9048fd3959..6265dfe9cb 100644 --- a/queue_job/models/__init__.py +++ b/queue_job/models/__init__.py @@ -3,4 +3,4 @@ from . import queue_job from . import queue_job_channel from . import queue_job_function -from . import queue_job_locks +from . import queue_job_lock diff --git a/queue_job/models/queue_job_lock.py b/queue_job/models/queue_job_lock.py new file mode 100644 index 0000000000..b01c7f3a91 --- /dev/null +++ b/queue_job/models/queue_job_lock.py @@ -0,0 +1,16 @@ +# Copyright 2025 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from odoo import fields, models + + +class QueueJobLock(models.Model): + _name = "queue.job.lock" + _description = "Queue Job Lock" + + queue_job_id = fields.Many2one( + comodel_name="queue.job", + required=True, + ondelete="cascade", + index=True, + ) diff --git a/queue_job/models/queue_job_locks.py b/queue_job/models/queue_job_locks.py deleted file mode 100644 index d2c3d73437..0000000000 --- a/queue_job/models/queue_job_locks.py +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright 2025 ACSONE SA/NV -# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). - -from odoo import models - - -class QueueJobLocks(models.AbstractModel): - - _name = "queue.job.locks" - _description = "Queue Job Locks" - - def init(self): - # Create job lock table - self.env.cr.execute( - """ - CREATE TABLE IF NOT EXISTS queue_job_locks ( - id INT PRIMARY KEY, - CONSTRAINT - queue_job_locks_queue_job_id_fkey - FOREIGN KEY (id) - REFERENCES queue_job (id) ON DELETE CASCADE - ); - """ - ) diff --git a/queue_job/security/ir.model.access.csv b/queue_job/security/ir.model.access.csv index 634daf8ede..4def7dc38a 100644 --- a/queue_job/security/ir.model.access.csv +++ b/queue_job/security/ir.model.access.csv @@ -1,5 +1,6 @@ id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink access_queue_job_manager,queue job manager,queue_job.model_queue_job,queue_job.group_queue_job_manager,1,1,1,1 +access_queue_job_lock_manager,queue job lock manager,queue_job.model_queue_job_lock,queue_job.group_queue_job_manager,1,0,0,0 access_queue_job_function_manager,queue job functions manager,queue_job.model_queue_job_function,queue_job.group_queue_job_manager,1,1,1,1 access_queue_job_channel_manager,queue job channel manager,queue_job.model_queue_job_channel,queue_job.group_queue_job_manager,1,1,1,1 access_queue_requeue_job,queue requeue job manager,queue_job.model_queue_requeue_job,queue_job.group_queue_job_manager,1,1,1,1 diff --git a/queue_job/tests/test_requeue_dead_job.py b/queue_job/tests/test_requeue_dead_job.py index 3d63dd8780..c6c82a2f4d 100644 --- a/queue_job/tests/test_requeue_dead_job.py +++ b/queue_job/tests/test_requeue_dead_job.py @@ -40,11 +40,11 @@ def get_locks(self, uuid, cr=None): cr.execute( """ SELECT - id + queue_job_id FROM - queue_job_locks + queue_job_lock WHERE - id IN ( + queue_job_id IN ( SELECT id FROM From 5aaf3995c9fe4f31d6bbc10bbec8a47915d33337 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sat, 1 Feb 2025 11:30:18 +0100 Subject: [PATCH 3/7] [IMP] queue_job: tweak comment and warning message --- queue_job/job.py | 2 +- queue_job/jobrunner/runner.py | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/queue_job/job.py b/queue_job/job.py index 51b9846b19..6cfe12f232 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -223,7 +223,7 @@ def load_many(cls, env, job_uuids): def add_lock_record(self): """ - Create row in db to be locked once the job is performed + Create row in db to be locked while the job is being performed. """ self.env.cr.execute( """ diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 85475b04ad..0a73b57e0a 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -407,10 +407,7 @@ def requeue_dead_jobs(self): cr.execute(query) for (uuid,) in cr.fetchall(): - _logger.warning( - "Re-queued job with uuid: %s", - uuid, - ) + _logger.warning("Re-queued dead job with uuid: %s", uuid) class QueueJobRunner: From 10395fd1510450ae3d77f4e29f5fca9942641fc1 Mon Sep 17 00:00:00 2001 From: Bastian Guenther Date: Thu, 10 Apr 2025 14:34:56 +0200 Subject: [PATCH 4/7] [FIX] queue_job: Adapted migration to properly delete the invalid cron job --- .../migrations/18.0.1.3.0/pre-migration.py | 25 ++++++------------- 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/queue_job/migrations/18.0.1.3.0/pre-migration.py b/queue_job/migrations/18.0.1.3.0/pre-migration.py index 8dbb6ff7f1..931c336866 100644 --- a/queue_job/migrations/18.0.1.3.0/pre-migration.py +++ b/queue_job/migrations/18.0.1.3.0/pre-migration.py @@ -1,22 +1,11 @@ # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) +from openupgradelib import openupgrade -def migrate(cr, version): - # Deactivate cron garbage collector - cr.execute( - """ - UPDATE - ir_cron - SET - active=False - WHERE id IN ( - SELECT res_id - FROM - ir_model_data - WHERE - module='queue_job' - AND model='ir.cron' - AND name='ir_cron_queue_job_garbage_collector' - ); - """ +@openupgrade.migrate() +def migrate(env, version): + # Remove cron garbage collector + openupgrade.delete_records_safely_by_xml_id( + env, + ["queue_job.ir_cron_queue_job_garbage_collector"], ) From 8a346da51bf9adca5cd7582808486f41c94eb4e7 Mon Sep 17 00:00:00 2001 From: Zina Rasoamanana Date: Tue, 3 Jun 2025 16:11:21 +0200 Subject: [PATCH 5/7] [IMP] queue_job: remove DB commits within test of requeue --- .../pre-migration.py | 0 test_queue_job/__manifest__.py | 1 + test_queue_job/data/queue_job_test_job.xml | 18 ++++ test_queue_job/models/test_models.py | 31 ++++++ test_queue_job/tests/__init__.py | 1 + test_queue_job/tests/test_autovacuum.py | 20 +++- test_queue_job/tests/test_requeue_dead_job.py | 101 ++++++++++++++++++ 7 files changed, 167 insertions(+), 5 deletions(-) rename queue_job/migrations/{18.0.1.3.0 => 18.0.1.7.0}/pre-migration.py (100%) create mode 100644 test_queue_job/data/queue_job_test_job.xml create mode 100644 test_queue_job/tests/test_requeue_dead_job.py diff --git a/queue_job/migrations/18.0.1.3.0/pre-migration.py b/queue_job/migrations/18.0.1.7.0/pre-migration.py similarity index 100% rename from queue_job/migrations/18.0.1.3.0/pre-migration.py rename to queue_job/migrations/18.0.1.7.0/pre-migration.py diff --git a/test_queue_job/__manifest__.py b/test_queue_job/__manifest__.py index 89d4c931fa..f00c419627 100644 --- a/test_queue_job/__manifest__.py +++ b/test_queue_job/__manifest__.py @@ -13,6 +13,7 @@ "data/queue_job_channel_data.xml", "data/queue_job_function_data.xml", "security/ir.model.access.csv", + "data/queue_job_test_job.xml", ], "installable": True, } diff --git a/test_queue_job/data/queue_job_test_job.xml b/test_queue_job/data/queue_job_test_job.xml new file mode 100644 index 0000000000..8a28ab70a0 --- /dev/null +++ b/test_queue_job/data/queue_job_test_job.xml @@ -0,0 +1,18 @@ + + + + + + diff --git a/test_queue_job/models/test_models.py b/test_queue_job/models/test_models.py index 4c0dd6b2d3..03e8e8a8f9 100644 --- a/test_queue_job/models/test_models.py +++ b/test_queue_job/models/test_models.py @@ -1,6 +1,8 @@ # Copyright 2016 Camptocamp SA # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) +from datetime import datetime, timedelta + from odoo import api, fields, models from odoo.addons.queue_job.delay import chain @@ -28,6 +30,35 @@ def testing_related__url(self, **kwargs): "url": kwargs["url"].format(subject=subject), } + @api.model + def _create_test_started_job(self, uuid): + """Create started jobs to be used within tests""" + self.env["queue.job"].with_context( + _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, + ).create( + { + "uuid": uuid, + "state": "started", + "model_name": "queue.job", + "method_name": "write", + } + ) + + @api.model + def _create_test_enqueued_job(self, uuid): + """Create enqueued jobs to be used within tests""" + self.env["queue.job"].with_context( + _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, + ).create( + { + "uuid": uuid, + "state": "enqueued", + "model_name": "queue.job", + "method_name": "write", + "date_enqueued": datetime.now() - timedelta(minutes=1), + } + ) + class ModelTestQueueJob(models.Model): _name = "test.queue.job" diff --git a/test_queue_job/tests/__init__.py b/test_queue_job/tests/__init__.py index 0405022ce0..62347148e5 100644 --- a/test_queue_job/tests/__init__.py +++ b/test_queue_job/tests/__init__.py @@ -7,3 +7,4 @@ from . import test_job_function from . import test_related_actions from . import test_delay_mocks +from . import test_requeue_dead_job diff --git a/test_queue_job/tests/test_autovacuum.py b/test_queue_job/tests/test_autovacuum.py index 09730a4fea..97aebcba1e 100644 --- a/test_queue_job/tests/test_autovacuum.py +++ b/test_queue_job/tests/test_autovacuum.py @@ -28,12 +28,16 @@ def test_autovacuum(self): date_done = datetime.now() - timedelta(days=29) stored.write({"date_done": date_done}) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 1) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 1 + ) date_done = datetime.now() - timedelta(days=31) stored.write({"date_done": date_done}) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 0) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 0 + ) def test_autovacuum_multi_channel(self): root_channel = self.env.ref("queue_job.channel_root") @@ -48,11 +52,17 @@ def test_autovacuum_multi_channel(self): {"channel": channel_60days.complete_name, "date_done": date_done} ) - self.assertEqual(len(self.env["queue.job"].search([])), 2) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 2 + ) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 1) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 1 + ) date_done = datetime.now() - timedelta(days=61) job_60days.write({"date_done": date_done}) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 0) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 0 + ) diff --git a/test_queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py new file mode 100644 index 0000000000..a6328fed76 --- /dev/null +++ b/test_queue_job/tests/test_requeue_dead_job.py @@ -0,0 +1,101 @@ +# Copyright 2025 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +from contextlib import closing +from datetime import datetime, timedelta + +from odoo.tests import tagged + +from odoo.addons.queue_job.job import Job +from odoo.addons.queue_job.jobrunner.runner import Database + +from .common import JobCommonCase + + +@tagged("post_install", "-at_install") +class TestRequeueDeadJob(JobCommonCase): + def _get_demo_job(self, uuid): + # job created during load of demo data + job = self.env["queue.job"].search( + [ + ("uuid", "=", uuid), + ], + limit=1, + ) + + self.assertTrue( + job, + f"Demo data queue job {uuid} should be loaded in order" + " to make this tests work", + ) + + return job + + def get_locks(self, uuid, cr=None): + """ + Retrieve lock rows + """ + if cr is None: + cr = self.env.cr + + cr.execute( + """ + SELECT + queue_job_id + FROM + queue_job_lock + WHERE + queue_job_id IN ( + SELECT + id + FROM + queue_job + WHERE + uuid = %s + ) + FOR UPDATE SKIP LOCKED + """, + [uuid], + ) + + return cr.fetchall() + + def test_add_lock_record(self): + queue_job = self._get_demo_job("test_started_job") + self.assertEqual(len(queue_job), 1) + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_started() + self.assertEqual(job_obj.state, "started") + + locks = self.get_locks(job_obj.uuid) + + self.assertEqual(1, len(locks)) + + def test_lock(self): + queue_job = self._get_demo_job("test_started_job") + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_started() + job_obj.lock() + + with closing(self.env.registry.cursor()) as new_cr: + locks = self.get_locks(job_obj.uuid, new_cr) + + # Row should be locked + self.assertEqual(0, len(locks)) + + def test_requeue_dead_jobs(self): + queue_job = self._get_demo_job("test_enqueued_job") + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_enqueued() + job_obj.set_started() + job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) + job_obj.store() + + # requeue dead jobs using current cursor + query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() + self.env.cr.execute(query) + + uuids_requeued = self.env.cr.fetchall() + self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued) From ef00b658fa1b147476325985b9b2f4770cc02d46 Mon Sep 17 00:00:00 2001 From: Jose Zambudio Date: Mon, 18 Aug 2025 11:47:31 +0200 Subject: [PATCH 6/7] fix(queue.job): Handle zero max_retries in job retry logic --- queue_job/jobrunner/runner.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 0a73b57e0a..0024d28b36 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -331,6 +331,7 @@ def _query_requeue_dead_jobs(self): CASE WHEN max_retries IS NOT NULL AND + max_retries != 0 AND -- infinite retries if max_retries is 0 retry IS NOT NULL AND retry>max_retries THEN 'failed' @@ -345,6 +346,7 @@ def _query_requeue_dead_jobs(self): CASE WHEN max_retries IS NOT NULL AND + max_retries != 0 AND -- infinite retries if max_retries is 0 retry IS NOT NULL AND retry>max_retries THEN 'JobFoundDead' @@ -354,6 +356,7 @@ def _query_requeue_dead_jobs(self): CASE WHEN max_retries IS NOT NULL AND + max_retries != 0 AND -- infinite retries if max_retries is 0 retry IS NOT NULL AND retry>max_retries THEN 'Job found dead after too many retries' From 7f44c75ff23013e0143597bb5b421fc7ec73819a Mon Sep 17 00:00:00 2001 From: Zina Rasoamanana Date: Mon, 15 Sep 2025 10:45:39 +0200 Subject: [PATCH 7/7] [RMV] queue_job: remove test_requeue_dead_job --- queue_job/tests/test_requeue_dead_job.py | 133 ----------------------- 1 file changed, 133 deletions(-) delete mode 100644 queue_job/tests/test_requeue_dead_job.py diff --git a/queue_job/tests/test_requeue_dead_job.py b/queue_job/tests/test_requeue_dead_job.py deleted file mode 100644 index c6c82a2f4d..0000000000 --- a/queue_job/tests/test_requeue_dead_job.py +++ /dev/null @@ -1,133 +0,0 @@ -# Copyright 2025 ACSONE SA/NV -# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). -from contextlib import closing -from datetime import datetime, timedelta - -from odoo.tests.common import TransactionCase - -from odoo.addons.queue_job.job import Job -from odoo.addons.queue_job.jobrunner.runner import Database - - -class TestRequeueDeadJob(TransactionCase): - def create_dummy_job(self, uuid): - """ - Create dummy job for tests - """ - return ( - self.env["queue.job"] - .with_context( - _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, - ) - .create( - { - "uuid": uuid, - "user_id": self.env.user.id, - "state": "pending", - "model_name": "queue.job", - "method_name": "write", - } - ) - ) - - def get_locks(self, uuid, cr=None): - """ - Retrieve lock rows - """ - if cr is None: - cr = self.env.cr - - cr.execute( - """ - SELECT - queue_job_id - FROM - queue_job_lock - WHERE - queue_job_id IN ( - SELECT - id - FROM - queue_job - WHERE - uuid = %s - ) - FOR UPDATE SKIP LOCKED - """, - [uuid], - ) - - return cr.fetchall() - - def test_add_lock_record(self): - queue_job = self.create_dummy_job("test_add_lock") - job_obj = Job.load(self.env, queue_job.uuid) - - job_obj.set_started() - self.assertEqual(job_obj.state, "started") - - locks = self.get_locks(job_obj.uuid) - - self.assertEqual(1, len(locks)) - - def test_lock(self): - queue_job = self.create_dummy_job("test_lock") - job_obj = Job.load(self.env, queue_job.uuid) - - job_obj.set_started() - job_obj.store() - - locks = self.get_locks(job_obj.uuid) - - self.assertEqual(1, len(locks)) - - # commit to update queue_job records in DB - self.env.cr.commit() # pylint: disable=E8102 - - job_obj.lock() - - with closing(self.env.registry.cursor()) as new_cr: - locks = self.get_locks(job_obj.uuid, new_cr) - - # Row should be locked - self.assertEqual(0, len(locks)) - - # clean up - queue_job.unlink() - - self.env.cr.commit() # pylint: disable=E8102 - - # because we committed the cursor, the savepoint of the test method is - # gone, and this would break TransactionCase cleanups - self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id) - - def test_requeue_dead_jobs(self): - uuid = "test_requeue_dead_jobs" - - queue_job = self.create_dummy_job(uuid) - job_obj = Job.load(self.env, queue_job.uuid) - - job_obj.set_enqueued() - # simulate enqueuing was in the past - job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) - job_obj.set_started() - - job_obj.store() - self.env.cr.commit() # pylint: disable=E8102 - - # requeue dead jobs using current cursor - query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() - self.env.cr.execute(query) - - uuids_requeued = self.env.cr.fetchall() - - self.assertEqual(len(uuids_requeued), 1) - self.assertEqual(uuids_requeued[0][0], uuid) - - # clean up - queue_job.unlink() - self.env.cr.commit() # pylint: disable=E8102 - - # because we committed the cursor, the savepoint of the test method is - # gone, and this would break TransactionCase cleanups - self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id)