diff --git a/core/node/config.go b/core/node/config.go index 08fa52601..7c975de10 100644 --- a/core/node/config.go +++ b/core/node/config.go @@ -1,6 +1,7 @@ package node import ( + "slices" "time" "github.com/opensvc/om3/v3/core/schedule" @@ -20,6 +21,15 @@ type ( SplitAction string `json:"split_action"` SSHKey string `json:"sshkey"` PRKey string `json:"prkey"` + Hooks []Hook `json:"hooks"` + } + + Hook struct { + Name string `json:"name"` + Events []string `json:"events"` + Command []string `json:"command"` + + sig string } ) @@ -55,5 +65,36 @@ func (c Config) Equals(other Config) bool { } } + // Compare Hook slice + if len(c.Hooks) != len(other.Hooks) { + return false + } + for i := range c.Hooks { + if !c.Hooks[i].Equal(&other.Hooks[i]) { + return false + } + } + + return true +} + +func (t *Hook) Sig() string { + return t.sig +} + +func (t *Hook) SetSig(sig string) { + t.sig = sig +} + +func (t *Hook) Equal(o *Hook) bool { + if t.Name != o.Name { + return false + } else if t.sig != o.sig { + return false + } else if !slices.Equal(t.Events, o.Events) { + return false + } else if !slices.Equal(t.Command, o.Command) { + return false + } return true } diff --git a/daemon/hook/main.go b/daemon/hook/main.go index c2304bd4f..fdce9f3a3 100644 --- a/daemon/hook/main.go +++ b/daemon/hook/main.go @@ -6,19 +6,17 @@ import ( "fmt" "os/exec" "slices" - "strings" "sync" "sync/atomic" "syscall" "time" "github.com/opensvc/om3/v3/core/event" + "github.com/opensvc/om3/v3/core/node" "github.com/opensvc/om3/v3/core/object" "github.com/opensvc/om3/v3/core/xconfig" "github.com/opensvc/om3/v3/daemon/msgbus" - "github.com/opensvc/om3/v3/util/command" "github.com/opensvc/om3/v3/util/hostname" - "github.com/opensvc/om3/v3/util/key" "github.com/opensvc/om3/v3/util/plog" "github.com/opensvc/om3/v3/util/pubsub" "github.com/opensvc/om3/v3/util/xmap" @@ -86,7 +84,11 @@ func NewManager(drainDuration time.Duration, subQS pubsub.QueueSizer) *Manager { func (t *Manager) Start(parent context.Context) error { t.log.Infof("starting") t.ctx, t.cancel = context.WithCancel(parent) - t.update() + initialNodeConfig := node.ConfigData.GetByNode(t.localhost) + if initialNodeConfig == nil { + return fmt.Errorf("node config not found for localhost: %s", t.localhost) + } + t.update(initialNodeConfig.Hooks) t.startSubscriptions() t.startUpdateLoop() t.log.Infof("started") @@ -108,8 +110,11 @@ func (t *Manager) startUpdateLoop() { select { case <-t.ctx.Done(): return - case <-t.sub.C: - t.update() + case i := <-t.sub.C: + switch ev := i.(type) { + case *msgbus.NodeConfigUpdated: + t.update(ev.Value.Hooks) + } } } }() @@ -130,28 +135,26 @@ func (t *Manager) Stop() error { return nil } -func (t *Manager) update() { +func (t *Manager) update(hooks []node.Hook) { if err := t.loadConfig(); err != nil { t.log.Warnf("%s", err) return } currentHookNames := xmap.Keys(t.hooks) var hooksToStop, scannedHooks []string - hooksToStart := make(map[string]string) - for _, name := range t.config.SectionStrings() { - if !strings.HasPrefix(name, "hook#") { - continue - } + hooksToStart := make(map[string]node.Hook) + for _, h := range hooks { + name := h.Name + scannedHooks = append(scannedHooks, name) currentHook, ok := t.hooks[name] if !ok { - hooksToStart[name] = "" + hooksToStart[name] = h continue } - sig := t.config.SectionSig(name) - if sig != currentHook.sig { + if sig := h.Sig(); sig != currentHook.sig { hooksToStop = append(hooksToStop, name) - hooksToStart[name] = sig + hooksToStart[name] = h } } @@ -166,23 +169,16 @@ func (t *Manager) update() { } delete(t.hooks, name) } - for name, sig := range hooksToStart { - kinds := t.config.GetStrings(key.New(name, "events")) + for name, hookToStart := range hooksToStart { + kinds := hookToStart.Events h := hook{ - sig: sig, - } - t.hooks[name] = h - s := t.config.Get(key.New(name, "command")) - args, err := command.CmdArgsFromString(s) - if err != nil { - t.log.Warnf("%s: failed to split command: %s", name, err) - continue + sig: hookToStart.Sig(), } - if len(args) < 1 { + if len(hookToStart.Command) < 1 { t.log.Warnf("%s: empty command", name) continue } - h.cancel = t.startHook(name, kinds, args) + h.cancel = t.startHook(name, kinds, hookToStart.Command) t.hooks[name] = h } } diff --git a/daemon/nmon/config.go b/daemon/nmon/config.go new file mode 100644 index 000000000..f4bdf80c0 --- /dev/null +++ b/daemon/nmon/config.go @@ -0,0 +1,97 @@ +package nmon + +import ( + "runtime" + "strings" + + "github.com/opensvc/om3/v3/core/node" + "github.com/opensvc/om3/v3/core/object" + "github.com/opensvc/om3/v3/util/key" +) + +func (t *Manager) getNodeConfig() node.Config { + var ( + keyMaintenanceGracePeriod = key.New("node", "maintenance_grace_period") + keyMaxParallel = key.New("node", "max_parallel") + keyMaxKeySize = key.New("node", "max_key_size") + keyReadyPeriod = key.New("node", "ready_period") + keyRejoinGracePeriod = key.New("node", "rejoin_grace_period") + keyEnv = key.New("node", "env") + keySplitAction = key.New("node", "split_action") + keySSHKey = key.New("node", "sshkey") + keyPRKey = key.New("node", "prkey") + keyMinAvailMemPct = key.New("node", "min_avail_mem_pct") + keyMinAvailSwapPct = key.New("node", "min_avail_swap_pct") + ) + cfg := node.Config{} + if d := t.config.GetDuration(keyMaintenanceGracePeriod); d != nil { + cfg.MaintenanceGracePeriod = *d + } + if d := t.config.GetDuration(keyReadyPeriod); d != nil { + cfg.ReadyPeriod = *d + } + if d := t.config.GetDuration(keyRejoinGracePeriod); d != nil { + cfg.RejoinGracePeriod = *d + } + if d := t.config.GetSize(keyMaxKeySize); d != nil { + cfg.MaxKeySize = *d + } + cfg.MinAvailMemPct = t.config.GetInt(keyMinAvailMemPct) + cfg.MinAvailSwapPct = t.config.GetInt(keyMinAvailSwapPct) + cfg.MaxParallel = t.config.GetInt(keyMaxParallel) + cfg.Env = t.config.GetString(keyEnv) + cfg.SplitAction = t.config.GetString(keySplitAction) + cfg.SSHKey = t.config.GetString(keySSHKey) + cfg.PRKey = t.config.GetString(keyPRKey) + + if cfg.MaxParallel == 0 { + cfg.MaxParallel = runtime.NumCPU() + } + if cfg.MaxParallel < MinMaxParallel { + cfg.MaxParallel = MinMaxParallel + } + + nodeObj, err := object.NewNode(object.WithVolatile(true)) + if err != nil { + t.log.Warnf("load node config: %s", err) + } else { + mergedCfg := nodeObj.MergedConfig() + hooks := make(map[string]node.Hook) + for _, s := range t.config.SectionStrings() { + if !strings.HasPrefix(s, "hook#") { + continue + } + t.log.Warnf("analyse config: %s", s) + hook := node.Hook{Name: s[5:]} + if hook.Name == "" { + t.log.Debugf("skip empty hook name for %s", s) + continue + } + + hook.Events = mergedCfg.GetStrings(key.New(s, "events")) + if len(hook.Events) == 0 { + t.log.Debugf("skip empty hook events for %s", s) + continue + } + hook.Command = mergedCfg.GetStrings(key.New(s, "command")) + if len(hook.Command) == 0 { + t.log.Debugf("skip empty hook command for %s", s) + continue + } + hook.SetSig(mergedCfg.SectionSig(s)) + + hooks[hook.Name] = hook + t.log.Infof("hook %s: %#v, %s", hook.Name, hook, hook.Sig()) + } + cfg.Hooks = make([]node.Hook, 0, len(hooks)) + for _, h := range hooks { + cfg.Hooks = append(cfg.Hooks, h) + } + } + + for _, e := range nodeObj.Schedules() { + cfg.Schedules = append(cfg.Schedules, e.Config) + } + + return cfg +} diff --git a/daemon/nmon/main_cmd.go b/daemon/nmon/main_cmd.go index 46c70a80c..a31545a7f 100644 --- a/daemon/nmon/main_cmd.go +++ b/daemon/nmon/main_cmd.go @@ -4,19 +4,16 @@ import ( "errors" "fmt" "os" - "runtime" "slices" "strings" "time" "github.com/opensvc/om3/v3/core/clusternode" "github.com/opensvc/om3/v3/core/node" - "github.com/opensvc/om3/v3/core/object" "github.com/opensvc/om3/v3/core/rawconfig" "github.com/opensvc/om3/v3/daemon/msgbus" "github.com/opensvc/om3/v3/util/errcontext" "github.com/opensvc/om3/v3/util/file" - "github.com/opensvc/om3/v3/util/key" "github.com/opensvc/om3/v3/util/toc" ) @@ -67,56 +64,6 @@ func (t *Manager) onConfigFileUpdated(_ *msgbus.ConfigFileUpdated) { t.checkRejoinTicker() } -func (t *Manager) getNodeConfig() node.Config { - var ( - keyMaintenanceGracePeriod = key.New("node", "maintenance_grace_period") - keyMaxParallel = key.New("node", "max_parallel") - keyMaxKeySize = key.New("node", "max_key_size") - keyReadyPeriod = key.New("node", "ready_period") - keyRejoinGracePeriod = key.New("node", "rejoin_grace_period") - keyEnv = key.New("node", "env") - keySplitAction = key.New("node", "split_action") - keySSHKey = key.New("node", "sshkey") - keyPRKey = key.New("node", "prkey") - keyMinAvailMemPct = key.New("node", "min_avail_mem_pct") - keyMinAvailSwapPct = key.New("node", "min_avail_swap_pct") - ) - cfg := node.Config{} - if d := t.config.GetDuration(keyMaintenanceGracePeriod); d != nil { - cfg.MaintenanceGracePeriod = *d - } - if d := t.config.GetDuration(keyReadyPeriod); d != nil { - cfg.ReadyPeriod = *d - } - if d := t.config.GetDuration(keyRejoinGracePeriod); d != nil { - cfg.RejoinGracePeriod = *d - } - if d := t.config.GetSize(keyMaxKeySize); d != nil { - cfg.MaxKeySize = *d - } - cfg.MinAvailMemPct = t.config.GetInt(keyMinAvailMemPct) - cfg.MinAvailSwapPct = t.config.GetInt(keyMinAvailSwapPct) - cfg.MaxParallel = t.config.GetInt(keyMaxParallel) - cfg.Env = t.config.GetString(keyEnv) - cfg.SplitAction = t.config.GetString(keySplitAction) - cfg.SSHKey = t.config.GetString(keySSHKey) - cfg.PRKey = t.config.GetString(keyPRKey) - - if cfg.MaxParallel == 0 { - cfg.MaxParallel = runtime.NumCPU() - } - if cfg.MaxParallel < MinMaxParallel { - cfg.MaxParallel = MinMaxParallel - } - - node, _ := object.NewNode(object.WithVolatile(true)) - for _, e := range node.Schedules() { - cfg.Schedules = append(cfg.Schedules, e.Config) - } - - return cfg -} - func (t *Manager) checkRejoinTicker() { if t.state.State != node.MonitorStateRejoin { return