Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,6 @@ CALLBACK_READ_TIMEOUT = 10

# require as a env if you want to use doc transformation
OPENAI_API_KEY=""

KAAPI_GUARDRAILS_AUTH=""
KAAPI_GUARDRAILS_URL=""
5 changes: 5 additions & 0 deletions .env.test.example
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@ AWS_S3_BUCKET_PREFIX="bucket-prefix-name"
# Callback Timeouts (in seconds)
CALLBACK_CONNECT_TIMEOUT = 3
CALLBACK_READ_TIMEOUT = 10

OPENAI_API_KEY=""

KAAPI_GUARDRAILS_AUTH=""
KAAPI_GUARDRAILS_URL=""
2 changes: 2 additions & 0 deletions backend/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class Settings(BaseSettings):
POSTGRES_USER: str
POSTGRES_PASSWORD: str = ""
POSTGRES_DB: str = ""
KAAPI_GUARDRAILS_AUTH: str = ""
KAAPI_GUARDRAILS_URL: str = ""

@computed_field # type: ignore[prop-decorator]
@property
Expand Down
14 changes: 14 additions & 0 deletions backend/app/models/llm/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,20 @@ class LLMCallRequest(SQLModel):
"in production, always use the id + version."
),
)
input_guardrails: list[dict[str, Any]] | None = Field(
default=None,
description=(
"Optional guardrails configuration to apply input validation. "
"If not provided, no guardrails will be applied."
),
)
output_guardrails: list[dict[str, Any]] | None = Field(
default=None,
description=(
"Optional guardrails configuration to apply output validation. "
"If not provided, no guardrails will be applied."
),
)
callback_url: HttpUrl | None = Field(
default=None, description="Webhook URL for async response delivery"
)
Expand Down
60 changes: 60 additions & 0 deletions backend/app/services/llm/guardrails.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from typing import Any
from uuid import UUID
import logging

import httpx

from app.core.config import settings

logger = logging.getLogger(__name__)


def call_guardrails(
input_text: str, guardrail_config: list[dict], job_id: UUID
) -> dict[str, Any]:
"""
Call the Kaapi guardrails service to validate and process input text.

Args:
input_text: Text to validate and process.
guardrail_config: List of validator configurations to apply.
job_id: Unique identifier for the request.

Returns:
JSON response from the guardrails service with validation results.
"""
payload = {
"request_id": str(job_id),
"input": input_text,
"validators": guardrail_config,
}

headers = {
"accept": "application/json",
"Authorization": f"Bearer {settings.KAAPI_GUARDRAILS_AUTH}",
"Content-Type": "application/json",
}

try:
with httpx.Client(timeout=10.0) as client:
response = client.post(
settings.KAAPI_GUARDRAILS_URL,
json=payload,
headers=headers,
)

response.raise_for_status()
return response.json()
except Exception as e:
logger.warning(
f"[call_guardrails] Service unavailable. Bypassing guardrails. job_id={job_id}. error={e}"
)

