Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdb/db_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (oDb *DB) PurgeNodeHBAsOutdated(ctx context.Context) error {
if count, err := oDb.execCountContext(ctx, request); err != nil {
return err
} else if count > 0 {
slog.Info(fmt.Sprintf("purged %d entries from table node_hba", count))
slog.Debug(fmt.Sprintf("purged %d entries from table node_hba", count))
oDb.SetChange("node_hba")
}
return nil
Expand Down
4 changes: 4 additions & 0 deletions cmd/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func setDefaultRunnerConfig() {
viper.SetDefault(s+".pprof.ux.socket", "/var/run/oc3_runner_pprof.sock")
viper.SetDefault(s+".metrics.enable", false)
viper.SetDefault(s+".log.request.level", "none")
viper.SetDefault(s+".nb_workers", 0)
viper.SetDefault(s+".purge_timeout", 0)
viper.SetDefault(s+".notification_timeout", 0)
viper.SetDefault(s+".command_timeout", 0)
}

func setDefaultDBConfig() {
Expand Down
63 changes: 56 additions & 7 deletions messenger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"sync"

"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type CmdComet struct {
Expand Down Expand Up @@ -40,12 +42,46 @@ var (
type (
Client struct {
conn *websocket.Conn
mu sync.Mutex
group string
token string
name string
}
)

var (
connectionTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "oc3",
Subsystem: "messenger",
Name: "connections_total",
Help: "Total number of connections",
},
[]string{"group"})
disconnectionTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "oc3",
Subsystem: "messenger",
Name: "disconnections_total",
Help: "Total number of disconnections",
},
[]string{"group"})
sendMessageTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "oc3",
Subsystem: "messenger",
Name: "send_message_total",
Help: "Total number of sent messages",
}, []string{"group"})
receiveMessageTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "oc3",
Subsystem: "messenger",
Name: "receive_message_total",
Help: "Total number of received messages",
}, []string{"group"})
)

func postHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
Expand All @@ -69,6 +105,7 @@ func postHandler(w http.ResponseWriter, r *http.Request) {
}

slog.Debug(fmt.Sprintf("MESSAGE to %s:%s", group, message))
sendMessageTotal.WithLabelValues(group).Inc()

if hmacKey != "" {
signature := r.FormValue("signature")
Expand All @@ -87,8 +124,11 @@ func postHandler(w http.ResponseWriter, r *http.Request) {
mu.RUnlock()

for _, client := range clients {
if err := client.conn.WriteMessage(websocket.TextMessage, []byte(message)); err != nil {
err := client.WriteMessage(websocket.TextMessage, []byte(message))
if err != nil {
slog.Warn(fmt.Sprintf("Error writing to client: %v", err))
} else {
receiveMessageTotal.WithLabelValues(group).Inc()
}
}

Expand Down Expand Up @@ -159,6 +199,7 @@ func distributeHandler(w http.ResponseWriter, r *http.Request) {
group: group,
token: token,
name: name,
mu: sync.Mutex{},
}

if useTokens {
Expand All @@ -182,7 +223,7 @@ func distributeHandler(w http.ResponseWriter, r *http.Request) {
}

for _, existingClient := range listeners[group] {
if err := existingClient.conn.WriteMessage(websocket.TextMessage, []byte("+"+name)); err != nil {
if err := existingClient.WriteMessage(websocket.TextMessage, []byte("+"+name)); err != nil {
slog.Warn(fmt.Sprintf("Error notifying client: %v", err))
}
}
Expand All @@ -193,7 +234,8 @@ func distributeHandler(w http.ResponseWriter, r *http.Request) {

userAgent := r.Header.Get("User-Agent")

slog.Info(fmt.Sprintf("CONNECT %s to %s", userAgent, group))
slog.Debug(fmt.Sprintf("CONNECT %s to %s", userAgent, group))
connectionTotal.WithLabelValues(group).Inc()

defer func() {
mu.Lock()
Expand All @@ -209,9 +251,10 @@ func distributeHandler(w http.ResponseWriter, r *http.Request) {
mu.Unlock()

conn.Close()
slog.Info(fmt.Sprintf("DISCONNECT %s from %s", group, userAgent))
slog.Debug(fmt.Sprintf("DISCONNECT %s from %s", group, userAgent))
disconnectionTotal.WithLabelValues(group).Inc()
for _, existingClient := range listeners[group] {
if err := existingClient.conn.WriteMessage(websocket.TextMessage, []byte("-"+name)); err != nil {
if err := existingClient.WriteMessage(websocket.TextMessage, []byte("-"+name)); err != nil {
slog.Warn(fmt.Sprintf("Error notifying client: %v", err))
}
}
Expand All @@ -226,6 +269,12 @@ func distributeHandler(w http.ResponseWriter, r *http.Request) {
}
}

func (c *Client) WriteMessage(messageType int, data []byte) error {
c.mu.Lock()
defer c.mu.Unlock()
return c.conn.WriteMessage(messageType, data)
}

func (c *CmdComet) Run() error {
hmacKey = c.Key
useTokens = c.RequireToken
Expand All @@ -237,7 +286,7 @@ func (c *CmdComet) Run() error {
addr := fmt.Sprintf("%s:%s", c.Address, c.Port)

if c.KeyFile != "" && c.CertFile != "" {
slog.Info(fmt.Sprintf("Starting HTTPS server on %s", addr))
slog.Debug(fmt.Sprintf("Starting HTTPS server on %s", addr))

cert, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile)
if err != nil {
Expand All @@ -257,7 +306,7 @@ func (c *CmdComet) Run() error {
return err
}
} else {
slog.Info(fmt.Sprintf("Starting HTTP server on %s", addr))
slog.Debug(fmt.Sprintf("Starting HTTP server on %s", addr))
if err := http.ListenAndServe(addr, nil); err != nil {
slog.Error(fmt.Sprintf("Error starting HTTP server: %s", err))
return err
Expand Down
Loading
Loading