diff --git a/queue_job/README.rst b/queue_job/README.rst
index 1a44189b2c..befcb8bf73 100644
--- a/queue_job/README.rst
+++ b/queue_job/README.rst
@@ -61,20 +61,20 @@ instantaneous if no other job is running.
Features:
-- Views for jobs, jobs are stored in PostgreSQL
-- Jobrunner: execute the jobs, highly efficient thanks to PostgreSQL's
- NOTIFY
-- Channels: give a capacity for the root channel and its sub-channels
- and segregate jobs in them. Allow for instance to restrict heavy jobs
- to be executed one at a time while little ones are executed 4 at a
- times.
-- Retries: Ability to retry jobs by raising a type of exception
-- Retry Pattern: the 3 first tries, retry after 10 seconds, the 5 next
- tries, retry after 1 minutes, ...
-- Job properties: priorities, estimated time of arrival (ETA), custom
- description, number of retries
-- Related Actions: link an action on the job view, such as open the
- record concerned by the job
+- Views for jobs, jobs are stored in PostgreSQL
+- Jobrunner: execute the jobs, highly efficient thanks to PostgreSQL's
+ NOTIFY
+- Channels: give a capacity for the root channel and its sub-channels
+ and segregate jobs in them. Allow for instance to restrict heavy jobs
+ to be executed one at a time while little ones are executed 4 at a
+ times.
+- Retries: Ability to retry jobs by raising a type of exception
+- Retry Pattern: the 3 first tries, retry after 10 seconds, the 5 next
+ tries, retry after 1 minutes, ...
+- Job properties: priorities, estimated time of arrival (ETA), custom
+ description, number of retries
+- Related Actions: link an action on the job view, such as open the
+ record concerned by the job
**Table of contents**
@@ -89,18 +89,18 @@ Be sure to have the ``requests`` library.
Configuration
=============
-- Using environment variables and command line:
+- Using environment variables and command line:
- - Adjust environment variables (optional):
+ - Adjust environment variables (optional):
- - ``ODOO_QUEUE_JOB_CHANNELS=root:4`` or any other channels
- configuration. The default is ``root:1``
- - if ``xmlrpc_port`` is not set: ``ODOO_QUEUE_JOB_PORT=8069``
+ - ``ODOO_QUEUE_JOB_CHANNELS=root:4`` or any other channels
+ configuration. The default is ``root:1``
+ - if ``xmlrpc_port`` is not set: ``ODOO_QUEUE_JOB_PORT=8069``
- - Start Odoo with ``--load=web,queue_job`` and ``--workers`` greater
- than 1. [1]_
+ - Start Odoo with ``--load=web,queue_job`` and ``--workers`` greater
+ than 1. [1]_
-- Using the Odoo configuration file:
+- Using the Odoo configuration file:
.. code:: ini
@@ -113,8 +113,8 @@ Configuration
[queue_job]
channels = root:2
-- Confirm the runner is starting correctly by checking the odoo log
- file:
+- Confirm the runner is starting correctly by checking the odoo log
+ file:
::
@@ -123,10 +123,10 @@ Configuration
...INFO...queue_job.jobrunner.runner: queue job runner ready for db
...INFO...queue_job.jobrunner.runner: database connections ready
-- Create jobs (eg using ``base_import_async``) and observe they start
- immediately and in parallel.
-- Tip: to enable debug logging for the queue job, use
- ``--log-handler=odoo.addons.queue_job:DEBUG``
+- Create jobs (eg using ``base_import_async``) and observe they start
+ immediately and in parallel.
+- Tip: to enable debug logging for the queue job, use
+ ``--log-handler=odoo.addons.queue_job:DEBUG``
.. [1]
It works with the threaded Odoo server too, although this way of
@@ -247,23 +247,56 @@ is at the top of the graph. In the example above, if it was called on
``group_a``, then ``group_b`` would never be delayed (but a warning
would be shown).
+It is also possible to split a job into several jobs, each one
+processing a part of the work. This can be useful to avoid very long
+jobs, parallelize some task and get more specific errors. Usage is as
+follows:
+
+.. code:: python
+
+ def button_split_delayable(self):
+ (
+ self # Can be a big recordset, let's say 1000 records
+ .delayable()
+ .generate_thumbnail((50, 50))
+ .set(priority=30)
+ .set(description=_("generate xxx"))
+ .split(50) # Split the job in 20 jobs of 50 records each
+ .delay()
+ )
+
+The ``split()`` method takes a ``chain`` boolean keyword argument. If
+set to True, the jobs will be chained, meaning that the next job will
+only start when the previous one is done:
+
+.. code:: python
+
+ def button_increment_var(self):
+ (
+ self
+ .delayable()
+ .increment_counter()
+ .split(1, chain=True) # Will exceute the jobs one after the other
+ .delay()
+ )
+
Enqueing Job Options
~~~~~~~~~~~~~~~~~~~~
-- priority: default is 10, the closest it is to 0, the faster it will
- be executed
-- eta: Estimated Time of Arrival of the job. It will not be executed
- before this date/time
-- max_retries: default is 5, maximum number of retries before giving up
- and set the job state to 'failed'. A value of 0 means infinite
- retries.
-- description: human description of the job. If not set, description is
- computed from the function doc or method name
-- channel: the complete name of the channel to use to process the
- function. If specified it overrides the one defined on the function
-- identity_key: key uniquely identifying the job, if specified and a
- job with the same key has not yet been run, the new job will not be
- created
+- priority: default is 10, the closest it is to 0, the faster it will be
+ executed
+- eta: Estimated Time of Arrival of the job. It will not be executed
+ before this date/time
+- max_retries: default is 5, maximum number of retries before giving up
+ and set the job state to 'failed'. A value of 0 means infinite
+ retries.
+- description: human description of the job. If not set, description is
+ computed from the function doc or method name
+- channel: the complete name of the channel to use to process the
+ function. If specified it overrides the one defined on the function
+- identity_key: key uniquely identifying the job, if specified and a job
+ with the same key has not yet been run, the new job will not be
+ created
Configure default options for jobs
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -334,11 +367,11 @@ dictionary on the job function:
"kwargs": {"name": "Partner"},
}
-- ``enable``: when ``False``, the button has no effect (default:
- ``True``)
-- ``func_name``: name of the method on ``queue.job`` that returns an
- action
-- ``kwargs``: extra arguments to pass to the related action method
+- ``enable``: when ``False``, the button has no effect (default:
+ ``True``)
+- ``func_name``: name of the method on ``queue.job`` that returns an
+ action
+- ``kwargs``: extra arguments to pass to the related action method
Example of related action code:
@@ -382,10 +415,10 @@ integers:
Based on this configuration, we can tell that:
-- 5 first retries are postponed 10 seconds later
-- retries 5 to 10 postponed 20 seconds later
-- retries 10 to 15 postponed 30 seconds later
-- all subsequent retries postponed 5 minutes later
+- 5 first retries are postponed 10 seconds later
+- retries 5 to 10 postponed 20 seconds later
+- retries 10 to 15 postponed 30 seconds later
+- all subsequent retries postponed 5 minutes later
**Job Context**
@@ -432,11 +465,11 @@ Testing
The recommended way to test jobs, rather than running them directly and
synchronously is to split the tests in two parts:
- - one test where the job is mocked (trap jobs with ``trap_jobs()``
- and the test only verifies that the job has been delayed with the
- expected arguments
- - one test that only calls the method of the job synchronously, to
- validate the proper behavior of this method only
+ - one test where the job is mocked (trap jobs with ``trap_jobs()``
+ and the test only verifies that the job has been delayed with the
+ expected arguments
+ - one test that only calls the method of the job synchronously, to
+ validate the proper behavior of this method only
Proceeding this way means that you can prove that jobs will be enqueued
properly at runtime, and it ensures your code does not have a different
@@ -560,14 +593,14 @@ synchronously
Tips and tricks
~~~~~~~~~~~~~~~
-- **Idempotency**
- (https://www.restapitutorial.com/lessons/idempotency.html): The
- queue_job should be idempotent so they can be retried several times
- without impact on the data.
-- **The job should test at the very beginning its relevance**: the
- moment the job will be executed is unknown by design. So the first
- task of a job should be to check if the related work is still
- relevant at the moment of the execution.
+- **Idempotency**
+ (https://www.restapitutorial.com/lessons/idempotency.html): The
+ queue_job should be idempotent so they can be retried several times
+ without impact on the data.
+- **The job should test at the very beginning its relevance**: the
+ moment the job will be executed is unknown by design. So the first
+ task of a job should be to check if the related work is still relevant
+ at the moment of the execution.
Patterns
~~~~~~~~
@@ -584,20 +617,19 @@ Through the time, two main patterns emerged:
Known issues / Roadmap
======================
-- After creating a new database or installing ``queue_job`` on an
- existing database, Odoo must be restarted for the runner to detect
- it.
-- When Odoo shuts down normally, it waits for running jobs to finish.
- However, when the Odoo server crashes or is otherwise force-stopped,
- running jobs are interrupted while the runner has no chance to know
- they have been aborted. In such situations, jobs may remain in
- ``started`` or ``enqueued`` state after the Odoo server is halted.
- Since the runner has no way to know if they are actually running or
- not, and does not know for sure if it is safe to restart the jobs, it
- does not attempt to restart them automatically. Such stale jobs
- therefore fill the running queue and prevent other jobs to start. You
- must therefore requeue them manually, either from the Jobs view, or
- by running the following SQL statement *before starting Odoo*:
+- After creating a new database or installing ``queue_job`` on an
+ existing database, Odoo must be restarted for the runner to detect it.
+- When Odoo shuts down normally, it waits for running jobs to finish.
+ However, when the Odoo server crashes or is otherwise force-stopped,
+ running jobs are interrupted while the runner has no chance to know
+ they have been aborted. In such situations, jobs may remain in
+ ``started`` or ``enqueued`` state after the Odoo server is halted.
+ Since the runner has no way to know if they are actually running or
+ not, and does not know for sure if it is safe to restart the jobs, it
+ does not attempt to restart them automatically. Such stale jobs
+ therefore fill the running queue and prevent other jobs to start. You
+ must therefore requeue them manually, either from the Jobs view, or by
+ running the following SQL statement *before starting Odoo*:
.. code:: sql
@@ -609,11 +641,11 @@ Changelog
Next
----
-- [ADD] Run jobrunner as a worker process instead of a thread in the
- main process (when running with --workers > 0)
-- [REF] ``@job`` and ``@related_action`` deprecated, any method can be
- delayed, and configured using ``queue.job.function`` records
-- [MIGRATION] from 13.0 branched at rev. e24ff4b
+- [ADD] Run jobrunner as a worker process instead of a thread in the
+ main process (when running with --workers > 0)
+- [REF] ``@job`` and ``@related_action`` deprecated, any method can be
+ delayed, and configured using ``queue.job.function`` records
+- [MIGRATION] from 13.0 branched at rev. e24ff4b
Bug Tracker
===========
@@ -637,21 +669,21 @@ Authors
Contributors
------------
-- Guewen Baconnier
-- Stéphane Bidoul
-- Matthieu Dietrich
-- Jos De Graeve
-- David Lefever
-- Laurent Mignon
-- Laetitia Gangloff
-- Cédric Pigeon
-- Tatiana Deribina
-- Souheil Bejaoui
-- Eric Antones
-- Simone Orsi
-- Nguyen Minh Chien
-- Tran Quoc Duong
-- Vo Hong Thien
+- Guewen Baconnier
+- Stéphane Bidoul
+- Matthieu Dietrich
+- Jos De Graeve
+- David Lefever
+- Laurent Mignon
+- Laetitia Gangloff
+- Cédric Pigeon
+- Tatiana Deribina
+- Souheil Bejaoui
+- Eric Antones
+- Simone Orsi
+- Nguyen Minh Chien
+- Tran Quoc Duong
+- Vo Hong Thien
Other credits
-------------
diff --git a/queue_job/delay.py b/queue_job/delay.py
index 9b596b1665..251e4f7a84 100644
--- a/queue_job/delay.py
+++ b/queue_job/delay.py
@@ -525,6 +525,51 @@ def delay(self):
"""Delay the whole graph"""
self._graph.delay()
+ def split(self, size, chain=False):
+ """Split the Delayables.
+
+ Use `DelayableGroup` or `DelayableChain`
+ if `chain` is True containing batches of size `size`
+ """
+ if not self._job_method:
+ raise ValueError("No method set on the Delayable")
+
+ total_records = len(self.recordset)
+
+ delayables = []
+ for index in range(0, total_records, size):
+ recordset = self.recordset[index : index + size]
+ delayable = Delayable(
+ recordset,
+ priority=self.priority,
+ eta=self.eta,
+ max_retries=self.max_retries,
+ description=self.description,
+ channel=self.channel,
+ identity_key=self.identity_key,
+ )
+ # Update the __self__
+ delayable._job_method = getattr(recordset, self._job_method.__name__)
+ delayable._job_args = self._job_args
+ delayable._job_kwargs = self._job_kwargs
+
+ delayables.append(delayable)
+
+ description = self.description or (
+ self._job_method.__doc__.splitlines()[0].strip()
+ if self._job_method.__doc__
+ else f"{self.recordset._name}.{self._job_method.__name__}"
+ )
+ for index, delayable in enumerate(delayables):
+ delayable.set(
+ description=f"{description} (split {index + 1}/{len(delayables)})"
+ )
+
+ # Prevent warning on deletion
+ self._generated_job = True
+
+ return (DelayableChain if chain else DelayableGroup)(*delayables)
+
def _build_job(self):
if self._generated_job:
return self._generated_job
diff --git a/queue_job/readme/USAGE.md b/queue_job/readme/USAGE.md
index fb160bfa48..c08374b9fc 100644
--- a/queue_job/readme/USAGE.md
+++ b/queue_job/readme/USAGE.md
@@ -108,6 +108,38 @@ is at the top of the graph. In the example above, if it was called on
`group_a`, then `group_b` would never be delayed (but a warning would be
shown).
+It is also possible to split a job into several jobs, each one processing
+a part of the work. This can be useful to avoid very long jobs, parallelize
+some task and get more specific errors. Usage is as follows:
+
+``` python
+def button_split_delayable(self):
+ (
+ self # Can be a big recordset, let's say 1000 records
+ .delayable()
+ .generate_thumbnail((50, 50))
+ .set(priority=30)
+ .set(description=_("generate xxx"))
+ .split(50) # Split the job in 20 jobs of 50 records each
+ .delay()
+ )
+```
+
+The `split()` method takes a `chain` boolean keyword argument. If set to
+True, the jobs will be chained, meaning that the next job will only start
+when the previous one is done:
+
+``` python
+def button_increment_var(self):
+ (
+ self
+ .delayable()
+ .increment_counter()
+ .split(1, chain=True) # Will exceute the jobs one after the other
+ .delay()
+ )
+```
+
### Enqueing Job Options
- priority: default is 10, the closest it is to 0, the faster it will be
diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html
index 27faa78270..d9f8b6d46f 100644
--- a/queue_job/static/description/index.html
+++ b/queue_job/static/description/index.html
@@ -596,12 +596,41 @@
is at the top of the graph. In the example above, if it was called on
group_a, then group_b would never be delayed (but a warning
would be shown).
+It is also possible to split a job into several jobs, each one
+processing a part of the work. This can be useful to avoid very long
+jobs, parallelize some task and get more specific errors. Usage is as
+follows:
+
+def button_split_delayable(self):
+ (
+ self # Can be a big recordset, let's say 1000 records
+ .delayable()
+ .generate_thumbnail((50, 50))
+ .set(priority=30)
+ .set(description=_("generate xxx"))
+ .split(50) # Split the job in 20 jobs of 50 records each
+ .delay()
+ )
+
+The split() method takes a chain boolean keyword argument. If
+set to True, the jobs will be chained, meaning that the next job will
+only start when the previous one is done:
+
+def button_increment_var(self):
+ (
+ self
+ .delayable()
+ .increment_counter()
+ .split(1, chain=True) # Will exceute the jobs one after the other
+ .delay()
+ )
+
-- priority: default is 10, the closest it is to 0, the faster it will
-be executed
+- priority: default is 10, the closest it is to 0, the faster it will be
+executed
- eta: Estimated Time of Arrival of the job. It will not be executed
before this date/time
- max_retries: default is 5, maximum number of retries before giving up
@@ -611,8 +640,8 @@
computed from the function doc or method name
- channel: the complete name of the channel to use to process the
function. If specified it overrides the one defined on the function
-- identity_key: key uniquely identifying the job, if specified and a
-job with the same key has not yet been run, the new job will not be
+
- identity_key: key uniquely identifying the job, if specified and a job
+with the same key has not yet been run, the new job will not be
created
@@ -868,8 +897,8 @@
without impact on the data.
The job should test at the very beginning its relevance: the
moment the job will be executed is unknown by design. So the first
-task of a job should be to check if the related work is still
-relevant at the moment of the execution.
+task of a job should be to check if the related work is still relevant
+at the moment of the execution.
@@ -890,8 +919,7 @@
- After creating a new database or installing queue_job on an
-existing database, Odoo must be restarted for the runner to detect
-it.
+existing database, Odoo must be restarted for the runner to detect it.
- When Odoo shuts down normally, it waits for running jobs to finish.
However, when the Odoo server crashes or is otherwise force-stopped,
running jobs are interrupted while the runner has no chance to know
@@ -901,8 +929,8 @@
not, and does not know for sure if it is safe to restart the jobs, it
does not attempt to restart them automatically. Such stale jobs
therefore fill the running queue and prevent other jobs to start. You
-must therefore requeue them manually, either from the Jobs view, or
-by running the following SQL statement before starting Odoo:
+must therefore requeue them manually, either from the Jobs view, or by
+running the following SQL statement before starting Odoo:
update queue_job set state='pending' where state in ('started', 'enqueued')
diff --git a/queue_job/tests/__init__.py b/queue_job/tests/__init__.py
index e0ff9576a5..db53ac3a60 100644
--- a/queue_job/tests/__init__.py
+++ b/queue_job/tests/__init__.py
@@ -1,6 +1,7 @@
from . import test_runner_channels
from . import test_runner_runner
from . import test_delayable
+from . import test_delayable_split
from . import test_json_field
from . import test_model_job_channel
from . import test_model_job_function
diff --git a/queue_job/tests/test_delayable_split.py b/queue_job/tests/test_delayable_split.py
new file mode 100644
index 0000000000..25fd9c3ebc
--- /dev/null
+++ b/queue_job/tests/test_delayable_split.py
@@ -0,0 +1,103 @@
+# Copyright 2024 Akretion (http://www.akretion.com).
+# @author Florian Mounier
+# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
+
+from odoo.tests import common
+
+from odoo.addons.queue_job.delay import Delayable
+
+
+class TestDelayableSplit(common.BaseCase):
+ def setUp(self):
+ super().setUp()
+
+ class FakeRecordSet(list):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._name = "recordset"
+
+ def __getitem__(self, key):
+ if isinstance(key, slice):
+ return FakeRecordSet(super().__getitem__(key))
+ return super().__getitem__(key)
+
+ def method(self, arg, kwarg=None):
+ """Method to be called"""
+ return arg, kwarg
+
+ self.FakeRecordSet = FakeRecordSet
+
+ def _cancel_delayables(self, *delayables):
+ # Prevent warning at deletion
+ for delayable in delayables:
+ delayable._generated_job = True
+
+ def test_delayable_split_no_method_call_beforehand(self):
+ dl = Delayable(self.FakeRecordSet(range(20)))
+ with self.assertRaises(ValueError):
+ dl.split(3)
+ self._cancel_delayables(dl)
+
+ def test_delayable_split_10_3(self):
+ dl = Delayable(self.FakeRecordSet(range(10)))
+ dl.method("arg", kwarg="kwarg")
+ group = dl.split(3)
+ self.assertEqual(len(group._delayables), 4)
+ delayables = sorted(list(group._delayables), key=lambda x: x.description)
+ self.assertEqual(delayables[0].recordset, self.FakeRecordSet([0, 1, 2]))
+ self.assertEqual(delayables[1].recordset, self.FakeRecordSet([3, 4, 5]))
+ self.assertEqual(delayables[2].recordset, self.FakeRecordSet([6, 7, 8]))
+ self.assertEqual(delayables[3].recordset, self.FakeRecordSet([9]))
+ self.assertEqual(delayables[0].description, "Method to be called (split 1/4)")
+ self.assertEqual(delayables[1].description, "Method to be called (split 2/4)")
+ self.assertEqual(delayables[2].description, "Method to be called (split 3/4)")
+ self.assertEqual(delayables[3].description, "Method to be called (split 4/4)")
+ self.assertNotEqual(delayables[0]._job_method, dl._job_method)
+ self.assertNotEqual(delayables[1]._job_method, dl._job_method)
+ self.assertNotEqual(delayables[2]._job_method, dl._job_method)
+ self.assertNotEqual(delayables[3]._job_method, dl._job_method)
+ self.assertEqual(delayables[0]._job_method.__name__, dl._job_method.__name__)
+ self.assertEqual(delayables[1]._job_method.__name__, dl._job_method.__name__)
+ self.assertEqual(delayables[2]._job_method.__name__, dl._job_method.__name__)
+ self.assertEqual(delayables[3]._job_method.__name__, dl._job_method.__name__)
+ self.assertEqual(delayables[0]._job_args, ("arg",))
+ self.assertEqual(delayables[1]._job_args, ("arg",))
+ self.assertEqual(delayables[2]._job_args, ("arg",))
+ self.assertEqual(delayables[3]._job_args, ("arg",))
+ self.assertEqual(delayables[0]._job_kwargs, {"kwarg": "kwarg"})
+ self.assertEqual(delayables[1]._job_kwargs, {"kwarg": "kwarg"})
+ self.assertEqual(delayables[2]._job_kwargs, {"kwarg": "kwarg"})
+ self.assertEqual(delayables[3]._job_kwargs, {"kwarg": "kwarg"})
+ self._cancel_delayables(*group._delayables)
+
+ def test_delayable_split_10_5(self):
+ dl = Delayable(self.FakeRecordSet(range(10)))
+ dl.method("arg", kwarg="kwarg")
+ group = dl.split(5)
+ self.assertEqual(len(group._delayables), 2)
+ delayables = sorted(list(group._delayables), key=lambda x: x.description)
+ self.assertEqual(delayables[0].recordset, self.FakeRecordSet([0, 1, 2, 3, 4]))
+ self.assertEqual(delayables[1].recordset, self.FakeRecordSet([5, 6, 7, 8, 9]))
+ self.assertEqual(delayables[0].description, "Method to be called (split 1/2)")
+ self.assertEqual(delayables[1].description, "Method to be called (split 2/2)")
+ self._cancel_delayables(*group._delayables)
+
+ def test_delayable_split_10_10(self):
+ dl = Delayable(self.FakeRecordSet(range(10)))
+ dl.method("arg", kwarg="kwarg")
+ group = dl.split(10)
+ self.assertEqual(len(group._delayables), 1)
+ delayables = sorted(list(group._delayables), key=lambda x: x.description)
+ self.assertEqual(delayables[0].recordset, self.FakeRecordSet(range(10)))
+ self.assertEqual(delayables[0].description, "Method to be called (split 1/1)")
+ self._cancel_delayables(*group._delayables)
+
+ def test_delayable_split_10_20(self):
+ dl = Delayable(self.FakeRecordSet(range(10)))
+ dl.method("arg", kwarg="kwarg")
+ group = dl.split(20)
+ self.assertEqual(len(group._delayables), 1)
+ delayables = sorted(list(group._delayables), key=lambda x: x.description)
+ self.assertEqual(delayables[0].recordset, self.FakeRecordSet(range(10)))
+ self.assertEqual(delayables[0].description, "Method to be called (split 1/1)")
+ self._cancel_delayables(*group._delayables)