diff --git a/util/logkey/main.go b/util/logkey/main.go index 41ba525..d50a937 100644 --- a/util/logkey/main.go +++ b/util/logkey/main.go @@ -7,6 +7,8 @@ const ( Error = "error" Handler = "handler" JobDetail = "job_detail" + JobName = "job_name" + JobOpName = "job_op_name" Method = "method" MSetID = "mset_id" NodeID = "node_id" @@ -20,5 +22,4 @@ const ( StatusCode = "status_code" TagID = "tag_id" URI = "uri" - WorkType = "work_type" ) diff --git a/worker/base_job.go b/worker/base_job.go index f6b1818..e69ae6b 100644 --- a/worker/base_job.go +++ b/worker/base_job.go @@ -16,6 +16,8 @@ type ( JobBase struct { name string detail string + + logger *slog.Logger } operation struct { @@ -44,23 +46,25 @@ type ( func RunJob(ctx context.Context, j JobRunner) error { name := j.Name() - detail := j.Detail() - defer logDurationInfo(fmt.Sprintf("%s %s", name, detail), time.Now()) - slog.Debug(fmt.Sprintf("%s starting %s", name, detail)) + jlog := j.Logger().With(logkey.JobName, name) + defer func(begin time.Time) { + jlog.Debug(fmt.Sprintf("STAT: %s elapse: %s", name, time.Since(begin))) + }(time.Now()) + jlog.Debug("starting job") ops := j.Operations() - err := runOps(ctx, ops...) + err := runOps(ctx, jlog, ops...) if err != nil { if tx, ok := j.(cdb.DBTxer); ok { - slog.Debug(fmt.Sprintf("%s rollbacking on error %s", name, detail)) + jlog.Debug("call rollback on error") if err := tx.Rollback(); err != nil { - slog.Error(fmt.Sprintf("%s rollback on error failed %s: %s", name, detail, err)) + jlog.Error("rollback on error failed", logkey.Error, err) } } return err } else if tx, ok := j.(cdb.DBTxer); ok { - slog.Debug(fmt.Sprintf("%s commiting %s", name, detail)) + jlog.Debug("call commit") if err := tx.Commit(); err != nil { return fmt.Errorf("commit: %w", err) } @@ -68,7 +72,7 @@ func RunJob(ctx context.Context, j JobRunner) error { if r, ok := j.(LogResulter); ok { r.LogResult() } - slog.Debug(fmt.Sprintf("%s done %s", name, detail)) + jlog.Debug("job done") return nil } @@ -80,7 +84,11 @@ func (j *JobBase) Detail() string { return j.detail } -func runOps(ctx context.Context, ops ...operation) error { +func (j *JobBase) Logger() *slog.Logger { + return j.logger +} + +func runOps(ctx context.Context, jlog *slog.Logger, ops ...operation) error { for _, op := range ops { var err error if op.condition != nil && !op.condition() { @@ -97,17 +105,13 @@ func runOps(ctx context.Context, ops ...operation) error { return err } // TODO: add metrics - slog.Warn(fmt.Sprintf("%s: non blocking error", op.desc), logkey.Error, err) + jlog.Warn(fmt.Sprintf("%s: non blocking error", op.desc), logkey.JobOpName, op.desc, logkey.Error, err) continue } operationDuration. With(prometheus.Labels{"desc": op.desc, "status": operationStatusOk}). Observe(duration.Seconds()) - slog.Debug(fmt.Sprintf("STAT: %s elapse: %s", op.desc, duration)) + jlog.Debug(fmt.Sprintf("STAT: %s elapse: %s", op.desc, duration), logkey.JobOpName, op.desc) } return nil } - -func logDurationInfo(s string, begin time.Time) { - slog.Debug(fmt.Sprintf("STAT: %s elapse: %s", s, time.Since(begin))) -} diff --git a/worker/job_feed_daemon_ping.go b/worker/job_feed_daemon_ping.go index 5deba8b..70cbc98 100644 --- a/worker/job_feed_daemon_ping.go +++ b/worker/job_feed_daemon_ping.go @@ -38,6 +38,7 @@ func newDaemonPing(nodeID string) *jobFeedDaemonPing { JobBase: JobBase{ name: "daemonPing", detail: "nodeID: " + nodeID, + logger: slog.With(logkey.NodeID, nodeID, logkey.JobName, "daemonPing"), }, JobRedis: JobRedis{ cachePendingH: cachekeys.FeedDaemonPingPendingH, diff --git a/worker/job_feed_daemon_status.go b/worker/job_feed_daemon_status.go index 61c801a..69267df 100644 --- a/worker/job_feed_daemon_status.go +++ b/worker/job_feed_daemon_status.go @@ -14,6 +14,7 @@ import ( "github.com/opensvc/oc3/cachekeys" "github.com/opensvc/oc3/cdb" + "github.com/opensvc/oc3/util/logkey" ) type ( @@ -88,6 +89,7 @@ func newDaemonStatus(nodeID string) *jobFeedDaemonStatus { JobBase: JobBase{ name: "daemonStatus", detail: "nodeID: " + nodeID, + logger: slog.With(logkey.NodeID, nodeID, logkey.JobName, "daemonStatus"), }, JobRedis: JobRedis{ cachePendingH: cachekeys.FeedDaemonStatusPendingH, diff --git a/worker/job_feed_instance_action.go b/worker/job_feed_instance_action.go index 59b5adc..ec8faa4 100644 --- a/worker/job_feed_instance_action.go +++ b/worker/job_feed_instance_action.go @@ -13,6 +13,7 @@ import ( "github.com/opensvc/oc3/cachekeys" "github.com/opensvc/oc3/cdb" "github.com/opensvc/oc3/feeder" + "github.com/opensvc/oc3/util/logkey" ) type jobFeedInstanceAction struct { @@ -43,6 +44,7 @@ func newAction(objectName, nodeID, clusterID, uuid string) *jobFeedInstanceActio JobBase: JobBase{ name: "instanceAction", detail: "ID: " + idX, + logger: slog.With(logkey.NodeID, nodeID, logkey.ClusterID, clusterID, logkey.Object, objectName, logkey.JobName, "instanceAction"), }, JobRedis: JobRedis{ cachePendingH: cachekeys.FeedInstanceActionPendingH, @@ -94,16 +96,17 @@ func (d *jobFeedInstanceAction) findNodeFromDb(ctx context.Context) error { } func (d *jobFeedInstanceAction) findObjectFromDb(ctx context.Context) error { - if isNew, objId, err := d.oDb.ObjectIDFindOrCreate(ctx, d.objectName, d.clusterID); err != nil { + isNew, objId, err := d.oDb.ObjectIDFindOrCreate(ctx, d.objectName, d.clusterID) + if err != nil { return fmt.Errorf("find or create object ID failed for %s: %w", d.objectName, err) - } else if isNew { + } + if isNew { // TODO: add metrics slog.Debug(fmt.Sprintf("jobFeedInstanceAction has created new object id %s@%s %s", d.objectName, d.clusterID, objId)) } else { - d.objectID = objId slog.Debug(fmt.Sprintf("jobFeedInstanceAction found object id %s@%s %s", d.objectName, d.clusterID, objId)) } - + d.objectID = objId return nil } diff --git a/worker/job_feed_instance_resource_info.go b/worker/job_feed_instance_resource_info.go index cab0d3b..8407417 100644 --- a/worker/job_feed_instance_resource_info.go +++ b/worker/job_feed_instance_resource_info.go @@ -10,6 +10,7 @@ import ( "github.com/opensvc/oc3/cachekeys" "github.com/opensvc/oc3/feeder" + "github.com/opensvc/oc3/util/logkey" ) type ( @@ -43,6 +44,7 @@ func newjobFeedInstanceResourceInfo(objectName, nodeID, clusterID string) *jobFe JobBase: JobBase{ name: "instanceResourceInfo", detail: "ID: " + idX, + logger: slog.With(logkey.NodeID, nodeID, logkey.ClusterID, clusterID, logkey.Object, objectName, logkey.JobName, "instanceResourceInfo"), }, JobRedis: JobRedis{ cachePendingH: cachekeys.FeedInstanceResourceInfoPendingH, diff --git a/worker/job_feed_instance_status.go b/worker/job_feed_instance_status.go index e946370..f15ac94 100644 --- a/worker/job_feed_instance_status.go +++ b/worker/job_feed_instance_status.go @@ -10,6 +10,7 @@ import ( "github.com/opensvc/oc3/cachekeys" "github.com/opensvc/oc3/cdb" + "github.com/opensvc/oc3/util/logkey" ) type jobFeedInstanceStatus struct { @@ -49,6 +50,7 @@ func newInstanceStatus(objectName, nodeID, clusterID string) *jobFeedInstanceSta JobBase: JobBase{ name: "instanceStatus", detail: "ID: " + idX, + logger: slog.With(logkey.NodeID, nodeID, logkey.ClusterID, clusterID, logkey.Object, objectName, logkey.JobName, "instanceStatus"), }, JobRedis: JobRedis{ cachePendingH: cachekeys.FeedInstanceStatusPendingH, diff --git a/worker/job_feed_node_disk.go b/worker/job_feed_node_disk.go index 6a9bc07..ed6e093 100644 --- a/worker/job_feed_node_disk.go +++ b/worker/job_feed_node_disk.go @@ -11,6 +11,7 @@ import ( "github.com/opensvc/oc3/cachekeys" "github.com/opensvc/oc3/mariadb" + "github.com/opensvc/oc3/util/logkey" ) type ( @@ -31,6 +32,7 @@ func newNodeDisk(nodename, nodeID, clusterID string) *jobFeedNodeDisk { JobBase: JobBase{ name: "nodeDisk", detail: "nodename: " + nodename + " nodeID: " + nodeID, + logger: slog.With(logkey.NodeID, nodeID, logkey.ClusterID, clusterID, logkey.Nodename, nodename, logkey.JobName, "nodeDisk"), }, JobRedis: JobRedis{ cachePendingH: cachekeys.FeedNodeDiskPendingH, diff --git a/worker/job_feed_object_config.go b/worker/job_feed_object_config.go index 75cc626..e70e981 100644 --- a/worker/job_feed_object_config.go +++ b/worker/job_feed_object_config.go @@ -13,6 +13,7 @@ import ( "github.com/opensvc/oc3/cachekeys" "github.com/opensvc/oc3/cdb" + "github.com/opensvc/oc3/util/logkey" ) type ( @@ -43,6 +44,7 @@ func newFeedObjectConfig(objectName, nodeID, clusterID string) *jobFeedObjectCon JobBase: JobBase{ name: "objectConfig", detail: "ID: " + idX, + logger: slog.With(logkey.NodeID, nodeID, logkey.ClusterID, clusterID, logkey.Object, objectName, logkey.JobName, "objectConfig"), }, JobRedis: JobRedis{ cachePendingH: cachekeys.FeedObjectConfigPendingH, diff --git a/worker/job_feed_system.go b/worker/job_feed_system.go index e4d49fd..ee24a0c 100644 --- a/worker/job_feed_system.go +++ b/worker/job_feed_system.go @@ -10,6 +10,7 @@ import ( "github.com/opensvc/oc3/cachekeys" "github.com/opensvc/oc3/mariadb" + "github.com/opensvc/oc3/util/logkey" ) type ( @@ -28,6 +29,7 @@ func newDaemonSystem(nodeID string) *jobFeedSystem { JobBase: JobBase{ name: "daemonSystem", detail: "nodeID: " + nodeID, + logger: slog.With(logkey.NodeID, nodeID, logkey.JobName, "daemonSystem"), }, JobRedis: JobRedis{ cachePendingH: cachekeys.FeedSystemPendingH, diff --git a/worker/worker.go b/worker/worker.go index da2ba6c..8b694fb 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -50,6 +50,7 @@ type ( Name() string Detail() string + Logger() *slog.Logger } ) @@ -96,7 +97,7 @@ func (w *Worker) Run() error { go func(j []string) { defer func() { <-runSlots }() if err := w.runJob(j); err != nil { - slog.Error(err.Error()) + slog.Error("job failed", logkey.Error, err) } }(j) case <-ctx.Done(): @@ -180,7 +181,8 @@ func (w *Worker) runJob(unqueuedJob []string) error { slog.Debug(fmt.Sprintf("ignore queue '%s'", unqueuedJob[0])) return nil } - workType := j.Name() + jName := j.Name() + jlog := j.Logger() if a, ok := j.(RedisSetter); ok { a.SetRedis(w.Redis) @@ -188,8 +190,8 @@ func (w *Worker) runJob(unqueuedJob []string) error { if a, ok := j.(PrepareDBer); ok { if err := a.PrepareDB(ctx, w.DB, w.Ev, withTx); err != nil { - slog.Error("🔴PrepareDB failed", logkey.Error, err, logkey.WorkType, workType) - return fmt.Errorf("can't prepare db for %s: %w", workType, err) + jlog.Error("🔴PrepareDB failed", logkey.Error, err) + return fmt.Errorf("can't prepare db for %s: %w", jName, err) } } @@ -205,11 +207,11 @@ func (w *Worker) runJob(unqueuedJob []string) error { duration := time.Since(begin) if err != nil { status = operationStatusFailed - slog.Error("🔴job failure", logkey.WorkType, workType, logkey.Error, err, logkey.JobDetail, j.Detail()) + jlog.Error("🔴job failure", logkey.Error, err, logkey.JobDetail, j.Detail()) } - processedOperationCounter.With(prometheus.Labels{"desc": workType, "status": status}).Inc() - operationDuration.With(prometheus.Labels{"desc": workType, "status": status}).Observe(duration.Seconds()) - slog.Debug(fmt.Sprintf("BLPOP %s <- %s: %s", unqueuedJob[0], unqueuedJob[1], duration)) + processedOperationCounter.With(prometheus.Labels{"desc": jName, "status": status}).Inc() + operationDuration.With(prometheus.Labels{"desc": jName, "status": status}).Observe(duration.Seconds()) + jlog.Debug(fmt.Sprintf("BLPOP %s <- %s: %s", unqueuedJob[0], unqueuedJob[1], duration)) return nil }