Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions immermatch/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
profile_candidate,
search_all_queries,
)
from immermatch.search_provider import ( # noqa: E402
get_provider,
get_provider_fingerprint,
parse_provider_query, # noqa: E402
)

# ---------------------------------------------------------------------------
# Page configuration
Expand Down Expand Up @@ -726,21 +731,23 @@ def _run_pipeline() -> None:
return

cache = _get_cache()
provider = get_provider(location)
provider_fingerprint = get_provider_fingerprint(provider)
# Eagerly create the Gemini client so it's ready before the pipeline starts —
# avoids lazy-init delay between query generation and job evaluation.
client = create_client() if _keys_ok() else None

# ---- Step 1: Generate queries ----------------------------------------
with st.status("✨ Crafting search queries...", expanded=False) as status:
cached_queries = cache.load_queries(profile, location)
cached_queries = cache.load_queries(profile, location, provider_fingerprint)
if cached_queries is not None:
queries = cached_queries
status.update(label="✅ Queries generated (cached)", state="complete")
else:
if client is None:
client = create_client()
queries = generate_search_queries(client, profile, location)
cache.save_queries(profile, location, queries)
queries = generate_search_queries(client, profile, location, provider=provider)
cache.save_queries(profile, location, queries, provider_fingerprint)
status.update(label="✅ Queries generated", state="complete")
st.session_state.queries = queries

Expand Down Expand Up @@ -775,7 +782,7 @@ def _run_pipeline() -> None:
job = futures[future]
evaluation = future.result()
ej = EvaluatedJob(job=job, evaluation=evaluation)
key = f"{ej.job.title}|{ej.job.company_name}"
key = f"{ej.job.title}|{ej.job.company_name}|{ej.job.location}"
all_evals[key] = ej
progress_bar.progress(
i / len(new_jobs),
Expand All @@ -802,7 +809,7 @@ def _run_pipeline() -> None:
def _on_jobs_found(new_unique_jobs: list[JobListing]) -> None:
"""Submit newly found jobs for evaluation immediately."""
for job in new_unique_jobs:
key = f"{job.title}|{job.company_name}"
key = f"{job.title}|{job.company_name}|{job.location}"
if key in all_evals:
continue # already evaluated (from cache)
fut = eval_executor.submit(evaluate_job, client, profile, job)
Expand All @@ -825,6 +832,7 @@ def _search_progress(qi: int, total: int, unique: int) -> None:
location=location,
on_progress=_search_progress,
on_jobs_found=_on_jobs_found,
provider=provider,
)
cache.save_jobs(jobs, location)
search_status.update(label=f"✅ Found {len(jobs)} unique jobs", state="complete")
Expand All @@ -847,7 +855,7 @@ def _search_progress(qi: int, total: int, unique: int) -> None:
job = eval_futures[future]
evaluation = future.result()
ej = EvaluatedJob(job=job, evaluation=evaluation)
key = f"{ej.job.title}|{ej.job.company_name}"
key = f"{ej.job.title}|{ej.job.company_name}|{ej.job.location}"
all_evals[key] = ej
eval_progress.progress(
i / total_evals,
Expand Down Expand Up @@ -951,7 +959,8 @@ def _record_ip_rate_limit() -> None:
expanded=False,
):
for q in st.session_state.queries:
st.markdown(f"- {q}")
_, clean_query = parse_provider_query(q)
st.markdown(f"- {clean_query}")

# -- Profile (collapsed) -----------------------------------------------
if st.session_state.profile is not None:
Expand Down
1 change: 1 addition & 0 deletions immermatch/bundesagentur.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class BundesagenturProvider:
"""

name: str = "Bundesagentur für Arbeit"
source_id: str = "bundesagentur"

def __init__(
self,
Expand Down
24 changes: 19 additions & 5 deletions immermatch/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,25 +77,39 @@ def save_profile(self, cv_text: str, profile: CandidateProfile) -> None:
# 2. Queries (keyed by profile hash + location)
# ------------------------------------------------------------------

def load_queries(self, profile: CandidateProfile, location: str) -> list[str] | None:
def load_queries(
self,
profile: CandidateProfile,
location: str,
provider_fingerprint: str = "",
) -> list[str] | None:
data = self._load("queries.json")
if data is None:
return None
if data.get("profile_hash") != _profile_hash(profile):
return None
if data.get("location") != location:
return None
if data.get("provider_fingerprint", "") != provider_fingerprint:
return None
queries = data.get("queries")
if not isinstance(queries, list):
return None
return queries

def save_queries(self, profile: CandidateProfile, location: str, queries: list[str]) -> None:
def save_queries(
self,
profile: CandidateProfile,
location: str,
queries: list[str],
provider_fingerprint: str = "",
) -> None:
self._save(
"queries.json",
{
"profile_hash": _profile_hash(profile),
"location": location,
"provider_fingerprint": provider_fingerprint,
"queries": queries,
},
)
Expand Down Expand Up @@ -130,7 +144,7 @@ def save_jobs(self, jobs: list[JobListing], location: str = "") -> None:
existing = data.get("jobs", {})

for job in jobs:
key = f"{job.title}|{job.company_name}"
key = f"{job.title}|{job.company_name}|{job.location}"
existing[key] = job.model_dump()

self._save(
Expand All @@ -143,7 +157,7 @@ def save_jobs(self, jobs: list[JobListing], location: str = "") -> None:
)

# ------------------------------------------------------------------
# 4. Evaluations (append-only, keyed by title|company)
# 4. Evaluations (append-only, keyed by title|company|location)
# ------------------------------------------------------------------

def load_evaluations(self, profile: CandidateProfile) -> dict[str, EvaluatedJob]:
Expand Down Expand Up @@ -188,5 +202,5 @@ def get_unevaluated_jobs(
Jobs already in the evaluation cache are skipped.
"""
cached = self.load_evaluations(profile)
new_jobs = [job for job in jobs if f"{job.title}|{job.company_name}" not in cached]
new_jobs = [job for job in jobs if f"{job.title}|{job.company_name}|{job.location}" not in cached]
return new_jobs, cached
115 changes: 109 additions & 6 deletions immermatch/search_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from __future__ import annotations

