diff --git a/queue_job/README.rst b/queue_job/README.rst index 1a03778d45..a0d7bc3153 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -1,7 +1,3 @@ -.. image:: https://odoo-community.org/readme-banner-image - :target: https://odoo-community.org/get-involved?utm_source=readme - :alt: Odoo Community Association - ========= Job Queue ========= @@ -17,7 +13,7 @@ Job Queue .. |badge1| image:: https://img.shields.io/badge/maturity-Mature-brightgreen.png :target: https://odoo-community.org/page/development-status :alt: Mature -.. |badge2| image:: https://img.shields.io/badge/license-LGPL--3-blue.png +.. |badge2| image:: https://img.shields.io/badge/licence-LGPL--3-blue.png :target: http://www.gnu.org/licenses/lgpl-3.0-standalone.html :alt: License: LGPL-3 .. |badge3| image:: https://img.shields.io/badge/github-OCA%2Fqueue-lightgray.png?logo=github @@ -65,20 +61,20 @@ instantaneous if no other job is running. Features: -- Views for jobs, jobs are stored in PostgreSQL -- Jobrunner: execute the jobs, highly efficient thanks to PostgreSQL's - NOTIFY -- Channels: give a capacity for the root channel and its sub-channels - and segregate jobs in them. Allow for instance to restrict heavy jobs - to be executed one at a time while little ones are executed 4 at a - times. -- Retries: Ability to retry jobs by raising a type of exception -- Retry Pattern: the 3 first tries, retry after 10 seconds, the 5 next - tries, retry after 1 minutes, ... -- Job properties: priorities, estimated time of arrival (ETA), custom - description, number of retries -- Related Actions: link an action on the job view, such as open the - record concerned by the job +- Views for jobs, jobs are stored in PostgreSQL +- Jobrunner: execute the jobs, highly efficient thanks to PostgreSQL's + NOTIFY +- Channels: give a capacity for the root channel and its sub-channels + and segregate jobs in them. Allow for instance to restrict heavy jobs + to be executed one at a time while little ones are executed 4 at a + times. +- Retries: Ability to retry jobs by raising a type of exception +- Retry Pattern: the 3 first tries, retry after 10 seconds, the 5 next + tries, retry after 1 minutes, ... +- Job properties: priorities, estimated time of arrival (ETA), custom + description, number of retries +- Related Actions: link an action on the job view, such as open the + record concerned by the job **Table of contents** @@ -93,18 +89,18 @@ Be sure to have the ``requests`` library. Configuration ============= -- Using environment variables and command line: +- Using environment variables and command line: - - Adjust environment variables (optional): + - Adjust environment variables (optional): - - ``ODOO_QUEUE_JOB_CHANNELS=root:4`` or any other channels - configuration. The default is ``root:1`` - - if ``xmlrpc_port`` is not set: ``ODOO_QUEUE_JOB_PORT=8069`` + - ``ODOO_QUEUE_JOB_CHANNELS=root:4`` or any other channels + configuration. The default is ``root:1`` + - if ``xmlrpc_port`` is not set: ``ODOO_QUEUE_JOB_PORT=8069`` - - Start Odoo with ``--load=web,queue_job`` and ``--workers`` greater - than 1. [1]_ + - Start Odoo with ``--load=web,queue_job`` and ``--workers`` greater + than 1. [1]_ -- Using the Odoo configuration file: +- Using the Odoo configuration file: .. code:: ini @@ -117,8 +113,8 @@ Configuration [queue_job] channels = root:2 -- Confirm the runner is starting correctly by checking the odoo log - file: +- Confirm the runner is starting correctly by checking the odoo log + file: :: @@ -127,10 +123,14 @@ Configuration ...INFO...queue_job.jobrunner.runner: queue job runner ready for db ...INFO...queue_job.jobrunner.runner: database connections ready -- Create jobs (eg using ``base_import_async``) and observe they start - immediately and in parallel. -- Tip: to enable debug logging for the queue job, use - ``--log-handler=odoo.addons.queue_job:DEBUG`` +- Create jobs (eg using ``base_import_async``) and observe they start + immediately and in parallel. +- Tip: to enable debug logging for the queue job, use + ``--log-handler=odoo.addons.queue_job:DEBUG`` + +- Jobs that remain in ``enqueued`` or ``started`` state (because, for + instance, their worker has been killed) will be automatically + re-queued. .. [1] It works with the threaded Odoo server too, although this way of @@ -287,20 +287,20 @@ only start when the previous one is done: Enqueing Job Options ~~~~~~~~~~~~~~~~~~~~ -- priority: default is 10, the closest it is to 0, the faster it will be - executed -- eta: Estimated Time of Arrival of the job. It will not be executed - before this date/time -- max_retries: default is 5, maximum number of retries before giving up - and set the job state to 'failed'. A value of 0 means infinite - retries. -- description: human description of the job. If not set, description is - computed from the function doc or method name -- channel: the complete name of the channel to use to process the - function. If specified it overrides the one defined on the function -- identity_key: key uniquely identifying the job, if specified and a job - with the same key has not yet been run, the new job will not be - created +- priority: default is 10, the closest it is to 0, the faster it will + be executed +- eta: Estimated Time of Arrival of the job. It will not be executed + before this date/time +- max_retries: default is 5, maximum number of retries before giving up + and set the job state to 'failed'. A value of 0 means infinite + retries. +- description: human description of the job. If not set, description is + computed from the function doc or method name +- channel: the complete name of the channel to use to process the + function. If specified it overrides the one defined on the function +- identity_key: key uniquely identifying the job, if specified and a + job with the same key has not yet been run, the new job will not be + created Configure default options for jobs ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -371,11 +371,11 @@ dictionary on the job function: "kwargs": {"name": "Partner"}, } -- ``enable``: when ``False``, the button has no effect (default: - ``True``) -- ``func_name``: name of the method on ``queue.job`` that returns an - action -- ``kwargs``: extra arguments to pass to the related action method +- ``enable``: when ``False``, the button has no effect (default: + ``True``) +- ``func_name``: name of the method on ``queue.job`` that returns an + action +- ``kwargs``: extra arguments to pass to the related action method Example of related action code: @@ -419,10 +419,10 @@ integers: Based on this configuration, we can tell that: -- 5 first retries are postponed 10 seconds later -- retries 5 to 10 postponed 20 seconds later -- retries 10 to 15 postponed 30 seconds later -- all subsequent retries postponed 5 minutes later +- 5 first retries are postponed 10 seconds later +- retries 5 to 10 postponed 20 seconds later +- retries 10 to 15 postponed 30 seconds later +- all subsequent retries postponed 5 minutes later **Job Context** @@ -469,11 +469,11 @@ Testing The recommended way to test jobs, rather than running them directly and synchronously is to split the tests in two parts: - - one test where the job is mocked (trap jobs with ``trap_jobs()`` - and the test only verifies that the job has been delayed with the - expected arguments - - one test that only calls the method of the job synchronously, to - validate the proper behavior of this method only + - one test where the job is mocked (trap jobs with ``trap_jobs()`` + and the test only verifies that the job has been delayed with the + expected arguments + - one test that only calls the method of the job synchronously, to + validate the proper behavior of this method only Proceeding this way means that you can prove that jobs will be enqueued properly at runtime, and it ensures your code does not have a different @@ -597,14 +597,14 @@ synchronously Tips and tricks ~~~~~~~~~~~~~~~ -- **Idempotency** - (https://www.restapitutorial.com/lessons/idempotency.html): The - queue_job should be idempotent so they can be retried several times - without impact on the data. -- **The job should test at the very beginning its relevance**: the - moment the job will be executed is unknown by design. So the first - task of a job should be to check if the related work is still relevant - at the moment of the execution. +- **Idempotency** + (https://www.restapitutorial.com/lessons/idempotency.html): The + queue_job should be idempotent so they can be retried several times + without impact on the data. +- **The job should test at the very beginning its relevance**: the + moment the job will be executed is unknown by design. So the first + task of a job should be to check if the related work is still + relevant at the moment of the execution. Patterns ~~~~~~~~ @@ -621,19 +621,20 @@ Through the time, two main patterns emerged: Known issues / Roadmap ====================== -- After creating a new database or installing ``queue_job`` on an - existing database, Odoo must be restarted for the runner to detect it. -- When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - ``started`` or ``enqueued`` state after the Odoo server is halted. - Since the runner has no way to know if they are actually running or - not, and does not know for sure if it is safe to restart the jobs, it - does not attempt to restart them automatically. Such stale jobs - therefore fill the running queue and prevent other jobs to start. You - must therefore requeue them manually, either from the Jobs view, or by - running the following SQL statement *before starting Odoo*: +- After creating a new database or installing ``queue_job`` on an + existing database, Odoo must be restarted for the runner to detect + it. +- When Odoo shuts down normally, it waits for running jobs to finish. + However, when the Odoo server crashes or is otherwise force-stopped, + running jobs are interrupted while the runner has no chance to know + they have been aborted. In such situations, jobs may remain in + ``started`` or ``enqueued`` state after the Odoo server is halted. + Since the runner has no way to know if they are actually running or + not, and does not know for sure if it is safe to restart the jobs, it + does not attempt to restart them automatically. Such stale jobs + therefore fill the running queue and prevent other jobs to start. You + must therefore requeue them manually, either from the Jobs view, or + by running the following SQL statement *before starting Odoo*: .. code:: sql @@ -645,11 +646,11 @@ Changelog Next ---- -- [ADD] Run jobrunner as a worker process instead of a thread in the - main process (when running with --workers > 0) -- [REF] ``@job`` and ``@related_action`` deprecated, any method can be - delayed, and configured using ``queue.job.function`` records -- [MIGRATION] from 13.0 branched at rev. e24ff4b +- [ADD] Run jobrunner as a worker process instead of a thread in the + main process (when running with --workers > 0) +- [REF] ``@job`` and ``@related_action`` deprecated, any method can be + delayed, and configured using ``queue.job.function`` records +- [MIGRATION] from 13.0 branched at rev. e24ff4b Bug Tracker =========== @@ -673,21 +674,21 @@ Authors Contributors ------------ -- Guewen Baconnier -- Stéphane Bidoul -- Matthieu Dietrich -- Jos De Graeve -- David Lefever -- Laurent Mignon -- Laetitia Gangloff -- Cédric Pigeon -- Tatiana Deribina -- Souheil Bejaoui -- Eric Antones -- Simone Orsi -- Nguyen Minh Chien -- Tran Quoc Duong -- Vo Hong Thien +- Guewen Baconnier +- Stéphane Bidoul +- Matthieu Dietrich +- Jos De Graeve +- David Lefever +- Laurent Mignon +- Laetitia Gangloff +- Cédric Pigeon +- Tatiana Deribina +- Souheil Bejaoui +- Eric Antones +- Simone Orsi +- Nguyen Minh Chien +- Tran Quoc Duong +- Vo Hong Thien Other credits ------------- diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index ee60ca5ca0..21f045a7d2 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -32,6 +32,8 @@ def _try_perform_job(self, env, job): job.set_started() job.store() env.cr.commit() + job.lock() + _logger.debug("%s started", job) job.perform() diff --git a/queue_job/data/queue_data.xml b/queue_job/data/queue_data.xml index 216b6cf016..1fb70f106c 100644 --- a/queue_job/data/queue_data.xml +++ b/queue_job/data/queue_data.xml @@ -1,14 +1,6 @@ - - Jobs Garbage Collector - 5 - minutes - - code - model.requeue_stuck_jobs() - Job failed diff --git a/queue_job/job.py b/queue_job/job.py index e472d70db9..6cfe12f232 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -221,6 +221,61 @@ def load_many(cls, env, job_uuids): recordset = cls.db_records_from_uuids(env, job_uuids) return {cls._load_from_db_record(record) for record in recordset} + def add_lock_record(self): + """ + Create row in db to be locked while the job is being performed. + """ + self.env.cr.execute( + """ + INSERT INTO + queue_job_lock (id, queue_job_id) + SELECT + id, id + FROM + queue_job + WHERE + uuid = %s + ON CONFLICT(id) + DO NOTHING; + """, + [self.uuid], + ) + + def lock(self): + """ + Lock row of job that is being performed + + If a job cannot be locked, + it means that the job wasn't started, + a RetryableJobError is thrown. + """ + self.env.cr.execute( + """ + SELECT + * + FROM + queue_job_lock + WHERE + queue_job_id in ( + SELECT + id + FROM + queue_job + WHERE + uuid = %s + AND state='started' + ) + FOR UPDATE; + """, + [self.uuid], + ) + + # 1 job should be locked + if 1 != len(self.env.cr.fetchall()): + raise RetryableJobError( + f"Trying to lock job that wasn't started, uuid: {self.uuid}" + ) + @classmethod def _load_from_db_record(cls, job_db_record): stored = job_db_record @@ -735,6 +790,7 @@ def set_started(self): self.state = STARTED self.date_started = datetime.now() self.worker_pid = os.getpid() + self.add_lock_record() def set_done(self, result=None): self.state = DONE diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 04b163bd4a..0024d28b36 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -114,22 +114,6 @@ * After creating a new database or installing queue_job on an existing database, Odoo must be restarted for the runner to detect it. -* When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - ``started`` or ``enqueued`` state after the Odoo server is halted. - Since the runner has no way to know if they are actually running or - not, and does not know for sure if it is safe to restart the jobs, - it does not attempt to restart them automatically. Such stale jobs - therefore fill the running queue and prevent other jobs to start. - You must therefore requeue them manually, either from the Jobs view, - or by running the following SQL statement *before starting Odoo*: - -.. code-block:: sql - - update queue_job set state='pending' where state in ('started', 'enqueued') - .. rubric:: Footnotes .. [1] From a security standpoint, it is safe to have an anonymous HTTP @@ -155,7 +139,7 @@ from odoo.tools import config from . import queue_job_config -from .channels import ENQUEUED, NOT_DONE, PENDING, ChannelManager +from .channels import ENQUEUED, NOT_DONE, ChannelManager SELECT_TIMEOUT = 60 ERROR_RECOVERY_DELAY = 5 @@ -212,33 +196,12 @@ def _connection_info_for(db_name): def _async_http_get(scheme, host, port, user, password, db_name, job_uuid): - # Method to set failed job (due to timeout, etc) as pending, - # to avoid keeping it as enqueued. - def set_job_pending(): - connection_info = _connection_info_for(db_name) - conn = psycopg2.connect(**connection_info) - conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) - with closing(conn.cursor()) as cr: - cr.execute( - "UPDATE queue_job SET state=%s, " - "date_enqueued=NULL, date_started=NULL " - "WHERE uuid=%s and state=%s " - "RETURNING uuid", - (PENDING, job_uuid, ENQUEUED), - ) - if cr.fetchone(): - _logger.warning( - "state of job %s was reset from %s to %s", - job_uuid, - ENQUEUED, - PENDING, - ) - # TODO: better way to HTTP GET asynchronously (grequest, ...)? # if this was python3 I would be doing this with # asyncio, aiohttp and aiopg def urlopen(): url = f"{scheme}://{host}:{port}/queue_job/runjob?db={db_name}&job_uuid={job_uuid}" + # pylint: disable=except-pass try: auth = None if user: @@ -252,10 +215,10 @@ def urlopen(): # for codes between 500 and 600 response.raise_for_status() except requests.Timeout: - set_job_pending() + # A timeout is a normal behaviour, it shouldn't be logged as an exception + pass except Exception: _logger.exception("exception in GET %s", url) - set_job_pending() thread = threading.Thread(target=urlopen) thread.daemon = True @@ -359,6 +322,96 @@ def set_job_enqueued(self, uuid): (ENQUEUED, uuid), ) + def _query_requeue_dead_jobs(self): + return """ + UPDATE + queue_job + SET + state=( + CASE + WHEN + max_retries IS NOT NULL AND + max_retries != 0 AND -- infinite retries if max_retries is 0 + retry IS NOT NULL AND + retry>max_retries + THEN 'failed' + ELSE 'pending' + END), + retry=( + CASE + WHEN state='started' + THEN COALESCE(retry,0)+1 ELSE retry + END), + exc_name=( + CASE + WHEN + max_retries IS NOT NULL AND + max_retries != 0 AND -- infinite retries if max_retries is 0 + retry IS NOT NULL AND + retry>max_retries + THEN 'JobFoundDead' + ELSE exc_name + END), + exc_info=( + CASE + WHEN + max_retries IS NOT NULL AND + max_retries != 0 AND -- infinite retries if max_retries is 0 + retry IS NOT NULL AND + retry>max_retries + THEN 'Job found dead after too many retries' + ELSE exc_info + END) + WHERE + id in ( + SELECT + queue_job_id + FROM + queue_job_lock + WHERE + queue_job_id in ( + SELECT + id + FROM + queue_job + WHERE + state IN ('enqueued','started') + AND date_enqueued < + (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') + ) + FOR UPDATE SKIP LOCKED + ) + RETURNING uuid + """ + + def requeue_dead_jobs(self): + """ + Set started and enqueued jobs but not locked to pending + + A job is locked when it's being executed + When a job is killed, it releases the lock + + If the number of retries exceeds the number of max retries, + the job is set as 'failed' with the error 'JobFoundDead'. + + Adding a buffer on 'date_enqueued' to check + that it has been enqueued for more than 10sec. + This prevents from requeuing jobs before they are actually started. + + When Odoo shuts down normally, it waits for running jobs to finish. + However, when the Odoo server crashes or is otherwise force-stopped, + running jobs are interrupted while the runner has no chance to know + they have been aborted. + """ + + with closing(self.conn.cursor()) as cr: + query = self._query_requeue_dead_jobs() + + cr.execute(query) + + for (uuid,) in cr.fetchall(): + _logger.warning("Re-queued dead job with uuid: %s", uuid) + class QueueJobRunner: def __init__( @@ -454,6 +507,11 @@ def initialize_databases(self): else: db.close() + def requeue_dead_jobs(self): + for db in self.db_by_name.values(): + if db.has_queue_job: + db.requeue_dead_jobs() + def run_jobs(self): now = _odoo_now() for job in self.channel_manager.get_jobs_to_run(now): @@ -546,6 +604,7 @@ def run(self): _logger.info("database connections ready") # inner loop does the normal processing while not self._stop: + self.requeue_dead_jobs() self.process_notifications() self.run_jobs() self.wait_notification() diff --git a/queue_job/migrations/18.0.1.7.0/pre-migration.py b/queue_job/migrations/18.0.1.7.0/pre-migration.py new file mode 100644 index 0000000000..931c336866 --- /dev/null +++ b/queue_job/migrations/18.0.1.7.0/pre-migration.py @@ -0,0 +1,11 @@ +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) +from openupgradelib import openupgrade + + +@openupgrade.migrate() +def migrate(env, version): + # Remove cron garbage collector + openupgrade.delete_records_safely_by_xml_id( + env, + ["queue_job.ir_cron_queue_job_garbage_collector"], + ) diff --git a/queue_job/models/__init__.py b/queue_job/models/__init__.py index 4744e7ab46..6265dfe9cb 100644 --- a/queue_job/models/__init__.py +++ b/queue_job/models/__init__.py @@ -3,3 +3,4 @@ from . import queue_job from . import queue_job_channel from . import queue_job_function +from . import queue_job_lock diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 4be3820ad6..411ae43af5 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -6,7 +6,6 @@ from datetime import datetime, timedelta from odoo import _, api, exceptions, fields, models -from odoo.osv import expression from odoo.tools import config, html_escape, index_exists from odoo.addons.base_sparse_field.models.fields import Serialized @@ -410,55 +409,6 @@ def autovacuum(self): break return True - def requeue_stuck_jobs(self, enqueued_delta=5, started_delta=0): - """Fix jobs that are in a bad states - - :param in_queue_delta: lookup time in minutes for jobs - that are in enqueued state - - :param started_delta: lookup time in minutes for jobs - that are in enqueued state, - 0 means that it is not checked - """ - self._get_stuck_jobs_to_requeue( - enqueued_delta=enqueued_delta, started_delta=started_delta - ).requeue() - return True - - def _get_stuck_jobs_domain(self, queue_dl, started_dl): - domain = [] - now = fields.datetime.now() - if queue_dl: - queue_dl = now - timedelta(minutes=queue_dl) - domain.append( - [ - "&", - ("date_enqueued", "<=", fields.Datetime.to_string(queue_dl)), - ("state", "=", "enqueued"), - ] - ) - if started_dl: - started_dl = now - timedelta(minutes=started_dl) - domain.append( - [ - "&", - ("date_started", "<=", fields.Datetime.to_string(started_dl)), - ("state", "=", "started"), - ] - ) - if not domain: - raise exceptions.ValidationError( - _("If both parameters are 0, ALL jobs will be requeued!") - ) - return expression.OR(domain) - - def _get_stuck_jobs_to_requeue(self, enqueued_delta, started_delta): - job_model = self.env["queue.job"] - stuck_jobs = job_model.search( - self._get_stuck_jobs_domain(enqueued_delta, started_delta) - ) - return stuck_jobs - def related_action_open_record(self): """Open a form view with the record(s) of the job. diff --git a/queue_job/models/queue_job_lock.py b/queue_job/models/queue_job_lock.py new file mode 100644 index 0000000000..b01c7f3a91 --- /dev/null +++ b/queue_job/models/queue_job_lock.py @@ -0,0 +1,16 @@ +# Copyright 2025 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from odoo import fields, models + + +class QueueJobLock(models.Model): + _name = "queue.job.lock" + _description = "Queue Job Lock" + + queue_job_id = fields.Many2one( + comodel_name="queue.job", + required=True, + ondelete="cascade", + index=True, + ) diff --git a/queue_job/readme/CONFIGURE.md b/queue_job/readme/CONFIGURE.md index 07b7b84126..216b5358af 100644 --- a/queue_job/readme/CONFIGURE.md +++ b/queue_job/readme/CONFIGURE.md @@ -35,3 +35,6 @@ channels = root:2 [^1]: It works with the threaded Odoo server too, although this way of running Odoo is obviously not for production purposes. + +* Jobs that remain in `enqueued` or `started` state (because, for instance, + their worker has been killed) will be automatically re-queued. diff --git a/queue_job/security/ir.model.access.csv b/queue_job/security/ir.model.access.csv index 634daf8ede..4def7dc38a 100644 --- a/queue_job/security/ir.model.access.csv +++ b/queue_job/security/ir.model.access.csv @@ -1,5 +1,6 @@ id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink access_queue_job_manager,queue job manager,queue_job.model_queue_job,queue_job.group_queue_job_manager,1,1,1,1 +access_queue_job_lock_manager,queue job lock manager,queue_job.model_queue_job_lock,queue_job.group_queue_job_manager,1,0,0,0 access_queue_job_function_manager,queue job functions manager,queue_job.model_queue_job_function,queue_job.group_queue_job_manager,1,1,1,1 access_queue_job_channel_manager,queue job channel manager,queue_job.model_queue_job_channel,queue_job.group_queue_job_manager,1,1,1,1 access_queue_requeue_job,queue requeue job manager,queue_job.model_queue_requeue_job,queue_job.group_queue_job_manager,1,1,1,1 diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html index f1196c7d89..caac6fddea 100644 --- a/queue_job/static/description/index.html +++ b/queue_job/static/description/index.html @@ -3,7 +3,7 @@ -README.rst +Job Queue -
+
+

