Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions configs/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pool:
# plugins location
plugin_directory: ./plugins

janitor:
finished_job_retention_days: 14

# auth plugin
auth:
plugin: ./plugins/auth_header.so
Expand Down
17 changes: 13 additions & 4 deletions internal/pkg/janitor/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package janitor
import (
"time"

"github.com/hladush/go-telemetry/pkg/telemetry"
"github.com/patterninc/heimdall/internal/pkg/database"
"github.com/patterninc/heimdall/pkg/object/cluster"
"github.com/patterninc/heimdall/pkg/plugin"
Expand All @@ -12,11 +13,16 @@ const (
defaultJobLimit = 3
)

var (
startMethod = telemetry.NewMethod("Start", "Janitor")
)

type Janitor struct {
Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"`
StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"`
Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"`
StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"`
FinishedJobRetentionDays int `yaml:"finished_job_retention_days,omitempty" json:"finished_job_retention_days,omitempty"`
CleanInterval int `yaml:"clean_interval,omitempty" json:"clean_interval,omitempty"`
db *database.Database
db *database.Database
commandHandlers map[string]plugin.Handler
clusters cluster.Clusters
}
Expand All @@ -31,12 +37,15 @@ func (j *Janitor) Start(d *database.Database, commandHandlers map[string]plugin.
// kick off janitor worker in the background.
go func() {
for {
if err := j.cleanupFinishedJobs(); err != nil {
startMethod.LogAndCountError(err, "cleanup_finished_jobs")
}
jobsFound := j.worker()

// if no jobs are found, sleep before checking again
if !jobsFound {
time.Sleep(time.Duration(j.CleanInterval) * time.Second)
}

}
}()

Expand Down
74 changes: 73 additions & 1 deletion internal/pkg/janitor/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package janitor

import (
"context"
"database/sql"
_ "embed"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -31,8 +33,31 @@ var queryJobsSetCanceled string
//go:embed queries/jobs_set_failed.sql
var queryJobsSetFailed string

func (j *Janitor) worker() bool {
//go:embed queries/old_jobs_cluster_tags_delete.sql
var queryOldJobsClusterTagsDelete string

//go:embed queries/old_jobs_command_tags_delete.sql
var queryOldJobsCommandTagsDelete string

//go:embed queries/old_jobs_tags_delete.sql
var queryOldJobsTagsDelete string

//go:embed queries/old_jobs_delete.sql
var queryOldJobsDelete string

//go:embed queries/old_job_biggest_id.sql
var queryOldJobsBiggestID string

var (
queriesForOldJobsCleanup = []string{
queryOldJobsClusterTagsDelete,
queryOldJobsCommandTagsDelete,
queryOldJobsTagsDelete,
queryOldJobsDelete,
}
)

func (j *Janitor) worker() bool {
// track worker cycle
workerMethod.CountRequest()
defer workerMethod.RecordLatency(time.Now())
Expand Down Expand Up @@ -190,3 +215,50 @@ func (j *Janitor) updateJobs(sess *database.Session, jobs []*job.Job) error {
return nil

}

func (j *Janitor) cleanupFinishedJobs() error {
if j.FinishedJobRetentionDays <= 0 {
return nil
}
// open session
sess, err := j.db.NewSession(false)
if err != nil {
return err
}
defer sess.Close()

retentionTimestamp := time.Now().AddDate(0, 0, -j.FinishedJobRetentionDays).Unix()

// get biggest ID of old jobs
row, err := sess.QueryRow(queryOldJobsBiggestID, retentionTimestamp)
if err != nil {
return fmt.Errorf("failed to get biggest ID of old jobs: %w", err)
}

var biggestID sql.NullInt64
if err := row.Scan(&biggestID); err != nil {
if err == sql.ErrNoRows {
return nil
}
return fmt.Errorf("failed to get biggest ID of old jobs: %w", err)
}

if !biggestID.Valid || biggestID.Int64 == 0 {
return nil
}

// remove old jobs data
for _, q := range queriesForOldJobsCleanup {
for {
affectedRows, err := sess.Exec(q, biggestID.Int64)
if err != nil {
return err
}
if affectedRows == 0 {
break
}
}
}

return nil
}
5 changes: 5 additions & 0 deletions internal/pkg/janitor/queries/old_job_biggest_id.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SELECT system_job_id
FROM jobs
WHERE updated_at < $1
ORDER BY updated_at desc
LIMIT 1
7 changes: 7 additions & 0 deletions internal/pkg/janitor/queries/old_jobs_cluster_tags_delete.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
DELETE FROM job_cluster_tags
WHERE system_job_id IN (
SELECT system_job_id
FROM job_cluster_tags
WHERE system_job_id <= $1
LIMIT 100
);
8 changes: 8 additions & 0 deletions internal/pkg/janitor/queries/old_jobs_command_tags_delete.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

DELETE FROM job_command_tags
WHERE system_job_id IN (
SELECT system_job_id
FROM job_command_tags
WHERE system_job_id <= $1
LIMIT 100
);
8 changes: 8 additions & 0 deletions internal/pkg/janitor/queries/old_jobs_delete.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

DELETE FROM jobs
WHERE system_job_id IN (
SELECT system_job_id
FROM jobs
WHERE system_job_id <= $1
LIMIT 100
);
7 changes: 7 additions & 0 deletions internal/pkg/janitor/queries/old_jobs_tags_delete.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
DELETE FROM job_tags
WHERE system_job_id IN (
SELECT system_job_id
FROM job_tags
WHERE system_job_id <= $1
LIMIT 100
);