Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cdb/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package cdb

const (
TextMax = 65535
)
18 changes: 14 additions & 4 deletions cdb/db_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,15 @@ func (oDb *DB) GetUnfinishedActions(ctx context.Context) (lines []SvcAction, err

}

func (oDb *DB) InsertSvcAction(ctx context.Context, svcID, nodeID uuid.UUID, action string, begin time.Time, status_log string, sid string, cron bool, end time.Time, status string) (int64, error) {
func (oDb *DB) InsertSvcAction(ctx context.Context, svcID, nodeID uuid.UUID, action string, begin time.Time, statusLog string, sid string, cron bool, end time.Time, status string) (int64, error) {
query := "INSERT INTO svcactions (svc_id, node_id, action, begin, status_log, sid, cron"
placeholders := "?, ?, ?, ?, ?, ?, ?"
args := []any{svcID, nodeID, action, begin, status_log, sid, cron}
if len(statusLog) > TextMax {
// Fix end svc action failed: Error 1406 (22001): Data too long for column 'status_log' at row 1
// TODO: add metrics svcactions status_log truncated with len / TextMax
statusLog = statusLog[:TextMax]
}
args := []any{svcID, nodeID, action, begin, statusLog, sid, cron}

if !end.IsZero() {
query += ", end"
Expand All @@ -219,7 +224,7 @@ func (oDb *DB) InsertSvcAction(ctx context.Context, svcID, nodeID uuid.UUID, act

result, err := oDb.DB.ExecContext(ctx, query, args...)
if err != nil {
return 0, err
return 0, fmt.Errorf("insert svcactions failed, status_log length %d: %w", len(statusLog), err)
} else if result == nil {
return 0, errors.New("insert svcactions returns unexpected nil result")
}
Expand All @@ -240,9 +245,14 @@ func (oDb *DB) InsertSvcAction(ctx context.Context, svcID, nodeID uuid.UUID, act

func (oDb *DB) UpdateSvcAction(ctx context.Context, svcActionID int64, end time.Time, status, statusLog string) error {
const query = `UPDATE svcactions SET end = ?, status = ?, time = TIMESTAMPDIFF(SECOND, begin, ?), status_log = ? WHERE id = ?`
if len(statusLog) > TextMax {
// Fix end svc action failed: Error 1406 (22001): Data too long for column 'status_log' at row 1
// TODO: add metrics svcactions status_log truncated with len / TextMax
statusLog = statusLog[:TextMax]
}
result, err := oDb.DB.ExecContext(ctx, query, end, status, end, statusLog, svcActionID)
if err != nil {
return err
return fmt.Errorf("update svcactions failed status_log length %d: %w", len(statusLog), err)
} else if result == nil {
return errors.New("update svcactions returns unexpected nil result")
}
Expand Down
8 changes: 7 additions & 1 deletion cdb/db_instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,9 +563,15 @@ func (oDb *DB) InstanceResourceInfoUpdate(ctx context.Context, svcID, nodeID str
}
}()

var value string
for _, info := range data.Info {
for _, key := range info.Keys {
if result, err := stmt.ExecContext(ctx, svcID, nodeID, info.Rid, key.Key, topology, key.Value); err != nil {
if len(key.Value) > 255 {
value = key.Value[:255]
} else {
value = key.Value
}
if result, err := stmt.ExecContext(ctx, svcID, nodeID, info.Rid, key.Key, topology, value); err != nil {
return fmt.Errorf("db exec: %w", err)
} else if result != nil {
if count, err := result.RowsAffected(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion worker/job_feed_instance_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (d *jobFeedInstanceAction) updateDB(ctx context.Context) error {
} else {
// begin already processed, update record with end info
if err := d.oDb.UpdateSvcAction(ctx, actionID, endTime, d.data.Status, statusLog); err != nil {
return fmt.Errorf("end svc action failed: %w", err)
return fmt.Errorf("end svc action failed, initial statusLog length %d: %w", len(statusLog), err)
}
}

Expand Down
73 changes: 50 additions & 23 deletions worker/job_feed_node_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,29 +99,48 @@ func (d *jobFeedNodeDisk) getData(ctx context.Context) error {
// ]
// }
//
// CREATE TABLE `svcdisks` (
// `id` int(11) NOT NULL AUTO_INCREMENT,
// `disk_id` varchar(120) CHARACTER SET latin1 COLLATE latin1_swedish_ci DEFAULT NULL,
// `disk_size` int(11) NOT NULL DEFAULT 0,
// `disk_vendor` varchar(60) DEFAULT NULL,
// `disk_model` varchar(60) DEFAULT NULL,
// `disk_dg` varchar(60) DEFAULT '',
// `disk_devid` varchar(60) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
// `disk_arrayid` varchar(60) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
// `disk_updated` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp(),
// `disk_local` varchar(1) DEFAULT 'T',
// `disk_used` int(11) NOT NULL DEFAULT 0,
// `disk_region` varchar(32) DEFAULT '0',
// `node_id` char(36) CHARACTER SET ascii COLLATE ascii_general_ci DEFAULT '',
// `app_id` int(11) DEFAULT NULL,
// `svc_id` char(36) CHARACTER SET ascii COLLATE ascii_general_ci DEFAULT '',
// PRIMARY KEY (`id`),
// UNIQUE KEY `uk_svcdisks_1` (`disk_id`,`svc_id`,`node_id`,`disk_dg`),
// KEY `idx1` (`disk_id`,`node_id`,`disk_dg`),
// KEY `k_node_id` (`node_id`),
// KEY `k_svc_id` (`svc_id`),
// KEY `k_svcdisks_1` (`svc_id`,`node_id`)
// ) ENGINE=InnoDB AUTO_INCREMENT=4641237 DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci
// CREATE TABLE `svcdisks` (
//
// `id` int(11) NOT NULL AUTO_INCREMENT,
// `disk_id` varchar(120) DEFAULT NULL,
// `disk_size` int(11) NOT NULL DEFAULT 0,
// `disk_vendor` varchar(60) DEFAULT NULL,
// `disk_model` varchar(60) DEFAULT NULL,
// `disk_dg` varchar(60) DEFAULT NULL,
// `disk_updated` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp(),
// `disk_local` varchar(1) DEFAULT 'T',
// `disk_used` int(11) NOT NULL DEFAULT 0,
// `disk_region` varchar(32) DEFAULT '0',
// `app_id` int(11) DEFAULT NULL,
// `node_id` char(36) CHARACTER SET ascii COLLATE ascii_general_ci DEFAULT '',
// `svc_id` char(36) CHARACTER SET ascii COLLATE ascii_general_ci DEFAULT '',
// PRIMARY KEY (`id`),
// UNIQUE KEY `uk_svcdisks_1` (`disk_id`,`svc_id`,`node_id`,`disk_dg`),
// KEY `k_node_id` (`node_id`),
// KEY `k_svc_id` (`svc_id`),
// KEY `k_svcdisks_1` (`svc_id`,`node_id`)
//
// ) ENGINE=InnoDB AUTO_INCREMENT=47411 DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci COMMENT='disks used by services' |
//
// CREATE TABLE `diskinfo` (
//
// `id` int(11) NOT NULL AUTO_INCREMENT,
// `disk_id` varchar(120) DEFAULT NULL,
// `disk_devid` varchar(60) DEFAULT '',
// `disk_arrayid` varchar(300) DEFAULT NULL,
// `disk_updated` datetime DEFAULT NULL,
// `disk_raid` varchar(128) DEFAULT NULL,
// `disk_size` int(11) DEFAULT NULL,
// `disk_group` varchar(60) DEFAULT '',
// `disk_level` int(11) NOT NULL DEFAULT 0,
// `disk_controller` varchar(32) DEFAULT NULL,
// `disk_name` varchar(120) DEFAULT '',
// `disk_alloc` int(11) DEFAULT NULL,
// `disk_created` timestamp NOT NULL DEFAULT current_timestamp(),
// PRIMARY KEY (`id`),
// UNIQUE KEY `new_index` (`disk_id`,`disk_group`)
//
// ) ENGINE=InnoDB AUTO_INCREMENT=5024190 DEFAULT CHARSET=latin1 COLLATE=latin1_swedish_ci |
func (d *jobFeedNodeDisk) updateDB(ctx context.Context) error {
var (
// pathToObjectID is a map of an object path to object ID, to cache db results
Expand Down Expand Up @@ -196,7 +215,15 @@ func (d *jobFeedNodeDisk) updateDB(ctx context.Context) error {
}
if strings.HasPrefix(diskID, d.nodeID+".") && len(diskL) == 0 {
line["local"] = "T"
// disk_devid length is 60 we must reduce its size
// example: diskID value:
// initial <nodename>.disk!by-id!lvm-pv-uuid-yiIrKF-vEm1-I3fK-hEMj-AnNT-0ztt-JM8rbf
// -> <nodeID>.disk!by-id!lvm-pv-uuid-yiIrKF-vEm1-I3fK-hEMj-AnNT-0ztt-JM8rbf
// -> final value: by-id!lvm-pv-uuid-yiIrKF-vEm1-I3fK-hEMj-AnNT-0ztt-JM8rbf
devID := strings.TrimPrefix(diskID, d.nodeID+".")
if strings.HasPrefix(devID, "disk!by-id!") {
devID = strings.TrimPrefix(devID, "disk!")
}
if changed, err := d.oDb.UpdateDiskinfoArrayAndDevIDsAndSize(ctx, diskID, nodeID, devID, int32(line["size"].(float64))); err != nil {
return fmt.Errorf("updateDiskinfoArrayAndDevIDsAndSize: %w", err)
} else if changed {
Expand Down
3 changes: 2 additions & 1 deletion worker/job_feed_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func (d *jobFeedSystem) pkg(ctx context.Context) error {
mariadb.Mapping{To: "pkg_updated"},
mariadb.Mapping{To: "pkg_name", From: "name"},
mariadb.Mapping{To: "pkg_version", From: "version"},
mariadb.Mapping{To: "pkg_arch", From: "arch"},
// TODO: check for increase pkg_arch verify `pkg_arch` varchar(8) NOT NULL,
mariadb.Mapping{To: "pkg_arch", From: "arch", Modify: mariadb.ModifierMaxLen(8)},
mariadb.Mapping{To: "pkg_type", From: "type"},
mariadb.Mapping{To: "pkg_sig", From: "sig"},
mariadb.Mapping{To: "pkg_install_date", From: "installed_at", Modify: mariadb.ModifyFromRFC3339},
Expand Down
Loading