return {
"success": False,
"bypassed": True,
"data": {
"safe_text": input_text,
"rephrase_needed": False,
},
}
70 changes: 70 additions & 0 deletions backend/app/services/llm/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from app.crud.jobs import JobCrud
from app.models import JobStatus, JobType, JobUpdate, LLMCallRequest
from app.models.llm.request import ConfigBlob, LLMCallConfig, KaapiCompletionConfig
from app.services.llm.guardrails import call_guardrails
from app.services.llm.providers.registry import get_llm_provider
from app.services.llm.mappers import transform_kaapi_config_to_native
from app.utils import APIResponse, send_callback
Expand Down Expand Up @@ -134,6 +135,9 @@ def execute_job(

# one of (id, version) or blob is guaranteed to be present due to prior validation
config = request.config
input_query = request.query.input
input_guardrails = request.input_guardrails
output_guardrails = request.output_guardrails
callback_response = None
config_blob: ConfigBlob | None = None

Expand All @@ -142,6 +146,36 @@ def execute_job(
)

try:
if input_guardrails:
safe_input = call_guardrails(input_query, input_guardrails, job_id)

logger.info(
f"[execute_job] Input guardrail validation | success={safe_input['success']}."
)

if safe_input.get("bypassed"):
logger.info("[execute_job] Guardrails bypassed (service unavailable)")

elif safe_input["success"]:
request.query.input = safe_input["data"]["safe_text"]

if safe_input["data"]["rephrase_needed"]:
callback_response = APIResponse.failure_response(
error=request.query.input,
metadata=request.request_metadata,
)
return handle_job_error(
job_id, request.callback_url, callback_response
)
else:
request.query.input = safe_input["error"]

callback_response = APIResponse.failure_response(
error=safe_input["error"],
metadata=request.request_metadata,
)
return handle_job_error(job_id, request.callback_url, callback_response)

with Session(engine) as session:
# Update job status to PROCESSING
job_crud = JobCrud(session=session)
Expand Down Expand Up @@ -226,6 +260,42 @@ def execute_job(
)

if response:
if output_guardrails:
output_text = response.response.output.text
safe_output = call_guardrails(output_text, output_guardrails, job_id)

logger.info(
f"[execute_job] Output guardrail validation | success={safe_output['success']}."
)

if safe_output.get("bypassed"):
logger.info(
"[execute_job] Guardrails bypassed (service unavailable)"
)

elif safe_output["success"]:
response.response.output.text = safe_output["data"]["safe_text"]

if safe_output["data"]["rephrase_needed"] == True:
callback_response = APIResponse.failure_response(
error=request.query.input,
metadata=request.request_metadata,
)
return handle_job_error(
job_id, request.callback_url, callback_response
)

else:
response.response.output.text = safe_output["error"]

callback_response = APIResponse.failure_response(
error=safe_output["error"],
metadata=request.request_metadata,
)
return handle_job_error(
job_id, request.callback_url, callback_response
)

callback_response = APIResponse.success_response(
data=response, metadata=request.request_metadata
)
Expand Down
110 changes: 109 additions & 1 deletion backend/app/tests/api/routes/test_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from app.models.llm.request import (
QueryParams,
LLMCallConfig,
CompletionConfig,
ConfigBlob,
KaapiLLMParams,
KaapiCompletionConfig,
Expand Down Expand Up @@ -164,3 +163,112 @@ def test_llm_call_invalid_provider(
)

assert response.status_code == 422


def test_llm_call_success_with_guardrails(
client: TestClient,
user_api_key_header: dict[str, str],
) -> None:
"""Test successful LLM call when guardrails are enabled (no validators)."""

with (
patch("app.services.llm.jobs.start_high_priority_job") as mock_start_job,
patch("app.services.llm.guardrails.call_guardrails") as mock_guardrails,
):
mock_start_job.return_value = "test-task-id"

mock_guardrails.return_value = {
"success": True,
"bypassed": False,
"data": {
"safe_text": "What is the capital of France?",
"rephrase_needed": False,
},
}

payload = LLMCallRequest(
query=QueryParams(input="What is the capital of France?"),
config=LLMCallConfig(
blob=ConfigBlob(
completion=NativeCompletionConfig(
provider="openai-native",
params={
"model": "gpt-4o",
"temperature": 0.7,
},
)
)
),
input_guardrails=[],
output_guardrails=[],
callback_url="https://example.com/callback",
)

response = client.post(
"/api/v1/llm/call",
json=payload.model_dump(mode="json"),
headers=user_api_key_header,
)

assert response.status_code == 200

body = response.json()
assert body["success"] is True
assert "response is being generated" in body["data"]["message"]

mock_start_job.assert_called_once()
mock_guardrails.assert_not_called()
Comment on lines +174 to +220
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
echo "=== Searching for call_guardrails invocations in non-test code ==="
rg -n "call_guardrails" --type=py -g '!**/test*' -B 2 -A 2

echo ""
echo "=== Searching for the llm route handler ==="
fd -e py -path "*/routes/*llm*" --type f | head -20

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 2325


🏁 Script executed:

#!/bin/bash
echo "=== Finding llm route file ==="
find . -path "*/routes*" -name "*.py" | grep -i llm

echo ""
echo "=== Looking at route handler structure ==="
fd llm -e py -path "*routes*" 

echo ""
echo "=== Check app structure ==="
find backend/app -type d -name routes

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 491


🏁 Script executed:

#!/bin/bash
echo "=== Reading the LLM route handler ==="
wc -l backend/app/api/routes/llm.py

echo ""
echo "=== Finding the call endpoint in route handler ==="
rg -n "def.*call" backend/app/api/routes/llm.py -A 30

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 1648


🏁 Script executed:

#!/bin/bash
echo "=== Full content of llm.py route handler ==="
cat -n backend/app/api/routes/llm.py

echo ""
echo "=== Finding start_job function ==="
rg -n "def start_job" --type py -g '!**/test*' -A 10

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 6825


🏁 Script executed:

#!/bin/bash
echo "=== Checking URL paths used in existing tests ==="
rg -n 'client.post.*llm/call' backend/app/tests/api/routes/test_llm.py

echo ""
echo "=== Verify: does TestClient care about leading slash? ==="
rg -n 'client.post' backend/app/tests/api/routes/test_llm.py | head -10

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 425


🏁 Script executed:

#!/bin/bash
echo "=== Get the exact POST URLs from test file ==="
rg -n 'client.post\(' backend/app/tests/api/routes/test_llm.py -A 1 | grep -E '(client.post|"/)' | head -20

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 435


🏁 Script executed:

#!/bin/bash
echo "=== Get exact URLs from all test POST calls ==="
sed -n '39,41p; 77,79p; 113,115p; 134,136p; 159,161p; 207,209p; 262,264p' backend/app/tests/api/routes/test_llm.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 843


🏁 Script executed:

#!/bin/bash
echo "=== Check the second test function (lines 223-274) ==="
sed -n '223,274p' backend/app/tests/api/routes/test_llm.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 1878


Remove unreachable mock_guardrails patches — they never execute when start_high_priority_job is mocked.

call_guardrails is invoked inside execute_job (backend/app/services/llm/jobs.py lines 150, 265), which runs asynchronously via start_high_priority_job. When start_high_priority_job is mocked in the test, the job never executes, making any mock_guardrails setup and assertions trivially true/false and misleading.

First test (lines 174–220): The mock_guardrails.return_value setup (lines 180–187) is dead code. Remove the call_guardrails patch and assertion.

Second test (lines 223–274): Similar dead mock setup. Additionally, this test is missing mock_guardrails.assert_called_once() — if the intent is to verify that guardrails are invoked when input_guardrails is non-empty, add the assertion; otherwise, remove the patch entirely.

Minor: New tests use /api/v1/llm/call (leading slash) while existing tests use api/v1/llm/call (no slash). Align with existing convention.

🤖 Prompt for AI Agents
In `@backend/app/tests/api/routes/test_llm.py` around lines 174 - 220, The test
patches of call_guardrails are unreachable because start_high_priority_job is
mocked and the job (execute_job) never runs; remove the patch/return_value and
the mock_guardrails.assert_not_called() in the first test (references:
start_high_priority_job, call_guardrails, execute_job) so you don't set dead
expectations; for the second test either remove the call_guardrails patch
altogether or (if you intend to assert guardrails were invoked) stop mocking
start_high_priority_job and instead let execute_job run or explicitly call the
job helper, then add mock_guardrails.assert_called_once(); finally, normalize
the endpoint path to match existing tests (use the same "api/v1/llm/call"
convention without a leading slash).



def test_llm_call_guardrails_bypassed_still_succeeds(
client: TestClient,
user_api_key_header: dict[str, str],
) -> None:
"""If guardrails service is unavailable (bypassed), request should still succeed."""

with (
patch("app.services.llm.jobs.start_high_priority_job") as mock_start_job,
patch("app.services.llm.guardrails.call_guardrails") as mock_guardrails,
):
mock_start_job.return_value = "test-task-id"

mock_guardrails.return_value = {
"success": True,
"bypassed": True,
"data": {
"safe_text": "What is the capital of France?",
"rephrase_needed": False,
},
}

payload = LLMCallRequest(
query=QueryParams(input="What is the capital of France?"),
config=LLMCallConfig(
blob=ConfigBlob(
completion=NativeCompletionConfig(
provider="openai-native",
params={
"model": "gpt-4",
"temperature": 0.7,
},
)
)
),
input_guardrails=[{"type": "pii_remover"}],
output_guardrails=[],
callback_url="https://example.com/callback",
)

response = client.post(
"/api/v1/llm/call",
json=payload.model_dump(mode="json"),
headers=user_api_key_header,
)

assert response.status_code == 200

body = response.json()
assert body["success"] is True
assert "response is being generated" in body["data"]["message"]

mock_start_job.assert_called_once()
Comment on lines +223 to +274
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing assertion on mock_guardrails — test doesn't verify the "bypassed" behavior it claims to test.

The docstring says the test validates that a bypassed guardrails response still succeeds, and input_guardrails is non-empty, yet there is no assertion on mock_guardrails (e.g., assert_called_once, checking the arguments, or verifying the bypassed flag propagation). The same root issue applies as in the previous test: with start_high_priority_job mocked, call_guardrails is never reached, so the bypass mock is entirely unused.

As-is, this test is functionally identical to test_llm_call_success — it only proves the route accepts guardrails fields and returns 200.

🤖 Prompt for AI Agents
In `@backend/app/tests/api/routes/test_llm.py` around lines 223 - 274, The test
claims to verify guardrails-bypassed behavior but never asserts that
call_guardrails was invoked or that its bypassed=True was propagated; update
test_llm_call_guardrails_bypassed_still_succeeds to assert
mock_guardrails.assert_called_once() and inspect mock_guardrails.call_args to
confirm the request input (e.g., "What is the capital of France?") and the
provided input_guardrails were passed; if mocking start_high_priority_job
prevents call_guardrails from running, remove or adjust that mock so
call_guardrails is exercised (or explicitly assert call order: mock_guardrails
called before start_high_priority_job) and add an assertion that the response
handling used the bypassed flag (for example by confirming
start_high_priority_job was still called after a bypass).

Loading