import logging
import threading
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
Expand All @@ -16,7 +17,13 @@

from .llm import call_gemini, parse_json
from .models import CandidateProfile, JobListing
from .search_provider import SearchProvider, get_provider
from .search_provider import (
CombinedSearchProvider,
SearchProvider,
format_provider_query,
get_provider,
parse_provider_query,
)

# Re-export SerpApi helpers so existing imports keep working.
from .serpapi_provider import BLOCKED_PORTALS as _BLOCKED_PORTALS # noqa: F401
Expand All @@ -29,6 +36,25 @@
from .serpapi_provider import parse_job_results as _parse_job_results # noqa: F401
from .serpapi_provider import search_jobs # noqa: F401

logger = logging.getLogger(__name__)
_MIN_JOBS_PER_PROVIDER = 30


def _provider_quota_source_key(provider: SearchProvider) -> str:
"""Return a stable source key for per-provider quota accounting."""
source_id = getattr(provider, "source_id", None)
if isinstance(source_id, str) and source_id.strip():
return source_id.strip().lower()
name = getattr(provider, "name", None)
if isinstance(name, str) and name == "Bundesagentur für Arbeit":
return "bundesagentur"
if isinstance(name, str) and "serpapi" in name.lower():
return "serpapi"
if type(provider).__name__ == "SerpApiProvider":
return "serpapi"
return type(provider).__name__.lower()


Comment on lines +45 to +57
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_provider_quota_source_key() returns a key based on provider.source_id or the provider type name, but SerpApiProvider doesn’t define source_id and its JobListing.source values are "serpapi". This means quota_sources will include "serpapiprovider" (or "magicmock" in tests) and quota_met will never be satisfied, causing early-stop/quota logic and the "Provider quota not reached" warning to behave incorrectly. Consider mapping known provider names to their JobListing.source values (e.g., "SerpApi (Google Jobs)" -> "serpapi") or standardizing by adding a required provider.source_id that matches JobListing.source for all providers.

Suggested change
source_id = getattr(provider, "source_id", None)
if isinstance(source_id, str) and source_id.strip():
return source_id.strip().lower()
if getattr(provider, "name", None) == "Bundesagentur für Arbeit":
return "bundesagentur"
return type(provider).__name__.lower()
# Prefer an explicit source_id when provided by the provider implementation.
source_id = getattr(provider, "source_id", None)
if isinstance(source_id, str) and source_id.strip():
return source_id.strip().lower()
# Fall back to well-known provider names that should match JobListing.source.
name = getattr(provider, "name", None)
if isinstance(name, str):
if name == "Bundesagentur für Arbeit":
return "bundesagentur"
# SerpApi provider: align quota key with JobListing.source == "serpapi".
if "serpapi" in name.lower():
return "serpapi"
# As an additional safeguard, handle the SerpApi provider by class name.
cls_name = type(provider).__name__
if cls_name == "SerpApiProvider":
return "serpapi"
# Generic fallback: use the provider class name.
return cls_name.lower()