Job Queue

- - -Odoo Community Association - -
-

Job Queue

-

Mature License: LGPL-3 OCA/queue Translate me on Weblate Try me on Runboat

+

Mature License: LGPL-3 OCA/queue Translate me on Weblate Try me on Runboat

This addon adds an integrated Job Queue to Odoo.

It allows to postpone method calls executed asynchronously.

Jobs are executed in the background by a Jobrunner, in their own @@ -450,11 +445,11 @@

Job Queue

-

Installation

+

Installation

Be sure to have the requests library.

-

Configuration

+

Configuration

  • Using environment variables and command line:
    • Adjust environment variables (optional):
        @@ -494,6 +489,9 @@

        Configuration

        immediately and in parallel.
      • Tip: to enable debug logging for the queue job, use --log-handler=odoo.addons.queue_job:DEBUG
      • +
      • Jobs that remain in enqueued or started state (because, for +instance, their worker has been killed) will be automatically +re-queued.
      @@ -504,15 +502,15 @@

      Configuration

-

Usage

+

Usage

To use this module, you need to:

  1. Go to Job Queue menu
-

Developers

+

Developers

-

Delaying jobs

+

Delaying jobs

The fast way to enqueue a job for a method is to use with_delay() on a record or model:

@@ -632,10 +630,10 @@ 

