Skip to content

converged-computing/flux-batch

Repository files navigation

flux-batch

Python SDK to generate Flux batch jobs and services

PyPI - Version

https://github.com/converged-computing/flux-batch/raw/main/img/flux-batch-small.png

Related Projects

  • flux-mcp: MCP functions for Flux.
  • flux-mcp-server: MCP server.
  • fractale-mcp: (fractale) MCP orchestration (agents, databases, ui interfaces).
  • hpc-mcp: HPC tools for a larger set of HPC and converged computing use cases.

If you are looking for flux batch please see the documentation here. This library supports Flux Framework and is experimental.

Services

  • flux-scribe: Write job events to a local sqlite database via the JournalConsumer (not added yet, written and needs testing)

Usage

This is a small Flux utility that makes it easy to create Flux batch jobs and services. The use case is to submit work (one or more jobs) under an instance, and run a custom service, or epilog and prolog commands. We will provision several services here, and you can also provide your own name to start / stop.

Setup

Install the library and start (or be in) a flux instance.

flux start
pip install -e . --break-system-packages

Examples

We have a few simple examples:

Saving Logs

python3 ./examples/save_logs.py

Usernetes

Here is an example to deploy the usernetes module.

# Create a Flux allocation
flux alloc -N2 --time 4h

# Ensure Flux Python bindings on path (e.g., import flux, flux.Flux() works)
export PYTHONPATH=/usr/lib64/python3.12/site-packages

# Ensure you have the code here!
git clone https://github.com/converged-computing/flux-batch
cd flux-batch
pip install -e .

# Run the example
python examples/usernetes_module.py

# The module files are written here. If you need them recreated, delete again.
ls ~/.flux-batch

# Join commands are here. We will need to get the flux job id instead of this uuid
ls ~/.usernetes/join-commands

# Look in /tmp/control-plane.log and /tmp/worker.log for logs
# Look in /tmp/<username>/usernetes for usernetes!
# . source_env.sh then use kubectl

Flux Scribe Module

export FLUX_SCRIBE_DATABASE=sqlite:///flux-batch-job.db
python3 ./examples/flux_scribe_module.py

General Test

Or run the controlled example to see a batch job with prolog and epilog run and complete:

python3 ./tests/test_flux_batch.py
Flux Batch Module Test
[OK] Connected to Flux.
[*] Creating batch jobs...
[*] Mapping attributes to BatchJobspecV1...
[*] Previewing submission (Dryrun -> Wrapper)...
#!/bin/bash
echo 'Batch Wrapper Starting'
flux submit --wait /bin/echo 'Job 1 starting'
flux submit --wait /bin/sleep 5
flux submit --wait /bin/echo 'Job 2 finished'
flux job wait --all
echo 'Batch Wrapper Finished'
[*] Performing submission (Dryrun -> Wrapper -> Submit)...
[SUCCESS] Batch submitted! Flux Job ID: ƒMX29AwFu
$ flux jobs -a
       JOBID USER     NAME       ST NTASKS NNODES     TIME INFO
   ƒMX29AwFu vscode   test-batch  R      1      1   4.213s 68e8c4399c15
$ flux jobs -a
       JOBID USER     NAME       ST NTASKS NNODES     TIME INFO
   ƒMX29AwFu vscode   test-batch CD      1      1   6.354s 68e8c4399c15

Here is an explicit (manual) example to do the same:

import flux
import flux_batch

# for pretty printing
# from rich import print

handle = flux.Flux()

# Create your batch job with some number of commands
batch = flux_batch.BatchJobV1()
batch.add_job(["echo", "Job 1 starting"])
batch.add_job(["sleep", "5"])
batch.add_job(["echo", "Job 2 finished"])

# Wrap it up into a jobspec
spec = flux_batch.BatchJobspecV1.from_jobs(
    batch,
    nodes=1,
    nslots=1,
    time_limit="10m",
    job_name="test-batch",
    # Add saving of logs, info, and metadata
    logs_dir="./logs",
)

# Add a prolog and epilog
spec.add_prolog("echo 'Batch Wrapper Starting'")
spec.add_epilog("echo 'Batch Wrapper Finished'")

# Add a service (this assumes user level that exists)
spec.add_service("flux-scribe")

# Preview it (batch wrapper), or generate the jobspec (json)
print(flux_batch.submit(handle, spec, dry_run=True))
jobspec = flux_batch.jobspec(spec)

# Submit that bad boi.
jobid = flux_batch.submit(handle, jobspec)

See the examples directory for more script examples.

TODO

  • How to control a variable number of ranks for different services (ref)
  • Need to add env directive to flux module start for the job id.

License

HPCIC DevTools is distributed under the terms of the MIT license. All new contributions must be made under this license.

See LICENSE, COPYRIGHT, and NOTICE for details.

SPDX-License-Identifier: (MIT)

LLNL-CODE- 842614

About

Python SDK for flux batch jobs and services

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published