Copilot uses AI. Check for mistakes.
# System prompt for the Profiler agent
PROFILER_SYSTEM_PROMPT = """You are an expert technical recruiter with deep knowledge of European job markets.
You will be given the raw text of a candidate's CV. Extract a comprehensive profile.
Expand Down Expand Up @@ -197,6 +223,49 @@ def generate_search_queries(
if provider is None:
provider = get_provider(location)

if isinstance(provider, CombinedSearchProvider):
provider_count = len(provider.providers)
if provider_count == 0:
return []

per_provider = num_queries // provider_count
remainder = num_queries % provider_count
merged_queries: list[str] = []

for index, child_provider in enumerate(provider.providers):
child_count = per_provider + (1 if index < remainder else 0)
if child_count <= 0:
continue
child_queries = _generate_search_queries_for_provider(
client,
profile,
location,
child_count,
child_provider,
)
merged_queries.extend([format_provider_query(child_provider.name, query) for query in child_queries])

seen: set[str] = set()
unique_queries: list[str] = []
for query in merged_queries:
if query in seen:
continue
seen.add(query)
unique_queries.append(query)
if len(unique_queries) >= num_queries:
break
return unique_queries
Comment on lines +235 to +257
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generate_search_queries() now prefixes queries with "provider=::" for CombinedSearchProvider, but query caching (ResultCache.save_queries/load_queries) is keyed only by profile_hash+location. If SERPAPI_KEY is toggled between runs, cached provider-targeted queries can be reused with a different provider set (e.g., BA-only), and the UI/search path will silently strip the prefix and run BA searches with SerpApi-oriented queries. Consider including an explicit provider/config fingerprint in the query cache key (or storing provider metadata in queries.json and invalidating when it changes), and centralizing the provider prefix/separator strings instead of duplicating literals here.

Copilot uses AI. Check for mistakes.

return _generate_search_queries_for_provider(client, profile, location, num_queries, provider)


def _generate_search_queries_for_provider(
client: genai.Client,
profile: CandidateProfile,
location: str,
num_queries: int,
provider: SearchProvider,
) -> list[str]:
# Select system prompt based on active provider
if provider.name == "Bundesagentur für Arbeit":
system_prompt = BA_HEADHUNTER_SYSTEM_PROMPT
Expand Down Expand Up @@ -268,30 +337,49 @@ def search_all_queries(
if provider is None:
provider = get_provider(location)

all_jobs: dict[str, JobListing] = {} # Use title+company as key for dedup
quota_sources: set[str] = set()
if isinstance(provider, CombinedSearchProvider):
quota_sources = {_provider_quota_source_key(p) for p in provider.providers}
if quota_sources and min_unique_jobs > 0:
min_unique_jobs = max(min_unique_jobs, _MIN_JOBS_PER_PROVIDER * len(quota_sources))

all_jobs: dict[str, JobListing] = {} # Use title+company+location as key for dedup
source_counts: dict[str, int] = {}
lock = threading.Lock()
completed = 0
early_stop = threading.Event()

def _search_one(query: str) -> list[JobListing]:
if early_stop.is_set():
return []
return provider.search(query, location, max_results=jobs_per_query)
clean_query = query
if not isinstance(provider, CombinedSearchProvider):
_, clean_query = parse_provider_query(query)
return provider.search(clean_query, location, max_results=jobs_per_query)

with ThreadPoolExecutor(max_workers=min(5, max(1, len(queries)))) as executor:
futures = [executor.submit(_search_one, q) for q in queries]
for future in as_completed(futures):
jobs = future.result()
jobs: list[JobListing] = []
try:
jobs = future.result()
except Exception:
logger.exception("A search query failed")
batch_new: list[JobListing] = []
with lock:
for job in jobs:
key = f"{job.title}|{job.company_name}"
key = f"{job.title}|{job.company_name}|{job.location}"
if key not in all_jobs:
all_jobs[key] = job
batch_new.append(job)
source = (job.source or "unknown").lower()
source_counts[source] = source_counts.get(source, 0) + 1
completed += 1
progress_args = (completed, len(queries), len(all_jobs))
if min_unique_jobs and len(all_jobs) >= min_unique_jobs:
quota_met = True
if quota_sources:
quota_met = all(source_counts.get(source, 0) >= _MIN_JOBS_PER_PROVIDER for source in quota_sources)
if min_unique_jobs and len(all_jobs) >= min_unique_jobs and quota_met:
early_stop.set()
# Callbacks outside the lock to avoid blocking other threads
if on_progress is not None:
Expand All @@ -305,4 +393,19 @@ def _search_one(query: str) -> list[JobListing]:
f.cancel()
break

if source_counts:
counts_text = ", ".join(f"{source}={count}" for source, count in sorted(source_counts.items()))
logger.info("Search source counts for location '%s': %s", location or "(none)", counts_text)
if quota_sources:
missing = [
source for source in sorted(quota_sources) if source_counts.get(source, 0) < _MIN_JOBS_PER_PROVIDER
]
if missing:
logger.warning(
"Provider quota not reached for location '%s': %s (required >= %d each)",
location or "(none)",
", ".join(missing),
_MIN_JOBS_PER_PROVIDER,
)

return list(all_jobs.values())
Loading