Delaying jobs

-

Enqueing Job Options

+

Enqueing Job Options

    -
  • priority: default is 10, the closest it is to 0, the faster it will be -executed
  • +
  • priority: default is 10, the closest it is to 0, the faster it will +be executed
  • eta: Estimated Time of Arrival of the job. It will not be executed before this date/time
  • max_retries: default is 5, maximum number of retries before giving up @@ -645,13 +643,13 @@

    Enqueing Job Options

    computed from the function doc or method name
  • channel: the complete name of the channel to use to process the function. If specified it overrides the one defined on the function
  • -
  • identity_key: key uniquely identifying the job, if specified and a job -with the same key has not yet been run, the new job will not be +
  • identity_key: key uniquely identifying the job, if specified and a +job with the same key has not yet been run, the new job will not be created
-

Configure default options for jobs

+

Configure default options for jobs

In earlier versions, jobs could be configured using the @job decorator. This is now obsolete, they can be configured using optional queue.job.function and queue.job.channel XML records.

@@ -779,7 +777,7 @@

Configure default options for job delaying any jobs.

-

Testing

+

Testing

Asserting enqueued jobs

The recommended way to test jobs, rather than running them directly and synchronously is to split the tests in two parts:

@@ -894,7 +892,7 @@

Testing

synchronously

