diff --git a/storage/clients.mdx b/storage/clients.mdx
index 62a5fc5..b51a5bd 100644
--- a/storage/clients.mdx
+++ b/storage/clients.mdx
@@ -4,10 +4,15 @@ description: Learn about the different storage clients available in Tilebox to a
icon: hard-drive
---
-Tilebox does not host the actual open data satellite products but instead relies on publicly accessible storage providers for data access. Instead Tilebox ingests available metadata as [datasets](/datasets/concepts/datasets) to enable high performance querying and structured access of the data as [xarray.Dataset](/sdks/python/xarray).
+Tilebox does not host the actual open data satellite products but instead relies on publicly accessible storage providers for data access.
+Tilebox ingests available metadata as [datasets](/datasets/concepts/datasets) to enable high performance querying and structured access of the data as [xarray.Dataset](/sdks/python/xarray).
Below is a list of the storage providers currently supported by Tilebox.
+
+ This feature is only available in the Python SDK.
+
+
## Copernicus Data Space
The [Copernicus Data Space](https://dataspace.copernicus.eu/) is an open ecosystem that provides free instant access to data and services from the Copernicus Sentinel missions. Check out the [ASF Open Data datasets](/datasets/open-data#copernicus-data-space) that are available in Tilebox.
@@ -25,7 +30,7 @@ from tilebox.datasets import Client
from tilebox.storage import CopernicusStorageClient
# Creating clients
-client = Client(token="YOUR_TILEBOX_API_KEY")
+client = Client()
datasets = client.datasets()
storage_client = CopernicusStorageClient(
access_key="YOUR_ACCESS_KEY",
@@ -70,7 +75,7 @@ Contents:
For cases where only a subset of the available file objects for a product is needed, you may restrict your download to just that subset. First, list available objects using `list_objects`, filter them, and then download using `download_objects`.
-For example, a Sentinel-2 L2A product includes many files such as metadata, different bands in various resolutions, masks, and quicklook images. The following example shows how to download only specific files from a Sentinel-2 L2A product.
+For example, a Sentinel-2 L2A product includes many files such as metadata, different bands in multiple resolutions, masks, and quicklook images. The following example shows how to download only specific files from a Sentinel-2 L2A product.
```python Python {4, 15}
collection = datasets.open_data.copernicus.sentinel2_msi.collections()["S2A_S2MSI2A"]
@@ -118,7 +123,7 @@ from tilebox.datasets import Client
from tilebox.storage import ASFStorageClient
# Creating clients
-client = Client(token="YOUR_TILEBOX_API_KEY")
+client = Client()
datasets = client.datasets()
storage_client = ASFStorageClient(
user="YOUR_ASF_USER",
@@ -191,7 +196,7 @@ from tilebox.datasets import Client
from tilebox.storage import UmbraStorageClient
# Creating clients
-client = Client(token="YOUR_TILEBOX_API_KEY")
+client = Client()
datasets = client.datasets()
storage_client = UmbraStorageClient(cache_directory=Path("./data"))
diff --git a/vale/styles/config/vocabularies/docs/accept.txt b/vale/styles/config/vocabularies/docs/accept.txt
index 79654fe..2e5547a 100644
--- a/vale/styles/config/vocabularies/docs/accept.txt
+++ b/vale/styles/config/vocabularies/docs/accept.txt
@@ -1,5 +1,9 @@
# (?i) makes the regex case-insensitive, see https://vale.sh/docs/topics/vocab/#case-sensitivity
+Assistance
+Custom Dataset
+Custom Datasets
+tilebox-generate
Tilebox
(?i)Xarray
NumPy
@@ -10,6 +14,8 @@ Datalore
Colab
Tropomi
georeferenced
+Geospatial
+Geopandas
Fortran
Zarr
datetime
@@ -17,7 +23,8 @@ accessor
Pipenv
Opendata
APIs
-datapoint
+(?i)datapoint
+(?i)datapoints
dataclass
subtask
subtasks
@@ -37,7 +44,7 @@ subtasks
subtask
parallelizable
deserializing
-idempotency
+(?i)idempotency
SDKs
SDK
coroutines
@@ -47,3 +54,12 @@ coroutines
(?i)pushbroom
rollout
(?i)automations
+(?i)protobuf
+UUIDs
+(?i)getters
+(?i)ingest
+(?i)Timeseries
+point in time
+bool
+boolean
+(?i)modis
diff --git a/workflows/caches.mdx b/workflows/caches.mdx
index 99c102b..87357e4 100644
--- a/workflows/caches.mdx
+++ b/workflows/caches.mdx
@@ -148,7 +148,7 @@ Caches are isolated per job, meaning that each job's cache data is only accessib
## Storing and Retrieving Data
-The job cache can be accessed via the `ExecutionContext` passed to a tasks `execute` function. This [`job_cache`](/api-reference/tilebox.workflows/ExecutionContext.job_cache) object provides methods to handle data storage and retrieval from the cache. The specifics of data storage depend on the chosen cache backend.
+The job cache can be accessed via the `ExecutionContext` passed to a tasks `execute` function. This [`job_cache`](/api-reference/python/tilebox.workflows/ExecutionContext.job_cache) object provides methods to handle data storage and retrieval from the cache. The specifics of data storage depend on the chosen cache backend.
The cache API is designed to be simple and can handle all types of data, supporting binary data in the form of `bytes`, identified by `str` cache keys. This allows for storing many different data types, such as pickled Python objects, serialized JSON, UTF-8, or binary data.
diff --git a/workflows/concepts/clusters.mdx b/workflows/concepts/clusters.mdx
index 7fbfc30..dd9d831 100644
--- a/workflows/concepts/clusters.mdx
+++ b/workflows/concepts/clusters.mdx
@@ -36,6 +36,12 @@ To manage clusters, first instantiate a cluster client using the `clusters` meth
client = Client()
clusters = client.clusters()
```
+ ```go Go
+ import "github.com/tilebox/tilebox-go/workflows/v1"
+
+ client := workflows.NewClient()
+ clusterClient := client.Clusters
+ ```
### Creating a Cluster
@@ -47,11 +53,20 @@ To create a cluster, use the `create` method on the cluster client and provide a
cluster = clusters.create("testing")
print(cluster)
```
+ ```go Go
+ cluster := client.Clusters.Create("testing")
+ fmt.Println(cluster)
+ ```
-```plaintext Output
+
+```plaintext Python
Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing')
```
+```go Go
+&{testing-CvufcSxcC9SKfe testing}
+```
+
### Cluster Slug
@@ -66,12 +81,29 @@ To list all available clusters, use the `all` method:
all_clusters = clusters.all()
print(all_clusters)
```
+ ```go Go
+ clusters, err := client.Clusters.List(ctx)
+ if err != nil {
+ slog.Error("failed to list clusters", slog.Any("error", err))
+ return
+ }
+
+ for _, cluster := range clusters {
+ fmt.Println(cluster)
+ }
+ ```
-```plaintext Output
+
+```plaintext Python
[Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing'),
Cluster(slug='production-EifhUozDpwAJDL', display_name='Production')]
```
+```go Go
+&{testing-CvufcSxcC9SKfe testing}
+&{production-EifhUozDpwAJDL Production}
+```
+
### Fetching a Specific Cluster
@@ -82,11 +114,24 @@ To fetch a specific cluster, use the `find` method and pass the cluster's slug:
cluster = clusters.find("testing-CvufcSxcC9SKfe")
print(cluster)
```
+ ```go Go
+ cluster, err := client.Clusters.Get(ctx, "testing-CvufcSxcC9SKfe")
+ if err != nil {
+ slog.Error("failed to get cluster", slog.Any("error", err))
+ return
+ }
+ fmt.Println(cluster)
+ ```
-```plaintext Output
+
+```plaintext Python
Cluster(slug='testing-CvufcSxcC9SKfe', display_name='testing')
```
+```go Go
+&{testing-CvufcSxcC9SKfe testing}
+```
+
### Deleting a Cluster
@@ -96,6 +141,9 @@ To delete a cluster, use the `delete` method and pass the cluster's slug:
```python Python
clusters.delete("testing-CvufcSxcC9SKfe")
```
+ ```go Go
+ err := client.Clusters.Delete(ctx, "testing-CvufcSxcC9SKfe")
+ ```
## Jobs Across Different Clusters
@@ -127,10 +175,63 @@ class DummyTask(Task):
client = Client()
job_client = client.jobs()
job = job_client.submit(
+ "my-job",
MultiClusterWorkflow(),
cluster="testing-CvufcSxcC9SKfe",
)
```
+```go Go
+package main
+
+import (
+ "context"
+
+ "github.com/tilebox/tilebox-go/workflows/v1"
+ "github.com/tilebox/tilebox-go/workflows/v1/subtask"
+)
+
+type MultiClusterWorkflow struct{}
+
+func (t *MultiClusterWorkflow) Execute(ctx context.Context) error {
+ // this submits a task to the same cluster as the one currently executing this task
+ sameCluster, err := workflows.SubmitSubtask(ctx, &DummyTask{})
+ if err != nil {
+ return err
+ }
+
+ otherCluster, err := workflows.SubmitSubtask(
+ ctx,
+ &DummyTask{},
+ // this task runs only on a task runner in the "other-cluster" cluster
+ subtask.WithClusterSlug("other-cluster-As3dcSb3D9SAdK"),
+ // dependencies can be specified across clusters
+ subtask.WithDependencies(sameCluster),
+ )
+ if err != nil {
+ return err
+ }
+
+ _ = otherCluster
+ return nil
+}
+
+type DummyTask struct{}
+
+func main() {
+ ctx := context.Background()
+ client := workflows.NewClient()
+
+ // submit a job to the "testing" cluster
+ _, _ = client.Jobs.Submit(
+ ctx,
+ "my-job",
+ "testing-CvufcSxcC9SKfe",
+ []workflows.Task{
+ &MultiClusterWorkflow{},
+ },
+ )
+}
+```
This workflow requires at least two task runners to complete. One must be in the "testing" cluster, and the other must be in the "other-cluster" cluster. If no task runners are available in the "other-cluster," the task submitted to that cluster will remain queued until a task runner is available. It won't execute on a task runner in the "testing" cluster, even if the task runner has the `DummyTask` registered.
diff --git a/workflows/concepts/jobs.mdx b/workflows/concepts/jobs.mdx
index e23b5fc..639d4ff 100644
--- a/workflows/concepts/jobs.mdx
+++ b/workflows/concepts/jobs.mdx
@@ -22,9 +22,15 @@ from tilebox.workflows import Client
client = Client()
job_client = client.jobs()
```
+```go Go
+import "github.com/tilebox/tilebox-go/workflows/v1"
+
+client := workflows.NewClient()
+jobClient := client.Jobs
+```
-After obtaining a job client, submit a job using the [submit](/api-reference/tilebox.workflows/JobClient.submit) method. You need to provide a name for the job, an instance of the root [task](/workflows/concepts/tasks), and a [cluster](/workflows/concepts/clusters) to execute the root task on.
+After obtaining a job client, submit a job using the [submit](/api-reference/python/tilebox.workflows/JobClient.submit) method. You need to provide a name for the job, an instance of the root [task](/workflows/concepts/tasks), and a [cluster](/workflows/concepts/clusters) to execute the root task on.
```python Python
@@ -34,6 +40,25 @@ from my_workflow import MyTask
cluster = "dev-cluster"
job = job_client.submit('my-job', MyTask("some", "parameters"), cluster)
```
+```go Go
+cluster, err := client.Clusters.Get(ctx, "dev-cluster")
+if err != nil {
+ slog.Error("failed to get cluster", slog.Any("error", err))
+ return
+}
+
+job, err := client.Jobs.Submit(ctx, "my-job", cluster,
+ []workflows.Task{
+ &MyTask{
+ Some: "parameters",
+ },
+ },
+)
+if err != nil {
+ slog.Error("Failed to submit job", slog.Any("error", err))
+ return
+}
+```
Once a job is submitted, it's immediately scheduled for execution. The root task will be picked up and executed as soon as an [eligible task runner](/workflows/concepts/task-runners#task-selection) is available.
@@ -49,6 +74,14 @@ from my_workflow import MyFlakyTask
cluster = "dev-cluster"
job = job_client.submit('my-job', MyFlakyTask(), cluster, max_retries=5)
```
+```go Go
+myJob, err := client.Jobs.Submit(ctx, "my-job", cluster,
+ []workflows.Task{
+ &MyFlakyTask{},
+ },
+ job.WithMaxRetries(5),
+)
+```
In this example, if `MyFlakyTask` fails, it will be retried up to five times before being marked as failed.
@@ -67,6 +100,25 @@ print(job.id) # 018dd029-58ca-74e5-8b58-b4f99d610f9a
# Later, in another process or machine, retrieve job info
job = job_client.find("018dd029-58ca-74e5-8b58-b4f99d610f9a")
```
+```go Go
+myJob, err := client.Jobs.Submit(ctx, "my-job", cluster,
+ []workflows.Task{
+ &helloworld.HelloTask{
+ Some: "parameters",
+ },
+ },
+)
+if err != nil {
+ slog.Error("Failed to submit job", slog.Any("error", err))
+ return
+}
+
+// 018dd029-58ca-74e5-8b58-b4f99d610f9a
+slog.Info("Job submitted", slog.String("job_id", myJob.ID.String()))
+
+// Later, in another process or machine, retrieve job info
+job, err := client.Jobs.Get(ctx, uuid.MustParse("018dd029-58ca-74e5-8b58-b4f99d610f9a"))
+```
@@ -78,7 +130,11 @@ job = job_client.find("018dd029-58ca-74e5-8b58-b4f99d610f9a")
Visualizing the execution of a job can be helpful. The Tilebox workflow orchestrator tracks all tasks in a job, including [sub-tasks](/workflows/concepts/tasks#task-composition-and-subtasks) and [dependencies](/workflows/concepts/tasks#dependencies). This enables the visualization of the execution of a job as a graph diagram.
-`display` is designed for use in an [interactive environment](/sdks/python/sample-notebooks#interactive-environments) such as a Jupyter notebook. In non-interactive environments, use [visualize](/api-reference/tilebox.workflows/JobClient.visualize), which returns the rendered diagram as an SVG string.
+`display` is designed for use in an [interactive environment](/sdks/python/sample-notebooks#interactive-environments) such as a Jupyter notebook. In non-interactive environments, use [visualize](/api-reference/python/tilebox.workflows/JobClient.visualize), which returns the rendered diagram as an SVG string.
+
+
+
+ Visualization isn't supported in Go yet.
@@ -148,6 +204,45 @@ class SubTask(Task):
job = job_client.submit('custom-display-names', RootTask(3), "dev-cluster")
job_client.display(job)
```
+```go Go
+type RootTask struct {
+ NumSubtasks int
+}
+
+func (t *RootTask) Execute(ctx context.Context) error {
+ err := workflows.SetTaskDisplay(ctx, fmt.Sprintf("Root(%d)", t.NumSubtasks))
+ if err != nil {
+ return fmt.Errorf("failed to set task display: %w", err)
+ }
+
+ for i := range t.NumSubtasks {
+ _, err := workflows.SubmitSubtask(ctx, &SubTask{Index: i})
+ if err != nil {
+ return fmt.Errorf("failed to submit subtask: %w", err)
+ }
+ }
+ return nil
+}
+
+type SubTask struct {
+ Index int
+}
+
+func (t *SubTask) Execute(ctx context.Context) error {
+ err := workflows.SetTaskDisplay(ctx, fmt.Sprintf("Leaf Nr. %d", t.Index))
+ if err != nil {
+ return fmt.Errorf("failed to set task display: %w", err)
+ }
+ return nil
+}
+
+// in main
+job, err := client.Jobs.Submit(ctx, "custom-display-names", cluster,
+ []workflows.Task{&RootTask{
+ NumSubtasks: 3,
+ }},
+)
+```
@@ -167,6 +262,18 @@ job = job_client.submit('my-job', MyTask(), "dev-cluster")
# After a short while, the job gets canceled
job_client.cancel(job)
```
+```go Go
+job, err := client.Jobs.Submit(ctx, "my-job", cluster,
+ []workflows.Task{&MyTask{}},
+)
+if err != nil {
+ slog.Error("Failed to submit job", slog.Any("error", err))
+ return
+}
+
+// After a short while, the job gets canceled
+err = client.Jobs.Cancel(ctx, job.ID)
+```
@@ -209,6 +316,72 @@ class PrintMovieStats(Task):
context.current_task.display = response["Title"]
print(f"{response['Title']} was released on {response['Released']}")
```
+```go Go
+package movie
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+
+ "github.com/tilebox/tilebox-go/workflows/v1"
+)
+
+type MoviesStats struct {
+ Titles []string
+}
+
+func (t *MoviesStats) Execute(ctx context.Context) error {
+ for _, title := range t.Titles {
+ _, err := workflows.SubmitSubtask(ctx, &PrintMovieStats{Title: title})
+ if err != nil {
+ return fmt.Errorf("failed to submit subtask: %w", err)
+ }
+ }
+ return nil
+}
+
+type Movie struct {
+ Title *string `json:"Title"`
+ Released *string `json:"Released"`
+}
+
+type PrintMovieStats struct {
+ Title string
+}
+
+func (t *PrintMovieStats) Execute(ctx context.Context) error {
+ apiURL := fmt.Sprintf("http://www.omdbapi.com/?t=%s&apikey=%s", url.QueryEscape(t.Title), "")
+ response, err := http.Get(apiURL)
+ if err != nil {
+ return fmt.Errorf("failed to fetch movie: %w", err)
+ }
+
+ defer response.Body.Close()
+ body, err := io.ReadAll(response.Body)
+ if err != nil {
+ return fmt.Errorf("failed to read response: %w", err)
+ }
+
+ var movie Movie
+ err = json.Unmarshal(body, &movie)
+ if err != nil {
+ return fmt.Errorf("failed to unmarshal response: %w", err)
+ }
+
+ // set the display name of the task to the title of the movie:
+ err := workflows.SetTaskDisplay(ctx, *movie.Title)
+ if err != nil {
+ return fmt.Errorf("failed to set task display: %w", err)
+ }
+
+ fmt.Printf("%s was released on %s\n", *movie.Title, *movie.Released)
+ return nil
+}
+```
Submitting the workflow as a job reveals a bug in the `PrintMovieStats` task.
@@ -224,6 +397,18 @@ job = job_client.submit('movies-stats', MoviesStats([
job_client.display(job)
```
+```go Go
+job, err := client.Jobs.Submit(ctx, "movies-stats", cluster,
+ []workflows.Task{&MoviesStats{
+ Titles: []string{
+ "The Matrix",
+ "Shrek 2",
+ "Tilebox - The Movie",
+ "The Avengers",
+ },
+ }},
+)
+```
@@ -259,6 +444,47 @@ class PrintMovieStats(Task):
context.current_task.display = f"NotFound: {self.title}"
print(f"Could not find the release date for {self.title}")
```
+```go Go
+type PrintMovieStats struct {
+ Title string
+}
+
+func (t *PrintMovieStats) Execute(ctx context.Context) error {
+ url2 := fmt.Sprintf("http://www.omdbapi.com/?t=%s&apikey=%s", url.QueryEscape(t.Title), "")
+ response, err := http.Get(url2)
+ if err != nil {
+ return fmt.Errorf("failed to fetch movie: %w", err)
+ }
+
+ defer response.Body.Close()
+ body, err := io.ReadAll(response.Body)
+ if err != nil {
+ return fmt.Errorf("failed to read response: %w", err)
+ }
+
+ var movie Movie
+ err = json.Unmarshal(body, &movie)
+ if err != nil {
+ return fmt.Errorf("failed to unmarshal response: %w", err)
+ }
+
+ if movie.Released != nil && movie.Title != nil {
+ err := workflows.SetTaskDisplay(ctx, *movie.Title)
+ if err != nil {
+ return fmt.Errorf("failed to set task display: %w", err)
+ }
+ fmt.Printf("%s was released on %s\n", *movie.Title, *movie.Released)
+ } else {
+ err := workflows.SetTaskDisplay(ctx, fmt.Sprintf("NotFound: %s", t.Title))
+ if err != nil {
+ return fmt.Errorf("failed to set task display: %w", err)
+ }
+ fmt.Printf("Could not find the release date for %s\n", t.Title)
+ }
+
+ return nil
+}
+```
With this fix, and after redeploying the task runners with the updated `PrintMovieStats` implementation, you can retry the job:
@@ -268,6 +494,9 @@ With this fix, and after redeploying the task runners with the updated `PrintMov
job_client.retry(job)
job_client.display(job)
```
+```go Go
+err = client.Jobs.Retry(ctx, job.ID)
+```
diff --git a/workflows/concepts/task-runners.mdx b/workflows/concepts/task-runners.mdx
index da79c80..37de22c 100644
--- a/workflows/concepts/task-runners.mdx
+++ b/workflows/concepts/task-runners.mdx
@@ -50,13 +50,55 @@ def main():
if __name__ == "__main__":
main()
```
+```go Go
+package main
+
+import (
+ "context"
+ "log/slog"
+
+ "github.com/tilebox/tilebox-go/workflows/v1"
+ // your own workflow:
+ "github.com/my_org/myworkflow"
+)
+
+func main() {
+ // 1. connect to the Tilebox Workflows API
+ client := workflows.NewClient()
+
+ // 2. select a cluster to join
+ runner, err := client.NewTaskRunner("dev-cluster")
+ if err != nil {
+ slog.Error("failed to create task runner", slog.Any("error", err))
+ return
+ }
+
+ // 3. register tasks
+ err = runner.RegisterTasks(
+ &myworkflow.MyTask{},
+ &myworkflow.OtherTask{},
+ )
+ if err != nil {
+ slog.Error("failed to register task", slog.Any("error", err))
+ return
+ }
+
+ // 4. listen for new tasks to execute
+ runner.Run(context.Background())
+}
+```
-To start the task runner locally, run it as a Python script:
+To start the task runner locally, run it as a script:
-```bash
+
+```bash Python
> python task_runner.py
```
+```bash Go
+> go run .
+```
+
## Task Selection
@@ -146,12 +188,56 @@ Here's an example of a distributed workflow:
def execute(self, context: ExecutionContext) -> None:
pass
```
+```go Go
+package distributed
+
+import (
+ "context"
+ "fmt"
+ "github.com/tilebox/tilebox-go/workflows/v1"
+ "github.com/tilebox/tilebox-go/workflows/v1/subtask"
+)
+
+type DistributedWorkflow struct{}
+
+func (t *DistributedWorkflow) Execute(ctx context.Context) error {
+ downloadTask, err := workflows.SubmitSubtask(ctx, &DownloadData{})
+ if err != nil {
+ return fmt.Errorf("failed to submit download subtask: %w", err)
+ }
+
+ _, err = workflows.SubmitSubtask(ctx, &ProcessData{}, subtask.WithDependencies(downloadTask))
+ if err != nil {
+ return fmt.Errorf("failed to submit process subtask: %w", err)
+ }
+ return nil
+}
+
+// DownloadData Download a dataset and store it in a shared internal bucket.
+// Requires a good network connection for high download bandwidth.
+type DownloadData struct{}
+
+func (t *DownloadData) Execute(ctx context.Context) error {
+ return nil
+}
+
+// ProcessData Perform compute-intensive processing of a dataset.
+// The dataset must be available in an internal bucket.
+// Requires access to a GPU for optimal performance.
+type ProcessData struct{}
+
+func (t *ProcessData) Execute(ctx context.Context) error {
+ return nil
+}
+```
To achieve distributed execution for this workflow, no single task runner capable of executing all three of the tasks is set up. Instead, two task runners, each capable of executing one of the tasks are set up: one in a high-speed network environment and the other with GPU access. When the distributed workflow runs, the first task runner picks up the `DownloadData` task, while the second picks up the `ProcessData` task. The `DistributedWorkflow` does not require specific hardware, so it can be registered with both runners and executed by either one.
-
-```python download_task_runner.py
+
+
+
+```python Python
from tilebox.workflows import Client
client = Client()
@@ -161,7 +247,43 @@ high_network_speed_runner = client.runner(
)
high_network_speed_runner.run_forever()
```
-```python gpu_task_runner.py
+```go Go
+package main
+
+import (
+ "context"
+ "log/slog"
+
+ "github.com/tilebox/tilebox-go/workflows/v1"
+)
+
+func main() {
+ client := workflows.NewClient()
+
+ highNetworkSpeedRunner, err := client.NewTaskRunner("dev-cluster")
+ if err != nil {
+ slog.Error("failed to create task runner", slog.Any("error", err))
+ return
+ }
+
+ err = highNetworkSpeedRunner.RegisterTasks(
+ &DownloadData{},
+ &DistributedWorkflow{},
+ )
+ if err != nil {
+ slog.Error("failed to register tasks", slog.Any("error", err))
+ return
+ }
+
+ highNetworkSpeedRunner.RunForever(context.Background())
+}
+```
+
+
+
+
+
+```python Python
from tilebox.workflows import Client
client = Client()
@@ -171,7 +293,40 @@ gpu_runner = client.runner(
)
gpu_runner.run_forever()
```
-
+```go Go
+package main
+
+import (
+ "context"
+ "log/slog"
+
+ "github.com/tilebox/tilebox-go/workflows/v1"
+)
+
+func main() {
+ client := workflows.NewClient()
+
+ gpuRunner, err := client.NewTaskRunner("dev-cluster")
+ if err != nil {
+ slog.Error("failed to create task runner", slog.Any("error", err))
+ return
+ }
+
+ err = gpuRunner.RegisterTasks(
+ &ProcessData{},
+ &DistributedWorkflow{},
+ )
+ if err != nil {
+ slog.Error("failed to register tasks", slog.Any("error", err))
+ return
+ }
+
+ gpuRunner.RunForever(context.Background())
+}
+```
+
+
+
Now, both `download_task_runner.py` and `gpu_task_runner.py` are started, in parallel, on different machines with the required hardware for each. When `DistributedWorkflow` is submitted, it executes on one of the two runners, and it's submitted sub-tasks are handled by the appropriate runner.
diff --git a/workflows/concepts/tasks.mdx b/workflows/concepts/tasks.mdx
index 84a095c..a8d9867 100644
--- a/workflows/concepts/tasks.mdx
+++ b/workflows/concepts/tasks.mdx
@@ -18,7 +18,15 @@ from tilebox.workflows import Task, ExecutionContext
class MyFirstTask(Task):
def execute(self, context: ExecutionContext):
- print(f"Hello World!")
+ print("Hello World!")
+```
+```go Go
+type MyFirstTask struct{}
+
+func (t *MyFirstTask) Execute(ctx context.Context) error {
+ slog.Info("Hello World!")
+ return nil
+}
```
@@ -32,7 +40,7 @@ This example demonstrates a simple task that prints "Hello World!" to the consol
The `execute` method is the entry point for executing the task. This is where the task's logic is defined. It's invoked by a [task runner](/workflows/concepts/task-runners) when the task runs and performs the task's operation.
- The `context` argument is an `ExecutionContext` instance that provides access to an [API for submitting new tasks](/api-reference/tilebox.workflows/ExecutionContext.submit_subtask) as part of the same job and features like [shared caching](/api-reference/tilebox.workflows/ExecutionContext.job_cache).
+ The `context` argument is an `ExecutionContext` instance that provides access to an [API for submitting new tasks](/api-reference/python/tilebox.workflows/ExecutionContext.submit_subtask) as part of the same job and features like [shared caching](/api-reference/python/tilebox.workflows/ExecutionContext.job_cache).
@@ -46,14 +54,18 @@ This example demonstrates a simple task that prints "Hello World!" to the consol
Tasks often require input parameters to operate. These inputs can range from simple values to complex data structures. By inheriting from the `Task` class, the task is treated as a Python `dataclass`, allowing input parameters to be defined as class attributes.
- Tasks must be **serializable to JSON** because they may be distributed across a cluster of [task runners](/workflows/concepts/task-runners).
+ Tasks must be **serializable to JSON or to protobuf** because they may be distributed across a cluster of [task runners](/workflows/concepts/task-runners).
+
+
+
+ In Go, task parameters must be exported fields of the task struct (starting with an uppercase letter), otherwise they will not be serialized to JSON.
Supported types for input parameters include:
- Basic types such as `str`, `int`, `float`, `bool`
- Lists and dictionaries of basic types
-- Nested data classes that are also JSON-serializable
+- Nested data classes that are also JSON-serializable or protobuf-serializable
```python Python
@@ -67,6 +79,24 @@ Supported types for input parameters include:
task = ParametrizableTask("Hello", 3, {"key": "value"})
```
+ ```go Go
+ type ParametrizableTask struct {
+ Message string
+ Number int
+ Data map[string]string
+ }
+
+ func (t *ParametrizableTask) Execute(context.Context) error {
+ slog.Info(strings.Repeat(t.Message, t.Number))
+ return nil
+ }
+
+ task := &ParametrizableTask{
+ message: "Hello",
+ number: 3,
+ data: map[string]string{"key": "value"},
+ }
+ ```
## Task Composition and subtasks
@@ -92,6 +122,36 @@ class ChildTask(Task):
# which will result in 5 ChildTasks being submitted and executed as well
task = ParentTask(5)
```
+```go Go
+type ParentTask struct {
+ NumSubtasks int
+}
+
+func (t *ParentTask) Execute(ctx context.Context) error {
+ for i := range t.NumSubtasks {
+ _, err := workflows.SubmitSubtask(ctx, &ChildTask{Index: i})
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+type ChildTask struct {
+ Index int
+}
+
+func (t *ChildTask) Execute(context.Context) error {
+ slog.Info("Executing ChildTask", slog.Int("index", t.Index))
+
+ return nil
+}
+
+// after submitting this task, a task runner may pick it up and execute it
+// which will result in 5 ChildTasks being submitted and executed as well
+task := &ParentTask{numSubtasks: 5}
+```
In this example, a `ParentTask` submits `ChildTask` tasks as subtasks. The number of subtasks to be submitted is based on the `num_subtasks` attribute of the `ParentTask`. The `submit_subtask` method takes an instance of a task as its argument, meaning the task to be submitted must be instantiated with concrete parameters first.
@@ -127,6 +187,87 @@ class DownloadImage(Task):
with file.open("wb") as file:
file.write(response.content)
```
+```go Go
+package dogs
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "os"
+ "strings"
+
+ "github.com/tilebox/tilebox-go/workflows/v1"
+)
+
+type DogImage struct {
+ ID string `json:"id"`
+ URL string `json:"url"`
+ Width *int `json:"width"`
+ Height *int `json:"height"`
+}
+
+type DownloadRandomDogImages struct {
+ NumImages int
+}
+
+func (t *DownloadRandomDogImages) Execute(ctx context.Context) error {
+ url := fmt.Sprintf("https://api.thedogapi.com/v1/images/search?limit=%d", t.NumImages)
+ response, err := http.Get(url)
+ if err != nil {
+ return fmt.Errorf("failed to download images: %w", err)
+ }
+
+ defer response.Body.Close()
+ body, err := io.ReadAll(response.Body)
+ if err != nil {
+ return fmt.Errorf("failed to read response: %w", err)
+ }
+
+ var dogImages []DogImage
+ err = json.Unmarshal(body, &dogImages)
+ if err != nil {
+ return err
+ }
+
+ for _, dogImage := range dogImages {
+ _, err := workflows.SubmitSubtask(ctx, &DownloadImage{URL: dogImage.URL})
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+type DownloadImage struct {
+ URL string
+}
+
+func (t *DownloadImage) Execute(context.Context) error {
+ response, err := http.Get(t.URL)
+ if err != nil {
+ return fmt.Errorf("failed to download image: %w", err)
+ }
+
+ defer response.Body.Close()
+ body, err := io.ReadAll(response.Body)
+ if err != nil {
+ return fmt.Errorf("failed to read response: %w", err)
+ }
+
+ err = os.MkdirAll("dogs", 0o755)
+ if err != nil {
+ return fmt.Errorf("failed to create dogs directory: %w", err)
+ }
+
+ elements := strings.Split(t.URL, "/")
+ file := fmt.Sprintf("dogs/%s", elements[len(elements)-1])
+
+ return os.WriteFile(file, body, 0o600)
+}
+```
This example consists of the following tasks:
@@ -160,6 +301,24 @@ job = jobs.submit(
jobs.display(job)
```
+```go Go
+ctx := context.Background()
+client := workflows.NewClient()
+
+job, err := client.Jobs.Submit(ctx, "download-dog-images", "dev-cluster",
+ []workflows.Task{
+ &helloworld.DownloadRandomDogImages{
+ NumImages: 5,
+ },
+ },
+)
+if err != nil {
+ slog.Error("Failed to submit job", slog.Any("error", err))
+ return
+}
+
+// now our deployed task runners will pick up the task and execute it
+```
@@ -203,6 +362,22 @@ For example, the `RecursiveTask` below is a valid task that submits smaller inst
if self.num >= 2:
context.submit_subtask(RecursiveTask(self.num // 2))
```
+ ```go Go
+ type RecursiveTask struct {
+ Num int
+ }
+
+ func (t *RecursiveTask) Execute(ctx context.Context) error {
+ slog.Info("Executing RecursiveTask", slog.Int("num", t.Num))
+ if t.Num >= 2 {
+ _, err := workflows.SubmitSubtask(ctx, &RecursiveTask{Num: t.Num / 2})
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+ ```
### Recursive subtask example
@@ -214,22 +389,68 @@ To improve this, recursive subtask submission decomposes a `DownloadRandomDogIma
An implementation of this recursive submission may look like this:
- ```python Python
- class DownloadRandomDogImages(Task):
- num_images: int
+```python Python
+class DownloadRandomDogImages(Task):
+ num_images: int
- def execute(self, context: ExecutionContext) -> None:
- if self.num_images > 4:
- half = self.num_images // 2
- remaining = self.num_images - half # account for odd numbers
- context.submit_subtask(DownloadRandomDogImages(half))
- context.submit_subtask(DownloadRandomDogImages(remaining))
- else:
- url = f"https://api.thedogapi.com/v1/images/search?limit={self.num_images}"
- response = httpx.get(url)
- for dog_image in response.json()[:self.num_images]:
- context.submit_subtask(DownloadImage(dog_image["url"]))
- ```
+ def execute(self, context: ExecutionContext) -> None:
+ if self.num_images > 4:
+ half = self.num_images // 2
+ remaining = self.num_images - half # account for odd numbers
+ context.submit_subtask(DownloadRandomDogImages(half))
+ context.submit_subtask(DownloadRandomDogImages(remaining))
+ else:
+ url = f"https://api.thedogapi.com/v1/images/search?limit={self.num_images}"
+ response = httpx.get(url)
+ for dog_image in response.json()[:self.num_images]:
+ context.submit_subtask(DownloadImage(dog_image["url"]))
+```
+```go Go
+type DownloadRandomDogImages struct {
+ NumImages int
+}
+
+func (t *DownloadRandomDogImages) Execute(ctx context.Context) error {
+ if t.NumImages > 4 {
+ half := t.NumImages / 2
+ remaining := t.NumImages - half // account for odd numbers
+ _, err := workflows.SubmitSubtask(ctx, &DownloadRandomDogImages{NumImages: half})
+ if err != nil {
+ return err
+ }
+ _, err = workflows.SubmitSubtask(ctx, &DownloadRandomDogImages{NumImages: remaining})
+ if err != nil {
+ return err
+ }
+ } else {
+ url := fmt.Sprintf("https://api.thedogapi.com/v1/images/search?limit=%d", t.NumImages)
+ response, err := http.Get(url)
+ if err != nil {
+ return fmt.Errorf("failed to download images: %w", err)
+ }
+
+ defer response.Body.Close()
+ body, err := io.ReadAll(response.Body)
+ if err != nil {
+ return fmt.Errorf("failed to read response: %w", err)
+ }
+
+ var dogImages []DogImage
+ err = json.Unmarshal(body, &dogImages)
+ if err != nil {
+ return err
+ }
+
+ for _, dogImage := range dogImages {
+ _, err := workflows.SubmitSubtask(ctx, &DownloadImage{URL: dogImage.URL})
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+```
With this implementation, downloading a large number of images (for example, 9) results in the following tasks being executed:
@@ -274,11 +495,44 @@ class FlakyTask(Task):
if random.random() < 0.1:
raise Exception("FlakyTask failed randomly")
```
+```go Go
+package flaky
+
+import (
+ "context"
+ "errors"
+ "log/slog"
+ "math/rand/v2"
+
+ "github.com/tilebox/tilebox-go/workflows/v1"
+ "github.com/tilebox/tilebox-go/workflows/v1/subtask"
+)
+
+type RootTask struct{}
+
+func (t *RootTask) Execute(ctx context.Context) error {
+ _, err := workflows.SubmitSubtask(ctx, &FlakyTask{},
+ subtask.WithMaxRetries(5),
+ )
+ return err
+}
+
+type FlakyTask struct{}
+
+func (t *FlakyTask) Execute(context.Context) error {
+ slog.Info("Executing FlakyTask")
+
+ if rand.Float64() < 0.1 {
+ return errors.New("FlakyTask failed randomly")
+ }
+ return nil
+}
+```
## Dependencies
-Tasks often rely on other tasks. For example, a task that processes data might depend on a task that fetches that data. **Tasks can express their dependencies on other tasks** by using the `depends_on` argument of the [submit_subtask](/api-reference/tilebox.workflows/ExecutionContext.submit_subtask) method. This means that a dependent task will only execute after the task it relies on has successfully completed.
+Tasks often rely on other tasks. For example, a task that processes data might depend on a task that fetches that data. **Tasks can express their dependencies on other tasks** by using the `depends_on` argument of the [`submit_subtask`](/api-reference/python/tilebox.workflows/ExecutionContext.submit_subtask) method. This means that a dependent task will only execute after the task it relies on has successfully completed.
The `depends_on` argument accepts a list of tasks, enabling a task to depend on multiple other tasks.
@@ -308,6 +562,48 @@ class PrintTask(Task):
def execute(self, context: ExecutionContext) -> None:
print(self.message)
```
+```go Go
+type RootTask struct{}
+
+func (t *RootTask) Execute(ctx context.Context) error {
+ firstTask, err := workflows.SubmitSubtask(
+ ctx,
+ &PrintTask{Message: "Executing first"},
+ )
+ if err != nil {
+ return err
+ }
+
+ secondTask, err := workflows.SubmitSubtask(
+ ctx,
+ &PrintTask{Message: "Executing second"},
+ subtask.WithDependencies(firstTask),
+ )
+ if err != nil {
+ return err
+ }
+
+ _, err = workflows.SubmitSubtask(
+ ctx,
+ &PrintTask{Message: "Executing last"},
+ subtask.WithDependencies(secondTask),
+ )
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+type PrintTask struct {
+ Message string
+}
+
+func (t *PrintTask) Execute(context.Context) error {
+ slog.Info("PrintTask", slog.String("message", t.Message))
+ return nil
+}
+```
The `RootTask` submits three `PrintTask` tasks as subtasks. These tasks depend on each other, meaning the second task executes only after the first task has successfully completed, and the third only executes after the second completes. The tasks are executed sequentially.
@@ -366,6 +662,155 @@ A practical example is a workflow that fetches news articles from an API and pro
"dev-cluster"
)
```
+```go Go
+package news
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "log/slog"
+ "net/http"
+ "os"
+ "time"
+
+ "github.com/tilebox/tilebox-go/workflows/v1"
+ "github.com/tilebox/tilebox-go/workflows/v1/subtask"
+)
+
+const newsAPIKey = "YOUR_API_KEY"
+
+type NewsWorkflow struct {
+ Category string
+ MaxArticles int
+}
+
+func (t *NewsWorkflow) Execute(ctx context.Context) error {
+ fetchTask, err := workflows.SubmitSubtask(ctx, &FetchNews{
+ Category: t.Category,
+ MaxArticles: t.MaxArticles,
+ })
+ if err != nil {
+ return err
+ }
+
+ _, err = workflows.SubmitSubtask(ctx, &PrintHeadlines{}, subtask.WithDependencies(fetchTask))
+ if err != nil {
+ return err
+ }
+
+ _, err = workflows.SubmitSubtask(ctx, &MostFrequentAuthors{}, subtask.WithDependencies(fetchTask))
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+type News struct {
+ Status string `json:"status"`
+ TotalResults int `json:"totalResults"`
+ Articles []struct {
+ Source struct {
+ ID *string `json:"id"`
+ Name string `json:"name"`
+ } `json:"source"`
+ Author *string `json:"author"`
+ Title string `json:"title"`
+ Description *string `json:"description"`
+ URL string `json:"url"`
+ URLToImage *string `json:"urlToImage"`
+ PublishedAt time.Time `json:"publishedAt"`
+ Content *string `json:"content"`
+ } `json:"articles"`
+}
+
+type FetchNews struct {
+ Category string
+ MaxArticles int
+}
+
+func (t *FetchNews) Execute(context.Context) error {
+ url := fmt.Sprintf("https://newsapi.org/v2/top-headlines?category=%s&pageSize=%d&country=us&apiKey=%s", t.Category, t.MaxArticles, newsAPIKey)
+ response, err := http.Get(url)
+ if err != nil {
+ return fmt.Errorf("failed to download news: %w", err)
+ }
+
+ defer response.Body.Close()
+ body, err := io.ReadAll(response.Body)
+ if err != nil {
+ return fmt.Errorf("failed to read response: %w", err)
+ }
+
+ // check out our documentation page on caches to learn
+ // about a better way of passing data between tasks
+ return os.WriteFile("news.json", body, 0o600)
+}
+
+type PrintHeadlines struct{}
+
+func (t *PrintHeadlines) Execute(context.Context) error {
+ newsBytes, err := os.ReadFile("news.json")
+ if err != nil {
+ return fmt.Errorf("failed to read news: %w", err)
+ }
+
+ var news News
+ err = json.Unmarshal(newsBytes, &news)
+ if err != nil {
+ return fmt.Errorf("failed to unmarshal news: %w", err)
+ }
+
+ for _, article := range news.Articles {
+ slog.Info("Article", slog.Time("published_at", article.PublishedAt), slog.String("title", article.Title))
+ }
+
+ return nil
+}
+
+type MostFrequentAuthors struct{}
+
+func (t *MostFrequentAuthors) Execute(context.Context) error {
+ newsBytes, err := os.ReadFile("news.json")
+ if err != nil {
+ return fmt.Errorf("failed to read news: %w", err)
+ }
+
+ var news News
+ err = json.Unmarshal(newsBytes, &news)
+ if err != nil {
+ return fmt.Errorf("failed to unmarshal news: %w", err)
+ }
+
+ authors := make(map[string]int)
+ for _, article := range news.Articles {
+ if article.Author == nil {
+ continue
+ }
+ authors[*article.Author]++
+ }
+
+ for author, count := range authors {
+ slog.Info("Author", slog.String("author", author), slog.Int("count", count))
+ }
+
+ return nil
+}
+
+// in main now submit a job, and then visualize it
+/*
+job, err := client.Jobs.Submit(ctx, "process-news", "dev-cluster",
+ []workflows.Task{
+ &NewsWorkflow{
+ Category: "science",
+ MaxArticles: 5,
+ },
+ },
+)
+*/
+```
```plaintext Output
@@ -414,27 +859,48 @@ If unspecified, the identifier of a task defaults to the class name. For instanc
To address this, Tilebox Workflows offers a way to explicitly specify the identifier of a task. This is done by overriding the `identifier` method of the `Task` class. This method should return a unique string identifying the task. This decouples the task's identifier from its class name, allowing you to change the identifier without renaming the class. It also allows tasks with the same class name to have different identifiers. The `identifier` method can also specify a version number for the taskāsee the section on [semantic versioning](#semantic-versioning) below for more details.
- ```python Python
- class MyTask(Task):
- def execute(self, context: ExecutionContext) -> None:
- pass
+```python Python
+class MyTask(Task):
+ def execute(self, context: ExecutionContext) -> None:
+ pass
- # MyTask has the identifier "MyTask" and the default version of "v0.0"
+# MyTask has the identifier "MyTask" and the default version of "v0.0"
- class MyTask2(Task):
- @staticmethod
- def identifier() -> tuple[str, str]:
- return "tilebox.com/example_workflow/MyTask", "v1.0"
+class MyTask2(Task):
+ @staticmethod
+ def identifier() -> tuple[str, str]:
+ return "tilebox.com/example_workflow/MyTask", "v1.0"
- def execute(self, context: ExecutionContext) -> None:
- pass
+ def execute(self, context: ExecutionContext) -> None:
+ pass
- # MyTask2 has the identifier "tilebox.com/example_workflow/MyTask" and the version "v1.0"
- ```
+# MyTask2 has the identifier "tilebox.com/example_workflow/MyTask" and the version "v1.0"
+```
+```go Go
+type MyTask struct{}
+
+func (t *MyTask) Execute(context.Context) error {
+ return nil
+}
+
+// MyTask has the identifier "MyTask" and the default version of "v0.0"
+
+type MyTask2 struct{}
+
+func (t *MyTask2) Identifier() workflows.TaskIdentifier {
+ return workflows.NewTaskIdentifier("tilebox.com/example_workflow/MyTask", "v1.0")
+}
+
+func (t *MyTask2) Execute(context.Context) error {
+ return nil
+}
+
+// MyTask2 has the identifier "tilebox.com/example_workflow/MyTask" and the version "v1.0"
+```
- The `identifier` method must be defined as either a `classmethod` or a `staticmethod`, meaning it can be called without instantiating the class.
+ In python, the `identifier` method must be defined as either a `classmethod` or a `staticmethod`, meaning it can be called without instantiating the class.
## Semantic Versioning
@@ -448,15 +914,26 @@ You assign a version number by overriding the `identifier` method of the task cl
For example, this task has the identifier `"tilebox.com/example_workflow/MyTask"` and the version `"v1.3"`:
- ```python Python
- class MyTask(Task):
- @staticmethod
- def identifier() -> tuple[str, str]:
- return "tilebox.com/example_workflow/MyTask", "v1.3"
+```python Python
+class MyTask(Task):
+ @staticmethod
+ def identifier() -> tuple[str, str]:
+ return "tilebox.com/example_workflow/MyTask", "v1.3"
- def execute(self, context: ExecutionContext) -> None:
- pass
- ```
+ def execute(self, context: ExecutionContext) -> None:
+ pass
+```
+```go Go
+type MyTask struct{}
+
+func (t *MyTask) Identifier() workflows.TaskIdentifier {
+ return workflows.NewTaskIdentifier("tilebox.com/example_workflow/MyTask", "v1.3")
+}
+
+func (t *MyTask) Execute(context.Context) error {
+ return nil
+}
+```
When a task is submitted as part of a job, the version from which it's submitted is recorded and may differ from the version on the task runner executing the task.
diff --git a/workflows/near-real-time/automations.mdx b/workflows/near-real-time/automations.mdx
index 7a70484..52c39c9 100644
--- a/workflows/near-real-time/automations.mdx
+++ b/workflows/near-real-time/automations.mdx
@@ -4,6 +4,10 @@ description: Process data in near-real-time by triggering jobs based on external
icon: repeat
---
+
+ This feature is only available in the Python SDK.
+
+
## Introduction
Tilebox Workflows can execute jobs in two ways: a one-time execution triggered by a user, typically a batch processing, and near-real-time execution based on specific external events.
@@ -31,6 +35,10 @@ To create a trigger, define a special task that serves as a prototype. In respon
Each automation has a [task identifier](/workflows/concepts/tasks#task-identifiers), a [version](/workflows/concepts/tasks#semantic-versioning), and [input parameters](/workflows/concepts/tasks#input-parameters), just like regular tasks.
Automations also automatically provide a special `trigger` attribute that contains information about the event that initiated the task's execution.
+
+ Go doesn't support registering automations yet, please use python or the console instead.
+
+
## Automation Client
The Tilebox Workflows client includes a sub-client for managing automations. You can create this sub-client by calling the `automations` method on the main client instance.
diff --git a/workflows/near-real-time/cron.mdx b/workflows/near-real-time/cron.mdx
index 493437c..8f07207 100644
--- a/workflows/near-real-time/cron.mdx
+++ b/workflows/near-real-time/cron.mdx
@@ -4,6 +4,10 @@ description: Trigger jobs based on a Cron schedule.
icon: clock
---
+
+ This feature is only available in the Python SDK.
+
+
## Creating Cron tasks
Cron tasks run repeatedly on a specified [cron](https://en.wikipedia.org/wiki/Cron) schedule.
diff --git a/workflows/near-real-time/storage-events.mdx b/workflows/near-real-time/storage-events.mdx
index 7b2759b..b227bee 100644
--- a/workflows/near-real-time/storage-events.mdx
+++ b/workflows/near-real-time/storage-events.mdx
@@ -4,6 +4,10 @@ description: Trigger jobs after objects are created or modified in a storage loc
icon: right-to-line
---
+
+ This feature is only available in the Python SDK.
+
+
## Creating a Storage Event Task
Storage Event Tasks are automations triggered when objects are created or modified in a [storage location](#storage-locations).
diff --git a/workflows/observability/logging.mdx b/workflows/observability/logging.mdx
index 847b442..8ee7984 100644
--- a/workflows/observability/logging.mdx
+++ b/workflows/observability/logging.mdx
@@ -21,6 +21,7 @@ The Tilebox workflow SDKs include support for exporting OpenTelemetry logs. To e
To configure logging with Axiom, you first need to create a [Axiom Dataset](https://axiom.co/docs/reference/datasets) to export your workflow logs to. You will also need an [Axiom API key](https://axiom.co/docs/reference/tokens) with the necessary write permissions for your Axiom dataset.
+
```python Python
from tilebox.workflows import Client, Task, ExecutionContext
from tilebox.workflows.observability.logging import configure_otel_logging_axiom
@@ -45,6 +46,65 @@ The Tilebox workflow SDKs include support for exporting OpenTelemetry logs. To e
if __name__ == "__main__":
main()
```
+```go Go
+package main
+
+import (
+ "context"
+ "log/slog"
+
+ "github.com/tilebox/tilebox-go/examples/workflows/axiom"
+ "github.com/tilebox/tilebox-go/observability"
+ "github.com/tilebox/tilebox-go/observability/logger"
+ "github.com/tilebox/tilebox-go/workflows/v1"
+)
+
+// specify a service name and version to identify the instrumenting application in traces and logs
+var service = &observability.Service{Name: "task-runner", Version: "dev"}
+
+func main() {
+ ctx := context.Background()
+
+ // Setup OpenTelemetry logging and slog
+ // It uses AXIOM_API_KEY and AXIOM_LOGS_DATASET from the environment
+ axiomHandler, shutdownLogger, err := logger.NewAxiomHandler(ctx, service,
+ logger.WithLevel(slog.LevelInfo), // export logs at info level and above as OTEL logs
+ )
+ defer shutdownLogger(ctx)
+ if err != nil {
+ slog.Error("failed to set up axiom log handler", slog.Any("error", err))
+ return
+ }
+ tileboxLogger := logger.New( // initialize a slog.Logger
+ axiomHandler, // export logs to the Axiom handler
+ logger.NewConsoleHandler(logger.WithLevel(slog.LevelWarn)), // and additionally, export WARN and ERROR logs to stdout
+ )
+ slog.SetDefault(tileboxLogger) // all future slog calls will be forwarded to the tilebox logger
+
+ client := workflows.NewClient()
+
+ cluster, err := client.Clusters.Get(ctx, "dev-cluster")
+ if err != nil {
+ slog.Error("failed to get cluster", slog.Any("error", err))
+ return
+ }
+
+ taskRunner, err := client.NewTaskRunner(cluster)
+ if err != nil {
+ slog.Error("failed to create task runner", slog.Any("error", err))
+ return
+ }
+
+ err = taskRunner.RegisterTasks(&MyTask{})
+ if err != nil {
+ slog.Error("failed to register tasks", slog.Any("error", err))
+ return
+ }
+
+ taskRunner.RunForever(ctx)
+}
+```
+
Setting the environment variables `AXIOM_API_KEY` and `AXIOM_LOGS_DATASET` allows you to omit these arguments in the `configure_otel_logging_axiom` function.
@@ -54,6 +114,7 @@ The Tilebox workflow SDKs include support for exporting OpenTelemetry logs. To e
If you are using another OpenTelemetry-compatible backend besides Axiom, such as OpenTelemetry Collector or Jaeger, you can configure logging by specifying the URL endpoint to export log messages to.
+
```python Python
from tilebox.workflows import Client
from tilebox.workflows.observability.logging import configure_otel_logging
@@ -79,6 +140,71 @@ The Tilebox workflow SDKs include support for exporting OpenTelemetry logs. To e
if __name__ == "__main__":
main()
```
+```go Go
+package main
+
+import (
+ "context"
+ "log/slog"
+
+ "github.com/tilebox/tilebox-go/examples/workflows/opentelemetry"
+ "github.com/tilebox/tilebox-go/observability"
+ "github.com/tilebox/tilebox-go/observability/logger"
+ "github.com/tilebox/tilebox-go/workflows/v1"
+)
+
+// specify a service name and version to identify the instrumenting application in traces and logs
+var service = &observability.Service{Name: "task-runner", Version: "dev"}
+
+func main() {
+ ctx := context.Background()
+
+ endpoint := "http://localhost:4318"
+ headers := map[string]string{
+ "Authorization": "Bearer ",
+ }
+
+ // Setup an OpenTelemetry log handler, exporting logs to an OTEL compatible log endpoint
+ otelHandler, shutdownLogger, err := logger.NewOtelHandler(ctx, service,
+ logger.WithEndpointURL(endpoint),
+ logger.WithHeaders(headers),
+ logger.WithLevel(slog.LevelInfo), // export logs at info level and above as OTEL logs
+ )
+ defer shutdownLogger(ctx)
+ if err != nil {
+ slog.Error("failed to set up otel log handler", slog.Any("error", err))
+ return
+ }
+ tileboxLogger := logger.New( // initialize a slog.Logger
+ otelHandler, // export logs to the OTEL handler
+ logger.NewConsoleHandler(logger.WithLevel(slog.LevelWarn)), // and additionally, export WARN and ERROR logs to stdout
+ )
+ slog.SetDefault(tileboxLogger) // all future slog calls will be forwarded to the tilebox logger
+
+ client := workflows.NewClient()
+
+ cluster, err := client.Clusters.Get(ctx, "dev-cluster")
+ if err != nil {
+ slog.Error("failed to get cluster", slog.Any("error", err))
+ return
+ }
+
+ taskRunner, err := client.NewTaskRunner(cluster)
+ if err != nil {
+ slog.Error("failed to create task runner", slog.Any("error", err))
+ return
+ }
+
+ err = taskRunner.RegisterTasks(&MyTask{})
+ if err != nil {
+ slog.Error("failed to register tasks", slog.Any("error", err))
+ return
+ }
+
+ taskRunner.RunForever(ctx)
+}
+```
+
If you set the environment variable `OTEL_LOGS_ENDPOINT`, you can omit that argument in the `configure_otel_logging` function.
@@ -87,6 +213,7 @@ The Tilebox workflow SDKs include support for exporting OpenTelemetry logs. To e
To log messages to the standard console output, use the `configure_console_logging` function.
+
```python Python
from tilebox.workflows import Client
from tilebox.workflows.observability.logging import configure_console_logging
@@ -106,6 +233,48 @@ The Tilebox workflow SDKs include support for exporting OpenTelemetry logs. To e
if __name__ == "__main__":
main()
```
+```go Go
+package main
+
+import (
+ "context"
+ "log/slog"
+
+ "github.com/tilebox/tilebox-go/examples/workflows/opentelemetry"
+ "github.com/tilebox/tilebox-go/observability/logger"
+ "github.com/tilebox/tilebox-go/workflows/v1"
+)
+
+func main() {
+ ctx := context.Background()
+
+ tileboxLogger := logger.New(logger.NewConsoleHandler(logger.WithLevel(slog.LevelWarn)))
+ slog.SetDefault(tileboxLogger) // all future slog calls will be forwarded to the tilebox logger
+
+ client := workflows.NewClient()
+
+ cluster, err := client.Clusters.Get(ctx, "dev-cluster")
+ if err != nil {
+ slog.Error("failed to get cluster", slog.Any("error", err))
+ return
+ }
+
+ taskRunner, err := client.NewTaskRunner(cluster)
+ if err != nil {
+ slog.Error("failed to create task runner", slog.Any("error", err))
+ return
+ }
+
+ err = taskRunner.RegisterTasks(&MyTask{})
+ if err != nil {
+ slog.Error("failed to register tasks", slog.Any("error", err))
+ return
+ }
+
+ taskRunner.RunForever(ctx)
+}
+```
+
The console logging backend is not recommended for production use. Log messages will be emitted to the standard output of each task runner rather than a centralized logging system. It is intended for local development and testing of workflows.
@@ -119,6 +288,7 @@ The Tilebox workflow SDKs include support for exporting OpenTelemetry logs. To e
Use the logger provided by the Tilebox SDK to emit log messages from your tasks. You can then use it to send log messages to the [configured logging backend](#configure-logging).
Log messages emitted within a task's `execute` method are also automatically recorded as span events for the current [job trace](/workflows/observability/tracing).
+
```python Python
import logging
from tilebox.workflows import Task, ExecutionContext
@@ -131,10 +301,27 @@ class MyTask(Task):
# emit a log message to the configured OpenTelemetry backend
logger.info("Hello world from configured logger!")
```
+```go Go
+package tasks
+
+import (
+ "context"
+ "log/slog"
+)
+
+type MyTask struct{}
+
+func (t *MyTask) Execute(context.Context) error {
+ // emit a log message to the configured OpenTelemetry backend
+ slog.Info("Hello world from configured logger!")
+ return nil
+}
+```
+
## Logging task runner internals
-Tilebox task runners also internally use a logger. By default, it's set to the WARNING level, but you can change it by explicitly configuring a logger for the workflows client when constructing the task runner.
+In python, Tilebox task runners also internally use a logger. By default, it's set to the WARNING level, but you can change it by explicitly configuring a logger for the workflows client when constructing the task runner.
```python Python
from tilebox.workflows import Client
diff --git a/workflows/observability/tracing.mdx b/workflows/observability/tracing.mdx
index bd30a29..5d02bef 100644
--- a/workflows/observability/tracing.mdx
+++ b/workflows/observability/tracing.mdx
@@ -31,6 +31,7 @@ The Tilebox workflow SDKs have built-in support for exporting OpenTelemetry trac
To configure tracing with Axiom, you first need to create a [Axiom Dataset](https://axiom.co/docs/reference/datasets) to export your workflow traces to. You will also need an [Axiom API key](https://axiom.co/docs/reference/tokens) with the necessary write permissions for your Axiom dataset.
+
```python Python
from tilebox.workflows import Client
from tilebox.workflows.observability.tracing import configure_otel_tracing_axiom
@@ -55,6 +56,60 @@ The Tilebox workflow SDKs have built-in support for exporting OpenTelemetry trac
if __name__ == "__main__":
main()
```
+```go Go
+package main
+
+import (
+ "context"
+ "log/slog"
+
+ "github.com/tilebox/tilebox-go/examples/workflows/axiom"
+ "github.com/tilebox/tilebox-go/observability"
+ "github.com/tilebox/tilebox-go/observability/tracer"
+ "github.com/tilebox/tilebox-go/workflows/v1"
+ "go.opentelemetry.io/otel"
+)
+
+// specify a service name and version to identify the instrumenting application in traces and logs
+var service = &observability.Service{Name: "task-runner", Version: "dev"}
+
+func main() {
+ ctx := context.Background()
+
+ // Setup an OpenTelemetry trace span processor, exporting traces and spans to Axiom
+ // It uses AXIOM_API_KEY and AXIOM_TRACES_DATASET from the environment
+ tileboxTracerProvider, shutdown, err := tracer.NewAxiomProvider(ctx, service)
+ defer shutdown(ctx)
+ if err != nil {
+ slog.Error("failed to set up axiom tracer provider", slog.Any("error", err))
+ return
+ }
+ otel.SetTracerProvider(tileboxTracerProvider) // set the tilebox tracer provider as the global OTEL tracer provider
+
+ client := workflows.NewClient()
+
+ cluster, err := client.Clusters.Get(ctx, "dev-cluster")
+ if err != nil {
+ slog.Error("failed to get cluster", slog.Any("error", err))
+ return
+ }
+
+ taskRunner, err := client.NewTaskRunner(cluster)
+ if err != nil {
+ slog.Error("failed to create task runner", slog.Any("error", err))
+ return
+ }
+
+ err = taskRunner.RegisterTasks(&MyTask{})
+ if err != nil {
+ slog.Error("failed to register tasks", slog.Any("error", err))
+ return
+ }
+
+ taskRunner.RunForever(ctx)
+}
+```
+
Set the environment variables `AXIOM_API_KEY` and `AXIOM_TRACES_DATASET` to omit those arguments
@@ -65,6 +120,7 @@ The Tilebox workflow SDKs have built-in support for exporting OpenTelemetry trac
If you are using another OpenTelemetry-compatible backend besides Axiom, like OpenTelemetry Collector or Jaeger, you can configure tracing by specifying the URL endpoint to export traces to.
+
```python Python
from tilebox.workflows import Client
from tilebox.workflows.observability.tracing import configure_otel_tracing
@@ -90,6 +146,67 @@ The Tilebox workflow SDKs have built-in support for exporting OpenTelemetry trac
if __name__ == "__main__":
main()
```
+```go Go
+package main
+
+import (
+ "context"
+ "log/slog"
+
+ "github.com/tilebox/tilebox-go/examples/workflows/opentelemetry"
+ "github.com/tilebox/tilebox-go/observability"
+ "github.com/tilebox/tilebox-go/observability/tracer"
+ "github.com/tilebox/tilebox-go/workflows/v1"
+ "go.opentelemetry.io/otel"
+)
+
+// specify a service name and version to identify the instrumenting application in traces and logs
+var service = &observability.Service{Name: "task-runner", Version: "dev"}
+
+func main() {
+ ctx := context.Background()
+
+ endpoint := "http://localhost:4318"
+ headers := map[string]string{
+ "Authorization": "Bearer ",
+ }
+
+ // Setup an OpenTelemetry trace span processor, exporting traces and spans to an OTEL compatible trace endpoint
+ tileboxTracerProvider, shutdown, err := tracer.NewOtelProvider(ctx, service,
+ tracer.WithEndpointURL(endpoint),
+ tracer.WithHeaders(headers),
+ )
+ defer shutdown(ctx)
+ if err != nil {
+ slog.Error("failed to set up otel span processor", slog.Any("error", err))
+ return
+ }
+ otel.SetTracerProvider(tileboxTracerProvider) // set the tilebox tracer provider as the global OTEL tracer provider
+
+ client := workflows.NewClient()
+
+ cluster, err := client.Clusters.Get(ctx, "dev-cluster")
+ if err != nil {
+ slog.Error("failed to get cluster", slog.Any("error", err))
+ return
+ }
+
+ taskRunner, err := client.NewTaskRunner(cluster)
+ if err != nil {
+ slog.Error("failed to create task runner", slog.Any("error", err))
+ return
+ }
+
+ err = taskRunner.RegisterTasks(&MyTask{})
+ if err != nil {
+ slog.Error("failed to register tasks", slog.Any("error", err))
+ return
+ }
+
+ taskRunner.RunForever(ctx)
+}
+```
+
Set the environment variable `OTEL_TRACES_ENDPOINT` to omit that argument