Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cmd/vmcp/app/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,19 @@ func runServe(cmd *cobra.Command, _ []string) error {
Timeout: defaults.Timeout,
DegradedThreshold: defaults.DegradedThreshold,
}

// Configure circuit breaker if enabled
if cfg.Operational.FailureHandling.CircuitBreaker != nil && cfg.Operational.FailureHandling.CircuitBreaker.Enabled {
healthMonitorConfig.CircuitBreaker = &health.CircuitBreakerConfig{
Enabled: true,
FailureThreshold: cfg.Operational.FailureHandling.CircuitBreaker.FailureThreshold,
Timeout: time.Duration(cfg.Operational.FailureHandling.CircuitBreaker.Timeout),
}
logger.Infof("Circuit breaker enabled (failure threshold: %d, timeout: %v)",
cfg.Operational.FailureHandling.CircuitBreaker.FailureThreshold,
time.Duration(cfg.Operational.FailureHandling.CircuitBreaker.Timeout))
}

logger.Info("Health monitoring configured from operational settings")
}

Expand Down
102 changes: 60 additions & 42 deletions pkg/vmcp/aggregator/default_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,65 +302,83 @@ func (a *defaultAggregator) MergeCapabilities(
// Convert resolved tools to final vmcp.Tool format
tools := make([]vmcp.Tool, 0, len(resolved.Tools))
for _, resolvedTool := range resolved.Tools {
// Look up full backend information from registry
backend := registry.Get(ctx, resolvedTool.BackendID)
if backend == nil {
logger.Warnf("Backend %s not found in registry for tool %s, skipping",
resolvedTool.BackendID, resolvedTool.ResolvedName)
continue
}

// Filter out tools from unhealthy backends
if !backend.HealthStatus.IsHealthyForRouting() {
logger.Debugf("Skipping tool %s from unhealthy backend %s (status: %s)",
resolvedTool.ResolvedName, backend.Name, backend.HealthStatus)
continue
}

tools = append(tools, vmcp.Tool{
Name: resolvedTool.ResolvedName,
Description: resolvedTool.Description,
InputSchema: resolvedTool.InputSchema,
BackendID: resolvedTool.BackendID,
})

// Look up full backend information from registry
backend := registry.Get(ctx, resolvedTool.BackendID)
if backend == nil {
logger.Warnf("Backend %s not found in registry for tool %s, creating minimal target",
resolvedTool.BackendID, resolvedTool.ResolvedName)
routingTable.Tools[resolvedTool.ResolvedName] = &vmcp.BackendTarget{
WorkloadID: resolvedTool.BackendID,
OriginalCapabilityName: resolvedTool.OriginalName,
}
} else {
// Use the backendToTarget helper from registry package
target := vmcp.BackendToTarget(backend)
// Store the original tool name for forwarding to backend
target.OriginalCapabilityName = resolvedTool.OriginalName
routingTable.Tools[resolvedTool.ResolvedName] = target
}
// Use the backendToTarget helper from registry package
target := vmcp.BackendToTarget(backend)
// Store the original tool name for forwarding to backend
target.OriginalCapabilityName = resolvedTool.OriginalName
routingTable.Tools[resolvedTool.ResolvedName] = target
}

// Add resources to routing table
// Add resources to routing table (with health filtering)
resources := make([]vmcp.Resource, 0, len(resolved.Resources))
for _, resource := range resolved.Resources {
backend := registry.Get(ctx, resource.BackendID)
if backend == nil {
logger.Warnf("Backend %s not found in registry for resource %s, creating minimal target",
logger.Warnf("Backend %s not found in registry for resource %s, skipping",
resource.BackendID, resource.URI)
routingTable.Resources[resource.URI] = &vmcp.BackendTarget{
WorkloadID: resource.BackendID,
OriginalCapabilityName: resource.URI,
}
} else {
target := vmcp.BackendToTarget(backend)
// Store the original resource URI for forwarding to backend
target.OriginalCapabilityName = resource.URI
routingTable.Resources[resource.URI] = target
continue
}

// Filter out resources from unhealthy backends
if !backend.HealthStatus.IsHealthyForRouting() {
logger.Debugf("Skipping resource %s from unhealthy backend %s (status: %s)",
resource.URI, backend.Name, backend.HealthStatus)
continue
}

resources = append(resources, resource)

target := vmcp.BackendToTarget(backend)
// Store the original resource URI for forwarding to backend
target.OriginalCapabilityName = resource.URI
routingTable.Resources[resource.URI] = target
}

// Add prompts to routing table
// Add prompts to routing table (with health filtering)
prompts := make([]vmcp.Prompt, 0, len(resolved.Prompts))
for _, prompt := range resolved.Prompts {
backend := registry.Get(ctx, prompt.BackendID)
if backend == nil {
logger.Warnf("Backend %s not found in registry for prompt %s, creating minimal target",
logger.Warnf("Backend %s not found in registry for prompt %s, skipping",
prompt.BackendID, prompt.Name)
routingTable.Prompts[prompt.Name] = &vmcp.BackendTarget{
WorkloadID: prompt.BackendID,
OriginalCapabilityName: prompt.Name,
}
} else {
target := vmcp.BackendToTarget(backend)
// Store the original prompt name for forwarding to backend
target.OriginalCapabilityName = prompt.Name
routingTable.Prompts[prompt.Name] = target
continue
}

// Filter out prompts from unhealthy backends
if !backend.HealthStatus.IsHealthyForRouting() {
logger.Debugf("Skipping prompt %s from unhealthy backend %s (status: %s)",
prompt.Name, backend.Name, backend.HealthStatus)
continue
}

prompts = append(prompts, prompt)

target := vmcp.BackendToTarget(backend)
// Store the original prompt name for forwarding to backend
target.OriginalCapabilityName = prompt.Name
routingTable.Prompts[prompt.Name] = target
}

// Determine conflict strategy used
Expand All @@ -376,16 +394,16 @@ func (a *defaultAggregator) MergeCapabilities(
// Create final aggregated view
aggregated := &AggregatedCapabilities{
Tools: tools,
Resources: resolved.Resources,
Prompts: resolved.Prompts,
Resources: resources,
Prompts: prompts,
SupportsLogging: resolved.SupportsLogging,
SupportsSampling: resolved.SupportsSampling,
RoutingTable: routingTable,
Metadata: &AggregationMetadata{
BackendCount: 0, // Will be set by caller
ToolCount: len(tools),
ResourceCount: len(resolved.Resources),
PromptCount: len(resolved.Prompts),
ResourceCount: len(resources),
PromptCount: len(prompts),
ConflictStrategy: conflictStrategy,
},
}
Expand Down
Loading
Loading