diff --git a/configs/local.yaml b/configs/local.yaml index b420f30..a61c8da 100644 --- a/configs/local.yaml +++ b/configs/local.yaml @@ -11,6 +11,9 @@ pool: # plugins location plugin_directory: ./plugins +janitor: + finished_job_retention_days: 14 + # auth plugin auth: plugin: ./plugins/auth_header.so diff --git a/internal/pkg/janitor/janitor.go b/internal/pkg/janitor/janitor.go index 1d26612..a269888 100644 --- a/internal/pkg/janitor/janitor.go +++ b/internal/pkg/janitor/janitor.go @@ -3,6 +3,7 @@ package janitor import ( "time" + "github.com/hladush/go-telemetry/pkg/telemetry" "github.com/patterninc/heimdall/internal/pkg/database" "github.com/patterninc/heimdall/pkg/object/cluster" "github.com/patterninc/heimdall/pkg/plugin" @@ -12,11 +13,16 @@ const ( defaultJobLimit = 3 ) +var ( + startMethod = telemetry.NewMethod("Start", "Janitor") +) + type Janitor struct { - Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"` - StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"` + Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"` + StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"` + FinishedJobRetentionDays int `yaml:"finished_job_retention_days,omitempty" json:"finished_job_retention_days,omitempty"` CleanInterval int `yaml:"clean_interval,omitempty" json:"clean_interval,omitempty"` - db *database.Database + db *database.Database commandHandlers map[string]plugin.Handler clusters cluster.Clusters } @@ -31,12 +37,15 @@ func (j *Janitor) Start(d *database.Database, commandHandlers map[string]plugin. // kick off janitor worker in the background. go func() { for { + if err := j.cleanupFinishedJobs(); err != nil { + startMethod.LogAndCountError(err, "cleanup_finished_jobs") + } jobsFound := j.worker() - // if no jobs are found, sleep before checking again if !jobsFound { time.Sleep(time.Duration(j.CleanInterval) * time.Second) } + } }() diff --git a/internal/pkg/janitor/job.go b/internal/pkg/janitor/job.go index f11b4ce..878e5a8 100644 --- a/internal/pkg/janitor/job.go +++ b/internal/pkg/janitor/job.go @@ -2,7 +2,9 @@ package janitor import ( "context" + "database/sql" _ "embed" + "fmt" "sync" "time" @@ -31,8 +33,31 @@ var queryJobsSetCanceled string //go:embed queries/jobs_set_failed.sql var queryJobsSetFailed string -func (j *Janitor) worker() bool { +//go:embed queries/old_jobs_cluster_tags_delete.sql +var queryOldJobsClusterTagsDelete string + +//go:embed queries/old_jobs_command_tags_delete.sql +var queryOldJobsCommandTagsDelete string + +//go:embed queries/old_jobs_tags_delete.sql +var queryOldJobsTagsDelete string + +//go:embed queries/old_jobs_delete.sql +var queryOldJobsDelete string + +//go:embed queries/old_job_biggest_id.sql +var queryOldJobsBiggestID string + +var ( + queriesForOldJobsCleanup = []string{ + queryOldJobsClusterTagsDelete, + queryOldJobsCommandTagsDelete, + queryOldJobsTagsDelete, + queryOldJobsDelete, + } +) +func (j *Janitor) worker() bool { // track worker cycle workerMethod.CountRequest() defer workerMethod.RecordLatency(time.Now()) @@ -190,3 +215,50 @@ func (j *Janitor) updateJobs(sess *database.Session, jobs []*job.Job) error { return nil } + +func (j *Janitor) cleanupFinishedJobs() error { + if j.FinishedJobRetentionDays <= 0 { + return nil + } + // open session + sess, err := j.db.NewSession(false) + if err != nil { + return err + } + defer sess.Close() + + retentionTimestamp := time.Now().AddDate(0, 0, -j.FinishedJobRetentionDays).Unix() + + // get biggest ID of old jobs + row, err := sess.QueryRow(queryOldJobsBiggestID, retentionTimestamp) + if err != nil { + return fmt.Errorf("failed to get biggest ID of old jobs: %w", err) + } + + var biggestID sql.NullInt64 + if err := row.Scan(&biggestID); err != nil { + if err == sql.ErrNoRows { + return nil + } + return fmt.Errorf("failed to get biggest ID of old jobs: %w", err) + } + + if !biggestID.Valid || biggestID.Int64 == 0 { + return nil + } + + // remove old jobs data + for _, q := range queriesForOldJobsCleanup { + for { + affectedRows, err := sess.Exec(q, biggestID.Int64) + if err != nil { + return err + } + if affectedRows == 0 { + break + } + } + } + + return nil +} diff --git a/internal/pkg/janitor/queries/old_job_biggest_id.sql b/internal/pkg/janitor/queries/old_job_biggest_id.sql new file mode 100644 index 0000000..a49bb73 --- /dev/null +++ b/internal/pkg/janitor/queries/old_job_biggest_id.sql @@ -0,0 +1,5 @@ +SELECT system_job_id +FROM jobs +WHERE updated_at < $1 +ORDER BY updated_at desc +LIMIT 1 \ No newline at end of file diff --git a/internal/pkg/janitor/queries/old_jobs_cluster_tags_delete.sql b/internal/pkg/janitor/queries/old_jobs_cluster_tags_delete.sql new file mode 100644 index 0000000..bcd0305 --- /dev/null +++ b/internal/pkg/janitor/queries/old_jobs_cluster_tags_delete.sql @@ -0,0 +1,7 @@ +DELETE FROM job_cluster_tags +WHERE system_job_id IN ( + SELECT system_job_id + FROM job_cluster_tags + WHERE system_job_id <= $1 + LIMIT 100 +); diff --git a/internal/pkg/janitor/queries/old_jobs_command_tags_delete.sql b/internal/pkg/janitor/queries/old_jobs_command_tags_delete.sql new file mode 100644 index 0000000..5b2d570 --- /dev/null +++ b/internal/pkg/janitor/queries/old_jobs_command_tags_delete.sql @@ -0,0 +1,8 @@ + +DELETE FROM job_command_tags +WHERE system_job_id IN ( + SELECT system_job_id + FROM job_command_tags + WHERE system_job_id <= $1 + LIMIT 100 +); \ No newline at end of file diff --git a/internal/pkg/janitor/queries/old_jobs_delete.sql b/internal/pkg/janitor/queries/old_jobs_delete.sql new file mode 100644 index 0000000..d248e5b --- /dev/null +++ b/internal/pkg/janitor/queries/old_jobs_delete.sql @@ -0,0 +1,8 @@ + +DELETE FROM jobs +WHERE system_job_id IN ( + SELECT system_job_id + FROM jobs + WHERE system_job_id <= $1 + LIMIT 100 +); \ No newline at end of file diff --git a/internal/pkg/janitor/queries/old_jobs_tags_delete.sql b/internal/pkg/janitor/queries/old_jobs_tags_delete.sql new file mode 100644 index 0000000..110b778 --- /dev/null +++ b/internal/pkg/janitor/queries/old_jobs_tags_delete.sql @@ -0,0 +1,7 @@ +DELETE FROM job_tags +WHERE system_job_id IN ( + SELECT system_job_id + FROM job_tags + WHERE system_job_id <= $1 + LIMIT 100 +); \ No newline at end of file