diff --git a/queue_job/README.rst b/queue_job/README.rst index 791e3f05ce..5f143aa122 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -259,6 +259,38 @@ Note: ``delay()`` must be called on the delayable, chain, or group which is at t of the graph. In the example above, if it was called on ``group_a``, then ``group_b`` would never be delayed (but a warning would be shown). +It is also possible to split a job into several jobs, each one processing a part of the +work. This can be useful to avoid very long jobs, parallelize some task and get more specific +errors. Usage is as follows: + +.. code-block:: python + + def button_split_delayable(self): + ( + self # Can be a big recordset, let's say 1000 records + .delayable() + .generate_thumbnail((50, 50)) + .set(priority=30) + .set(description=_("generate xxx")) + .split(50) # Split the job in 20 jobs of 50 records each + .delay() + ) + +The ``split()`` method takes a ``chain`` boolean keyword argument. If set to +True, the jobs will be chained, meaning that the next job will only start when the previous +one is done: + +.. code-block:: python + + def button_increment_var(self): + ( + self + .delayable() + .increment_counter() + .split(1, chain=True) # Will exceute the jobs one after the other + .delay() + ) + Enqueing Job Options -------------------- diff --git a/queue_job/delay.py b/queue_job/delay.py index d8bffeb7eb..e46e95aed9 100644 --- a/queue_job/delay.py +++ b/queue_job/delay.py @@ -526,6 +526,52 @@ def delay(self): """Delay the whole graph""" self._graph.delay() + def split(self, size, chain=False): + """Split the Delayables. + + Use `DelayableGroup` or `DelayableChain` + if `chain` is True containing batches of size `size` + """ + if not self._job_method: + raise ValueError("No method set on the Delayable") + + total_records = len(self.recordset) + + delayables = [] + for index in range(0, total_records, size): + recordset = self.recordset[index : index + size] + delayable = Delayable( + recordset, + priority=self.priority, + eta=self.eta, + max_retries=self.max_retries, + description=self.description, + channel=self.channel, + identity_key=self.identity_key, + ) + # Update the __self__ + delayable._job_method = getattr(recordset, self._job_method.__name__) + delayable._job_args = self._job_args + delayable._job_kwargs = self._job_kwargs + + delayables.append(delayable) + + description = self.description or ( + self._job_method.__doc__.splitlines()[0].strip() + if self._job_method.__doc__ + else "{}.{}".format(self.recordset._name, self._job_method.__name__) + ) + for index, delayable in enumerate(delayables): + delayable.set( + description="%s (split %s/%s)" + % (description, index + 1, len(delayables)) + ) + + # Prevent warning on deletion + self._generated_job = True + + return (DelayableChain if chain else DelayableGroup)(*delayables) + def _build_job(self): if self._generated_job: return self._generated_job diff --git a/queue_job/readme/USAGE.rst b/queue_job/readme/USAGE.rst index 0ac17f7b10..84eb38b315 100644 --- a/queue_job/readme/USAGE.rst +++ b/queue_job/readme/USAGE.rst @@ -104,6 +104,38 @@ Note: ``delay()`` must be called on the delayable, chain, or group which is at t of the graph. In the example above, if it was called on ``group_a``, then ``group_b`` would never be delayed (but a warning would be shown). +It is also possible to split a job into several jobs, each one processing a part of the +work. This can be useful to avoid very long jobs, parallelize some task and get more specific +errors. Usage is as follows: + +.. code-block:: python + + def button_split_delayable(self): + ( + self # Can be a big recordset, let's say 1000 records + .delayable() + .generate_thumbnail((50, 50)) + .set(priority=30) + .set(description=_("generate xxx")) + .split(50) # Split the job in 20 jobs of 50 records each + .delay() + ) + +The ``split()`` method takes a ``chain`` boolean keyword argument. If set to +True, the jobs will be chained, meaning that the next job will only start when the previous +one is done: + +.. code-block:: python + + def button_increment_var(self): + ( + self + .delayable() + .increment_counter() + .split(1, chain=True) # Will exceute the jobs one after the other + .delay() + ) + Enqueing Job Options -------------------- diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html index b430c0ea4f..64c9787c73 100644 --- a/queue_job/static/description/index.html +++ b/queue_job/static/description/index.html @@ -603,6 +603,34 @@
Note: delay() must be called on the delayable, chain, or group which is at the top of the graph. In the example above, if it was called on group_a, then group_b would never be delayed (but a warning would be shown).
+It is also possible to split a job into several jobs, each one processing a part of the +work. This can be useful to avoid very long jobs, parallelize some task and get more specific +errors. Usage is as follows:
++def button_split_delayable(self): + ( + self # Can be a big recordset, let's say 1000 records + .delayable() + .generate_thumbnail((50, 50)) + .set(priority=30) + .set(description=_("generate xxx")) + .split(50) # Split the job in 20 jobs of 50 records each + .delay() + ) ++
The split() method takes a chain boolean keyword argument. If set to +True, the jobs will be chained, meaning that the next job will only start when the previous +one is done:
++def button_increment_var(self): + ( + self + .delayable() + .increment_counter() + .split(1, chain=True) # Will exceute the jobs one after the other + .delay() + ) +