diff --git a/lib/builds/manager.go b/lib/builds/manager.go index 416ffc3a..9d71734c 100644 --- a/lib/builds/manager.go +++ b/lib/builds/manager.go @@ -948,42 +948,20 @@ func (m *manager) updateBuildComplete(id string, status string, digest *string, m.notifyStatusChange(id, status) } -// waitForImageReady polls the image manager until the build's image is ready. +// waitForImageReady blocks until the build's image reaches a terminal state. // imageRef should be the short repo name (e.g., "builds/abc123" or "myapp") // matching what triggerConversion stores in the image manager. // This ensures that when a build reports "ready", the image is actually usable // for instance creation (fixes KERNEL-863 race condition). func (m *manager) waitForImageReady(ctx context.Context, imageRef string) error { - // Poll for up to 60 seconds (image conversion is typically fast) - const maxAttempts = 120 - const pollInterval = 500 * time.Millisecond - m.logger.Debug("waiting for image to be ready", "image_ref", imageRef) - for attempt := 0; attempt < maxAttempts; attempt++ { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - img, err := m.imageManager.GetImage(ctx, imageRef) - if err == nil { - switch img.Status { - case images.StatusReady: - m.logger.Debug("image is ready", "image_ref", imageRef, "attempts", attempt+1) - return nil - case images.StatusFailed: - return fmt.Errorf("image conversion failed") - case images.StatusPending, images.StatusPulling, images.StatusConverting: - // Still processing, continue polling - } - } - // Image not found or still processing, wait and retry - time.Sleep(pollInterval) + if err := m.imageManager.WaitForReady(ctx, imageRef); err != nil { + return err } - return fmt.Errorf("timeout waiting for image to be ready after %v", time.Duration(maxAttempts)*pollInterval) + m.logger.Debug("image is ready", "image_ref", imageRef) + return nil } // subscribeToStatus adds a subscriber channel for status updates on a build diff --git a/lib/builds/manager_test.go b/lib/builds/manager_test.go index af5b4686..9ea0c7c2 100644 --- a/lib/builds/manager_test.go +++ b/lib/builds/manager_test.go @@ -3,10 +3,12 @@ package builds import ( "context" "encoding/json" + "fmt" "io" "log/slog" "os" "path/filepath" + "sync" "testing" "time" @@ -237,6 +239,7 @@ func (m *mockSecretProvider) GetSecrets(ctx context.Context, secretIDs []string) // mockImageManager implements images.Manager for testing type mockImageManager struct { + mu sync.RWMutex images map[string]*images.Image getImageErr error } @@ -274,11 +277,15 @@ func (m *mockImageManager) ImportLocalImage(ctx context.Context, repo, reference } func (m *mockImageManager) GetImage(ctx context.Context, name string) (*images.Image, error) { + m.mu.RLock() + defer m.mu.RUnlock() if m.getImageErr != nil { return nil, m.getImageErr } if img, ok := m.images[name]; ok { - return img, nil + // Return a copy to avoid races on the Status field + imgCopy := *img + return &imgCopy, nil } return nil, images.ErrNotFound } @@ -298,14 +305,49 @@ func (m *mockImageManager) TotalOCICacheBytes(ctx context.Context) (int64, error return 0, nil } +func (m *mockImageManager) WaitForReady(ctx context.Context, name string) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + m.mu.RLock() + img, ok := m.images[name] + var status string + if ok { + status = img.Status + } + m.mu.RUnlock() + switch status { + case images.StatusReady: + return nil + case images.StatusFailed: + return fmt.Errorf("image conversion failed") + } + time.Sleep(50 * time.Millisecond) + } +} + // SetImageReady sets an image to ready status for testing func (m *mockImageManager) SetImageReady(name string) { + m.mu.Lock() + defer m.mu.Unlock() m.images[name] = &images.Image{ Name: name, Status: images.StatusReady, } } +// SetImageStatus sets an image's status in a thread-safe way for testing +func (m *mockImageManager) SetImageStatus(name, status string) { + m.mu.Lock() + defer m.mu.Unlock() + if img, ok := m.images[name]; ok { + img.Status = status + } +} + // Test helper to create a manager with test paths and mocks func setupTestManager(t *testing.T) (*manager, *mockInstanceManager, *mockVolumeManager, string) { mgr, instanceMgr, volumeMgr, _, tempDir := setupTestManagerWithImageMgr(t) diff --git a/lib/builds/race_test.go b/lib/builds/race_test.go index 748a9699..d8103b81 100644 --- a/lib/builds/race_test.go +++ b/lib/builds/race_test.go @@ -97,17 +97,19 @@ func TestWaitForImageReady_WaitsForConversion(t *testing.T) { imageRef := "builds/" + buildID // Start with image in pending status + imageMgr.mu.Lock() imageMgr.images[imageRef] = &images.Image{ Name: imageRef, Status: images.StatusPending, } + imageMgr.mu.Unlock() // Simulate conversion completing after a short delay go func() { time.Sleep(100 * time.Millisecond) - imageMgr.images[imageRef].Status = images.StatusConverting + imageMgr.SetImageStatus(imageRef, images.StatusConverting) time.Sleep(100 * time.Millisecond) - imageMgr.images[imageRef].Status = images.StatusReady + imageMgr.SetImageStatus(imageRef, images.StatusReady) }() // waitForImageReady should poll and eventually succeed @@ -131,10 +133,12 @@ func TestWaitForImageReady_ContextCancelled(t *testing.T) { imageRef := "builds/" + buildID // Image stays in pending status forever + imageMgr.mu.Lock() imageMgr.images[imageRef] = &images.Image{ Name: imageRef, Status: images.StatusPending, } + imageMgr.mu.Unlock() // waitForImageReady should return context error err := mgr.waitForImageReady(ctx, imageRef) @@ -152,10 +156,12 @@ func TestWaitForImageReady_Failed(t *testing.T) { imageRef := "builds/" + buildID // Image is in failed status + imageMgr.mu.Lock() imageMgr.images[imageRef] = &images.Image{ Name: imageRef, Status: images.StatusFailed, } + imageMgr.mu.Unlock() // waitForImageReady should return error immediately err := mgr.waitForImageReady(ctx, imageRef) diff --git a/lib/images/disk.go b/lib/images/disk.go index c7be831a..438497ab 100644 --- a/lib/images/disk.go +++ b/lib/images/disk.go @@ -5,6 +5,7 @@ import ( "os" "os/exec" "path/filepath" + "runtime" "github.com/u-root/u-root/pkg/cpio" ) @@ -13,13 +14,20 @@ import ( type ExportFormat string const ( - FormatExt4 ExportFormat = "ext4" // Read-only ext4 (app images, default) - FormatErofs ExportFormat = "erofs" // Read-only compressed (future: when kernel supports it) + FormatExt4 ExportFormat = "ext4" // Read-only ext4 (legacy, used on Darwin) + FormatErofs ExportFormat = "erofs" // Read-only compressed with LZ4 (default on Linux) FormatCpio ExportFormat = "cpio" // Uncompressed archive (initrd, fast boot) ) -// DefaultImageFormat is the default export format for OCI images -const DefaultImageFormat = FormatExt4 +// DefaultImageFormat is the default export format for OCI images. +// On Linux, we use erofs (compressed, read-only) for smaller images. +// On Darwin, we use ext4 because the VZ kernel doesn't have erofs support. +var DefaultImageFormat = func() ExportFormat { + if runtime.GOOS == "darwin" { + return FormatExt4 + } + return FormatErofs +}() // ExportRootfs exports rootfs directory in specified format (public for system manager) func ExportRootfs(rootfsDir, outputPath string, format ExportFormat) (int64, error) { diff --git a/lib/images/manager.go b/lib/images/manager.go index 1ccfadb0..f5e5468d 100644 --- a/lib/images/manager.go +++ b/lib/images/manager.go @@ -23,6 +23,12 @@ const ( StatusFailed = "failed" ) +// StatusEvent represents a terminal status change for image readiness notifications. +type StatusEvent struct { + Status string + Err error +} + type Manager interface { ListImages(ctx context.Context) ([]Image, error) CreateImage(ctx context.Context, req CreateImageRequest) (*Image, error) @@ -38,14 +44,19 @@ type Manager interface { // TotalOCICacheBytes returns the total size of the OCI layer cache. // Used by the resource manager for disk capacity tracking. TotalOCICacheBytes(ctx context.Context) (int64, error) + // WaitForReady blocks until the image identified by name reaches a terminal + // state (ready or failed) or the context is cancelled. + WaitForReady(ctx context.Context, name string) error } type manager struct { - paths *paths.Paths - ociClient *ociClient - queue *BuildQueue - createMu sync.Mutex - metrics *Metrics + paths *paths.Paths + ociClient *ociClient + queue *BuildQueue + createMu sync.Mutex + metrics *Metrics + readySubscribers map[string][]chan StatusEvent // keyed by digestHex + subscriberMu sync.RWMutex } // NewManager creates a new image manager. @@ -59,9 +70,10 @@ func NewManager(p *paths.Paths, maxConcurrentBuilds int, meter metric.Meter) (Ma } m := &manager{ - paths: p, - ociClient: ociClient, - queue: NewBuildQueue(maxConcurrentBuilds), + paths: p, + ociClient: ociClient, + queue: NewBuildQueue(maxConcurrentBuilds), + readySubscribers: make(map[string][]chan StatusEvent), } // Initialize metrics if meter is provided @@ -254,7 +266,7 @@ func (m *manager) buildImage(ctx context.Context, ref *ResolvedRef) { m.updateStatusByDigest(ref, StatusConverting, nil) diskPath := digestPath(m.paths, ref.Repository(), ref.DigestHex()) - // Use default image format (ext4 for now, easy to switch to erofs later) + // Use default image format (erofs on Linux, ext4 on Darwin) diskSize, err := ExportRootfs(tempDir, diskPath, DefaultImageFormat) if err != nil { m.updateStatusByDigest(ref, StatusFailed, fmt.Errorf("convert to %s: %w", DefaultImageFormat, err)) @@ -286,6 +298,9 @@ func (m *manager) buildImage(ctx context.Context, ref *ResolvedRef) { return } + // Notify subscribers that image is ready + m.notifyReady(ref.DigestHex(), StatusReady, nil) + // Only create/update tag symlink on successful completion if ref.Tag() != "" { if err := createTagSymlink(m.paths, ref.Repository(), ref.Tag(), ref.DigestHex()); err != nil { @@ -317,6 +332,11 @@ func (m *manager) updateStatusByDigest(ref *ResolvedRef, status string, err erro } writeMetadata(m.paths, ref.Repository(), ref.DigestHex(), meta) + + // Notify subscribers of terminal status + if status == StatusReady || status == StatusFailed { + m.notifyReady(ref.DigestHex(), status, err) + } } func (m *manager) RecoverInterruptedBuilds() { @@ -476,3 +496,112 @@ func (m *manager) TotalOCICacheBytes(ctx context.Context) (int64, error) { } return total, nil } + +// WaitForReady blocks until the image reaches a terminal state (ready or failed) +// or the context is cancelled. +// +// The image may not exist yet when this is called (e.g., the registry's +// triggerConversion goroutine hasn't called ImportLocalImage yet), so we +// poll briefly for the image to appear before subscribing for notifications. +func (m *manager) WaitForReady(ctx context.Context, name string) error { + // Wait for the image to appear in the store. In the build flow, the + // registry triggers ImportLocalImage asynchronously after a push, so the + // image may not exist when the build manager calls WaitForReady. + const maxWaitForExist = 30 * time.Second + const pollInterval = 100 * time.Millisecond + + var img *Image + deadline := time.Now().Add(maxWaitForExist) + for { + got, err := m.GetImage(ctx, name) + if err == nil { + img = got + break + } + if time.Now().After(deadline) { + return fmt.Errorf("get image: %w", err) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(pollInterval): + } + } + + // Check if already in terminal state + switch img.Status { + case StatusReady: + return nil + case StatusFailed: + return fmt.Errorf("image conversion failed") + } + + digestHex := strings.TrimPrefix(img.Digest, "sha256:") + + // Subscribe BEFORE re-checking to avoid TOCTOU race + ch := make(chan StatusEvent, 1) + m.subscribeToReady(digestHex, ch) + defer m.unsubscribeFromReady(digestHex, ch) + + // Re-check after subscribing to close the race window + img, err := m.GetImage(ctx, name) + if err == nil { + switch img.Status { + case StatusReady: + return nil + case StatusFailed: + return fmt.Errorf("image conversion failed") + } + } + + // Wait for notification or context cancellation + select { + case event := <-ch: + if event.Status == StatusReady { + return nil + } + return fmt.Errorf("image conversion failed") + case <-ctx.Done(): + return ctx.Err() + } +} + +// subscribeToReady registers a channel for terminal status notifications on a digest. +func (m *manager) subscribeToReady(digestHex string, ch chan StatusEvent) { + m.subscriberMu.Lock() + defer m.subscriberMu.Unlock() + m.readySubscribers[digestHex] = append(m.readySubscribers[digestHex], ch) +} + +// unsubscribeFromReady removes a subscriber channel. +func (m *manager) unsubscribeFromReady(digestHex string, ch chan StatusEvent) { + m.subscriberMu.Lock() + defer m.subscriberMu.Unlock() + + subscribers := m.readySubscribers[digestHex] + for i, sub := range subscribers { + if sub == ch { + m.readySubscribers[digestHex] = append(subscribers[:i], subscribers[i+1:]...) + break + } + } + + if len(m.readySubscribers[digestHex]) == 0 { + delete(m.readySubscribers, digestHex) + } +} + +// notifyReady broadcasts a terminal status event to all subscribers for a digest. +func (m *manager) notifyReady(digestHex string, status string, err error) { + m.subscriberMu.RLock() + defer m.subscriberMu.RUnlock() + + event := StatusEvent{Status: status, Err: err} + for _, ch := range m.readySubscribers[digestHex] { + // Non-blocking send — drop if channel is full + select { + case ch <- event: + default: + } + } +} diff --git a/lib/images/manager_test.go b/lib/images/manager_test.go index 65445676..b3b74efd 100644 --- a/lib/images/manager_test.go +++ b/lib/images/manager_test.go @@ -44,14 +44,14 @@ func TestCreateImage(t *testing.T) { require.NoError(t, err) digestHex := strings.SplitN(img.Digest, ":", 2)[1] - // Check erofs disk file + // Check rootfs disk file (erofs on Linux, ext4 on Darwin) diskPath := digestPath(paths.New(dataDir), ref.Repository(), digestHex) diskStat, err := os.Stat(diskPath) require.NoError(t, err) require.False(t, diskStat.IsDir(), "disk path should be a file") - require.Greater(t, diskStat.Size(), int64(1000000), "erofs disk should be at least 1MB") + require.Greater(t, diskStat.Size(), int64(1000000), "rootfs disk should be at least 1MB") require.Equal(t, diskStat.Size(), *img.SizeBytes, "disk size should match metadata") - t.Logf("EROFS disk: path=%s, size=%d bytes", diskPath, diskStat.Size()) + t.Logf("Rootfs disk (%s): path=%s, size=%d bytes", DefaultImageFormat, diskPath, diskStat.Size()) // Check metadata file metadataPath := metadataPath(paths.New(dataDir), ref.Repository(), digestHex) diff --git a/lib/images/storage.go b/lib/images/storage.go index 4f4c10c5..eaeeb2a6 100644 --- a/lib/images/storage.go +++ b/lib/images/storage.go @@ -62,12 +62,12 @@ func digestDir(p *paths.Paths, repository, digestHex string) string { } // digestPath returns the path to the rootfs disk file for a digest -// Currently uses .ext4 extension (can change to .erofs when kernel supports it) +// Uses .erofs on Linux (compressed) or .ext4 on Darwin (VZ kernel lacks erofs support) func digestPath(p *paths.Paths, repository, digestHex string) string { return p.ImageDigestPath(repository, digestHex) } -// GetDiskPath returns the filesystem path to an image's rootfs.erofs file (public for instances manager) +// GetDiskPath returns the filesystem path to an image's rootfs disk file (public for instances manager) func GetDiskPath(p *paths.Paths, imageName string, digest string) (string, error) { // Parse image name to get repository ref, err := ParseNormalizedRef(imageName) diff --git a/lib/paths/paths.go b/lib/paths/paths.go index 06aaf3e9..eb84102b 100644 --- a/lib/paths/paths.go +++ b/lib/paths/paths.go @@ -1,7 +1,10 @@ // Package paths provides centralized path construction for hypeman data directory. package paths -import "path/filepath" +import ( + "path/filepath" + "runtime" +) // Paths provides typed path construction for the hypeman data directory. type Paths struct { @@ -88,8 +91,13 @@ func (p *Paths) ImageDigestDir(repository, digestHex string) string { } // ImageDigestPath returns the path to the rootfs disk file for a digest. +// Uses .erofs on Linux (compressed) and .ext4 on Darwin (VZ kernel lacks erofs support). func (p *Paths) ImageDigestPath(repository, digestHex string) string { - return filepath.Join(p.ImageDigestDir(repository, digestHex), "rootfs.ext4") + ext := "erofs" + if runtime.GOOS == "darwin" { + ext = "ext4" + } + return filepath.Join(p.ImageDigestDir(repository, digestHex), "rootfs."+ext) } // ImageMetadata returns the path to metadata.json for a digest. diff --git a/lib/system/init/mount.go b/lib/system/init/mount.go index faf65d4d..f39462bc 100644 --- a/lib/system/init/mount.go +++ b/lib/system/init/mount.go @@ -85,7 +85,7 @@ func waitForDevice(device string, timeout time.Duration) error { } // setupOverlay sets up the overlay filesystem: -// - /dev/vda: readonly rootfs (ext4) +// - /dev/vda: readonly rootfs (erofs or ext4) // - /dev/vdb: writable overlay disk (ext4) // - /overlay/newroot: merged overlay filesystem func setupOverlay(log *Logger) error { @@ -105,11 +105,17 @@ func setupOverlay(log *Logger) error { } } - // Mount readonly rootfs from /dev/vda (ext4 filesystem) - if err := mount("/dev/vda", "/lower", "ext4", "ro"); err != nil { - return fmt.Errorf("mount rootfs: %w", err) + // Mount readonly rootfs from /dev/vda + // Try erofs first (default on Linux), fall back to ext4 (Darwin or legacy images) + if err := mount("/dev/vda", "/lower", "erofs", "ro"); err != nil { + log.Info("hypeman-init:overlay", "erofs mount failed, trying ext4: "+err.Error()) + if err := mount("/dev/vda", "/lower", "ext4", "ro"); err != nil { + return fmt.Errorf("mount rootfs: %w", err) + } + log.Info("hypeman-init:overlay", "mounted rootfs from /dev/vda (ext4)") + } else { + log.Info("hypeman-init:overlay", "mounted rootfs from /dev/vda (erofs)") } - log.Info("hypeman-init:overlay", "mounted rootfs from /dev/vda") // Mount writable overlay disk from /dev/vdb if err := mount("/dev/vdb", "/overlay", "ext4", ""); err != nil {