Skip to content
Merged

dev #123

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion util/logkey/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const (
Error = "error"
Handler = "handler"
JobDetail = "job_detail"
JobName = "job_name"
JobOpName = "job_op_name"
Method = "method"
MSetID = "mset_id"
NodeID = "node_id"
Expand All @@ -20,5 +22,4 @@ const (
StatusCode = "status_code"
TagID = "tag_id"
URI = "uri"
WorkType = "work_type"
)
34 changes: 19 additions & 15 deletions worker/base_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type (
JobBase struct {
name string
detail string

logger *slog.Logger
}

operation struct {
Expand Down Expand Up @@ -44,31 +46,33 @@ type (

func RunJob(ctx context.Context, j JobRunner) error {
name := j.Name()
detail := j.Detail()
defer logDurationInfo(fmt.Sprintf("%s %s", name, detail), time.Now())
slog.Debug(fmt.Sprintf("%s starting %s", name, detail))
jlog := j.Logger().With(logkey.JobName, name)
defer func(begin time.Time) {
jlog.Debug(fmt.Sprintf("STAT: %s elapse: %s", name, time.Since(begin)))
}(time.Now())
jlog.Debug("starting job")

ops := j.Operations()

err := runOps(ctx, ops...)
err := runOps(ctx, jlog, ops...)
if err != nil {
if tx, ok := j.(cdb.DBTxer); ok {
slog.Debug(fmt.Sprintf("%s rollbacking on error %s", name, detail))
jlog.Debug("call rollback on error")
if err := tx.Rollback(); err != nil {
slog.Error(fmt.Sprintf("%s rollback on error failed %s: %s", name, detail, err))
jlog.Error("rollback on error failed", logkey.Error, err)
}
}
return err
} else if tx, ok := j.(cdb.DBTxer); ok {
slog.Debug(fmt.Sprintf("%s commiting %s", name, detail))
jlog.Debug("call commit")
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit: %w", err)
}
}
if r, ok := j.(LogResulter); ok {
r.LogResult()
}
slog.Debug(fmt.Sprintf("%s done %s", name, detail))
jlog.Debug("job done")
return nil
}

Expand All @@ -80,7 +84,11 @@ func (j *JobBase) Detail() string {
return j.detail
}

func runOps(ctx context.Context, ops ...operation) error {
func (j *JobBase) Logger() *slog.Logger {
return j.logger
}

func runOps(ctx context.Context, jlog *slog.Logger, ops ...operation) error {
for _, op := range ops {
var err error
if op.condition != nil && !op.condition() {
Expand All @@ -97,17 +105,13 @@ func runOps(ctx context.Context, ops ...operation) error {
return err
}
// TODO: add metrics
slog.Warn(fmt.Sprintf("%s: non blocking error", op.desc), logkey.Error, err)
jlog.Warn(fmt.Sprintf("%s: non blocking error", op.desc), logkey.JobOpName, op.desc, logkey.Error, err)
continue
}
operationDuration.
With(prometheus.Labels{"desc": op.desc, "status": operationStatusOk}).
Observe(duration.Seconds())
slog.Debug(fmt.Sprintf("STAT: %s elapse: %s", op.desc, duration))
jlog.Debug(fmt.Sprintf("STAT: %s elapse: %s", op.desc, duration), logkey.JobOpName, op.desc)
}
return nil
}

func logDurationInfo(s string, begin time.Time) {
slog.Debug(fmt.Sprintf("STAT: %s elapse: %s", s, time.Since(begin)))
}
1 change: 1 addition & 0 deletions worker/job_feed_daemon_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func newDaemonPing(nodeID string) *jobFeedDaemonPing {
JobBase: JobBase{
name: "daemonPing",
detail: "nodeID: " + nodeID,
logger: slog.With(logkey.NodeID, nodeID, logkey.JobName, "daemonPing"),
},
JobRedis: JobRedis{
cachePendingH: cachekeys.FeedDaemonPingPendingH,
Expand Down
2 changes: 2 additions & 0 deletions worker/job_feed_daemon_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/opensvc/oc3/cachekeys"
"github.com/opensvc/oc3/cdb"
"github.com/opensvc/oc3/util/logkey"
)

type (
Expand Down Expand Up @@ -88,6 +89,7 @@ func newDaemonStatus(nodeID string) *jobFeedDaemonStatus {
JobBase: JobBase{
name: "daemonStatus",
detail: "nodeID: " + nodeID,
logger: slog.With(logkey.NodeID, nodeID, logkey.JobName, "daemonStatus"),
},
JobRedis: JobRedis{
cachePendingH: cachekeys.FeedDaemonStatusPendingH,
Expand Down
11 changes: 7 additions & 4 deletions worker/job_feed_instance_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/opensvc/oc3/cachekeys"
"github.com/opensvc/oc3/cdb"
"github.com/opensvc/oc3/feeder"
"github.com/opensvc/oc3/util/logkey"
)

type jobFeedInstanceAction struct {
Expand Down Expand Up @@ -43,6 +44,7 @@ func newAction(objectName, nodeID, clusterID, uuid string) *jobFeedInstanceActio
JobBase: JobBase{
name: "instanceAction",
detail: "ID: " + idX,
logger: slog.With(logkey.NodeID, nodeID, logkey.ClusterID, clusterID, logkey.Object, objectName, logkey.JobName, "instanceAction"),
},
JobRedis: JobRedis{
cachePendingH: cachekeys.FeedInstanceActionPendingH,
Expand Down Expand Up @@ -94,16 +96,17 @@ func (d *jobFeedInstanceAction) findNodeFromDb(ctx context.Context) error {
}

func (d *jobFeedInstanceAction) findObjectFromDb(ctx context.Context) error {
if isNew, objId, err := d.oDb.ObjectIDFindOrCreate(ctx, d.objectName, d.clusterID); err != nil {
isNew, objId, err := d.oDb.ObjectIDFindOrCreate(ctx, d.objectName, d.clusterID)
if err != nil {
return fmt.Errorf("find or create object ID failed for %s: %w", d.objectName, err)
} else if isNew {
}
if isNew {
// TODO: add metrics
slog.Debug(fmt.Sprintf("jobFeedInstanceAction has created new object id %s@%s %s", d.objectName, d.clusterID, objId))
} else {
d.objectID = objId
slog.Debug(fmt.Sprintf("jobFeedInstanceAction found object id %s@%s %s", d.objectName, d.clusterID, objId))
}

d.objectID = objId
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions worker/job_feed_instance_resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/opensvc/oc3/cachekeys"
"github.com/opensvc/oc3/feeder"
"github.com/opensvc/oc3/util/logkey"
)

type (
Expand Down Expand Up @@ -43,6 +44,7 @@ func newjobFeedInstanceResourceInfo(objectName, nodeID, clusterID string) *jobFe
JobBase: JobBase{
name: "instanceResourceInfo",
detail: "ID: " + idX,
logger: slog.With(logkey.NodeID, nodeID, logkey.ClusterID, clusterID, logkey.Object, objectName, logkey.JobName, "instanceResourceInfo"),
},
JobRedis: JobRedis{
cachePendingH: cachekeys.FeedInstanceResourceInfoPendingH,
Expand Down
2 changes: 2 additions & 0 deletions worker/job_feed_instance_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/opensvc/oc3/cachekeys"
"github.com/opensvc/oc3/cdb"
"github.com/opensvc/oc3/util/logkey"
)

type jobFeedInstanceStatus struct {
Expand Down Expand Up @@ -49,6 +50,7 @@ func newInstanceStatus(objectName, nodeID, clusterID string) *jobFeedInstanceSta
JobBase: JobBase{
name: "instanceStatus",
detail: "ID: " + idX,
logger: slog.With(logkey.NodeID, nodeID, logkey.ClusterID, clusterID, logkey.Object, objectName, logkey.JobName, "instanceStatus"),
},
JobRedis: JobRedis{
cachePendingH: cachekeys.FeedInstanceStatusPendingH,
Expand Down
2 changes: 2 additions & 0 deletions worker/job_feed_node_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/opensvc/oc3/cachekeys"
"github.com/opensvc/oc3/mariadb"
"github.com/opensvc/oc3/util/logkey"
)

type (
Expand All @@ -31,6 +32,7 @@ func newNodeDisk(nodename, nodeID, clusterID string) *jobFeedNodeDisk {
JobBase: JobBase{
name: "nodeDisk",
detail: "nodename: " + nodename + " nodeID: " + nodeID,
logger: slog.With(logkey.NodeID, nodeID, logkey.ClusterID, clusterID, logkey.Nodename, nodename, logkey.JobName, "nodeDisk"),
},
JobRedis: JobRedis{
cachePendingH: cachekeys.FeedNodeDiskPendingH,
Expand Down
2 changes: 2 additions & 0 deletions worker/job_feed_object_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/opensvc/oc3/cachekeys"
"github.com/opensvc/oc3/cdb"
"github.com/opensvc/oc3/util/logkey"
)

type (
Expand Down Expand Up @@ -43,6 +44,7 @@ func newFeedObjectConfig(objectName, nodeID, clusterID string) *jobFeedObjectCon
JobBase: JobBase{
name: "objectConfig",
detail: "ID: " + idX,
logger: slog.With(logkey.NodeID, nodeID, logkey.ClusterID, clusterID, logkey.Object, objectName, logkey.JobName, "objectConfig"),
},
JobRedis: JobRedis{
cachePendingH: cachekeys.FeedObjectConfigPendingH,
Expand Down
2 changes: 2 additions & 0 deletions worker/job_feed_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/opensvc/oc3/cachekeys"
"github.com/opensvc/oc3/mariadb"
"github.com/opensvc/oc3/util/logkey"
)

type (
Expand All @@ -28,6 +29,7 @@ func newDaemonSystem(nodeID string) *jobFeedSystem {
JobBase: JobBase{
name: "daemonSystem",
detail: "nodeID: " + nodeID,
logger: slog.With(logkey.NodeID, nodeID, logkey.JobName, "daemonSystem"),
},
JobRedis: JobRedis{
cachePendingH: cachekeys.FeedSystemPendingH,
Expand Down
18 changes: 10 additions & 8 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type (

Name() string
Detail() string
Logger() *slog.Logger
}
)

Expand Down Expand Up @@ -96,7 +97,7 @@ func (w *Worker) Run() error {
go func(j []string) {
defer func() { <-runSlots }()
if err := w.runJob(j); err != nil {
slog.Error(err.Error())
slog.Error("job failed", logkey.Error, err)
}
}(j)
case <-ctx.Done():
Expand Down Expand Up @@ -180,16 +181,17 @@ func (w *Worker) runJob(unqueuedJob []string) error {
slog.Debug(fmt.Sprintf("ignore queue '%s'", unqueuedJob[0]))
return nil
}
workType := j.Name()
jName := j.Name()
jlog := j.Logger()

if a, ok := j.(RedisSetter); ok {
a.SetRedis(w.Redis)
}

if a, ok := j.(PrepareDBer); ok {
if err := a.PrepareDB(ctx, w.DB, w.Ev, withTx); err != nil {
slog.Error("🔴PrepareDB failed", logkey.Error, err, logkey.WorkType, workType)
return fmt.Errorf("can't prepare db for %s: %w", workType, err)
jlog.Error("🔴PrepareDB failed", logkey.Error, err)
return fmt.Errorf("can't prepare db for %s: %w", jName, err)
}
}

Expand All @@ -205,11 +207,11 @@ func (w *Worker) runJob(unqueuedJob []string) error {
duration := time.Since(begin)
if err != nil {
status = operationStatusFailed
slog.Error("🔴job failure", logkey.WorkType, workType, logkey.Error, err, logkey.JobDetail, j.Detail())
jlog.Error("🔴job failure", logkey.Error, err, logkey.JobDetail, j.Detail())
}
processedOperationCounter.With(prometheus.Labels{"desc": workType, "status": status}).Inc()
operationDuration.With(prometheus.Labels{"desc": workType, "status": status}).Observe(duration.Seconds())
slog.Debug(fmt.Sprintf("BLPOP %s <- %s: %s", unqueuedJob[0], unqueuedJob[1], duration))
processedOperationCounter.With(prometheus.Labels{"desc": jName, "status": status}).Inc()
operationDuration.With(prometheus.Labels{"desc": jName, "status": status}).Observe(duration.Seconds())
jlog.Debug(fmt.Sprintf("BLPOP %s <- %s: %s", unqueuedJob[0], unqueuedJob[1], duration))
return nil
}

Expand Down
Loading