From 8565eddac4f7701703c61c7c8189d52d286ba721 Mon Sep 17 00:00:00 2001 From: "ivan.hladush" Date: Tue, 4 Nov 2025 15:54:10 -0700 Subject: [PATCH 1/3] Add functionality for old jobs removal --- configs/local.yaml | 3 + internal/pkg/janitor/janitor.go | 26 +++++--- internal/pkg/janitor/job.go | 64 +++++++++++++++++++ .../queries/old_jobs_cluster_tags_delete.sql | 4 ++ .../queries/old_jobs_command_tags_delete.sql | 5 ++ .../pkg/janitor/queries/old_jobs_delete.sql | 3 + .../janitor/queries/old_jobs_tags_delete.sql | 4 ++ 7 files changed, 100 insertions(+), 9 deletions(-) create mode 100644 internal/pkg/janitor/queries/old_jobs_cluster_tags_delete.sql create mode 100644 internal/pkg/janitor/queries/old_jobs_command_tags_delete.sql create mode 100644 internal/pkg/janitor/queries/old_jobs_delete.sql create mode 100644 internal/pkg/janitor/queries/old_jobs_tags_delete.sql diff --git a/configs/local.yaml b/configs/local.yaml index b420f30..a61c8da 100644 --- a/configs/local.yaml +++ b/configs/local.yaml @@ -11,6 +11,9 @@ pool: # plugins location plugin_directory: ./plugins +janitor: + finished_job_retention_days: 14 + # auth plugin auth: plugin: ./plugins/auth_header.so diff --git a/internal/pkg/janitor/janitor.go b/internal/pkg/janitor/janitor.go index 1d26612..786b92d 100644 --- a/internal/pkg/janitor/janitor.go +++ b/internal/pkg/janitor/janitor.go @@ -3,6 +3,7 @@ package janitor import ( "time" + "github.com/hladush/go-telemetry/pkg/telemetry" "github.com/patterninc/heimdall/internal/pkg/database" "github.com/patterninc/heimdall/pkg/object/cluster" "github.com/patterninc/heimdall/pkg/plugin" @@ -12,13 +13,15 @@ const ( defaultJobLimit = 3 ) +var ( + startMethod = telemetry.NewMethod("Start", "janitor") +) + type Janitor struct { - Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"` - StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"` - CleanInterval int `yaml:"clean_interval,omitempty" json:"clean_interval,omitempty"` - db *database.Database - commandHandlers map[string]plugin.Handler - clusters cluster.Clusters + Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"` + StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"` + FinishedJobRetentionDays int `yaml:"finished_job_retention_days,omitempty" json:"finished_job_retention_days,omitempty"` + db *database.Database } func (j *Janitor) Start(d *database.Database, commandHandlers map[string]plugin.Handler, clusters cluster.Clusters) error { @@ -33,10 +36,15 @@ func (j *Janitor) Start(d *database.Database, commandHandlers map[string]plugin. for { jobsFound := j.worker() - // if no jobs are found, sleep before checking again - if !jobsFound { - time.Sleep(time.Duration(j.CleanInterval) * time.Second) + if err := j.cleanupStaleJobs(); err != nil { + startMethod.LogAndCountError(err, "cleanup_stale_jobs") } + + if err := j.cleanupFinishedJobs(); err != nil { + startMethod.LogAndCountError(err, "cleanup_finished_jobs") + } + time.Sleep(60 * time.Second) + } }() diff --git a/internal/pkg/janitor/job.go b/internal/pkg/janitor/job.go index f11b4ce..c88a98d 100644 --- a/internal/pkg/janitor/job.go +++ b/internal/pkg/janitor/job.go @@ -3,8 +3,12 @@ package janitor import ( "context" _ "embed" +<<<<<<< HEAD "sync" "time" +======= + "fmt" +>>>>>>> 4dc6ffe (Add functionality for old jobs removal) "github.com/go-faster/errors" "github.com/hladush/go-telemetry/pkg/telemetry" @@ -33,9 +37,34 @@ var queryJobsSetFailed string func (j *Janitor) worker() bool { +<<<<<<< HEAD // track worker cycle workerMethod.CountRequest() defer workerMethod.RecordLatency(time.Now()) +======= +//go:embed queries/old_jobs_cluster_tags_delete.sql +var queryOldJobsClusterTagsDelete string + +//go:embed queries/old_jobs_command_tags_delete.sql +var queryOldJobsCommandTagsDelete string + +//go:embed queries/old_jobs_tags_delete.sql +var queryOldJobsTagsDelete string + +//go:embed queries/old_jobs_delete.sql +var queryOldJobsDelete string + +var ( + queriesForOldJobsCleanup = []string{ + queryOldJobsClusterTagsDelete, + queryOldJobsCommandTagsDelete, + queryOldJobsTagsDelete, + queryOldJobsDelete, + } +) + +func (j *Janitor) cleanupStaleJobs() error { +>>>>>>> 4dc6ffe (Add functionality for old jobs removal) // create database session with transaction sess, err := j.db.NewSession(true) @@ -190,3 +219,38 @@ func (j *Janitor) updateJobs(sess *database.Session, jobs []*job.Job) error { return nil } + +func (j *Janitor) cleanupFinishedJobs() error { + if j.FinishedJobRetentionDays == 0 { + return nil + } + // Start transactional session + sess, err := j.db.NewSession(true) + if err != nil { + return err + } + defer sess.Close() + + defer func() { + _ = sess.Rollback() + }() + + exec := func(query string, args ...any) error { + if _, err := sess.Exec(query, args...); err != nil { + return fmt.Errorf("failed to exec query %q: %w", query, err) + } + return nil + } + + for _, q := range queriesForOldJobsCleanup { + if err := exec(q, j.FinishedJobRetentionDays); err != nil { + return err + } + } + + if err := sess.Commit(); err != nil { + return fmt.Errorf("failed to commit cleanup transaction: %w", err) + } + + return nil +} diff --git a/internal/pkg/janitor/queries/old_jobs_cluster_tags_delete.sql b/internal/pkg/janitor/queries/old_jobs_cluster_tags_delete.sql new file mode 100644 index 0000000..8ce92ae --- /dev/null +++ b/internal/pkg/janitor/queries/old_jobs_cluster_tags_delete.sql @@ -0,0 +1,4 @@ +DELETE FROM job_cluster_tags +WHERE system_job_id IN ( + SELECT system_job_id FROM jobs WHERE updated_at < extract(epoch FROM now() - ($1 || ' days')::interval)::int +); diff --git a/internal/pkg/janitor/queries/old_jobs_command_tags_delete.sql b/internal/pkg/janitor/queries/old_jobs_command_tags_delete.sql new file mode 100644 index 0000000..73d02ce --- /dev/null +++ b/internal/pkg/janitor/queries/old_jobs_command_tags_delete.sql @@ -0,0 +1,5 @@ + +DELETE FROM job_command_tags +WHERE system_job_id IN ( + SELECT system_job_id FROM jobs WHERE updated_at < extract(epoch FROM now() - ($1 || ' days')::interval)::int +); diff --git a/internal/pkg/janitor/queries/old_jobs_delete.sql b/internal/pkg/janitor/queries/old_jobs_delete.sql new file mode 100644 index 0000000..d0614a5 --- /dev/null +++ b/internal/pkg/janitor/queries/old_jobs_delete.sql @@ -0,0 +1,3 @@ + +DELETE FROM jobs +WHERE updated_at < extract(epoch FROM now() - ($1 || ' days')::interval)::int; diff --git a/internal/pkg/janitor/queries/old_jobs_tags_delete.sql b/internal/pkg/janitor/queries/old_jobs_tags_delete.sql new file mode 100644 index 0000000..31c80dd --- /dev/null +++ b/internal/pkg/janitor/queries/old_jobs_tags_delete.sql @@ -0,0 +1,4 @@ +DELETE FROM job_tags +WHERE system_job_id IN ( + SELECT system_job_id FROM jobs WHERE updated_at < extract(epoch FROM now() - ($1 || ' days')::interval)::int +); From 6ade7dcef44d073e8cbedb33a0ecd0bc9683c8a4 Mon Sep 17 00:00:00 2001 From: "ivan.hladush" Date: Tue, 11 Nov 2025 12:20:17 -0700 Subject: [PATCH 2/3] Improve code remove transaction managment from it --- internal/pkg/janitor/janitor.go | 2 +- internal/pkg/janitor/job.go | 46 +++++++++---------- .../janitor/queries/old_job_biggest_id.sql | 1 + .../queries/old_jobs_cluster_tags_delete.sql | 5 +- .../queries/old_jobs_command_tags_delete.sql | 7 ++- .../pkg/janitor/queries/old_jobs_delete.sql | 7 ++- .../janitor/queries/old_jobs_tags_delete.sql | 7 ++- 7 files changed, 43 insertions(+), 32 deletions(-) create mode 100644 internal/pkg/janitor/queries/old_job_biggest_id.sql diff --git a/internal/pkg/janitor/janitor.go b/internal/pkg/janitor/janitor.go index 786b92d..514d6d3 100644 --- a/internal/pkg/janitor/janitor.go +++ b/internal/pkg/janitor/janitor.go @@ -14,7 +14,7 @@ const ( ) var ( - startMethod = telemetry.NewMethod("Start", "janitor") + startMethod = telemetry.NewMethod("Start", "Janitor") ) type Janitor struct { diff --git a/internal/pkg/janitor/job.go b/internal/pkg/janitor/job.go index c88a98d..f3802c8 100644 --- a/internal/pkg/janitor/job.go +++ b/internal/pkg/janitor/job.go @@ -1,14 +1,9 @@ package janitor import ( - "context" + "database/sql" _ "embed" -<<<<<<< HEAD - "sync" - "time" -======= "fmt" ->>>>>>> 4dc6ffe (Add functionality for old jobs removal) "github.com/go-faster/errors" "github.com/hladush/go-telemetry/pkg/telemetry" @@ -37,11 +32,6 @@ var queryJobsSetFailed string func (j *Janitor) worker() bool { -<<<<<<< HEAD - // track worker cycle - workerMethod.CountRequest() - defer workerMethod.RecordLatency(time.Now()) -======= //go:embed queries/old_jobs_cluster_tags_delete.sql var queryOldJobsClusterTagsDelete string @@ -54,6 +44,9 @@ var queryOldJobsTagsDelete string //go:embed queries/old_jobs_delete.sql var queryOldJobsDelete string +//go:embed queries/old_job_biggest_id.sql +var queryOldJobsBiggestID string + var ( queriesForOldJobsCleanup = []string{ queryOldJobsClusterTagsDelete, @@ -64,7 +57,6 @@ var ( ) func (j *Janitor) cleanupStaleJobs() error { ->>>>>>> 4dc6ffe (Add functionality for old jobs removal) // create database session with transaction sess, err := j.db.NewSession(true) @@ -224,33 +216,37 @@ func (j *Janitor) cleanupFinishedJobs() error { if j.FinishedJobRetentionDays == 0 { return nil } - // Start transactional session - sess, err := j.db.NewSession(true) + // open session + sess, err := j.db.NewSession(false) if err != nil { return err } defer sess.Close() - defer func() { - _ = sess.Rollback() - }() + // get biggest ID of old jobs + row, err := sess.QueryRow(queryOldJobsBiggestID, j.FinishedJobRetentionDays) + if err != nil { + return fmt.Errorf("failed to get biggest ID of old jobs: %w", err) + } - exec := func(query string, args ...any) error { - if _, err := sess.Exec(query, args...); err != nil { - return fmt.Errorf("failed to exec query %q: %w", query, err) + var biggestID sql.NullInt64 + if err := row.Scan(&biggestID); err != nil { + if err == sql.ErrNoRows { + return nil } + return fmt.Errorf("failed to get biggest ID of old jobs: %w", err) + } + + if !biggestID.Valid || biggestID.Int64 == 0 { return nil } + // remove old jobs data for _, q := range queriesForOldJobsCleanup { - if err := exec(q, j.FinishedJobRetentionDays); err != nil { + if _, err := sess.Exec(q, biggestID.Int64); err != nil { return err } } - if err := sess.Commit(); err != nil { - return fmt.Errorf("failed to commit cleanup transaction: %w", err) - } - return nil } diff --git a/internal/pkg/janitor/queries/old_job_biggest_id.sql b/internal/pkg/janitor/queries/old_job_biggest_id.sql new file mode 100644 index 0000000..ce949e4 --- /dev/null +++ b/internal/pkg/janitor/queries/old_job_biggest_id.sql @@ -0,0 +1 @@ +SELECT MAX(system_job_id) FROM jobs WHERE updated_at < extract(epoch FROM now() - ($1 || ' days')::interval)::int; \ No newline at end of file diff --git a/internal/pkg/janitor/queries/old_jobs_cluster_tags_delete.sql b/internal/pkg/janitor/queries/old_jobs_cluster_tags_delete.sql index 8ce92ae..dd67f40 100644 --- a/internal/pkg/janitor/queries/old_jobs_cluster_tags_delete.sql +++ b/internal/pkg/janitor/queries/old_jobs_cluster_tags_delete.sql @@ -1,4 +1,7 @@ DELETE FROM job_cluster_tags WHERE system_job_id IN ( - SELECT system_job_id FROM jobs WHERE updated_at < extract(epoch FROM now() - ($1 || ' days')::interval)::int + SELECT system_job_id + FROM job_cluster_tags + WHERE system_job_id <= $1 + LIMIT 1000 ); diff --git a/internal/pkg/janitor/queries/old_jobs_command_tags_delete.sql b/internal/pkg/janitor/queries/old_jobs_command_tags_delete.sql index 73d02ce..aa8a573 100644 --- a/internal/pkg/janitor/queries/old_jobs_command_tags_delete.sql +++ b/internal/pkg/janitor/queries/old_jobs_command_tags_delete.sql @@ -1,5 +1,8 @@ DELETE FROM job_command_tags WHERE system_job_id IN ( - SELECT system_job_id FROM jobs WHERE updated_at < extract(epoch FROM now() - ($1 || ' days')::interval)::int -); + SELECT system_job_id + FROM job_command_tags + WHERE system_job_id <= $1 + LIMIT 1000 +); \ No newline at end of file diff --git a/internal/pkg/janitor/queries/old_jobs_delete.sql b/internal/pkg/janitor/queries/old_jobs_delete.sql index d0614a5..ca76c50 100644 --- a/internal/pkg/janitor/queries/old_jobs_delete.sql +++ b/internal/pkg/janitor/queries/old_jobs_delete.sql @@ -1,3 +1,8 @@ DELETE FROM jobs -WHERE updated_at < extract(epoch FROM now() - ($1 || ' days')::interval)::int; +WHERE system_job_id IN ( + SELECT system_job_id + FROM jobs + WHERE system_job_id <= $1 + LIMIT 1000 +); \ No newline at end of file diff --git a/internal/pkg/janitor/queries/old_jobs_tags_delete.sql b/internal/pkg/janitor/queries/old_jobs_tags_delete.sql index 31c80dd..55be5b1 100644 --- a/internal/pkg/janitor/queries/old_jobs_tags_delete.sql +++ b/internal/pkg/janitor/queries/old_jobs_tags_delete.sql @@ -1,4 +1,7 @@ DELETE FROM job_tags WHERE system_job_id IN ( - SELECT system_job_id FROM jobs WHERE updated_at < extract(epoch FROM now() - ($1 || ' days')::interval)::int -); + SELECT system_job_id + FROM job_tags + WHERE system_job_id <= $1 + LIMIT 1000 +); \ No newline at end of file From 81e1faf7ea68eaaf5859f33bd35c4542fd862c1a Mon Sep 17 00:00:00 2001 From: "ivan.hladush" Date: Thu, 13 Nov 2025 12:27:59 -0700 Subject: [PATCH 3/3] Fix review comments --- internal/pkg/janitor/janitor.go | 15 +++++----- internal/pkg/janitor/job.go | 28 +++++++++++++------ .../janitor/queries/old_job_biggest_id.sql | 6 +++- .../queries/old_jobs_cluster_tags_delete.sql | 2 +- .../queries/old_jobs_command_tags_delete.sql | 2 +- .../pkg/janitor/queries/old_jobs_delete.sql | 2 +- .../janitor/queries/old_jobs_tags_delete.sql | 2 +- 7 files changed, 37 insertions(+), 20 deletions(-) diff --git a/internal/pkg/janitor/janitor.go b/internal/pkg/janitor/janitor.go index 514d6d3..a269888 100644 --- a/internal/pkg/janitor/janitor.go +++ b/internal/pkg/janitor/janitor.go @@ -21,7 +21,10 @@ type Janitor struct { Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"` StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"` FinishedJobRetentionDays int `yaml:"finished_job_retention_days,omitempty" json:"finished_job_retention_days,omitempty"` + CleanInterval int `yaml:"clean_interval,omitempty" json:"clean_interval,omitempty"` db *database.Database + commandHandlers map[string]plugin.Handler + clusters cluster.Clusters } func (j *Janitor) Start(d *database.Database, commandHandlers map[string]plugin.Handler, clusters cluster.Clusters) error { @@ -34,16 +37,14 @@ func (j *Janitor) Start(d *database.Database, commandHandlers map[string]plugin. // kick off janitor worker in the background. go func() { for { - jobsFound := j.worker() - - if err := j.cleanupStaleJobs(); err != nil { - startMethod.LogAndCountError(err, "cleanup_stale_jobs") - } - if err := j.cleanupFinishedJobs(); err != nil { startMethod.LogAndCountError(err, "cleanup_finished_jobs") } - time.Sleep(60 * time.Second) + jobsFound := j.worker() + // if no jobs are found, sleep before checking again + if !jobsFound { + time.Sleep(time.Duration(j.CleanInterval) * time.Second) + } } }() diff --git a/internal/pkg/janitor/job.go b/internal/pkg/janitor/job.go index f3802c8..878e5a8 100644 --- a/internal/pkg/janitor/job.go +++ b/internal/pkg/janitor/job.go @@ -1,9 +1,12 @@ package janitor import ( + "context" "database/sql" _ "embed" "fmt" + "sync" + "time" "github.com/go-faster/errors" "github.com/hladush/go-telemetry/pkg/telemetry" @@ -30,8 +33,6 @@ var queryJobsSetCanceled string //go:embed queries/jobs_set_failed.sql var queryJobsSetFailed string -func (j *Janitor) worker() bool { - //go:embed queries/old_jobs_cluster_tags_delete.sql var queryOldJobsClusterTagsDelete string @@ -56,7 +57,10 @@ var ( } ) -func (j *Janitor) cleanupStaleJobs() error { +func (j *Janitor) worker() bool { + // track worker cycle + workerMethod.CountRequest() + defer workerMethod.RecordLatency(time.Now()) // create database session with transaction sess, err := j.db.NewSession(true) @@ -213,7 +217,7 @@ func (j *Janitor) updateJobs(sess *database.Session, jobs []*job.Job) error { } func (j *Janitor) cleanupFinishedJobs() error { - if j.FinishedJobRetentionDays == 0 { + if j.FinishedJobRetentionDays <= 0 { return nil } // open session @@ -223,8 +227,10 @@ func (j *Janitor) cleanupFinishedJobs() error { } defer sess.Close() + retentionTimestamp := time.Now().AddDate(0, 0, -j.FinishedJobRetentionDays).Unix() + // get biggest ID of old jobs - row, err := sess.QueryRow(queryOldJobsBiggestID, j.FinishedJobRetentionDays) + row, err := sess.QueryRow(queryOldJobsBiggestID, retentionTimestamp) if err != nil { return fmt.Errorf("failed to get biggest ID of old jobs: %w", err) } @@ -236,15 +242,21 @@ func (j *Janitor) cleanupFinishedJobs() error { } return fmt.Errorf("failed to get biggest ID of old jobs: %w", err) } - + if !biggestID.Valid || biggestID.Int64 == 0 { return nil } // remove old jobs data for _, q := range queriesForOldJobsCleanup { - if _, err := sess.Exec(q, biggestID.Int64); err != nil { - return err + for { + affectedRows, err := sess.Exec(q, biggestID.Int64) + if err != nil { + return err + } + if affectedRows == 0 { + break + } } } diff --git a/internal/pkg/janitor/queries/old_job_biggest_id.sql b/internal/pkg/janitor/queries/old_job_biggest_id.sql index ce949e4..a49bb73 100644 --- a/internal/pkg/janitor/queries/old_job_biggest_id.sql +++ b/internal/pkg/janitor/queries/old_job_biggest_id.sql @@ -1 +1,5 @@ -SELECT MAX(system_job_id) FROM jobs WHERE updated_at < extract(epoch FROM now() - ($1 || ' days')::interval)::int; \ No newline at end of file +SELECT system_job_id +FROM jobs +WHERE updated_at < $1 +ORDER BY updated_at desc +LIMIT 1 \ No newline at end of file diff --git a/internal/pkg/janitor/queries/old_jobs_cluster_tags_delete.sql b/internal/pkg/janitor/queries/old_jobs_cluster_tags_delete.sql index dd67f40..bcd0305 100644 --- a/internal/pkg/janitor/queries/old_jobs_cluster_tags_delete.sql +++ b/internal/pkg/janitor/queries/old_jobs_cluster_tags_delete.sql @@ -3,5 +3,5 @@ WHERE system_job_id IN ( SELECT system_job_id FROM job_cluster_tags WHERE system_job_id <= $1 - LIMIT 1000 + LIMIT 100 ); diff --git a/internal/pkg/janitor/queries/old_jobs_command_tags_delete.sql b/internal/pkg/janitor/queries/old_jobs_command_tags_delete.sql index aa8a573..5b2d570 100644 --- a/internal/pkg/janitor/queries/old_jobs_command_tags_delete.sql +++ b/internal/pkg/janitor/queries/old_jobs_command_tags_delete.sql @@ -4,5 +4,5 @@ WHERE system_job_id IN ( SELECT system_job_id FROM job_command_tags WHERE system_job_id <= $1 - LIMIT 1000 + LIMIT 100 ); \ No newline at end of file diff --git a/internal/pkg/janitor/queries/old_jobs_delete.sql b/internal/pkg/janitor/queries/old_jobs_delete.sql index ca76c50..d248e5b 100644 --- a/internal/pkg/janitor/queries/old_jobs_delete.sql +++ b/internal/pkg/janitor/queries/old_jobs_delete.sql @@ -4,5 +4,5 @@ WHERE system_job_id IN ( SELECT system_job_id FROM jobs WHERE system_job_id <= $1 - LIMIT 1000 + LIMIT 100 ); \ No newline at end of file diff --git a/internal/pkg/janitor/queries/old_jobs_tags_delete.sql b/internal/pkg/janitor/queries/old_jobs_tags_delete.sql index 55be5b1..110b778 100644 --- a/internal/pkg/janitor/queries/old_jobs_tags_delete.sql +++ b/internal/pkg/janitor/queries/old_jobs_tags_delete.sql @@ -3,5 +3,5 @@ WHERE system_job_id IN ( SELECT system_job_id FROM job_tags WHERE system_job_id <= $1 - LIMIT 1000 + LIMIT 100 ); \ No newline at end of file