diff --git a/cdb/db_nodes.go b/cdb/db_nodes.go index d0f8532..e1655f3 100644 --- a/cdb/db_nodes.go +++ b/cdb/db_nodes.go @@ -309,7 +309,7 @@ func (oDb *DB) PurgeNodeHBAsOutdated(ctx context.Context) error { if count, err := oDb.execCountContext(ctx, request); err != nil { return err } else if count > 0 { - slog.Info(fmt.Sprintf("purged %d entries from table node_hba", count)) + slog.Debug(fmt.Sprintf("purged %d entries from table node_hba", count)) oDb.SetChange("node_hba") } return nil diff --git a/cmd/conf.go b/cmd/conf.go index b75a04a..a372c15 100644 --- a/cmd/conf.go +++ b/cmd/conf.go @@ -102,6 +102,10 @@ func setDefaultRunnerConfig() { viper.SetDefault(s+".pprof.ux.socket", "/var/run/oc3_runner_pprof.sock") viper.SetDefault(s+".metrics.enable", false) viper.SetDefault(s+".log.request.level", "none") + viper.SetDefault(s+".nb_workers", 0) + viper.SetDefault(s+".purge_timeout", 0) + viper.SetDefault(s+".notification_timeout", 0) + viper.SetDefault(s+".command_timeout", 0) } func setDefaultDBConfig() { diff --git a/messenger/main.go b/messenger/main.go index 8e14a5e..58412d6 100644 --- a/messenger/main.go +++ b/messenger/main.go @@ -12,6 +12,8 @@ import ( "sync" "github.com/gorilla/websocket" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) type CmdComet struct { @@ -40,12 +42,46 @@ var ( type ( Client struct { conn *websocket.Conn + mu sync.Mutex group string token string name string } ) +var ( + connectionTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "oc3", + Subsystem: "messenger", + Name: "connections_total", + Help: "Total number of connections", + }, + []string{"group"}) + disconnectionTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "oc3", + Subsystem: "messenger", + Name: "disconnections_total", + Help: "Total number of disconnections", + }, + []string{"group"}) + sendMessageTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "oc3", + Subsystem: "messenger", + Name: "send_message_total", + Help: "Total number of sent messages", + }, []string{"group"}) + receiveMessageTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "oc3", + Subsystem: "messenger", + Name: "receive_message_total", + Help: "Total number of received messages", + }, []string{"group"}) +) + func postHandler(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) @@ -69,6 +105,7 @@ func postHandler(w http.ResponseWriter, r *http.Request) { } slog.Debug(fmt.Sprintf("MESSAGE to %s:%s", group, message)) + sendMessageTotal.WithLabelValues(group).Inc() if hmacKey != "" { signature := r.FormValue("signature") @@ -87,8 +124,11 @@ func postHandler(w http.ResponseWriter, r *http.Request) { mu.RUnlock() for _, client := range clients { - if err := client.conn.WriteMessage(websocket.TextMessage, []byte(message)); err != nil { + err := client.WriteMessage(websocket.TextMessage, []byte(message)) + if err != nil { slog.Warn(fmt.Sprintf("Error writing to client: %v", err)) + } else { + receiveMessageTotal.WithLabelValues(group).Inc() } } @@ -159,6 +199,7 @@ func distributeHandler(w http.ResponseWriter, r *http.Request) { group: group, token: token, name: name, + mu: sync.Mutex{}, } if useTokens { @@ -182,7 +223,7 @@ func distributeHandler(w http.ResponseWriter, r *http.Request) { } for _, existingClient := range listeners[group] { - if err := existingClient.conn.WriteMessage(websocket.TextMessage, []byte("+"+name)); err != nil { + if err := existingClient.WriteMessage(websocket.TextMessage, []byte("+"+name)); err != nil { slog.Warn(fmt.Sprintf("Error notifying client: %v", err)) } } @@ -193,7 +234,8 @@ func distributeHandler(w http.ResponseWriter, r *http.Request) { userAgent := r.Header.Get("User-Agent") - slog.Info(fmt.Sprintf("CONNECT %s to %s", userAgent, group)) + slog.Debug(fmt.Sprintf("CONNECT %s to %s", userAgent, group)) + connectionTotal.WithLabelValues(group).Inc() defer func() { mu.Lock() @@ -209,9 +251,10 @@ func distributeHandler(w http.ResponseWriter, r *http.Request) { mu.Unlock() conn.Close() - slog.Info(fmt.Sprintf("DISCONNECT %s from %s", group, userAgent)) + slog.Debug(fmt.Sprintf("DISCONNECT %s from %s", group, userAgent)) + disconnectionTotal.WithLabelValues(group).Inc() for _, existingClient := range listeners[group] { - if err := existingClient.conn.WriteMessage(websocket.TextMessage, []byte("-"+name)); err != nil { + if err := existingClient.WriteMessage(websocket.TextMessage, []byte("-"+name)); err != nil { slog.Warn(fmt.Sprintf("Error notifying client: %v", err)) } } @@ -226,6 +269,12 @@ func distributeHandler(w http.ResponseWriter, r *http.Request) { } } +func (c *Client) WriteMessage(messageType int, data []byte) error { + c.mu.Lock() + defer c.mu.Unlock() + return c.conn.WriteMessage(messageType, data) +} + func (c *CmdComet) Run() error { hmacKey = c.Key useTokens = c.RequireToken @@ -237,7 +286,7 @@ func (c *CmdComet) Run() error { addr := fmt.Sprintf("%s:%s", c.Address, c.Port) if c.KeyFile != "" && c.CertFile != "" { - slog.Info(fmt.Sprintf("Starting HTTPS server on %s", addr)) + slog.Debug(fmt.Sprintf("Starting HTTPS server on %s", addr)) cert, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile) if err != nil { @@ -257,7 +306,7 @@ func (c *CmdComet) Run() error { return err } } else { - slog.Info(fmt.Sprintf("Starting HTTP server on %s", addr)) + slog.Debug(fmt.Sprintf("Starting HTTP server on %s", addr)) if err := http.ListenAndServe(addr, nil); err != nil { slog.Error(fmt.Sprintf("Error starting HTTP server: %s", err)) return err diff --git a/runner/main.go b/runner/main.go index 17ca42b..869ff87 100644 --- a/runner/main.go +++ b/runner/main.go @@ -12,6 +12,8 @@ import ( "strings" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/spf13/viper" "github.com/opensvc/oc3/cdb" @@ -40,15 +42,18 @@ type ( } cmdSetUnreachable struct { - id int + id int + actionType string } cmdSetNotified struct { - id int + id int + actionType string } cmdSetInvalid struct { - id int + id int + actionType string } cmdSetRunning struct { @@ -56,10 +61,11 @@ type ( } cmdSetDone struct { - id int - ret int - stdout string - stderr string + id int + ret int + stdout string + stderr string + actionType string } dedupLog struct { @@ -78,9 +84,54 @@ const ( DefaultPurgeTimeout = 24 * time.Hour ) +var ( + queueQueued = promauto.NewCounter( + prometheus.CounterOpts{ + Namespace: "oc3", + Subsystem: "runner", + Name: "queue_queued_total", + Help: "Total number of actions queued", + }) + actionInProgress = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "oc3", + Subsystem: "runner", + Name: "action_in_progress_total", + Help: "Total number of actions in progress by type", + }, []string{"action_type"}) + actionProcessed = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "oc3", + Subsystem: "runner", + Name: "action_processed_total", + Help: "Total number of actions processed by type and result", + }, []string{"action_type", "result"}) + actionPullReturnCode = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "oc3", + Subsystem: "runner", + Name: "action_pull_return_code_total", + Help: "Total number of pull action return codes", + }, []string{"ret"}) + dbErrors = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "oc3", + Subsystem: "runner", + Name: "db_errors_total", + Help: "Total number of database errors by operation", + }, []string{"op"}) + dbRequests = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "oc3", + Subsystem: "runner", + Name: "db_requests_total", + Help: "Total number of database requests by operation", + }, []string{"op"}) +) + func (d *ActionDaemon) Run() error { - nbWorkers := getOptionInt("actiond.nb_workers", DefaultNbWorkers) - purgeTimeout := getOptionDuration("actiond.purge_timeout", DefaultPurgeTimeout) + nbWorkers := getOptionInt("runner.nb_workers", DefaultNbWorkers) + purgeTimeout := getOptionDuration("runner.purge_timeout", DefaultPurgeTimeout) odb := cdb.New(d.DB) dispatchC := make(chan cdb.ActionQueueEntry) cmdC := make(chan any) @@ -107,26 +158,33 @@ func (d *ActionDaemon) Run() error { pollWaitingActions := func() { // define SQL @now as the time we start processing the queue err := odb.ActionQSetNow(d.Ctx) + dbRequests.WithLabelValues("set_now").Inc() if err != nil { nowErrorLogger.warnf("set now: %s", err) + dbErrors.WithLabelValues("set_now").Inc() return } nowErrorLogger.reset() // mark all waiting actions as dequeued at @now err = odb.ActionQSetDequeuedToNow(d.Ctx) + dbRequests.WithLabelValues("set_dequeued_to_now").Inc() if err != nil { setDequeuedToNowErrorLogger.warnf("set dequeued to now: %s", err) + dbErrors.WithLabelValues("set_dequeued_to_now").Inc() return } setDequeuedToNowErrorLogger.reset() // fetch all actions marked as dequeued at @now lines, err := odb.ActionQGetQueued(d.Ctx) + dbRequests.WithLabelValues("get_queued").Inc() if err != nil { getQueuedErrorLogger.warnf("get queued: %s", err) + dbErrors.WithLabelValues("get_queued").Inc() return } + queueQueued.Add(float64(len(lines))) getQueuedErrorLogger.reset() // dispatch each action to a worker @@ -143,80 +201,102 @@ func (d *ActionDaemon) Run() error { case cmdSetUnreachable: c := cmd.(cmdSetUnreachable) d.unreachableIds = append(d.unreachableIds, c.id) + actionProcessed.WithLabelValues(c.actionType, "unreachable").Inc() case cmdSetNotified: c := cmd.(cmdSetNotified) d.nIds = append(d.nIds, c.id) + actionProcessed.WithLabelValues(c.actionType, "notified").Inc() case cmdSetInvalid: c := cmd.(cmdSetInvalid) d.invalidIds = append(d.invalidIds, c.id) + actionProcessed.WithLabelValues(c.actionType, "invalid").Inc() case cmdSetRunning: c := cmd.(cmdSetRunning) d.runningIds = append(d.runningIds, c.id) case cmdSetDone: c := cmd.(cmdSetDone) d.doneEntries = append(d.doneEntries, c) + result := "success" + if c.ret != 0 { + result = "failure" + } + actionProcessed.WithLabelValues(c.actionType, result).Inc() } case <-updateTicker.C: if len(d.unreachableIds) > 0 { err := odb.ActionQSetUnreachable(d.Ctx, d.unreachableIds) + dbRequests.WithLabelValues("set_unreachable").Inc() if err != nil { slog.Warn(fmt.Sprintf("set unreachable: %s", err)) + dbErrors.WithLabelValues("set_unreachable").Inc() } else { - slog.Info(fmt.Sprintf("set unreachable: %v", d.unreachableIds)) + slog.Debug(fmt.Sprintf("set unreachable: %v", d.unreachableIds)) d.unreachableIds = []int{} } } if len(d.invalidIds) > 0 { err := odb.ActionQSetInvalid(d.Ctx, d.invalidIds) + dbRequests.WithLabelValues("set_invalid").Inc() if err != nil { slog.Warn(fmt.Sprintf("set invalid: %s", err)) + dbErrors.WithLabelValues("set_invalid").Inc() } else { - slog.Info(fmt.Sprintf("set invalid: %v", d.invalidIds)) + slog.Debug(fmt.Sprintf("set invalid: %v", d.invalidIds)) d.invalidIds = []int{} } } if len(d.nIds) > 0 { err := odb.ActionQSetNotified(d.Ctx, d.nIds) + dbRequests.WithLabelValues("set_notified").Inc() if err != nil { slog.Warn(fmt.Sprintf("set notified: %s", err)) + dbErrors.WithLabelValues("set_notified").Inc() } else { - slog.Info(fmt.Sprintf("set notified: %v", d.nIds)) + slog.Debug(fmt.Sprintf("set notified: %v", d.nIds)) d.nIds = []int{} } } if len(d.ids) > 0 { err := odb.ActionQSetQueued(d.Ctx, d.ids) + dbRequests.WithLabelValues("set_queued").Inc() if err != nil { slog.Warn(fmt.Sprintf("set queued: %s", err)) + dbErrors.WithLabelValues("set_queued").Inc() } else { - slog.Info(fmt.Sprintf("set queued: %v", d.ids)) + slog.Debug(fmt.Sprintf("set queued: %v", d.ids)) d.ids = []int{} } } if len(d.runningIds) > 0 { err := odb.ActionQSetRunning(d.Ctx, d.runningIds) + dbRequests.WithLabelValues("set_running").Inc() if err != nil { slog.Warn(fmt.Sprintf("set running: %s", err)) + dbErrors.WithLabelValues("set_running").Inc() } else { - slog.Info(fmt.Sprintf("set running: %v", d.runningIds)) + slog.Debug(fmt.Sprintf("set running: %v", d.runningIds)) d.runningIds = []int{} } } if len(d.doneEntries) > 0 { for _, entry := range d.doneEntries { err := odb.ActionQSetDone(d.Ctx, entry.id, entry.ret, entry.stdout, entry.stderr) + dbRequests.WithLabelValues("set_done").Inc() if err != nil { slog.Warn(fmt.Sprintf("set done: %s", err)) + dbErrors.WithLabelValues("set_done").Inc() } else { - slog.Info(fmt.Sprintf("set done: id %d ret %d stdout %d stderr %d", entry.id, entry.ret, len(entry.stdout), len(entry.stderr))) + slog.Debug(fmt.Sprintf("set done: id %d ret %d stdout %d stderr %d", entry.id, entry.ret, len(entry.stdout), len(entry.stderr))) } } d.doneEntries = []cmdSetDone{} } if len(d.ids) > 0 || len(d.nIds) > 0 || len(d.invalidIds) > 0 || len(d.unreachableIds) > 0 || len(d.runningIds) > 0 || len(d.doneEntries) > 0 { data, err := odb.ActionQEventData(d.Ctx) + dbRequests.WithLabelValues("action_queue_event_data").Inc() if err != nil { slog.Warn(fmt.Sprintf("get action queue event data: %s", err)) + dbErrors.WithLabelValues("action_queue_event_data").Inc() } if err := odb.Session.NotifyTableChangeWithData(d.Ctx, "action_queue", data); err != nil { slog.Warn(fmt.Sprintf("notify changes: %s", err)) @@ -225,9 +305,11 @@ func (d *ActionDaemon) Run() error { case <-purgeTicker.C: if err := odb.ActionQPurge(d.Ctx); err != nil { slog.Warn(fmt.Sprintf("purge action queue: %s", err)) + dbErrors.WithLabelValues("purge").Inc() } else { - slog.Info("purge action queue: done") + slog.Debug("purge action queue: done") } + dbRequests.WithLabelValues("purge").Inc() case <-d.Ctx.Done(): return nil } @@ -250,7 +332,8 @@ func (w *Worker) Run() { func (w *Worker) work(e cdb.ActionQueueEntry) error { if err := w.validateCommand(e.Command); err != nil { w.cmdC <- cmdSetInvalid{ - id: e.ID, + id: e.ID, + actionType: e.ActionType, } return fmt.Errorf("invalid command: %s", err) } @@ -283,7 +366,9 @@ func notifyNode(nodename string, port int) error { func (w *Worker) workPull(e cdb.ActionQueueEntry) { - notifTimeout := getOptionDuration("actiond.notification_timeout", DefaultNotificationTimeout) + notifTimeout := getOptionDuration("runner.notification_timeout", DefaultNotificationTimeout) + + actionInProgress.WithLabelValues("pull").Inc() ctx, cancel := context.WithTimeout(w.ctx, notifTimeout) defer cancel() @@ -295,7 +380,8 @@ func (w *Worker) workPull(e cdb.ActionQueueEntry) { select { case <-ctx.Done(): w.cmdC <- cmdSetUnreachable{ - id: e.ID, + id: e.ID, + actionType: e.ActionType, } return case <-ticker.C: @@ -306,7 +392,8 @@ func (w *Worker) workPull(e cdb.ActionQueueEntry) { continue } w.cmdC <- cmdSetNotified{ - id: e.ID, + id: e.ID, + actionType: e.ActionType, } return } @@ -314,6 +401,7 @@ func (w *Worker) workPull(e cdb.ActionQueueEntry) { } func (w *Worker) workPush(e cdb.ActionQueueEntry) { + actionInProgress.WithLabelValues("push").Inc() w.cmdC <- cmdSetRunning{ id: e.ID, } @@ -328,11 +416,14 @@ func (w *Worker) workPush(e cdb.ActionQueueEntry) { ) w.cmdC <- cmdSetDone{ - id: e.ID, - ret: returnCode, - stdout: strings.TrimSpace(stdout), - stderr: strings.TrimSpace(stderr), + id: e.ID, + ret: returnCode, + stdout: strings.TrimSpace(stdout), + stderr: strings.TrimSpace(stderr), + actionType: e.ActionType, } + + actionPullReturnCode.WithLabelValues(fmt.Sprintf("%d", returnCode)).Inc() } func (w *Worker) validateCommand(cmd string) error { @@ -355,7 +446,7 @@ func (w *Worker) validateCommand(cmd string) error { } func executeCommand(ctx context.Context, cmd string) (string, string, int) { - cmdTimeout := getOptionDuration("actiond.command_timeout", DefaultCommandTimeout) + cmdTimeout := getOptionDuration("runner.command_timeout", DefaultCommandTimeout) ctx, cancel := context.WithTimeout(ctx, cmdTimeout) defer cancel() diff --git a/scheduler/main.go b/scheduler/main.go index 745df12..e3fd3c5 100644 --- a/scheduler/main.go +++ b/scheduler/main.go @@ -62,7 +62,7 @@ func (t *Scheduler) toggleTasks(ctx context.Context, states map[string]State) { cancel, hasCancel := t.cancels[name] switch { case storedState.IsDisabled && hasCancel: - task.Infof("stop") + task.Debugf("stop") cancel() delete(t.cancels, name) case !storedState.IsDisabled && !hasCancel: diff --git a/scheduler/task.go b/scheduler/task.go index 7807db3..6ad2c66 100644 --- a/scheduler/task.go +++ b/scheduler/task.go @@ -236,7 +236,7 @@ func (t *Task) Start(ctx context.Context) { } func (t *Task) Exec(ctx context.Context) (err error) { - t.Infof("run") + t.Debugf("run") status := taskExecStatusOk begin := time.Now()