-

Tips and tricks

+

Tips and tricks

  • Idempotency (https://www.restapitutorial.com/lessons/idempotency.html): The @@ -902,12 +900,12 @@

    Tips and tricks

    without impact on the data.
  • The job should test at the very beginning its relevance: the moment the job will be executed is unknown by design. So the first -task of a job should be to check if the related work is still relevant -at the moment of the execution.
  • +task of a job should be to check if the related work is still +relevant at the moment of the execution.
-

Patterns

+

Patterns

Through the time, two main patterns emerged:

  1. For data exposed to users, a model should store the data and the @@ -921,10 +919,11 @@

    Patterns

-

Known issues / Roadmap

+

Known issues / Roadmap

  • After creating a new database or installing queue_job on an -existing database, Odoo must be restarted for the runner to detect it.
  • +existing database, Odoo must be restarted for the runner to detect +it.
  • When Odoo shuts down normally, it waits for running jobs to finish. However, when the Odoo server crashes or is otherwise force-stopped, running jobs are interrupted while the runner has no chance to know @@ -934,17 +933,17 @@

    Known issues / Roadmap

    not, and does not know for sure if it is safe to restart the jobs, it does not attempt to restart them automatically. Such stale jobs therefore fill the running queue and prevent other jobs to start. You -must therefore requeue them manually, either from the Jobs view, or by -running the following SQL statement before starting Odoo:
  • +must therefore requeue them manually, either from the Jobs view, or +by running the following SQL statement before starting Odoo:
 update queue_job set state='pending' where state in ('started', 'enqueued')
 
-

Changelog

+

Changelog

-

Bug Tracker

+

Bug Tracker

Bugs are tracked on GitHub Issues. In case of trouble, please check there if your issue has already been reported. If you spotted it first, help us to smash it by providing a detailed and welcomed @@ -963,16 +962,16 @@

Bug Tracker

Do not contact contributors directly about support or help with technical issues.

-

Credits

+

Credits

-

Authors

+

Authors

  • Camptocamp
  • ACSONE SA/NV
-

Contributors

+

Contributors

-

Other credits

+

Other credits

The migration of this module from 17.0 to 18.0 was financially supported by Camptocamp.

-

Maintainers

+

Maintainers

This module is maintained by the OCA.

Odoo Community Association @@ -1012,6 +1011,5 @@

Maintainers

-
diff --git a/test_queue_job/__manifest__.py b/test_queue_job/__manifest__.py index 89d4c931fa..f00c419627 100644 --- a/test_queue_job/__manifest__.py +++ b/test_queue_job/__manifest__.py @@ -13,6 +13,7 @@ "data/queue_job_channel_data.xml", "data/queue_job_function_data.xml", "security/ir.model.access.csv", + "data/queue_job_test_job.xml", ], "installable": True, } diff --git a/test_queue_job/data/queue_job_test_job.xml b/test_queue_job/data/queue_job_test_job.xml new file mode 100644 index 0000000000..8a28ab70a0 --- /dev/null +++ b/test_queue_job/data/queue_job_test_job.xml @@ -0,0 +1,18 @@ + + + + + + diff --git a/test_queue_job/models/test_models.py b/test_queue_job/models/test_models.py index 4c0dd6b2d3..03e8e8a8f9 100644 --- a/test_queue_job/models/test_models.py +++ b/test_queue_job/models/test_models.py @@ -1,6 +1,8 @@ # Copyright 2016 Camptocamp SA # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) +from datetime import datetime, timedelta + from odoo import api, fields, models from odoo.addons.queue_job.delay import chain @@ -28,6 +30,35 @@ def testing_related__url(self, **kwargs): "url": kwargs["url"].format(subject=subject), } + @api.model + def _create_test_started_job(self, uuid): + """Create started jobs to be used within tests""" + self.env["queue.job"].with_context( + _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, + ).create( + { + "uuid": uuid, + "state": "started", + "model_name": "queue.job", + "method_name": "write", + } + ) + + @api.model + def _create_test_enqueued_job(self, uuid): + """Create enqueued jobs to be used within tests""" + self.env["queue.job"].with_context( + _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, + ).create( + { + "uuid": uuid, + "state": "enqueued", + "model_name": "queue.job", + "method_name": "write", + "date_enqueued": datetime.now() - timedelta(minutes=1), + } + ) + class ModelTestQueueJob(models.Model): _name = "test.queue.job" diff --git a/test_queue_job/tests/__init__.py b/test_queue_job/tests/__init__.py index 0405022ce0..62347148e5 100644 --- a/test_queue_job/tests/__init__.py +++ b/test_queue_job/tests/__init__.py @@ -7,3 +7,4 @@ from . import test_job_function from . import test_related_actions from . import test_delay_mocks +from . import test_requeue_dead_job diff --git a/test_queue_job/tests/test_autovacuum.py b/test_queue_job/tests/test_autovacuum.py index 09730a4fea..97aebcba1e 100644 --- a/test_queue_job/tests/test_autovacuum.py +++ b/test_queue_job/tests/test_autovacuum.py @@ -28,12 +28,16 @@ def test_autovacuum(self): date_done = datetime.now() - timedelta(days=29) stored.write({"date_done": date_done}) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 1) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 1 + ) date_done = datetime.now() - timedelta(days=31) stored.write({"date_done": date_done}) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 0) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 0 + ) def test_autovacuum_multi_channel(self): root_channel = self.env.ref("queue_job.channel_root") @@ -48,11 +52,17 @@ def test_autovacuum_multi_channel(self): {"channel": channel_60days.complete_name, "date_done": date_done} ) - self.assertEqual(len(self.env["queue.job"].search([])), 2) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 2 + ) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 1) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 1 + ) date_done = datetime.now() - timedelta(days=61) job_60days.write({"date_done": date_done}) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 0) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 0 + ) diff --git a/test_queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py new file mode 100644 index 0000000000..a6328fed76 --- /dev/null +++ b/test_queue_job/tests/test_requeue_dead_job.py @@ -0,0 +1,101 @@ +# Copyright 2025 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +from contextlib import closing +from datetime import datetime, timedelta + +from odoo.tests import tagged + +from odoo.addons.queue_job.job import Job +from odoo.addons.queue_job.jobrunner.runner import Database + +from .common import JobCommonCase + + +@tagged("post_install", "-at_install") +class TestRequeueDeadJob(JobCommonCase): + def _get_demo_job(self, uuid): + # job created during load of demo data + job = self.env["queue.job"].search( + [ + ("uuid", "=", uuid), + ], + limit=1, + ) + + self.assertTrue( + job, + f"Demo data queue job {uuid} should be loaded in order" + " to make this tests work", + ) + + return job + + def get_locks(self, uuid, cr=None): + """ + Retrieve lock rows + """ + if cr is None: + cr = self.env.cr + + cr.execute( + """ + SELECT + queue_job_id + FROM + queue_job_lock + WHERE + queue_job_id IN ( + SELECT + id + FROM + queue_job + WHERE + uuid = %s + ) + FOR UPDATE SKIP LOCKED + """, + [uuid], + ) + + return cr.fetchall() + + def test_add_lock_record(self): + queue_job = self._get_demo_job("test_started_job") + self.assertEqual(len(queue_job), 1) + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_started() + self.assertEqual(job_obj.state, "started") + + locks = self.get_locks(job_obj.uuid) + + self.assertEqual(1, len(locks)) + + def test_lock(self): + queue_job = self._get_demo_job("test_started_job") + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_started() + job_obj.lock() + + with closing(self.env.registry.cursor()) as new_cr: + locks = self.get_locks(job_obj.uuid, new_cr) + + # Row should be locked + self.assertEqual(0, len(locks)) + + def test_requeue_dead_jobs(self): + queue_job = self._get_demo_job("test_enqueued_job") + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_enqueued() + job_obj.set_started() + job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) + job_obj.store() + + # requeue dead jobs using current cursor + query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() + self.env.cr.execute(query) + + uuids_requeued = self.env.cr.fetchall() + self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued)