diff --git a/cmd/vmcp/app/commands.go b/cmd/vmcp/app/commands.go index c4ebc88845..d53f0b9b51 100644 --- a/cmd/vmcp/app/commands.go +++ b/cmd/vmcp/app/commands.go @@ -433,6 +433,19 @@ func runServe(cmd *cobra.Command, _ []string) error { Timeout: defaults.Timeout, DegradedThreshold: defaults.DegradedThreshold, } + + // Configure circuit breaker if enabled + if cfg.Operational.FailureHandling.CircuitBreaker != nil && cfg.Operational.FailureHandling.CircuitBreaker.Enabled { + healthMonitorConfig.CircuitBreaker = &health.CircuitBreakerConfig{ + Enabled: true, + FailureThreshold: cfg.Operational.FailureHandling.CircuitBreaker.FailureThreshold, + Timeout: time.Duration(cfg.Operational.FailureHandling.CircuitBreaker.Timeout), + } + logger.Infof("Circuit breaker enabled (failure threshold: %d, timeout: %v)", + cfg.Operational.FailureHandling.CircuitBreaker.FailureThreshold, + time.Duration(cfg.Operational.FailureHandling.CircuitBreaker.Timeout)) + } + logger.Info("Health monitoring configured from operational settings") } diff --git a/pkg/vmcp/aggregator/default_aggregator.go b/pkg/vmcp/aggregator/default_aggregator.go index ca51d207d8..9e018e5c10 100644 --- a/pkg/vmcp/aggregator/default_aggregator.go +++ b/pkg/vmcp/aggregator/default_aggregator.go @@ -302,6 +302,21 @@ func (a *defaultAggregator) MergeCapabilities( // Convert resolved tools to final vmcp.Tool format tools := make([]vmcp.Tool, 0, len(resolved.Tools)) for _, resolvedTool := range resolved.Tools { + // Look up full backend information from registry + backend := registry.Get(ctx, resolvedTool.BackendID) + if backend == nil { + logger.Warnf("Backend %s not found in registry for tool %s, skipping", + resolvedTool.BackendID, resolvedTool.ResolvedName) + continue + } + + // Filter out tools from unhealthy backends + if !backend.HealthStatus.IsHealthyForRouting() { + logger.Debugf("Skipping tool %s from unhealthy backend %s (status: %s)", + resolvedTool.ResolvedName, backend.Name, backend.HealthStatus) + continue + } + tools = append(tools, vmcp.Tool{ Name: resolvedTool.ResolvedName, Description: resolvedTool.Description, @@ -309,58 +324,61 @@ func (a *defaultAggregator) MergeCapabilities( BackendID: resolvedTool.BackendID, }) - // Look up full backend information from registry - backend := registry.Get(ctx, resolvedTool.BackendID) - if backend == nil { - logger.Warnf("Backend %s not found in registry for tool %s, creating minimal target", - resolvedTool.BackendID, resolvedTool.ResolvedName) - routingTable.Tools[resolvedTool.ResolvedName] = &vmcp.BackendTarget{ - WorkloadID: resolvedTool.BackendID, - OriginalCapabilityName: resolvedTool.OriginalName, - } - } else { - // Use the backendToTarget helper from registry package - target := vmcp.BackendToTarget(backend) - // Store the original tool name for forwarding to backend - target.OriginalCapabilityName = resolvedTool.OriginalName - routingTable.Tools[resolvedTool.ResolvedName] = target - } + // Use the backendToTarget helper from registry package + target := vmcp.BackendToTarget(backend) + // Store the original tool name for forwarding to backend + target.OriginalCapabilityName = resolvedTool.OriginalName + routingTable.Tools[resolvedTool.ResolvedName] = target } - // Add resources to routing table + // Add resources to routing table (with health filtering) + resources := make([]vmcp.Resource, 0, len(resolved.Resources)) for _, resource := range resolved.Resources { backend := registry.Get(ctx, resource.BackendID) if backend == nil { - logger.Warnf("Backend %s not found in registry for resource %s, creating minimal target", + logger.Warnf("Backend %s not found in registry for resource %s, skipping", resource.BackendID, resource.URI) - routingTable.Resources[resource.URI] = &vmcp.BackendTarget{ - WorkloadID: resource.BackendID, - OriginalCapabilityName: resource.URI, - } - } else { - target := vmcp.BackendToTarget(backend) - // Store the original resource URI for forwarding to backend - target.OriginalCapabilityName = resource.URI - routingTable.Resources[resource.URI] = target + continue } + + // Filter out resources from unhealthy backends + if !backend.HealthStatus.IsHealthyForRouting() { + logger.Debugf("Skipping resource %s from unhealthy backend %s (status: %s)", + resource.URI, backend.Name, backend.HealthStatus) + continue + } + + resources = append(resources, resource) + + target := vmcp.BackendToTarget(backend) + // Store the original resource URI for forwarding to backend + target.OriginalCapabilityName = resource.URI + routingTable.Resources[resource.URI] = target } - // Add prompts to routing table + // Add prompts to routing table (with health filtering) + prompts := make([]vmcp.Prompt, 0, len(resolved.Prompts)) for _, prompt := range resolved.Prompts { backend := registry.Get(ctx, prompt.BackendID) if backend == nil { - logger.Warnf("Backend %s not found in registry for prompt %s, creating minimal target", + logger.Warnf("Backend %s not found in registry for prompt %s, skipping", prompt.BackendID, prompt.Name) - routingTable.Prompts[prompt.Name] = &vmcp.BackendTarget{ - WorkloadID: prompt.BackendID, - OriginalCapabilityName: prompt.Name, - } - } else { - target := vmcp.BackendToTarget(backend) - // Store the original prompt name for forwarding to backend - target.OriginalCapabilityName = prompt.Name - routingTable.Prompts[prompt.Name] = target + continue + } + + // Filter out prompts from unhealthy backends + if !backend.HealthStatus.IsHealthyForRouting() { + logger.Debugf("Skipping prompt %s from unhealthy backend %s (status: %s)", + prompt.Name, backend.Name, backend.HealthStatus) + continue } + + prompts = append(prompts, prompt) + + target := vmcp.BackendToTarget(backend) + // Store the original prompt name for forwarding to backend + target.OriginalCapabilityName = prompt.Name + routingTable.Prompts[prompt.Name] = target } // Determine conflict strategy used @@ -376,16 +394,16 @@ func (a *defaultAggregator) MergeCapabilities( // Create final aggregated view aggregated := &AggregatedCapabilities{ Tools: tools, - Resources: resolved.Resources, - Prompts: resolved.Prompts, + Resources: resources, + Prompts: prompts, SupportsLogging: resolved.SupportsLogging, SupportsSampling: resolved.SupportsSampling, RoutingTable: routingTable, Metadata: &AggregationMetadata{ BackendCount: 0, // Will be set by caller ToolCount: len(tools), - ResourceCount: len(resolved.Resources), - PromptCount: len(resolved.Prompts), + ResourceCount: len(resources), + PromptCount: len(prompts), ConflictStrategy: conflictStrategy, }, } diff --git a/pkg/vmcp/health/circuit_breaker_integration_test.go b/pkg/vmcp/health/circuit_breaker_integration_test.go new file mode 100644 index 0000000000..81806624cb --- /dev/null +++ b/pkg/vmcp/health/circuit_breaker_integration_test.go @@ -0,0 +1,567 @@ +// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package health_test + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/stacklok/toolhive/pkg/vmcp" + "github.com/stacklok/toolhive/pkg/vmcp/health" +) + +func TestCircuitBreakerIntegration(t *testing.T) { + t.Parallel() + RegisterFailHandler(Fail) + RunSpecs(t, "Circuit Breaker Integration Suite") +} + +// flakyBackendClient simulates a backend that can fail intermittently. +// It implements vmcp.BackendClient interface for use with health.NewMonitor. +type flakyBackendClient struct { + mu sync.Mutex + + // Failure simulation + consecutiveFails int // Number of consecutive failures to return + failCount int // Current count of consecutive failures returned + + // Call tracking + checkCount atomic.Int64 // Total number of health checks performed + + // Behavior control + shouldFail atomic.Bool // Explicit control over failure state + responseDelay time.Duration // Simulate slow responses +} + +func newFlakyBackendClient() *flakyBackendClient { + return &flakyBackendClient{ + responseDelay: 10 * time.Millisecond, + } +} + +// ListCapabilities implements vmcp.BackendClient for health check purposes. +func (f *flakyBackendClient) ListCapabilities(ctx context.Context, _ *vmcp.BackendTarget) (*vmcp.CapabilityList, error) { + f.checkCount.Add(1) + + // Simulate response delay + if f.responseDelay > 0 { + select { + case <-time.After(f.responseDelay): + case <-ctx.Done(): + return nil, ctx.Err() + } + } + + f.mu.Lock() + defer f.mu.Unlock() + + // Check explicit failure control + if f.shouldFail.Load() { + return nil, fmt.Errorf("backend unavailable (explicit)") + } + + // Check consecutive failure pattern + if f.consecutiveFails > 0 && f.failCount < f.consecutiveFails { + f.failCount++ + return nil, fmt.Errorf("backend unavailable (%d/%d)", f.failCount, f.consecutiveFails) + } + + // Reset fail count after pattern completes + if f.failCount >= f.consecutiveFails { + f.failCount = 0 + } + + // Return successful response + return &vmcp.CapabilityList{ + Tools: []vmcp.Tool{}, + Resources: []vmcp.Resource{}, + Prompts: []vmcp.Prompt{}, + }, nil +} + +// CallTool implements vmcp.BackendClient (not used in health checks). +func (*flakyBackendClient) CallTool(_ context.Context, _ *vmcp.BackendTarget, _ string, _ map[string]any, _ map[string]any) (*vmcp.ToolCallResult, error) { + return nil, fmt.Errorf("not implemented") +} + +// ReadResource implements vmcp.BackendClient (not used in health checks). +func (*flakyBackendClient) ReadResource(_ context.Context, _ *vmcp.BackendTarget, _ string) (*vmcp.ResourceReadResult, error) { + return nil, fmt.Errorf("not implemented") +} + +// GetPrompt implements vmcp.BackendClient (not used in health checks). +func (*flakyBackendClient) GetPrompt(_ context.Context, _ *vmcp.BackendTarget, _ string, _ map[string]any) (*vmcp.PromptGetResult, error) { + return nil, fmt.Errorf("not implemented") +} + +func (f *flakyBackendClient) setConsecutiveFails(count int) { + f.mu.Lock() + defer f.mu.Unlock() + f.consecutiveFails = count + f.failCount = 0 +} + +func (f *flakyBackendClient) setExplicitFailure(shouldFail bool) { + f.shouldFail.Store(shouldFail) +} + +func (f *flakyBackendClient) getCheckCount() int64 { + return f.checkCount.Load() +} + +var _ = Describe("Circuit Breaker Integration Tests", func() { + var ( + ctx context.Context + cancel context.CancelFunc + monitor *health.Monitor + client *flakyBackendClient + backend vmcp.Backend + + checkInterval = 500 * time.Millisecond + failureThreshold = 3 + cbTimeout = 2 * time.Second + unhealthyThreshold = 3 + ) + + BeforeEach(func() { + ctx, cancel = context.WithCancel(context.Background()) + client = newFlakyBackendClient() + + backend = vmcp.Backend{ + ID: "test-backend-1", + Name: "test-backend", + BaseURL: "http://localhost:8080", + TransportType: "streamable-http", + } + + config := health.MonitorConfig{ + CheckInterval: checkInterval, + UnhealthyThreshold: unhealthyThreshold, + Timeout: 5 * time.Second, + DegradedThreshold: 2 * time.Second, + CircuitBreaker: &health.CircuitBreakerConfig{ + Enabled: true, + FailureThreshold: failureThreshold, + Timeout: cbTimeout, + }, + } + + var err error + monitor, err = health.NewMonitor(client, []vmcp.Backend{backend}, config) + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + if monitor != nil { + monitor.Stop() + } + cancel() + }) + + Context("Circuit Opens on Consecutive Failures", func() { + It("should open circuit after threshold failures", func() { + By("Starting with healthy backend") + monitor.Start(ctx) + + // Wait for initial healthy check + Eventually(func() vmcp.BackendHealthStatus { + state, err := monitor.GetBackendState(backend.ID) + if err != nil { + return vmcp.BackendUnknown + } + return state.Status + }, 3*time.Second, 100*time.Millisecond).Should(Equal(vmcp.BackendHealthy)) + + By("Simulating backend failures") + client.setExplicitFailure(true) + + By("Waiting for circuit to open") + // Should take approximately: failureThreshold * checkInterval = 3 * 500ms = ~1.5 seconds + Eventually(func() health.CircuitState { + state, err := monitor.GetBackendState(backend.ID) + if err != nil { + return "" + } + return state.CircuitState + }, 5*time.Second, 100*time.Millisecond).Should(Equal(health.CircuitOpen)) + + By("Verifying circuit state and backend status") + state, err := monitor.GetBackendState(backend.ID) + Expect(err).ToNot(HaveOccurred()) + Expect(state.CircuitState).To(Equal(health.CircuitOpen)) + Expect(state.Status).To(Equal(vmcp.BackendUnhealthy)) + Expect(state.ConsecutiveFailures).To(BeNumerically(">=", failureThreshold)) + + GinkgoWriter.Printf("✓ Circuit opened after %d consecutive failures\n", state.ConsecutiveFailures) + }) + + It("should stop health checks while circuit is open", func() { + By("Starting monitor and waiting for healthy state") + monitor.Start(ctx) + Eventually(func() vmcp.BackendHealthStatus { + state, err := monitor.GetBackendState(backend.ID) + if err != nil { + return vmcp.BackendUnknown + } + return state.Status + }, 3*time.Second, 100*time.Millisecond).Should(Equal(vmcp.BackendHealthy)) + + By("Causing circuit to open") + client.setExplicitFailure(true) + Eventually(func() health.CircuitState { + state, err := monitor.GetBackendState(backend.ID) + if err != nil { + return "" + } + return state.CircuitState + }, 5*time.Second, 100*time.Millisecond).Should(Equal(health.CircuitOpen)) + + By("Recording check count when circuit opens") + checksWhenOpen := client.getCheckCount() + GinkgoWriter.Printf("Health checks when circuit opened: %d\n", checksWhenOpen) + + By("Waiting and verifying no new checks occur") + time.Sleep(1 * time.Second) + checksAfterWait := client.getCheckCount() + + // Should be same or very close (at most 1 additional check due to timing) + Expect(checksAfterWait).To(BeNumerically("<=", checksWhenOpen+1)) + GinkgoWriter.Printf("Health checks after 1s wait: %d (diff: %d)\n", + checksAfterWait, checksAfterWait-checksWhenOpen) + }) + }) + + Context("Circuit Recovery After Timeout", func() { + It("should recover after timeout when backend is fixed", func() { + By("Starting monitor and establishing healthy state") + monitor.Start(ctx) + Eventually(func() vmcp.BackendHealthStatus { + state, err := monitor.GetBackendState(backend.ID) + if err != nil { + return vmcp.BackendUnknown + } + return state.Status + }, 3*time.Second, 100*time.Millisecond).Should(Equal(vmcp.BackendHealthy)) + + By("Opening circuit with failures") + client.setExplicitFailure(true) + Eventually(func() health.CircuitState { + state, err := monitor.GetBackendState(backend.ID) + if err != nil { + return "" + } + return state.CircuitState + }, 5*time.Second, 100*time.Millisecond).Should(Equal(health.CircuitOpen)) + + openTime := time.Now() + GinkgoWriter.Printf("Circuit opened at: %s\n", openTime.Format(time.RFC3339)) + + By("Fixing backend while circuit is open") + client.setExplicitFailure(false) + + By("Waiting for circuit to recover (transitions through half-open and closes)") + // After cbTimeout (2s), circuit attempts recovery. + // If backend is healthy, it will transition half-open→closed quickly. + Eventually(func() bool { + state, err := monitor.GetBackendState(backend.ID) + if err != nil { + return false + } + // Circuit should eventually be closed and backend healthy + return state.CircuitState == health.CircuitClosed && state.Status == vmcp.BackendHealthy + }, cbTimeout+3*time.Second, 100*time.Millisecond).Should(BeTrue()) + + elapsed := time.Since(openTime) + GinkgoWriter.Printf("✓ Circuit recovered after: %s (timeout was %s)\n", elapsed, cbTimeout) + + // Recovery should take at least cbTimeout + Expect(elapsed).To(BeNumerically(">=", cbTimeout)) + }) + }) + + Context("Circuit Recovery Behavior", func() { + It("should fully recover when backend is fixed", func() { + By("Starting monitor and establishing healthy state") + monitor.Start(ctx) + Eventually(func() vmcp.BackendHealthStatus { + state, err := monitor.GetBackendState(backend.ID) + if err != nil { + return vmcp.BackendUnknown + } + return state.Status + }, 3*time.Second, 100*time.Millisecond).Should(Equal(vmcp.BackendHealthy)) + + By("Opening circuit") + client.setExplicitFailure(true) + Eventually(func() health.CircuitState { + state, err := monitor.GetBackendState(backend.ID) + if err != nil { + return "" + } + return state.CircuitState + }, 5*time.Second, 100*time.Millisecond).Should(Equal(health.CircuitOpen)) + + By("Fixing backend") + client.setExplicitFailure(false) + + By("Waiting for full recovery") + // After timeout, circuit tests recovery and should fully close + Eventually(func() bool { + state, err := monitor.GetBackendState(backend.ID) + if err != nil { + return false + } + return state.CircuitState == health.CircuitClosed && + state.Status == vmcp.BackendHealthy && + state.ConsecutiveFailures == 0 + }, cbTimeout+3*time.Second, 100*time.Millisecond).Should(BeTrue()) + + GinkgoWriter.Println("✓ Circuit successfully recovered and closed") + }) + + It("should remain open if backend stays broken", func() { + By("Starting monitor and establishing healthy state") + monitor.Start(ctx) + Eventually(func() vmcp.BackendHealthStatus { + state, err := monitor.GetBackendState(backend.ID) + if err != nil { + return vmcp.BackendUnknown + } + return state.Status + }, 3*time.Second, 100*time.Millisecond).Should(Equal(vmcp.BackendHealthy)) + + By("Opening circuit with failures") + client.setExplicitFailure(true) + Eventually(func() health.CircuitState { + state, err := monitor.GetBackendState(backend.ID) + if err != nil { + return "" + } + return state.CircuitState + }, 5*time.Second, 100*time.Millisecond).Should(Equal(health.CircuitOpen)) + + By("Waiting past timeout period (backend still broken)") + time.Sleep(cbTimeout + 1*time.Second) + + By("Verifying circuit remains open after failed recovery attempt") + // Circuit will attempt recovery, fail, and reopen + Eventually(func() health.CircuitState { + state, err := monitor.GetBackendState(backend.ID) + if err != nil { + return "" + } + return state.CircuitState + }, 2*time.Second, 100*time.Millisecond).Should(Equal(health.CircuitOpen)) + + state, err := monitor.GetBackendState(backend.ID) + Expect(err).ToNot(HaveOccurred()) + Expect(state.Status).To(Equal(vmcp.BackendUnhealthy)) + + GinkgoWriter.Println("✓ Circuit reopened after failed recovery attempt") + }) + }) + + Context("Intermittent Failures", func() { + It("should not open circuit if failures are below threshold", func() { + By("Starting monitor with pattern of 2 failures then success") + client.setConsecutiveFails(2) // Less than threshold (3) + monitor.Start(ctx) + + By("Waiting for multiple check cycles") + time.Sleep(3 * time.Second) + + By("Verifying circuit remains closed") + state, err := monitor.GetBackendState(backend.ID) + Expect(err).ToNot(HaveOccurred()) + Expect(state.CircuitState).To(Equal(health.CircuitClosed)) + + // Status should be healthy or degraded, but not unavailable + Expect(state.Status).To(Or( + Equal(vmcp.BackendHealthy), + Equal(vmcp.BackendDegraded), + )) + + GinkgoWriter.Printf("✓ Circuit remained closed with intermittent failures (consecutive: %d)\n", + state.ConsecutiveFailures) + }) + + It("should reset failure count after successful check", func() { + By("Starting monitor and establishing healthy state") + monitor.Start(ctx) + Eventually(func() vmcp.BackendHealthStatus { + state, err := monitor.GetBackendState(backend.ID) + if err != nil { + return vmcp.BackendUnknown + } + return state.Status + }, 3*time.Second, 100*time.Millisecond).Should(Equal(vmcp.BackendHealthy)) + + By("Causing 2 failures (below threshold)") + client.setConsecutiveFails(2) + + Eventually(func() int { + state, err := monitor.GetBackendState(backend.ID) + if err != nil { + return -1 + } + return state.ConsecutiveFailures + }, 3*time.Second, 100*time.Millisecond).Should(BeNumerically(">=", 2)) + + By("Allowing recovery (next check succeeds after pattern completes)") + Eventually(func() int { + state, err := monitor.GetBackendState(backend.ID) + if err != nil { + return -1 + } + return state.ConsecutiveFailures + }, 5*time.Second, 100*time.Millisecond).Should(Equal(0)) + + By("Verifying backend is healthy or degraded (recovering)") + state, err := monitor.GetBackendState(backend.ID) + Expect(err).ToNot(HaveOccurred()) + // After recovery, backend might be degraded before fully stabilizing + Expect(state.Status).To(Or( + Equal(vmcp.BackendHealthy), + Equal(vmcp.BackendDegraded), + )) + + GinkgoWriter.Printf("✓ Failure count reset after successful check (status: %s)\n", state.Status) + }) + }) + + Context("Configuration", func() { + It("should respect custom failure threshold", func() { + By("Creating monitor with failure threshold of 5") + customConfig := health.MonitorConfig{ + CheckInterval: checkInterval, + UnhealthyThreshold: unhealthyThreshold, + Timeout: 5 * time.Second, + DegradedThreshold: 2 * time.Second, + CircuitBreaker: &health.CircuitBreakerConfig{ + Enabled: true, + FailureThreshold: 5, // Custom threshold + Timeout: cbTimeout, + }, + } + + customMonitor, err := health.NewMonitor(client, []vmcp.Backend{backend}, customConfig) + Expect(err).ToNot(HaveOccurred()) + defer customMonitor.Stop() + + By("Starting monitor and causing failures") + client.setExplicitFailure(true) + customMonitor.Start(ctx) + + By("Verifying circuit doesn't open after 3 failures") + time.Sleep(4 * checkInterval) + state, err := customMonitor.GetBackendState(backend.ID) + Expect(err).ToNot(HaveOccurred()) + Expect(state.CircuitState).To(Equal(health.CircuitClosed)) + + By("Waiting for 5+ failures") + Eventually(func() health.CircuitState { + state, err := customMonitor.GetBackendState(backend.ID) + if err != nil { + return "" + } + return state.CircuitState + }, 6*time.Second, 100*time.Millisecond).Should(Equal(health.CircuitOpen)) + + GinkgoWriter.Println("✓ Custom failure threshold respected") + }) + + It("should respect custom timeout duration", func() { + By("Creating monitor with short timeout") + shortTimeout := 1 * time.Second + customConfig := health.MonitorConfig{ + CheckInterval: checkInterval, + UnhealthyThreshold: unhealthyThreshold, + Timeout: 5 * time.Second, + DegradedThreshold: 2 * time.Second, + CircuitBreaker: &health.CircuitBreakerConfig{ + Enabled: true, + FailureThreshold: failureThreshold, + Timeout: shortTimeout, + }, + } + + customMonitor, err := health.NewMonitor(client, []vmcp.Backend{backend}, customConfig) + Expect(err).ToNot(HaveOccurred()) + defer customMonitor.Stop() + + By("Opening circuit") + client.setExplicitFailure(true) + customMonitor.Start(ctx) + + Eventually(func() health.CircuitState { + state, err := customMonitor.GetBackendState(backend.ID) + if err != nil { + return "" + } + return state.CircuitState + }, 5*time.Second, 100*time.Millisecond).Should(Equal(health.CircuitOpen)) + + openTime := time.Now() + + By("Fixing backend and waiting for recovery") + client.setExplicitFailure(false) + + // After shortTimeout, circuit should attempt recovery and succeed + Eventually(func() bool { + state, err := customMonitor.GetBackendState(backend.ID) + if err != nil { + return false + } + return state.CircuitState == health.CircuitClosed && state.Status == vmcp.BackendHealthy + }, shortTimeout+2*time.Second, 100*time.Millisecond).Should(BeTrue()) + + elapsed := time.Since(openTime) + GinkgoWriter.Printf("✓ Custom timeout respected: recovered after %s (timeout was %s)\n", + elapsed, shortTimeout) + + // Should take at least the timeout duration + Expect(elapsed).To(BeNumerically(">=", shortTimeout)) + }) + }) + + Context("Circuit Breaker Disabled", func() { + It("should not open circuit when disabled", func() { + By("Creating monitor with circuit breaker disabled") + config := health.MonitorConfig{ + CheckInterval: checkInterval, + UnhealthyThreshold: unhealthyThreshold, + Timeout: 5 * time.Second, + DegradedThreshold: 2 * time.Second, + CircuitBreaker: nil, // Disabled + } + + disabledMonitor, err := health.NewMonitor(client, []vmcp.Backend{backend}, config) + Expect(err).ToNot(HaveOccurred()) + defer disabledMonitor.Stop() + + By("Causing many failures") + client.setExplicitFailure(true) + disabledMonitor.Start(ctx) + + By("Waiting for multiple check cycles") + time.Sleep(3 * time.Second) + + By("Verifying status becomes unhealthy but circuit doesn't open") + state, err := disabledMonitor.GetBackendState(backend.ID) + Expect(err).ToNot(HaveOccurred()) + Expect(state.Status).To(Equal(vmcp.BackendUnhealthy)) + // Circuit state should be empty/uninitialized when disabled + Expect(state.CircuitState).To(Equal(health.CircuitState(""))) + + GinkgoWriter.Println("✓ Circuit breaker disabled, status tracking still works") + }) + }) +}) diff --git a/pkg/vmcp/router/default_router.go b/pkg/vmcp/router/default_router.go index d486488821..a55fb3da53 100644 --- a/pkg/vmcp/router/default_router.go +++ b/pkg/vmcp/router/default_router.go @@ -71,6 +71,13 @@ func routeCapability( return nil, fmt.Errorf("%w: %s", notFoundErr, key) } + // Check if the backend is healthy before routing + if !target.HealthStatus.IsHealthyForRouting() { + logger.Warnf("%s %s found but backend %s is unavailable (status: %s)", + entityType, key, target.WorkloadName, target.HealthStatus) + return nil, fmt.Errorf("%w: backend %s is %s", ErrBackendUnavailable, target.WorkloadName, target.HealthStatus) + } + logger.Debugf("Routed %s %s to backend %s", entityType, key, target.WorkloadID) return target, nil } diff --git a/pkg/vmcp/router/default_router_test.go b/pkg/vmcp/router/default_router_test.go index 9292c944e3..f3d083490d 100644 --- a/pkg/vmcp/router/default_router_test.go +++ b/pkg/vmcp/router/default_router_test.go @@ -35,6 +35,7 @@ func TestDefaultRouter_RouteTool(t *testing.T) { WorkloadID: "backend1", WorkloadName: "Backend 1", BaseURL: "http://backend1:8080", + HealthStatus: vmcp.BackendHealthy, }, }, Resources: make(map[string]*vmcp.BackendTarget), @@ -73,6 +74,60 @@ func TestDefaultRouter_RouteTool(t *testing.T) { expectError: true, errorContains: "routing table tools map not initialized", }, + { + name: "backend is unhealthy", + setupTable: &vmcp.RoutingTable{ + Tools: map[string]*vmcp.BackendTarget{ + "test_tool": { + WorkloadID: "backend1", + WorkloadName: "Backend 1", + BaseURL: "http://backend1:8080", + HealthStatus: vmcp.BackendUnhealthy, + }, + }, + Resources: make(map[string]*vmcp.BackendTarget), + Prompts: make(map[string]*vmcp.BackendTarget), + }, + toolName: "test_tool", + expectError: true, + errorContains: "backend unavailable", + }, + { + name: "backend is unauthenticated", + setupTable: &vmcp.RoutingTable{ + Tools: map[string]*vmcp.BackendTarget{ + "test_tool": { + WorkloadID: "backend1", + WorkloadName: "Backend 1", + BaseURL: "http://backend1:8080", + HealthStatus: vmcp.BackendUnauthenticated, + }, + }, + Resources: make(map[string]*vmcp.BackendTarget), + Prompts: make(map[string]*vmcp.BackendTarget), + }, + toolName: "test_tool", + expectError: true, + errorContains: "backend unavailable", + }, + { + name: "backend is degraded but still works", + setupTable: &vmcp.RoutingTable{ + Tools: map[string]*vmcp.BackendTarget{ + "test_tool": { + WorkloadID: "backend1", + WorkloadName: "Backend 1", + BaseURL: "http://backend1:8080", + HealthStatus: vmcp.BackendDegraded, + }, + }, + Resources: make(map[string]*vmcp.BackendTarget), + Prompts: make(map[string]*vmcp.BackendTarget), + }, + toolName: "test_tool", + expectedID: "backend1", + expectError: false, + }, } for _, tt := range tests { @@ -126,6 +181,7 @@ func TestDefaultRouter_RouteResource(t *testing.T) { WorkloadID: "backend2", WorkloadName: "Backend 2", BaseURL: "http://backend2:8080", + HealthStatus: vmcp.BackendHealthy, }, }, Prompts: make(map[string]*vmcp.BackendTarget), @@ -163,6 +219,42 @@ func TestDefaultRouter_RouteResource(t *testing.T) { expectError: true, errorContains: "routing table resources map not initialized", }, + { + name: "backend is unhealthy", + setupTable: &vmcp.RoutingTable{ + Tools: make(map[string]*vmcp.BackendTarget), + Resources: map[string]*vmcp.BackendTarget{ + "file:///path/to/resource": { + WorkloadID: "backend2", + WorkloadName: "Backend 2", + BaseURL: "http://backend2:8080", + HealthStatus: vmcp.BackendUnhealthy, + }, + }, + Prompts: make(map[string]*vmcp.BackendTarget), + }, + uri: "file:///path/to/resource", + expectError: true, + errorContains: "backend unavailable", + }, + { + name: "backend is degraded but still works", + setupTable: &vmcp.RoutingTable{ + Tools: make(map[string]*vmcp.BackendTarget), + Resources: map[string]*vmcp.BackendTarget{ + "file:///path/to/resource": { + WorkloadID: "backend2", + WorkloadName: "Backend 2", + BaseURL: "http://backend2:8080", + HealthStatus: vmcp.BackendDegraded, + }, + }, + Prompts: make(map[string]*vmcp.BackendTarget), + }, + uri: "file:///path/to/resource", + expectedID: "backend2", + expectError: false, + }, } for _, tt := range tests { @@ -217,6 +309,7 @@ func TestDefaultRouter_RoutePrompt(t *testing.T) { WorkloadID: "backend3", WorkloadName: "Backend 3", BaseURL: "http://backend3:8080", + HealthStatus: vmcp.BackendHealthy, }, }, }, @@ -253,6 +346,42 @@ func TestDefaultRouter_RoutePrompt(t *testing.T) { expectError: true, errorContains: "routing table prompts map not initialized", }, + { + name: "backend is unhealthy", + setupTable: &vmcp.RoutingTable{ + Tools: make(map[string]*vmcp.BackendTarget), + Resources: make(map[string]*vmcp.BackendTarget), + Prompts: map[string]*vmcp.BackendTarget{ + "greeting": { + WorkloadID: "backend3", + WorkloadName: "Backend 3", + BaseURL: "http://backend3:8080", + HealthStatus: vmcp.BackendUnhealthy, + }, + }, + }, + promptName: "greeting", + expectError: true, + errorContains: "backend unavailable", + }, + { + name: "backend is degraded but still works", + setupTable: &vmcp.RoutingTable{ + Tools: make(map[string]*vmcp.BackendTarget), + Resources: make(map[string]*vmcp.BackendTarget), + Prompts: map[string]*vmcp.BackendTarget{ + "greeting": { + WorkloadID: "backend3", + WorkloadName: "Backend 3", + BaseURL: "http://backend3:8080", + HealthStatus: vmcp.BackendDegraded, + }, + }, + }, + promptName: "greeting", + expectedID: "backend3", + expectError: false, + }, } for _, tt := range tests { @@ -292,14 +421,14 @@ func TestDefaultRouter_ConcurrentAccess(t *testing.T) { // Setup routing table table := &vmcp.RoutingTable{ Tools: map[string]*vmcp.BackendTarget{ - "tool1": {WorkloadID: "backend1"}, - "tool2": {WorkloadID: "backend2"}, + "tool1": {WorkloadID: "backend1", HealthStatus: vmcp.BackendHealthy}, + "tool2": {WorkloadID: "backend2", HealthStatus: vmcp.BackendHealthy}, }, Resources: map[string]*vmcp.BackendTarget{ - "res1": {WorkloadID: "backend1"}, + "res1": {WorkloadID: "backend1", HealthStatus: vmcp.BackendHealthy}, }, Prompts: map[string]*vmcp.BackendTarget{ - "prompt1": {WorkloadID: "backend2"}, + "prompt1": {WorkloadID: "backend2", HealthStatus: vmcp.BackendHealthy}, }, } diff --git a/pkg/vmcp/server/integration_test.go b/pkg/vmcp/server/integration_test.go index 8bb5e40231..1e1d119465 100644 --- a/pkg/vmcp/server/integration_test.go +++ b/pkg/vmcp/server/integration_test.go @@ -646,8 +646,9 @@ func TestIntegration_AuditLogging(t *testing.T) { // Create backends backends := []vmcp.Backend{ { - ID: "weather-service", - Name: "Weather Service", + ID: "weather-service", + Name: "Weather Service", + HealthStatus: vmcp.BackendHealthy, }, } diff --git a/pkg/vmcp/types.go b/pkg/vmcp/types.go index 5a492c2aaa..6a4d3d24b2 100644 --- a/pkg/vmcp/types.go +++ b/pkg/vmcp/types.go @@ -142,6 +142,30 @@ func (s BackendHealthStatus) ToCRDStatus() string { } } +// IsHealthyForRouting determines if a backend is healthy enough to accept requests +// and have its capabilities included in aggregation. +// +// Returns true for: +// - BackendHealthy: Backend is fully operational +// - BackendDegraded: Backend is slow but still functional +// +// Returns false for: +// - BackendUnhealthy: Backend is not responding +// - BackendUnknown: Backend health is unknown +// - BackendUnauthenticated: Backend authentication failed +// - Any other status: Err on the side of caution +func (s BackendHealthStatus) IsHealthyForRouting() bool { + switch s { + case BackendHealthy, BackendDegraded: + return true + case BackendUnhealthy, BackendUnknown, BackendUnauthenticated: + return false + default: + // Unknown status - err on the side of caution + return false + } +} + // Condition represents a specific aspect of vMCP server status. type Condition = metav1.Condition diff --git a/test/e2e/thv-operator/virtualmcp/helpers.go b/test/e2e/thv-operator/virtualmcp/helpers.go index 8330bc0b03..660f58064e 100644 --- a/test/e2e/thv-operator/virtualmcp/helpers.go +++ b/test/e2e/thv-operator/virtualmcp/helpers.go @@ -36,6 +36,13 @@ import ( "github.com/stacklok/toolhive/test/e2e/images" ) +const ( + // ConditionTypeReady represents the standard Kubernetes "Ready" condition type. + // This is defined here for test convenience to avoid magic strings. + // It matches the condition type used in VirtualMCPServer.Status.Conditions. + ConditionTypeReady = "Ready" +) + // WaitForVirtualMCPServerReady waits for a VirtualMCPServer to reach Ready status // and ensures at least one associated pod is actually running and ready. // This is used when waiting for a single expected pod (e.g., one replica deployment). @@ -57,7 +64,7 @@ func WaitForVirtualMCPServerReady( } for _, condition := range vmcpServer.Status.Conditions { - if condition.Type == "Ready" { + if condition.Type == ConditionTypeReady { if condition.Status == "True" { // Also check that at least one pod is actually running and ready labels := map[string]string{ diff --git a/test/e2e/thv-operator/virtualmcp/virtualmcp_circuit_breaker_lifecycle_test.go b/test/e2e/thv-operator/virtualmcp/virtualmcp_circuit_breaker_lifecycle_test.go new file mode 100644 index 0000000000..3cd6c812a1 --- /dev/null +++ b/test/e2e/thv-operator/virtualmcp/virtualmcp_circuit_breaker_lifecycle_test.go @@ -0,0 +1,261 @@ +// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package virtualmcp + +import ( + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1" + vmcpconfig "github.com/stacklok/toolhive/pkg/vmcp/config" + "github.com/stacklok/toolhive/test/e2e/images" +) + +// Circuit breaker test configuration shared across tests +var ( + cbHealthCheckInterval = 5 * time.Second + cbFailureThreshold = 3 + cbTimeout = 20 * time.Second + cbUnhealthyThreshold = 3 +) + +var _ = Describe("VirtualMCPServer Circuit Breaker - Degradation Path", Ordered, func() { + var ( + testNamespace = "default" + mcpGroupName = "test-cb-degradation-group" + vmcpServerName = "test-vmcp-cb-degradation" + backendName = "backend-cb-degradation" + timeout = 5 * time.Minute + pollingInterval = 2 * time.Second + ) + + BeforeAll(func() { + By("Creating MCPGroup") + CreateMCPGroupAndWait(ctx, k8sClient, mcpGroupName, testNamespace, + "Test MCP Group for circuit breaker degradation test", timeout, pollingInterval) + + By("Creating backend MCPServer with valid image (healthy)") + backend := &mcpv1alpha1.MCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: backendName, + Namespace: testNamespace, + }, + Spec: mcpv1alpha1.MCPServerSpec{ + GroupRef: mcpGroupName, + Image: images.YardstickServerImage, + Transport: "streamable-http", + ProxyPort: 8080, + McpPort: 8080, + Env: []mcpv1alpha1.EnvVar{ + {Name: "TRANSPORT", Value: "streamable-http"}, + }, + }, + } + Expect(k8sClient.Create(ctx, backend)).To(Succeed()) + + By("Waiting for backend MCPServer to be running") + Eventually(func() error { + server := &mcpv1alpha1.MCPServer{} + if err := k8sClient.Get(ctx, types.NamespacedName{ + Name: backendName, + Namespace: testNamespace, + }, server); err != nil { + return err + } + if server.Status.Phase != mcpv1alpha1.MCPServerPhaseRunning { + return fmt.Errorf("backend not running, phase: %s", server.Status.Phase) + } + return nil + }, timeout, pollingInterval).Should(Succeed()) + + By("Creating VirtualMCPServer with circuit breaker enabled") + vmcpServer := &mcpv1alpha1.VirtualMCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: vmcpServerName, + Namespace: testNamespace, + }, + Spec: mcpv1alpha1.VirtualMCPServerSpec{ + IncomingAuth: &mcpv1alpha1.IncomingAuthConfig{ + Type: "anonymous", + }, + OutgoingAuth: &mcpv1alpha1.OutgoingAuthConfig{ + Source: "discovered", + }, + ServiceType: "NodePort", + Config: vmcpconfig.Config{ + Name: vmcpServerName, + Group: mcpGroupName, + Operational: &vmcpconfig.OperationalConfig{ + FailureHandling: &vmcpconfig.FailureHandlingConfig{ + HealthCheckInterval: vmcpconfig.Duration(cbHealthCheckInterval), + StatusReportingInterval: vmcpconfig.Duration(5 * time.Second), + UnhealthyThreshold: cbUnhealthyThreshold, + CircuitBreaker: &vmcpconfig.CircuitBreakerConfig{ + Enabled: true, + FailureThreshold: cbFailureThreshold, + Timeout: vmcpconfig.Duration(cbTimeout), + }, + }, + }, + }, + }, + } + Expect(k8sClient.Create(ctx, vmcpServer)).To(Succeed()) + + By("Waiting for VirtualMCPServer to reach Ready phase") + Eventually(func() error { + server := &mcpv1alpha1.VirtualMCPServer{} + if err := k8sClient.Get(ctx, types.NamespacedName{ + Name: vmcpServerName, + Namespace: testNamespace, + }, server); err != nil { + return err + } + + if server.Status.Phase != mcpv1alpha1.VirtualMCPServerPhaseReady { + return fmt.Errorf("phase is %s, want Ready", server.Status.Phase) + } + + // Check for Ready condition + readyCondition := false + for _, cond := range server.Status.Conditions { + if cond.Type == ConditionTypeReady && cond.Status == metav1.ConditionTrue { + readyCondition = true + break + } + } + if !readyCondition { + return fmt.Errorf("Ready condition not found or not True") + } + + return nil + }, timeout, pollingInterval).Should(Succeed()) + }) + + AfterAll(func() { + By("Cleaning up VirtualMCPServer") + vmcpServer := &mcpv1alpha1.VirtualMCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: vmcpServerName, + Namespace: testNamespace, + }, + } + Expect(k8sClient.Delete(ctx, vmcpServer)).To(Succeed()) + + By("Cleaning up backend MCPServer") + backend := &mcpv1alpha1.MCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: backendName, + Namespace: testNamespace, + }, + } + Expect(k8sClient.Delete(ctx, backend)).To(Succeed()) + + By("Cleaning up MCPGroup") + group := &mcpv1alpha1.MCPGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: mcpGroupName, + Namespace: testNamespace, + }, + } + Expect(k8sClient.Delete(ctx, group)).To(Succeed()) + }) + + It("should open circuit breaker when backend becomes unhealthy", func() { + By("Step 1: Verifying initial healthy state (circuit closed)") + Eventually(func() error { + server := &mcpv1alpha1.VirtualMCPServer{} + if err := k8sClient.Get(ctx, types.NamespacedName{ + Name: vmcpServerName, + Namespace: testNamespace, + }, server); err != nil { + return err + } + + if len(server.Status.DiscoveredBackends) == 0 { + return fmt.Errorf("no discovered backends") + } + + backend := server.Status.DiscoveredBackends[0] + if backend.Status != mcpv1alpha1.BackendStatusReady { + return fmt.Errorf("backend status is %s, want ready", backend.Status) + } + + GinkgoWriter.Printf("✓ Initial state: Backend %s is ready (circuit closed)\n", backend.Name) + return nil + }, timeout, pollingInterval).Should(Succeed()) + + By("Step 2: Breaking backend by changing to invalid image") + backend := &mcpv1alpha1.MCPServer{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: backendName, + Namespace: testNamespace, + }, backend) + Expect(err).ToNot(HaveOccurred()) + + backend.Spec.Image = "invalid-image-does-not-exist:v999" + Expect(k8sClient.Update(ctx, backend)).To(Succeed()) + + GinkgoWriter.Println("✓ Backend image changed to invalid") + + By("Step 3: Waiting for circuit breaker to open") + // Circuit should open after: + // - failureThreshold (3) consecutive failures + // - With healthCheckInterval (5s) between checks + // - Should take ~15-20 seconds + Eventually(func() error { + server := &mcpv1alpha1.VirtualMCPServer{} + if err := k8sClient.Get(ctx, types.NamespacedName{ + Name: vmcpServerName, + Namespace: testNamespace, + }, server); err != nil { + return err + } + + if len(server.Status.DiscoveredBackends) == 0 { + return fmt.Errorf("no discovered backends") + } + + backend := server.Status.DiscoveredBackends[0] + if backend.Status != mcpv1alpha1.BackendStatusUnavailable { + return fmt.Errorf("backend status is %s, want unavailable", backend.Status) + } + + GinkgoWriter.Printf("✓ Circuit breaker OPENED: Backend %s marked unavailable\n", backend.Name) + return nil + }, 1*time.Minute, pollingInterval).Should(Succeed()) + + By("Step 4: Verifying circuit remains open") + Consistently(func() error { + server := &mcpv1alpha1.VirtualMCPServer{} + if err := k8sClient.Get(ctx, types.NamespacedName{ + Name: vmcpServerName, + Namespace: testNamespace, + }, server); err != nil { + return err + } + + if len(server.Status.DiscoveredBackends) == 0 { + return fmt.Errorf("no discovered backends") + } + + backend := server.Status.DiscoveredBackends[0] + if backend.Status != mcpv1alpha1.BackendStatusUnavailable { + return fmt.Errorf("backend should remain unavailable, got %s", backend.Status) + } + + return nil + }, 10*time.Second, pollingInterval).Should(Succeed()) + + GinkgoWriter.Println("✅ Degradation test completed:") + GinkgoWriter.Println(" - Backend transitioned from healthy to unhealthy ✓") + GinkgoWriter.Println(" - Circuit breaker opened after threshold failures ✓") + GinkgoWriter.Println(" - Backend marked unavailable ✓") + }) +}) diff --git a/test/e2e/thv-operator/virtualmcp/virtualmcp_status_reporting_test.go b/test/e2e/thv-operator/virtualmcp/virtualmcp_status_reporting_test.go index 138125c58a..194babf141 100644 --- a/test/e2e/thv-operator/virtualmcp/virtualmcp_status_reporting_test.go +++ b/test/e2e/thv-operator/virtualmcp/virtualmcp_status_reporting_test.go @@ -153,7 +153,7 @@ var _ = Describe("VirtualMCPServer Status Reporting", Ordered, func() { // Check Ready condition readyCondition := false for _, cond := range server.Status.Conditions { - if cond.Type == "Ready" && cond.Status == metav1.ConditionTrue { + if cond.Type == ConditionTypeReady && cond.Status == metav1.ConditionTrue { readyCondition = true break }