From f49c40b2a39bf493d2ae76fdfbad577c080edbfe Mon Sep 17 00:00:00 2001 From: Adrian Immer Date: Sun, 1 Mar 2026 04:28:06 +0100 Subject: [PATCH 01/11] feat: add SearchProvider protocol and source field to JobListing - Introduce SearchProvider protocol in search_provider.py with a get_provider() factory for future per-country routing - Add 'source' field to JobListing model to track which provider produced each listing - Update JobListing docstring to be provider-agnostic --- immermatch/models.py | 3 +- immermatch/search_provider.py | 60 +++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 immermatch/search_provider.py diff --git a/immermatch/models.py b/immermatch/models.py index 5fb0d42..2dd44fc 100644 --- a/immermatch/models.py +++ b/immermatch/models.py @@ -67,7 +67,7 @@ class ApplyOption(BaseModel): class JobListing(BaseModel): - """A job listing from SerpApi.""" + """A job listing returned by a search provider.""" title: str company_name: str @@ -75,6 +75,7 @@ class JobListing(BaseModel): description: str = "" link: str = "" posted_at: str = "" + source: str = Field(default="", description="Search provider that produced this listing (e.g. 'bundesagentur')") apply_options: list[ApplyOption] = Field( default_factory=list, description="List of direct application links (LinkedIn, company site, etc.)" ) diff --git a/immermatch/search_provider.py b/immermatch/search_provider.py new file mode 100644 index 0000000..4fa6745 --- /dev/null +++ b/immermatch/search_provider.py @@ -0,0 +1,60 @@ +"""Abstract search-provider interface and provider factory. + +Every job-search backend (Bundesagentur für Arbeit, SerpApi, …) implements +the ``SearchProvider`` protocol so the rest of the pipeline can be +search-engine-agnostic. +""" + +from __future__ import annotations + +import logging +from typing import Protocol, runtime_checkable + +from .models import JobListing + +logger = logging.getLogger(__name__) + + +@runtime_checkable +class SearchProvider(Protocol): + """Pluggable interface for job-search backends. + + Implementations must expose a ``name`` attribute and a ``search`` method + that translate a keyword + location into a list of ``JobListing`` objects. + """ + + name: str + """Human-readable provider name, e.g. ``"Bundesagentur für Arbeit"``.""" + + def search( + self, + query: str, + location: str, + max_results: int = 50, + ) -> list[JobListing]: + """Run a single search and return parsed job listings. + + Args: + query: Free-text keyword (job title, skill, …). + location: Free-text target location (city, region, country). + max_results: Upper bound on results to return. + + Returns: + De-duplicated list of ``JobListing`` objects. + """ + ... + + +def get_provider(location: str = "") -> SearchProvider: # noqa: ARG001 + """Return the appropriate ``SearchProvider`` for *location*. + + Currently always returns the Bundesagentur für Arbeit provider + (Germany-only). This factory is the single extension point for + future per-country routing — e.g. returning ``SerpApiProvider`` + for non-German locations. + """ + # Lazy import so the module can be loaded without pulling in httpx + # when only the protocol is needed (e.g. for type-checking). + from .bundesagentur import BundesagenturProvider # noqa: PLC0415 + + return BundesagenturProvider() From b27a8707b59e9ffc43a03c117f02b5049fd99133 Mon Sep 17 00:00:00 2001 From: Adrian Immer Date: Sun, 1 Mar 2026 04:34:10 +0100 Subject: [PATCH 02/11] feat: implement BundesagenturProvider with full test coverage - BundesagenturProvider calls the free BA REST API (jobsuche.api.bund.dev) - Search endpoint (pc/v4/jobs) with pagination, detail-fetching (pc/v2/jobdetails) in parallel via ThreadPoolExecutor - veroeffentlichtseit=7 default (only recent listings, solves stale jobs) - Retry with exponential backoff on 429/5xx errors - 22 tests covering parsing, pagination, error handling, and protocol conformance --- Improving Job Search API Results.md | 290 +++++++++++++++++++++++++++ immermatch/bundesagentur.py | 283 ++++++++++++++++++++++++++ tests/test_bundesagentur.py | 301 ++++++++++++++++++++++++++++ 3 files changed, 874 insertions(+) create mode 100644 Improving Job Search API Results.md create mode 100644 immermatch/bundesagentur.py create mode 100644 tests/test_bundesagentur.py diff --git a/Improving Job Search API Results.md b/Improving Job Search API Results.md new file mode 100644 index 0000000..62d2523 --- /dev/null +++ b/Improving Job Search API Results.md @@ -0,0 +1,290 @@ +# **Architecting High-Fidelity Job Aggregation Pipelines: Mitigating Expired, Fraudulent, and Misaligned Listings in Recruitment Data** + +## **1\. Introduction to the Problem Space: The Degradation of SERP-Based Job Aggregation** + +The modern digital recruitment landscape is characterized by a high degree of fragmentation, necessitating the use of data aggregation systems to construct centralized, accessible job repositories. Platforms relying on Search Engine Results Pages (SERP) APIs, specifically Google Jobs accessed via proxy services such as SerpAPI, frequently encounter severe data quality degradation. In the context of the "immermatch" project, this degradation severely impacts the user experience and algorithmic matching efficacy. The system currently manifests three primary failure modalities: temporal decay (expired job links), adversarial monetization (routing to paywalled platforms or fraudulent scam networks), and semantic drift (links redirecting to misaligned job titles or entirely different corporate entities). + +The reliance on Google Jobs introduces a fundamental architectural vulnerability. Google Jobs does not function as a primary Applicant Tracking System (ATS); rather, it operates as a secondary aggregator that indexes JobPosting schema markup distributed across the open web.1 Because Google’s ranking algorithms heavily weight domain authority, structural formatting, and Search Engine Optimization (SEO) metrics, third-party job boards, recruitment agencies, and malicious scraping platforms frequently outrank the primary corporate ATS.2 Consequently, an aggregation pipeline fetching data from Google Jobs inherits the noise, duplication, and manipulative routing mechanisms prevalent in the SEO-driven employment market. + +To elevate the fidelity of an aggregation platform like immermatch, the underlying system architecture must transition from passive SERP consumption to active, programmatic validation. This necessitates the implementation of a multi-tiered data governance pipeline capable of pre-flight validation, semantic alignment verification, and direct-source API integration. The following report provides an exhaustive, expert-level analysis of the programmatic solutions required to rectify these data quality issues, with a specific emphasis on global applicability, computational natural language processing (NLP), and the unique regulatory and technological nuances of the German labor market. + +## **2\. The Architectural Mechanics and SEO Vulnerabilities of Google Jobs** + +To engineer effective solutions for the immermatch platform, it is critical to deconstruct why Google Jobs natively produces suboptimal search results. The phenomenon of misaligned links and aggregator dominance is not an anomaly; it is a direct consequence of how search engines process and rank structured data. + +### **2.1. Schema Markup and Indexation Bias** + +When a user searches for job-related keywords, Google Jobs aggregates listings by scraping websites that have implemented the JobPosting structured data schema.1 However, organic ranking factors heavily influence which version of a duplicated job posting is displayed to the end-user. Research into Google Jobs indexation reveals that third-party platforms, such as The Muse or Monster, often outrank the original hiring company's website because they provide a richer schema payload.2 + +Third-party aggregators systematically inject extensive metadata into their JobPosting JSON-LD structures, including specific experience requirements, HTML-formatted descriptions, unique URL structures per position, and highly optimized organization logos.2 Furthermore, large aggregators utilize the Google Indexing API to push real-time updates, whereas smaller corporate ATS platforms wait for passive algorithmic crawling.2 This creates a structural bias where the immermatch system, querying SerpAPI, will predominantly receive links to secondary job boards rather than direct employer applications. + +### **2.2. Exploiting Dynamic and Static Filter Parameters** + +If the immermatch platform must continue to leverage SerpAPI for specific geographic or niche queries, the query construction logic must be heavily optimized using Google's internal filtering syntax. The SerpAPI Google Jobs engine relies on a filtering mechanism separated into dynamic and static values, which are passed via the chips array.4 + +Static filter values apply universally across all search contexts. These include parameters such as the date a job was posted (date\_posted:week, date\_posted:today) or the employment type (employment\_type:FULLTIME, employment\_type:CONTRACTOR).4 Utilizing strict static filters, particularly narrowing the temporal window to date\_posted:3days, can marginally reduce the ingestion of expired ghost jobs, though it does not eliminate the root cause of temporal decay. + +Conversely, dynamic filter values adapt based on the specific search keyword, generating internal Google knowledge graph identifiers for specific corporate entities, represented as organization\_mid.4 For example, targeting Apple requires appending the parameter chips=organization\_mid:/m/0k8z. To mitigate the routing of candidates to incorrect companies (semantic drift), the immermatch aggregation platform must be refactored to extract the organization\_mid from initial broad queries and append it to subsequent deep-dive queries. This forces the Google API to strictly isolate jobs belonging to the cryptographically verified corporate entity, significantly reducing the noise caused by recruitment agencies bidding on identical job titles. + +## **3\. Strategic Alternatives to SERP Aggregation: Direct API Integrations** + +The most definitive and architecturally sound solution to the vulnerabilities inherent in Google Jobs scraping is the bypass of the SERP entirely in favor of direct Application Programming Interface (API) integrations. By sourcing data directly from specialized job data providers or governmental databases, aggregation platforms can guarantee higher data provenance, significantly reduce latency, and entirely eliminate the intermediary layer of SEO-optimized third-party boards that harbor scams and paywalls. + +### **3.1. Evaluating Commercial Job Data API Providers** + +A transition to commercial data feeds allows for the ingestion of structured, normalized data that has already undergone preliminary deduplication and validation. The landscape of job data APIs in 2025 and 2026 presents several highly capable alternatives, each with distinct operational and financial profiles: + +* **Coresignal:** Recognized for its immense historical breadth, Coresignal provides access to over 349 million global job records, heavily enriched by broad LinkedIn coverage.5 The platform excels in providing structured employee data alongside job data, which is highly beneficial for advanced predictive talent analytics.6 However, Coresignal charges a premium (up to 10 times what alternatives charge), which is justified only if the immermatch platform requires multi-source enrichment and recruiter contact data; for strictly raw job listings, it is cost-prohibitive.6 +* **Techmap:** Positioned as a highly scalable and cost-effective alternative to Coresignal, Techmap offers direct company sourcing capabilities across a broader range of countries.5 Its fixed-cost structure and hourly update frequency make it highly viable for platforms requiring high-volume daily ingestions without facing exponential cost scaling.5 +* **LinkUp:** LinkUp distinguishes itself by exclusively indexing jobs directly from primary employer websites, thereby entirely bypassing the third-party job board ecosystem.5 For the immermatch architecture, this design choice natively solves the issue of misaligned links and aggregator paywalls, providing the highest possible fidelity of direct-source data. However, its geographic coverage is less comprehensive than Techmap.5 +* **JobDataAPI and JobsPikr:** At a highly competitive price point of approximately $1 per 1,000 jobs, JobDataAPI provides access to roughly 6.7 million new global job postings per month.5 It supports modern data formats including JSON, Parquet, and JSON-LD, making it highly compatible with big data processing pipelines.6 Similarly, JobsPikr offers customizable crawling solutions tailored for large-scale database ingestion.5 + +| API Provider | Primary Data Source Architecture | Core Strategic Advantage | Pricing Paradigm | Geographic Optimization | +| :---- | :---- | :---- | :---- | :---- | +| **Coresignal** | Multi-source aggregation (LinkedIn heavy) | Vast historical data, employee enrichment integration | Premium / Volume-based | Global | +| **Techmap** | Direct ATS & Company sites | High country coverage, frequent hourly updates | Fixed Subscription | Global | +| **LinkUp** | Strictly Employer ATS websites | Zero aggregator noise, guaranteed direct applications | Enterprise Subscription | Global (US Heavy) | +| **JobDataAPI** | Aggregated web sources | High cost efficiency ($1/1000 jobs), vast format support | Pay-per-use | Global | + +### **3.2. Exploiting Federal Data Architectures: The Bundesagentur für Arbeit API** + +For the immermatch platform, particularly when operating within or targeting the German labor market, leveraging the official digital infrastructure of the German government provides an unparalleled strategic advantage. The Bundesagentur für Arbeit (BA), Germany's Federal Employment Agency, maintains the largest, most tightly regulated, and authoritative job database in the nation.7 + +The BA provides a highly structured RESTful API (jobsuche.api.bund.dev), which allows developers to programmatically query the national registry of open positions.7 Unlike SERP scraping, this data is strictly regulated by federal mandates, drastically reducing the probability of encountering fraudulent listings, ghost jobs, or paywalled entry points. Employers posting on the BA portal undergo verification, ensuring a baseline of corporate legitimacy.8 + +The technical integration requires querying the endpoint https://rest.arbeitsagentur.de/jobboerse/jobsuche-service.7 Authentication is managed via a static client ID header, specifically passing X-API-Key: jobboerse-jobsuche in GET requests.7 The API supports complex, type-safe querying parameters, allowing the immermatch system to filter by geographic location, contract duration (Befristung), employment type (Arbeitszeit), and specific professional classifications utilizing endpoints such as /pc/v4/jobs for standard searches and /ed/v1/arbeitgeberlogo/{hashID} for retrieving corporate branding assets.7 + +To abstract the pagination, rate-limiting, and payload deserialization complexities of the BA API, robust open-source clients have been developed. For Rust environments, the jobsuche crate provides strong typing with enums for all parameters and supports both synchronous and asynchronous operations.10 For Python-centric data pipelines, the de-jobsuche PyPI package provides equivalent wrapper functionality, allowing rapid integration via pip install deutschland\[jobsuche\].12 Furthermore, data science teams utilizing R can leverage the bunddev CRAN package, which provides native functions like jobsuche\_search() and jobsuche\_logo() alongside built-in throttling and caching mechanisms.9 + +By directly integrating with the BA API, the immermatch platform immediately bypasses the SEO-manipulated Google Jobs environment, securing direct links to verified employer portals or official federal application systems. + +## **4\. Navigating the Aggregator Ecosystem and Strategic Domain Blacklisting** + +While transitioning to direct APIs is the optimal long-term strategy, legacy systems relying on SerpAPI must implement intermediate defensive measures. The most immediate method to prevent routing to paywalled aggregators or misaligned search pages is the implementation of stringent Boolean exclusion logic within the search query.14 + +Because Google Jobs aggregates from thousands of domains, many of which are parasitic entities that simply re-host content to harvest applicant data or serve advertisements, these domains must be algorithmically blacklisted at the query level. + +### **4.1. Differentiating Marketplaces from Meta-Aggregators** + +To build an effective exclusion list, the data engineering team must distinguish between primary job marketplaces and meta-aggregators (search engines).15 Job marketplaces, such as StepStone, XING, Monster, and LinkedIn, host primary content; hiring companies pay these platforms directly to host their requisitions.15 While links to these platforms are not as ideal as a direct corporate ATS link, they generally represent valid, actionable job postings. + +Conversely, meta-aggregators function similarly to Google Jobs itself—they scrape the internet for job ads and syndicate content from other boards.15 When a candidate clicks a Google Jobs link that routes to a meta-aggregator, they are dumped into a secondary search interface rather than a targeted job application page, resulting in a severely degraded user experience.8 + +### **4.2. Implementing Exclusionary Query Logic** + +Implementing search operators such as the uppercase NOT (e.g., software engineer NOT jobot NOT bravado) forces the SERP engine to drop listings originating from known spam networks or meta-aggregators before the payload is ever returned via SerpAPI.14 This logic must be applied dynamically, backed by an internal database of known adversarial domains. + +For the German market specifically, the immermatch pipeline must implement permanent exclusion filters against meta-aggregators that loop traffic without providing direct ATS links. Domains that should be rigorously blacklisted from the search parameters include: + +* **Jobrapido:** A high-traffic search engine that scrapes primary boards, frequently resulting in infinite redirect loops for the end-user.15 +* **Kimeta:** Functions entirely as a crawling technology, rarely hosting primary application infrastructure.15 +* **Jooble, Adzuna, and Talent.com:** Global aggregators that syndicate content, heavily diluting the provenance of the original job posting.18 +* **Zuhausejobs.com:** Often cited as a lower-tier platform prone to hosting generic or aggregated remote listings that lack strict verification.8 + +By injecting an exclusion array (e.g., NOT site:jobrapido.de NOT site:kimeta.de) into every SerpAPI request payload, the immermatch system can artificially elevate the quality of the returned SERP data, forcing Google to surface primary ATS links or verified marketplaces like StepStone and Arbeitnow.17 + +## **5\. Mitigating Temporal Decay: The Engineering of Expired Link Detection** + +The phenomenon of "ghost jobs"—listings that remain active on the internet despite the position being filled, the hiring initiative being canceled, or the requisition being a facade to collect resumes—is a systemic issue in digital recruitment.20 Google Jobs indexation algorithms inherently lag behind the real-time state of corporate ATS databases.1 Consequently, the SerpAPI payload will inevitably contain links that are temporally decayed. To maintain platform integrity, the immermatch architecture must implement an asynchronous, multi-stage URL validation pipeline. + +### **5.1. HTTP Protocol Analysis and Redirect Tracing** + +The foundational layer of expired job detection relies on automated HTTP status code analysis. This can be achieved using libraries such as Python's native urllib, the popular requests library, or asynchronous equivalents like aiohttp for high-throughput concurrency.22 When a requisition is removed from a corporate ATS, the server rarely serves a standard HTTP 200 OK response containing the original content. Instead, the server behavior typically falls into one of three distinct paradigms: + +1. **Hard Deletion (HTTP 404 / 410):** The ATS returns a 404 Not Found or a 410 Gone status code.25 This is the most deterministic indicator of expiration. A simple Python script executing a HEAD request (which is significantly faster than a GET request as it omits the response body) can instantly flag these URLs, permitting immediate purging of the listing from the immermatch database.23 +2. **Access Revocation (HTTP 403 / 401):** Less commonly, the system may return a 403 Forbidden or 401 Unauthorized error, indicating that the listing has been transitioned from the public career site to an internal, authenticated tier accessible only to current employees.25 +3. **Soft Redirection (HTTP 301 / 302 / 307):** To preserve SEO authority and prevent users from encountering dead pages, many enterprise platforms implement a 301 Moved Permanently or a 302/307 Temporary Redirect.25 Instead of showing an error, the ATS redirects the user to the company’s main career homepage or a generic search interface. + +To programmatically identify these soft redirections, the validation script must execute requests with automatic redirection tracking explicitly disabled (e.g., setting allow\_redirects=False in the Python requests.get() method).27 By capturing the Location header in the 3xx response, the system can mathematically compare the destination URL against the original target. If the Uniform Resource Identifier (URI) path depth decreases significantly—for example, redirecting from a highly specific path like company.com/careers/engineering/req-1234 to a generic company.com/careers—the system can reliably infer that the specific requisition has been terminated and flag the job as expired.22 + +### **5.2. DOM Parsing and Lexical Heuristics for "Zombie" Pages** + +The most complex expiration scenario occurs when an ATS returns a valid HTTP 200 OK status code but dynamically replaces the job description with a generic expiration message.25 These "zombie" pages completely bypass HTTP-level status code detection. For example, enterprise systems like Oracle Taleo will frequently maintain the active URL structure but inject the text "Job is no longer available" or "This position has been filled" into the application container.28 + +Addressing this requires a secondary validation layer utilizing headless browser automation paired with robust HTML parsing frameworks.29 Because modern ATS platforms heavily utilize single-page application (SPA) frameworks like React, Angular, or Vue.js, the actual Document Object Model (DOM) content is rendered client-side via JavaScript.30 Attempting to scrape these pages using standard HTTP GET requests paired with BeautifulSoup will fail, returning only an empty HTML shell or loading scripts.30 + +To overcome JavaScript rendering, the validation pipeline must instantiate a headless browser. While legacy options like Scrapy coupled with Splash or Selenium exist, modern architectures favor Playwright or Pyppeteer due to their superior performance, native asynchronous support, and modern JavaScript engine compatibility.30 Tools like Crawl4AI can also be leveraged for extracting structured data from live, dynamic web pages without the overhead of manually managing browser contexts.32 + +A Playwright-based pipeline must instantiate a Chromium instance, navigate to the target URL, await network idle states to ensure all asynchronous API calls within the ATS have resolved, and then extract the fully rendered textual payload.30 Subsequently, a lexical analysis engine must scan the extracted text for predefined semantic markers of expiration. An array of regular expression (Regex) patterns targeting phrases such as (?i)(no longer available|position closed|role filled|not accepting applications) must be executed against the DOM text.28 If a match exceeds a predefined confidence threshold, the job is classified as a zombie page and removed from the immermatch index. + +| Expiration Paradigm | Technical Indicator | Required Validation Protocol | Computational Cost | +| :---- | :---- | :---- | :---- | +| **Hard Deletion** | HTTP 404 / 410 | Asynchronous HTTP HEAD/GET request | Low | +| **Soft Redirection** | HTTP 301 / 302 / 307 | HTTP Request with allow\_redirects=False | Low | +| **Zombie Page (Static)** | HTTP 200 \+ Expiration Text | BeautifulSoup DOM Parsing \+ Regex | Medium | +| **Zombie Page (Dynamic)** | HTTP 200 \+ JS Rendered Text | Playwright / Crawl4AI \+ Regex | High | + +## **6\. Eradicating Adversarial Monetization: Scam and Paywall Defense Architectures** + +The proliferation of fraudulent job listings and paywalled aggregator traps represents a critical threat to user trust and platform viability. By 2025 and 2026, scammers have evolved beyond simple phishing emails, utilizing sophisticated techniques including AI-generated job descriptions, cloned corporate sites, and "task scams" to harvest personally identifiable information (PII) or extort upfront fees from candidates.33 Simultaneously, parasitic job boards institute strict paywalls, demanding subscription fees for access to listings that are freely available on primary corporate sites.37 The immermatch pipeline must implement autonomous defense mechanisms against these dual adversarial vectors. + +### **6.1. Programmatic Detection of Paywalls via Semantic Web Standards** + +The most elegant and deterministic method for detecting paywalls without requiring complex, site-specific web scraping relies on the semantic web standards established by Schema.org, specifically the application/ld+json structured data specifications.39 To comply with Google’s strict indexing guidelines and prevent algorithmic penalties for cloaking (where content served to Googlebot differs from content served to users), websites implementing paywalls are required to explicitly declare the hidden nature of their content.42 + +This declaration is achieved using the isAccessibleForFree property within their JSON-LD payload.39 An example implementation provided by search engine guidelines mandates the inclusion of this boolean flag, alongside a hasPart array that explicitly maps CSS selectors (e.g., .meteredContent, .paywall) to the gated content regions.39 + +A Python-based extraction module can be deployed to intercept and evaluate this metadata. Using the requests library to fetch the HTML document and BeautifulSoup to parse the DOM, the pipeline isolates all \', + re.DOTALL, +) + -def _build_ba_link(hash_id: str) -> str: +def _build_ba_link(refnr: str) -> str: """Construct the public Arbeitsagentur URL for a listing.""" - return f"https://www.arbeitsagentur.de/jobsuche/suche?id={hash_id}" + return f"https://www.arbeitsagentur.de/jobsuche/jobdetail/{refnr}" def _parse_location(arbeitsort: dict) -> str: @@ -53,101 +68,125 @@ def _parse_location(arbeitsort: dict) -> str: return ", ".join(parts) if parts else "Germany" -def _parse_search_results(data: dict) -> list[_JobStub]: - """Parse the search endpoint response into lightweight stubs.""" - stubs: list[_JobStub] = [] - for item in data.get("stellenangebote", []): - hash_id = item.get("hashId", "") - if not hash_id: - continue - arbeitsort = item.get("arbeitsort", {}) - stubs.append( - _JobStub( - hash_id=hash_id, - title=item.get("beruf", item.get("titel", "Unknown")), - company_name=item.get("arbeitgeber", "Unknown"), - location=_parse_location(arbeitsort), - posted_at=item.get("aktuelleVeroeffentlichungsdatum", ""), - refnr=item.get("refnr", ""), - ) - ) - return stubs - - -class _JobStub: - """Minimal data from the search endpoint before detail-fetching.""" - - __slots__ = ("hash_id", "title", "company_name", "location", "posted_at", "refnr") +def _clean_html(raw: str) -> str: + """Strip HTML tags and decode entities, collapse whitespace.""" + text = html_mod.unescape(raw) + text = re.sub(r"<[^>]+>", " ", text) + return re.sub(r"\s+", " ", text).strip() - def __init__( - self, - hash_id: str, - title: str, - company_name: str, - location: str, - posted_at: str, - refnr: str, - ) -> None: - self.hash_id = hash_id - self.title = title - self.company_name = company_name - self.location = location - self.posted_at = posted_at - self.refnr = refnr +# ------------------------------------------------------------------ +# Detail page scraping +# ------------------------------------------------------------------ -def _fetch_job_details(client: httpx.Client, hash_id: str) -> dict: - """Fetch full job details for a single listing (with retry).""" - url = f"{_BASE_URL}/pc/v2/jobdetails/{hash_id}" + +def _fetch_detail(client: httpx.Client, refnr: str) -> dict: + """Fetch the public detail page and extract the ng-state JSON. + + Returns the ``jobdetail`` dict on success, or ``{}`` on any failure. + """ + url = _build_ba_link(refnr) last_exc: Exception | None = None for attempt in range(_MAX_RETRIES): try: resp = client.get(url) if resp.status_code == 200: - return resp.json() # type: ignore[no-any-return] + match = _NG_STATE_RE.search(resp.text) + if match: + state = json.loads(match.group(1)) + return state.get("jobdetail", {}) # type: ignore[no-any-return] + logger.debug("BA detail %s: ng-state not found in HTML", refnr) + return {} if resp.status_code in {429, 500, 502, 503}: delay = _BASE_DELAY * (2**attempt) - logger.warning("BA detail %s returned %s, retrying in %ss", hash_id, resp.status_code, delay) + logger.warning( + "BA detail page %s returned %s, retrying in %ss", + refnr, + resp.status_code, + delay, + ) time.sleep(delay) continue - # 404 or other client error → give up immediately - logger.debug("BA detail %s returned %s, skipping", hash_id, resp.status_code) + logger.debug("BA detail page %s returned %s, skipping", refnr, resp.status_code) return {} except httpx.HTTPError as exc: last_exc = exc delay = _BASE_DELAY * (2**attempt) - logger.warning("BA detail %s network error: %s, retrying in %ss", hash_id, exc, delay) + logger.warning("BA detail %s network error: %s, retrying in %ss", refnr, exc, delay) time.sleep(delay) if last_exc: - logger.error("BA detail %s failed after %d retries: %s", hash_id, _MAX_RETRIES, last_exc) + logger.error("BA detail %s failed after %d retries: %s", refnr, _MAX_RETRIES, last_exc) return {} -def _stub_to_listing(stub: _JobStub, details: dict) -> JobListing: - """Merge a search stub with its full details into a ``JobListing``.""" - description = details.get("stellenbeschreibung", "") - link = _build_ba_link(stub.hash_id) +# ------------------------------------------------------------------ +# Search result parsing +# ------------------------------------------------------------------ - apply_options = [ApplyOption(source="Arbeitsagentur", url=link)] - if external_url := details.get("allianzPartnerUrl"): - apply_options.append(ApplyOption(source="Company Website", url=external_url)) - # Prefer the more specific title from details when available - title = details.get("titel", stub.title) or stub.title - company = details.get("arbeitgeber", stub.company_name) or stub.company_name +def _parse_listing(item: dict, detail: dict | None = None) -> JobListing | None: + """Convert a search-result item (+ optional detail) into a :class:`JobListing`. + + Returns ``None`` when the item lacks a ``refnr`` (the unique job ID). + """ + refnr = item.get("refnr", "") + if not refnr: + return None + + arbeitsort = item.get("arbeitsort", {}) + link = _build_ba_link(refnr) + + titel = item.get("titel", "") + beruf = item.get("beruf", "") + arbeitgeber = item.get("arbeitgeber", "") + ort = _parse_location(arbeitsort) + + # Prefer the rich description from the detail page when available. + description = "" + if detail: + raw_desc = detail.get("stellenangebotsBeschreibung", "") + if raw_desc: + description = _clean_html(raw_desc) + + # Fallback: build a minimal description from search fields. + if not description: + parts: list[str] = [] + if beruf and beruf != titel: + parts.append(f"Beruf: {beruf}") + if arbeitgeber: + parts.append(f"Arbeitgeber: {arbeitgeber}") + if ort: + parts.append(f"Standort: {ort}") + description = "\n".join(parts) + + # Build apply options — always include the Arbeitsagentur page link, + # plus an external career-site link when available in the detail data. + apply_options = [ApplyOption(source="Arbeitsagentur", url=link)] + if detail: + ext_url = detail.get("allianzpartnerUrl", "") + if ext_url: + if not ext_url.startswith("http"): + ext_url = f"https://{ext_url}" + ext_name = detail.get("allianzpartnerName", "Company Website") + apply_options.append(ApplyOption(source=ext_name, url=ext_url)) return JobListing( - title=title, - company_name=company, - location=stub.location, + title=titel or beruf or "Unknown", + company_name=arbeitgeber or "Unknown", + location=ort, description=description, link=link, - posted_at=stub.posted_at, + posted_at=item.get("aktuelleVeroeffentlichungsdatum", ""), source="bundesagentur", apply_options=apply_options, ) +def _parse_search_results(data: dict) -> list[dict]: + """Return the raw search-result items (dicts) that have a ``refnr``.""" + return [item for item in data.get("stellenangebote", []) if item.get("refnr")] + + class BundesagenturProvider: """Job-search provider backed by the Bundesagentur für Arbeit API. @@ -159,7 +198,7 @@ class BundesagenturProvider: def __init__( self, days_published: int = _DEFAULT_DAYS_PUBLISHED, - detail_workers: int = 10, + detail_workers: int = 5, ) -> None: self._days_published = days_published self._detail_workers = detail_workers @@ -182,31 +221,36 @@ def search( max_results: Upper bound on total results. Returns: - List of ``JobListing`` objects with descriptions fetched from - the detail endpoint. + List of ``JobListing`` objects. When possible, descriptions + are scraped from the public detail pages; otherwise a minimal + fallback description is built from the search data. """ - stubs = self._search_stubs(query, location, max_results) - if not stubs: + if not query or not query.strip(): + logger.debug("Skipping BA search: empty query") + return [] + + items = self._search_items(query, location, max_results) + if not items: return [] - return self._enrich_stubs(stubs) + return self._enrich(items) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ - def _search_stubs( + def _search_items( self, query: str, location: str, max_results: int, - ) -> list[_JobStub]: - """Paginate through the search endpoint and collect stubs.""" + ) -> list[dict]: + """Paginate through the search endpoint and collect raw items.""" page_size = min(max_results, 50) # BA allows up to 100, 50 is safe - stubs: list[_JobStub] = [] - page = 0 + items: list[dict] = [] + page = 1 # BA API pages are 1-indexed with httpx.Client(headers=_DEFAULT_HEADERS, timeout=30) as client: - while len(stubs) < max_results: + while len(items) < max_results: params: dict[str, str | int] = { "was": query, "size": page_size, @@ -222,35 +266,47 @@ def _search_stubs( break data = resp.json() - page_stubs = _parse_search_results(data) - if not page_stubs: + page_items = _parse_search_results(data) + if not page_items: break - stubs.extend(page_stubs) + items.extend(page_items) total = int(data.get("maxErgebnisse", 0)) - if len(stubs) >= total or len(stubs) >= max_results: + if len(items) >= total or len(items) >= max_results: break page += 1 - return stubs[:max_results] - - def _enrich_stubs(self, stubs: list[_JobStub]) -> list[JobListing]: - """Batch-fetch full details and convert stubs to listings.""" - listings: list[JobListing] = [] - - with httpx.Client(headers=_DEFAULT_HEADERS, timeout=30) as client: + return items[:max_results] + + def _enrich(self, items: list[dict]) -> list[JobListing]: + """Fetch detail pages in parallel and build ``JobListing`` objects.""" + # Map refnr → detail dict (fetched in parallel). + details: dict[str, dict] = {} + with httpx.Client( + timeout=30, + headers={ + "User-Agent": "Mozilla/5.0 (compatible; Immermatch/1.0)", + "Accept": "text/html", + }, + follow_redirects=True, + ) as client: with ThreadPoolExecutor(max_workers=self._detail_workers) as pool: - future_to_stub = {pool.submit(_fetch_job_details, client, stub.hash_id): stub for stub in stubs} - for future in as_completed(future_to_stub): - stub = future_to_stub[future] + future_to_refnr = {pool.submit(_fetch_detail, client, item["refnr"]): item["refnr"] for item in items} + for future in as_completed(future_to_refnr): + refnr = future_to_refnr[future] try: - details = future.result() + details[refnr] = future.result() except Exception: - logger.exception("Failed to fetch details for %s", stub.hash_id) - details = {} - listings.append(_stub_to_listing(stub, details)) + logger.exception("Failed to fetch detail for %s", refnr) + details[refnr] = {} + listings: list[JobListing] = [] + for item in items: + refnr = item["refnr"] + listing = _parse_listing(item, detail=details.get(refnr)) + if listing is not None: + listings.append(listing) return listings @staticmethod diff --git a/tests/test_bundesagentur.py b/tests/test_bundesagentur.py index b88fd9d..c2849bb 100644 --- a/tests/test_bundesagentur.py +++ b/tests/test_bundesagentur.py @@ -2,15 +2,19 @@ from __future__ import annotations +import json from unittest.mock import MagicMock, patch +import httpx + from immermatch.bundesagentur import ( BundesagenturProvider, _build_ba_link, - _JobStub, + _clean_html, + _fetch_detail, + _parse_listing, _parse_location, _parse_search_results, - _stub_to_listing, ) # --------------------------------------------------------------------------- @@ -19,18 +23,18 @@ def _make_stellenangebot( - hash_id: str = "abc123", + refnr: str = "10000-1234567890-S", + titel: str = "Python Entwickler (m/w/d)", beruf: str = "Python Entwickler", arbeitgeber: str = "ACME GmbH", ort: str = "Berlin", region: str = "Berlin", land: str = "Deutschland", - refnr: str = "10000-1234567890-S", posted: str = "2026-02-25", ) -> dict: return { - "hashId": hash_id, "beruf": beruf, + "titel": titel, "arbeitgeber": arbeitgeber, "refnr": refnr, "aktuelleVeroeffentlichungsdatum": posted, @@ -46,24 +50,27 @@ def _make_search_response( return { "stellenangebote": items, "maxErgebnisse": str(total if total is not None else len(items)), - "page": "0", + "page": "1", "size": "50", } -def _make_detail_response( - stellenbeschreibung: str = "Full job description here.", - titel: str = "Python Entwickler (m/w/d)", - arbeitgeber: str = "ACME GmbH", - allianz_url: str | None = None, +def _make_ng_state_html(jobdetail: dict) -> str: + """Wrap a jobdetail dict in the Angular SSR ng-state script tag.""" + state = {"jobdetail": jobdetail} + return f'' + + +def _make_detail( + description: str = "Great job & benefits", + partner_url: str = "", + partner_name: str = "", ) -> dict: - d: dict = { - "stellenbeschreibung": stellenbeschreibung, - "titel": titel, - "arbeitgeber": arbeitgeber, - } - if allianz_url: - d["allianzPartnerUrl"] = allianz_url + d: dict = {"stellenangebotsBeschreibung": description} + if partner_url: + d["allianzpartnerUrl"] = partner_url + if partner_name: + d["allianzpartnerName"] = partner_name return d @@ -74,7 +81,7 @@ def _make_detail_response( class TestBuildBaLink: def test_simple(self) -> None: - assert _build_ba_link("abc123") == "https://www.arbeitsagentur.de/jobsuche/suche?id=abc123" + assert _build_ba_link("10000-123-S") == "https://www.arbeitsagentur.de/jobsuche/jobdetail/10000-123-S" class TestParseLocation: @@ -93,21 +100,118 @@ def test_city_only(self) -> None: assert _parse_location({"ort": "Hamburg"}) == "Hamburg" +class TestCleanHtml: + def test_strips_tags(self) -> None: + assert _clean_html("bold text") == "bold text" + + def test_decodes_entities(self) -> None: + # & → & (plain entities are decoded) + assert _clean_html("AT&T rocks") == "AT&T rocks" + + def test_collapses_whitespace(self) -> None: + assert _clean_html("a b\n\nc") == "a b c" + + def test_combined(self) -> None: + assert _clean_html("

Hello & world


ok") == "Hello & world ok" + + def test_empty_string(self) -> None: + assert _clean_html("") == "" + + +class TestParseListing: + def test_valid_item(self) -> None: + item = _make_stellenangebot(refnr="REF1", titel="Dev (m/w/d)", beruf="Entwickler", arbeitgeber="Co") + listing = _parse_listing(item) + assert listing is not None + assert listing.title == "Dev (m/w/d)" + assert listing.company_name == "Co" + assert listing.source == "bundesagentur" + assert "REF1" in listing.link + assert len(listing.apply_options) == 1 + assert listing.apply_options[0].source == "Arbeitsagentur" + + def test_description_includes_beruf_when_different_from_title(self) -> None: + item = _make_stellenangebot(titel="Senior Dev", beruf="Softwareentwickler") + listing = _parse_listing(item) + assert listing is not None + assert "Beruf: Softwareentwickler" in listing.description + + def test_description_omits_beruf_when_equal_to_title(self) -> None: + item = _make_stellenangebot(titel="Python Dev", beruf="Python Dev") + listing = _parse_listing(item) + assert listing is not None + assert "Beruf:" not in listing.description + + def test_missing_refnr_returns_none(self) -> None: + item = {"beruf": "Dev", "arbeitgeber": "Co", "arbeitsort": {}} + assert _parse_listing(item) is None + + def test_fallback_title_from_beruf(self) -> None: + item = _make_stellenangebot(titel="", beruf="QA Engineer") + listing = _parse_listing(item) + assert listing is not None + assert listing.title == "QA Engineer" + + def test_with_detail_description(self) -> None: + item = _make_stellenangebot(refnr="REF1") + detail = _make_detail(description="

Full desc

& more") + listing = _parse_listing(item, detail=detail) + assert listing is not None + assert listing.description == "Full desc & more" + + def test_with_detail_empty_description_falls_back(self) -> None: + item = _make_stellenangebot(refnr="REF1", beruf="QA", arbeitgeber="Corp") + detail = {"stellenangebotsBeschreibung": ""} + listing = _parse_listing(item, detail=detail) + assert listing is not None + # Falls back to search-field description + assert "Arbeitgeber: Corp" in listing.description + + def test_with_detail_external_apply_url(self) -> None: + item = _make_stellenangebot(refnr="REF1") + detail = _make_detail(partner_url="https://careers.acme.com/apply", partner_name="ACME Careers") + listing = _parse_listing(item, detail=detail) + assert listing is not None + assert len(listing.apply_options) == 2 + assert listing.apply_options[1].source == "ACME Careers" + assert listing.apply_options[1].url == "https://careers.acme.com/apply" + + def test_with_detail_external_url_adds_https_prefix(self) -> None: + item = _make_stellenangebot(refnr="REF1") + detail = _make_detail(partner_url="careers.acme.com") + listing = _parse_listing(item, detail=detail) + assert listing is not None + assert listing.apply_options[1].url == "https://careers.acme.com" + + def test_with_detail_external_url_default_name(self) -> None: + item = _make_stellenangebot(refnr="REF1") + detail = _make_detail(partner_url="https://example.com") + listing = _parse_listing(item, detail=detail) + assert listing is not None + assert listing.apply_options[1].source == "Company Website" + + def test_with_no_detail(self) -> None: + item = _make_stellenangebot(refnr="REF1") + listing = _parse_listing(item, detail=None) + assert listing is not None + assert len(listing.apply_options) == 1 # Only Arbeitsagentur + + class TestParseSearchResults: def test_valid_items(self) -> None: data = _make_search_response( [ - _make_stellenangebot(hash_id="a1", beruf="Dev", arbeitgeber="Co"), - _make_stellenangebot(hash_id="a2", beruf="QA", arbeitgeber="Co2"), + _make_stellenangebot(refnr="r1", beruf="Dev", arbeitgeber="Co"), + _make_stellenangebot(refnr="r2", beruf="QA", arbeitgeber="Co2"), ] ) - stubs = _parse_search_results(data) - assert len(stubs) == 2 - assert stubs[0].hash_id == "a1" - assert stubs[0].title == "Dev" - assert stubs[1].hash_id == "a2" + results = _parse_search_results(data) + assert len(results) == 2 + # Returns raw dicts, not JobListing objects + assert results[0]["arbeitgeber"] == "Co" + assert results[1]["arbeitgeber"] == "Co2" - def test_skips_missing_hash(self) -> None: + def test_skips_missing_refnr(self) -> None: data = {"stellenangebote": [{"beruf": "Dev"}]} assert _parse_search_results(data) == [] @@ -116,45 +220,78 @@ def test_empty_response(self) -> None: assert _parse_search_results({"stellenangebote": []}) == [] -class TestStubToListing: - def test_basic_merge(self) -> None: - stub = _JobStub( - hash_id="abc", - title="Dev", - company_name="Co", - location="Berlin", - posted_at="2026-02-25", - refnr="REF", - ) - details = _make_detail_response( - stellenbeschreibung="Desc", - titel="Developer (m/w/d)", - arbeitgeber="Co", - allianz_url="https://company.de/apply", - ) - listing = _stub_to_listing(stub, details) +# =========================================================================== +# Tests for _fetch_detail +# =========================================================================== - assert listing.title == "Developer (m/w/d)" - assert listing.company_name == "Co" # prefers detail value - assert listing.description == "Desc" - assert listing.source == "bundesagentur" - assert listing.link == _build_ba_link("abc") - assert len(listing.apply_options) == 2 - assert listing.apply_options[0].source == "Arbeitsagentur" - assert listing.apply_options[1].source == "Company Website" - assert listing.apply_options[1].url == "https://company.de/apply" - def test_no_external_url(self) -> None: - stub = _JobStub("h1", "T", "C", "Loc", "2026-01-01", "R") - listing = _stub_to_listing(stub, _make_detail_response()) - assert len(listing.apply_options) == 1 +class TestFetchDetail: + def test_extracts_ng_state(self) -> None: + detail = {"stellenangebotsBeschreibung": "

Hello

", "firma": "ACME"} + html = _make_ng_state_html(detail) + + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.text = html + client = MagicMock(spec=httpx.Client) + client.get.return_value = mock_resp + + result = _fetch_detail(client, "REF-123") + assert result == detail + + def test_missing_ng_state_returns_empty(self) -> None: + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.text = "No state here" + client = MagicMock(spec=httpx.Client) + client.get.return_value = mock_resp + + assert _fetch_detail(client, "REF-123") == {} + + def test_non_200_returns_empty(self) -> None: + mock_resp = MagicMock() + mock_resp.status_code = 404 + client = MagicMock(spec=httpx.Client) + client.get.return_value = mock_resp + + assert _fetch_detail(client, "REF-123") == {} + + def test_retries_on_server_error(self) -> None: + error_resp = MagicMock() + error_resp.status_code = 503 - def test_empty_details_fallback(self) -> None: - stub = _JobStub("h1", "Title", "Company", "Loc", "2026-01-01", "R") - listing = _stub_to_listing(stub, {}) - assert listing.title == "Title" - assert listing.company_name == "Company" - assert listing.description == "" + detail = {"stellenangebotsBeschreibung": "ok"} + ok_resp = MagicMock() + ok_resp.status_code = 200 + ok_resp.text = _make_ng_state_html(detail) + + client = MagicMock(spec=httpx.Client) + client.get.side_effect = [error_resp, ok_resp] + + with patch("immermatch.bundesagentur.time.sleep"): + result = _fetch_detail(client, "REF-123") + assert result == detail + + def test_retries_on_network_error(self) -> None: + detail = {"stellenangebotsBeschreibung": "recovered"} + ok_resp = MagicMock() + ok_resp.status_code = 200 + ok_resp.text = _make_ng_state_html(detail) + + client = MagicMock(spec=httpx.Client) + client.get.side_effect = [httpx.ConnectError("timeout"), ok_resp] + + with patch("immermatch.bundesagentur.time.sleep"): + result = _fetch_detail(client, "REF-123") + assert result == detail + + def test_all_retries_fail(self) -> None: + client = MagicMock(spec=httpx.Client) + client.get.side_effect = httpx.ConnectError("down") + + with patch("immermatch.bundesagentur.time.sleep"): + result = _fetch_detail(client, "REF-123") + assert result == {} # =========================================================================== @@ -163,44 +300,64 @@ def test_empty_details_fallback(self) -> None: class TestBundesagenturProviderSearch: - """Test the full search → enrich pipeline with mocked internals.""" + """Test the full search pipeline with mocked HTTP.""" def test_search_returns_listings(self) -> None: - stubs = [ - _JobStub("h1", "Dev A", "Co A", "Berlin", "2026-02-25", "REF1"), - _JobStub("h2", "Dev B", "Co B", "München", "2026-02-24", "REF2"), + items = [ + _make_stellenangebot(refnr="r1", titel="Dev A", arbeitgeber="Co A"), + _make_stellenangebot(refnr="r2", titel="Dev B", arbeitgeber="Co B"), ] - detail = _make_detail_response(stellenbeschreibung="Full desc") + resp_data = _make_search_response(items, total=2) + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = resp_data provider = BundesagenturProvider(days_published=7) with ( - patch.object(provider, "_search_stubs", return_value=stubs), - patch("immermatch.bundesagentur._fetch_job_details", return_value=detail), + patch.object(provider, "_get_with_retry", return_value=mock_resp), + patch.object(provider, "_enrich", side_effect=lambda it: [_parse_listing(i) for i in it]), ): jobs = provider.search("Python", "Berlin", max_results=10) assert len(jobs) == 2 assert all(j.source == "bundesagentur" for j in jobs) - assert all(j.description == "Full desc" for j in jobs) + assert jobs[0].title == "Dev A" + assert jobs[1].title == "Dev B" def test_search_empty_results(self) -> None: + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = _make_search_response([], total=0) + provider = BundesagenturProvider() - with patch.object(provider, "_search_stubs", return_value=[]): + with ( + patch.object(provider, "_get_with_retry", return_value=mock_resp), + patch.object(provider, "_enrich", side_effect=lambda it: [_parse_listing(i) for i in it]), + ): jobs = provider.search("Niche Job", "Berlin") assert jobs == [] + def test_search_empty_query_returns_empty(self) -> None: + """Empty or whitespace-only queries are rejected before hitting the API.""" + provider = BundesagenturProvider() + assert provider.search("", "Berlin") == [] + assert provider.search(" ", "Berlin") == [] + def test_search_respects_max_results(self) -> None: - """max_results is enforced via _search_stubs truncation.""" - stubs = [_JobStub(f"h{i}", "Dev", "Co", "Berlin", "2026-01-01", f"R{i}") for i in range(3)] - detail = _make_detail_response() + resp_data = _make_search_response( + [_make_stellenangebot(refnr=f"r{i}") for i in range(5)], + total=5, + ) + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = resp_data provider = BundesagenturProvider() with ( - patch.object(provider, "_search_stubs", return_value=stubs) as mock_stubs, - patch("immermatch.bundesagentur._fetch_job_details", return_value=detail), + patch.object(provider, "_get_with_retry", return_value=mock_resp), + patch.object(provider, "_enrich", side_effect=lambda it: [_parse_listing(i) for i in it]), ): jobs = provider.search("Dev", "Berlin", max_results=3) - mock_stubs.assert_called_once_with("Dev", "Berlin", 3) assert len(jobs) == 3 @@ -214,11 +371,11 @@ def test_veroeffentlichtseit_custom(self) -> None: class TestBundesagenturProviderPagination: - """Test the search-stub pagination logic with mocked HTTP.""" + """Test pagination logic via _search_items.""" def test_single_page(self) -> None: resp_data = _make_search_response( - [_make_stellenangebot(hash_id=f"h{i}") for i in range(5)], + [_make_stellenangebot(refnr=f"r{i}") for i in range(5)], total=5, ) mock_resp = MagicMock() @@ -226,19 +383,21 @@ def test_single_page(self) -> None: mock_resp.json.return_value = resp_data provider = BundesagenturProvider() - with patch.object(provider, "_get_with_retry", return_value=mock_resp): - with patch("immermatch.bundesagentur.httpx.Client"): - stubs = provider._search_stubs("Dev", "Berlin", max_results=50) + with ( + patch.object(provider, "_get_with_retry", return_value=mock_resp), + patch("immermatch.bundesagentur.httpx.Client"), + ): + items = provider._search_items("Dev", "Berlin", max_results=50) - assert len(stubs) == 5 + assert len(items) == 5 def test_multi_page(self) -> None: - page_0 = _make_search_response( - [_make_stellenangebot(hash_id=f"p0_{i}") for i in range(50)], + page_1 = _make_search_response( + [_make_stellenangebot(refnr=f"p1_{i}") for i in range(50)], total=60, ) - page_1 = _make_search_response( - [_make_stellenangebot(hash_id=f"p1_{i}") for i in range(10)], + page_2 = _make_search_response( + [_make_stellenangebot(refnr=f"p2_{i}") for i in range(10)], total=60, ) call_count = 0 @@ -248,43 +407,80 @@ def mock_get(client, url, params): call_count += 1 mock_resp = MagicMock() mock_resp.status_code = 200 - mock_resp.json.return_value = page_1 if params.get("page", 0) >= 1 else page_0 + mock_resp.json.return_value = page_2 if params.get("page", 1) >= 2 else page_1 return mock_resp provider = BundesagenturProvider() - with patch.object(provider, "_get_with_retry", side_effect=mock_get): - with patch("immermatch.bundesagentur.httpx.Client"): - stubs = provider._search_stubs("Dev", "Berlin", max_results=100) + with ( + patch.object(provider, "_get_with_retry", side_effect=mock_get), + patch("immermatch.bundesagentur.httpx.Client"), + ): + items = provider._search_items("Dev", "Berlin", max_results=100) - assert len(stubs) == 60 + assert len(items) == 60 assert call_count == 2 class TestBundesagenturProviderErrors: """Test error handling in the provider.""" - def test_search_server_error_returns_empty(self) -> None: + def test_search_items_server_error_returns_empty(self) -> None: """A persistent failure from the search endpoint returns an empty list.""" provider = BundesagenturProvider() - with patch.object(provider, "_get_with_retry", return_value=None): + with ( + patch.object(provider, "_get_with_retry", return_value=None), + patch("immermatch.bundesagentur.httpx.Client"), + ): + items = provider._search_items("Dev", "Berlin", max_results=50) + assert items == [] + + +class TestEnrich: + """Test the _enrich detail-fetching pipeline.""" + + def test_enriches_items_with_details(self) -> None: + items = [ + _make_stellenangebot(refnr="r1", titel="Dev A"), + _make_stellenangebot(refnr="r2", titel="Dev B"), + ] + details = { + "r1": _make_detail(description="Desc A"), + "r2": _make_detail(description="

Desc B

"), + } + + provider = BundesagenturProvider() + with patch("immermatch.bundesagentur._fetch_detail", side_effect=lambda _c, refnr: details.get(refnr, {})): with patch("immermatch.bundesagentur.httpx.Client"): - stubs = provider._search_stubs("Dev", "Berlin", max_results=50) - assert stubs == [] + listings = provider._enrich(items) - def test_detail_failure_still_returns_listing(self) -> None: - """If detail-fetching fails, listing still appears with empty description.""" - stubs = [_JobStub("h1", "Python Dev", "Co", "Berlin", "2026-02-25", "REF")] + assert len(listings) == 2 + assert listings[0].description == "Desc A" + assert listings[1].description == "Desc B" + + def test_enrich_falls_back_on_failed_detail(self) -> None: + items = [_make_stellenangebot(refnr="r1", titel="Dev", arbeitgeber="Corp")] provider = BundesagenturProvider() - with ( - patch.object(provider, "_search_stubs", return_value=stubs), - patch("immermatch.bundesagentur._fetch_job_details", return_value={}), - ): - jobs = provider.search("Dev", "Berlin") + with patch("immermatch.bundesagentur._fetch_detail", return_value={}): + with patch("immermatch.bundesagentur.httpx.Client"): + listings = provider._enrich(items) + + assert len(listings) == 1 + # Uses fallback description from search fields + assert "Arbeitgeber: Corp" in listings[0].description + + def test_enrich_with_external_apply_url(self) -> None: + items = [_make_stellenangebot(refnr="r1")] + detail = _make_detail(partner_url="https://jobs.example.com", partner_name="Example") + + provider = BundesagenturProvider() + with patch("immermatch.bundesagentur._fetch_detail", return_value=detail): + with patch("immermatch.bundesagentur.httpx.Client"): + listings = provider._enrich(items) - assert len(jobs) == 1 - assert jobs[0].description == "" - assert jobs[0].title == "Python Dev" # falls back to stub title + assert len(listings[0].apply_options) == 2 + assert listings[0].apply_options[1].source == "Example" + assert listings[0].apply_options[1].url == "https://jobs.example.com" class TestSearchProviderProtocol: From 07c8fad30c8574565db6afc227927d598a84fa12 Mon Sep 17 00:00:00 2001 From: Adrian Immer Date: Sun, 1 Mar 2026 16:31:06 +0100 Subject: [PATCH 08/11] feat(ba): enhance Bundesagentur provider with configurable detail-fetch strategy and increase job query limit --- README.md | 10 ++++ immermatch/app.py | 2 +- immermatch/bundesagentur.py | 101 +++++++++++++++++++++++++++++------- tests/test_bundesagentur.py | 92 ++++++++++++++++++++++++++++++++ 4 files changed, 185 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index abc76e7..07cb19a 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,16 @@ The app uses four AI agent personas powered by Gemini: Jobs are fetched from Google Jobs via SerpApi, deduplicated, and scored in parallel. +## Bundesagentur Provider Tuning + +The Bundesagentur provider in `immermatch/bundesagentur.py` supports a configurable detail-fetch strategy: + +- `api_then_html` (default): first tries `/pc/v4/jobdetails/{refnr}`, then falls back to scraping the public job-detail page if needed +- `api_only`: uses only the API detail endpoint +- `html_only`: uses only the public detail page parsing path + +This helps keep job descriptions available even when one upstream detail path is unstable. + ## Environment Variables Copy `.env.example` to `.env` and fill in your keys. The app also supports `.streamlit/secrets.toml` for Streamlit Cloud deployments. diff --git a/immermatch/app.py b/immermatch/app.py index 231fb41..c4b54b7 100644 --- a/immermatch/app.py +++ b/immermatch/app.py @@ -15,7 +15,7 @@ import streamlit as st -jobs_per_query = 10 # default value +jobs_per_query = 20 # default value # --------------------------------------------------------------------------- # Inject API keys from Streamlit secrets into env vars diff --git a/immermatch/bundesagentur.py b/immermatch/bundesagentur.py index ea9b2fc..8c9f7f6 100644 --- a/immermatch/bundesagentur.py +++ b/immermatch/bundesagentur.py @@ -20,6 +20,7 @@ import re import time from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Literal import httpx @@ -35,11 +36,13 @@ } # How many days back to accept listings (keeps results fresh). -_DEFAULT_DAYS_PUBLISHED = 7 +_DEFAULT_DAYS_PUBLISHED = 28 # Retry settings for transient server errors. _MAX_RETRIES = 3 _BASE_DELAY = 2 # seconds +_MAX_PAGES = 100 +_BACKOFF_JITTER = 0.5 # Regex to extract the Angular SSR state from the detail page. _NG_STATE_RE = re.compile( @@ -97,8 +100,8 @@ def _fetch_detail(client: httpx.Client, refnr: str) -> dict: return state.get("jobdetail", {}) # type: ignore[no-any-return] logger.debug("BA detail %s: ng-state not found in HTML", refnr) return {} - if resp.status_code in {429, 500, 502, 503}: - delay = _BASE_DELAY * (2**attempt) + if resp.status_code in {403, 429, 500, 502, 503}: + delay = _BASE_DELAY * (2**attempt) + _BACKOFF_JITTER logger.warning( "BA detail page %s returned %s, retrying in %ss", refnr, @@ -111,7 +114,7 @@ def _fetch_detail(client: httpx.Client, refnr: str) -> dict: return {} except httpx.HTTPError as exc: last_exc = exc - delay = _BASE_DELAY * (2**attempt) + delay = _BASE_DELAY * (2**attempt) + _BACKOFF_JITTER logger.warning("BA detail %s network error: %s, retrying in %ss", refnr, exc, delay) time.sleep(delay) if last_exc: @@ -119,6 +122,41 @@ def _fetch_detail(client: httpx.Client, refnr: str) -> dict: return {} +def _fetch_detail_api(client: httpx.Client, refnr: str) -> dict: + """Fetch structured job detail JSON from the BA API using plain ``refnr``. + + Returns the detail dict on success, or ``{}`` on any failure. + """ + url = f"{_BASE_URL}/pc/v4/jobdetails/{refnr}" + last_exc: Exception | None = None + for attempt in range(_MAX_RETRIES): + try: + resp = client.get(url) + if resp.status_code == 200: + data = resp.json() + return data if isinstance(data, dict) else {} + if resp.status_code in {403, 429, 500, 502, 503}: + delay = _BASE_DELAY * (2**attempt) + _BACKOFF_JITTER + logger.warning( + "BA API detail %s returned %s, retrying in %ss", + refnr, + resp.status_code, + delay, + ) + time.sleep(delay) + continue + logger.debug("BA API detail %s returned %s, skipping", refnr, resp.status_code) + return {} + except (httpx.HTTPError, ValueError) as exc: + last_exc = exc + delay = _BASE_DELAY * (2**attempt) + _BACKOFF_JITTER + logger.warning("BA API detail %s error: %s, retrying in %ss", refnr, exc, delay) + time.sleep(delay) + if last_exc: + logger.error("BA API detail %s failed after %d retries: %s", refnr, _MAX_RETRIES, last_exc) + return {} + + # ------------------------------------------------------------------ # Search result parsing # ------------------------------------------------------------------ @@ -163,9 +201,11 @@ def _parse_listing(item: dict, detail: dict | None = None) -> JobListing | None: # plus an external career-site link when available in the detail data. apply_options = [ApplyOption(source="Arbeitsagentur", url=link)] if detail: - ext_url = detail.get("allianzpartnerUrl", "") + ext_url = str(detail.get("allianzpartnerUrl", "")).strip() if ext_url: - if not ext_url.startswith("http"): + if ext_url.startswith("//"): + ext_url = f"https:{ext_url}" + elif not re.match(r"^[a-zA-Z][a-zA-Z0-9+.-]*://", ext_url): ext_url = f"https://{ext_url}" ext_name = detail.get("allianzpartnerName", "Company Website") apply_options.append(ApplyOption(source=ext_name, url=ext_url)) @@ -199,9 +239,11 @@ def __init__( self, days_published: int = _DEFAULT_DAYS_PUBLISHED, detail_workers: int = 5, + detail_strategy: Literal["api_then_html", "api_only", "html_only"] = "api_then_html", ) -> None: self._days_published = days_published self._detail_workers = detail_workers + self._detail_strategy = detail_strategy # ------------------------------------------------------------------ # Public API (SearchProvider protocol) @@ -250,7 +292,7 @@ def _search_items( page = 1 # BA API pages are 1-indexed with httpx.Client(headers=_DEFAULT_HEADERS, timeout=30) as client: - while len(items) < max_results: + while len(items) < max_results and page <= _MAX_PAGES: params: dict[str, str | int] = { "was": query, "size": page_size, @@ -277,22 +319,31 @@ def _search_items( page += 1 + if page > _MAX_PAGES and len(items) < max_results: + logger.warning("Reached BA page cap (%s) while searching query=%r", _MAX_PAGES, query) + return items[:max_results] def _enrich(self, items: list[dict]) -> list[JobListing]: """Fetch detail pages in parallel and build ``JobListing`` objects.""" # Map refnr → detail dict (fetched in parallel). details: dict[str, dict] = {} - with httpx.Client( - timeout=30, - headers={ - "User-Agent": "Mozilla/5.0 (compatible; Immermatch/1.0)", - "Accept": "text/html", - }, - follow_redirects=True, - ) as client: + with ( + httpx.Client(headers=_DEFAULT_HEADERS, timeout=30) as api_client, + httpx.Client( + timeout=30, + headers={ + "User-Agent": "Mozilla/5.0 (compatible; Immermatch/1.0)", + "Accept": "text/html", + }, + follow_redirects=True, + ) as html_client, + ): with ThreadPoolExecutor(max_workers=self._detail_workers) as pool: - future_to_refnr = {pool.submit(_fetch_detail, client, item["refnr"]): item["refnr"] for item in items} + future_to_refnr = { + pool.submit(self._get_detail, api_client, html_client, item["refnr"]): item["refnr"] + for item in items + } for future in as_completed(future_to_refnr): refnr = future_to_refnr[future] try: @@ -309,6 +360,18 @@ def _enrich(self, items: list[dict]) -> list[JobListing]: listings.append(listing) return listings + def _get_detail(self, api_client: httpx.Client, html_client: httpx.Client, refnr: str) -> dict: + """Resolve job detail using the configured endpoint strategy.""" + if self._detail_strategy == "api_only": + return _fetch_detail_api(api_client, refnr) + if self._detail_strategy == "html_only": + return _fetch_detail(html_client, refnr) + + detail = _fetch_detail_api(api_client, refnr) + if detail: + return detail + return _fetch_detail(html_client, refnr) + @staticmethod def _get_with_retry( client: httpx.Client, @@ -322,8 +385,8 @@ def _get_with_retry( resp = client.get(url, params=params) if resp.status_code == 200: return resp - if resp.status_code in {429, 500, 502, 503}: - delay = _BASE_DELAY * (2**attempt) + if resp.status_code in {403, 429, 500, 502, 503}: + delay = _BASE_DELAY * (2**attempt) + _BACKOFF_JITTER logger.warning("BA search %s returned %s, retry in %ss", url, resp.status_code, delay) time.sleep(delay) continue @@ -331,7 +394,7 @@ def _get_with_retry( return None except httpx.HTTPError as exc: last_exc = exc - delay = _BASE_DELAY * (2**attempt) + delay = _BASE_DELAY * (2**attempt) + _BACKOFF_JITTER logger.warning("BA search network error: %s, retry in %ss", exc, delay) time.sleep(delay) if last_exc: diff --git a/tests/test_bundesagentur.py b/tests/test_bundesagentur.py index c2849bb..899531c 100644 --- a/tests/test_bundesagentur.py +++ b/tests/test_bundesagentur.py @@ -12,6 +12,7 @@ _build_ba_link, _clean_html, _fetch_detail, + _fetch_detail_api, _parse_listing, _parse_location, _parse_search_results, @@ -272,6 +273,22 @@ def test_retries_on_server_error(self) -> None: result = _fetch_detail(client, "REF-123") assert result == detail + def test_retries_on_403_then_succeeds(self) -> None: + blocked_resp = MagicMock() + blocked_resp.status_code = 403 + + detail = {"stellenangebotsBeschreibung": "ok"} + ok_resp = MagicMock() + ok_resp.status_code = 200 + ok_resp.text = _make_ng_state_html(detail) + + client = MagicMock(spec=httpx.Client) + client.get.side_effect = [blocked_resp, ok_resp] + + with patch("immermatch.bundesagentur.time.sleep"): + result = _fetch_detail(client, "REF-123") + assert result == detail + def test_retries_on_network_error(self) -> None: detail = {"stellenangebotsBeschreibung": "recovered"} ok_resp = MagicMock() @@ -434,6 +451,51 @@ def test_search_items_server_error_returns_empty(self) -> None: items = provider._search_items("Dev", "Berlin", max_results=50) assert items == [] + def test_get_with_retry_retries_on_403(self) -> None: + blocked_resp = MagicMock() + blocked_resp.status_code = 403 + + ok_resp = MagicMock() + ok_resp.status_code = 200 + + client = MagicMock(spec=httpx.Client) + client.get.side_effect = [blocked_resp, ok_resp] + + with patch("immermatch.bundesagentur.time.sleep"): + result = BundesagenturProvider._get_with_retry(client, "https://example.com", {}) + + assert result is ok_resp + + +class TestFetchDetailApi: + def test_fetches_json_detail(self) -> None: + detail = {"stellenangebotsBeschreibung": "API detail"} + ok_resp = MagicMock() + ok_resp.status_code = 200 + ok_resp.json.return_value = detail + + client = MagicMock(spec=httpx.Client) + client.get.return_value = ok_resp + + assert _fetch_detail_api(client, "REF-123") == detail + + def test_retries_on_403_then_succeeds(self) -> None: + blocked_resp = MagicMock() + blocked_resp.status_code = 403 + + detail = {"stellenangebotsBeschreibung": "API detail"} + ok_resp = MagicMock() + ok_resp.status_code = 200 + ok_resp.json.return_value = detail + + client = MagicMock(spec=httpx.Client) + client.get.side_effect = [blocked_resp, ok_resp] + + with patch("immermatch.bundesagentur.time.sleep"): + result = _fetch_detail_api(client, "REF-123") + + assert result == detail + class TestEnrich: """Test the _enrich detail-fetching pipeline.""" @@ -482,6 +544,36 @@ def test_enrich_with_external_apply_url(self) -> None: assert listings[0].apply_options[1].source == "Example" assert listings[0].apply_options[1].url == "https://jobs.example.com" + def test_api_then_html_strategy_falls_back_to_html(self) -> None: + items = [_make_stellenangebot(refnr="r1", titel="Dev", arbeitgeber="Corp")] + html_detail = _make_detail(description="HTML fallback") + + provider = BundesagenturProvider(detail_strategy="api_then_html") + with ( + patch("immermatch.bundesagentur._fetch_detail_api", return_value={}), + patch("immermatch.bundesagentur._fetch_detail", return_value=html_detail), + patch("immermatch.bundesagentur.httpx.Client"), + ): + listings = provider._enrich(items) + + assert len(listings) == 1 + assert listings[0].description == "HTML fallback" + + def test_api_only_strategy_uses_api_detail(self) -> None: + items = [_make_stellenangebot(refnr="r1", titel="Dev", arbeitgeber="Corp")] + api_detail = {"stellenangebotsBeschreibung": "API detail"} + + provider = BundesagenturProvider(detail_strategy="api_only") + with ( + patch("immermatch.bundesagentur._fetch_detail_api", return_value=api_detail), + patch("immermatch.bundesagentur._fetch_detail", return_value={}), + patch("immermatch.bundesagentur.httpx.Client"), + ): + listings = provider._enrich(items) + + assert len(listings) == 1 + assert listings[0].description == "API detail" + class TestSearchProviderProtocol: """Verify BundesagenturProvider satisfies the SearchProvider protocol.""" From bc4368bac6dd2577804d4c4a5a568cb6a3d92cad Mon Sep 17 00:00:00 2001 From: Adrian Immer Date: Sun, 1 Mar 2026 23:41:09 +0100 Subject: [PATCH 09/11] feat: combined search provider Now both Bundesagentur and SerpAPI are used to find each 30 jobs --- immermatch/app.py | 10 ++- immermatch/cache.py | 6 +- immermatch/search_agent.py | 95 +++++++++++++++++++++-- immermatch/search_provider.py | 81 ++++++++++++++++++-- tests/test_bundesagentur.py | 4 - tests/test_cache.py | 12 +-- tests/test_integration.py | 16 ++-- tests/test_search_agent.py | 140 ++++++++++++++++++++++++++++++++-- tests/test_search_provider.py | 15 ++++ 9 files changed, 339 insertions(+), 40 deletions(-) create mode 100644 tests/test_search_provider.py diff --git a/immermatch/app.py b/immermatch/app.py index c4b54b7..6de3e58 100644 --- a/immermatch/app.py +++ b/immermatch/app.py @@ -52,6 +52,7 @@ profile_candidate, search_all_queries, ) +from immermatch.search_provider import parse_provider_query # noqa: E402 # --------------------------------------------------------------------------- # Page configuration @@ -775,7 +776,7 @@ def _run_pipeline() -> None: job = futures[future] evaluation = future.result() ej = EvaluatedJob(job=job, evaluation=evaluation) - key = f"{ej.job.title}|{ej.job.company_name}" + key = f"{ej.job.title}|{ej.job.company_name}|{ej.job.location}" all_evals[key] = ej progress_bar.progress( i / len(new_jobs), @@ -802,7 +803,7 @@ def _run_pipeline() -> None: def _on_jobs_found(new_unique_jobs: list[JobListing]) -> None: """Submit newly found jobs for evaluation immediately.""" for job in new_unique_jobs: - key = f"{job.title}|{job.company_name}" + key = f"{job.title}|{job.company_name}|{job.location}" if key in all_evals: continue # already evaluated (from cache) fut = eval_executor.submit(evaluate_job, client, profile, job) @@ -847,7 +848,7 @@ def _search_progress(qi: int, total: int, unique: int) -> None: job = eval_futures[future] evaluation = future.result() ej = EvaluatedJob(job=job, evaluation=evaluation) - key = f"{ej.job.title}|{ej.job.company_name}" + key = f"{ej.job.title}|{ej.job.company_name}|{ej.job.location}" all_evals[key] = ej eval_progress.progress( i / total_evals, @@ -951,7 +952,8 @@ def _record_ip_rate_limit() -> None: expanded=False, ): for q in st.session_state.queries: - st.markdown(f"- {q}") + _, clean_query = parse_provider_query(q) + st.markdown(f"- {clean_query}") # -- Profile (collapsed) ----------------------------------------------- if st.session_state.profile is not None: diff --git a/immermatch/cache.py b/immermatch/cache.py index f94c0a4..f2cc149 100644 --- a/immermatch/cache.py +++ b/immermatch/cache.py @@ -130,7 +130,7 @@ def save_jobs(self, jobs: list[JobListing], location: str = "") -> None: existing = data.get("jobs", {}) for job in jobs: - key = f"{job.title}|{job.company_name}" + key = f"{job.title}|{job.company_name}|{job.location}" existing[key] = job.model_dump() self._save( @@ -143,7 +143,7 @@ def save_jobs(self, jobs: list[JobListing], location: str = "") -> None: ) # ------------------------------------------------------------------ - # 4. Evaluations (append-only, keyed by title|company) + # 4. Evaluations (append-only, keyed by title|company|location) # ------------------------------------------------------------------ def load_evaluations(self, profile: CandidateProfile) -> dict[str, EvaluatedJob]: @@ -188,5 +188,5 @@ def get_unevaluated_jobs( Jobs already in the evaluation cache are skipped. """ cached = self.load_evaluations(profile) - new_jobs = [job for job in jobs if f"{job.title}|{job.company_name}" not in cached] + new_jobs = [job for job in jobs if f"{job.title}|{job.company_name}|{job.location}" not in cached] return new_jobs, cached diff --git a/immermatch/search_agent.py b/immermatch/search_agent.py index e233d9c..a86285a 100644 --- a/immermatch/search_agent.py +++ b/immermatch/search_agent.py @@ -7,6 +7,7 @@ from __future__ import annotations +import logging import threading from collections.abc import Callable from concurrent.futures import ThreadPoolExecutor, as_completed @@ -16,7 +17,7 @@ from .llm import call_gemini, parse_json from .models import CandidateProfile, JobListing -from .search_provider import SearchProvider, get_provider +from .search_provider import CombinedSearchProvider, SearchProvider, get_provider, parse_provider_query # Re-export SerpApi helpers so existing imports keep working. from .serpapi_provider import BLOCKED_PORTALS as _BLOCKED_PORTALS # noqa: F401 @@ -29,6 +30,9 @@ from .serpapi_provider import parse_job_results as _parse_job_results # noqa: F401 from .serpapi_provider import search_jobs # noqa: F401 +logger = logging.getLogger(__name__) +_MIN_JOBS_PER_PROVIDER = 30 + # System prompt for the Profiler agent PROFILER_SYSTEM_PROMPT = """You are an expert technical recruiter with deep knowledge of European job markets. You will be given the raw text of a candidate's CV. Extract a comprehensive profile. @@ -197,6 +201,49 @@ def generate_search_queries( if provider is None: provider = get_provider(location) + if isinstance(provider, CombinedSearchProvider): + provider_count = len(provider.providers) + if provider_count == 0: + return [] + + per_provider = num_queries // provider_count + remainder = num_queries % provider_count + merged_queries: list[str] = [] + + for index, child_provider in enumerate(provider.providers): + child_count = per_provider + (1 if index < remainder else 0) + if child_count <= 0: + continue + child_queries = _generate_search_queries_for_provider( + client, + profile, + location, + child_count, + child_provider, + ) + merged_queries.extend([f"provider={child_provider.name}::{query}" for query in child_queries]) + + seen: set[str] = set() + unique_queries: list[str] = [] + for query in merged_queries: + if query in seen: + continue + seen.add(query) + unique_queries.append(query) + if len(unique_queries) >= num_queries: + break + return unique_queries + + return _generate_search_queries_for_provider(client, profile, location, num_queries, provider) + + +def _generate_search_queries_for_provider( + client: genai.Client, + profile: CandidateProfile, + location: str, + num_queries: int, + provider: SearchProvider, +) -> list[str]: # Select system prompt based on active provider if provider.name == "Bundesagentur für Arbeit": system_prompt = BA_HEADHUNTER_SYSTEM_PROMPT @@ -268,7 +315,16 @@ def search_all_queries( if provider is None: provider = get_provider(location) - all_jobs: dict[str, JobListing] = {} # Use title+company as key for dedup + quota_sources: set[str] = set() + if isinstance(provider, CombinedSearchProvider): + quota_sources = { + "bundesagentur" if p.name == "Bundesagentur für Arbeit" else "serpapi" for p in provider.providers + } + if quota_sources: + min_unique_jobs = max(min_unique_jobs, _MIN_JOBS_PER_PROVIDER * len(quota_sources)) + + all_jobs: dict[str, JobListing] = {} # Use title+company+location as key for dedup + source_counts: dict[str, int] = {} lock = threading.Lock() completed = 0 early_stop = threading.Event() @@ -276,22 +332,34 @@ def search_all_queries( def _search_one(query: str) -> list[JobListing]: if early_stop.is_set(): return [] - return provider.search(query, location, max_results=jobs_per_query) + clean_query = query + if not isinstance(provider, CombinedSearchProvider): + _, clean_query = parse_provider_query(query) + return provider.search(clean_query, location, max_results=jobs_per_query) with ThreadPoolExecutor(max_workers=min(5, max(1, len(queries)))) as executor: futures = [executor.submit(_search_one, q) for q in queries] for future in as_completed(futures): - jobs = future.result() + jobs: list[JobListing] = [] + try: + jobs = future.result() + except Exception: + logger.exception("A search query failed") batch_new: list[JobListing] = [] with lock: for job in jobs: - key = f"{job.title}|{job.company_name}" + key = f"{job.title}|{job.company_name}|{job.location}" if key not in all_jobs: all_jobs[key] = job batch_new.append(job) + source = (job.source or "unknown").lower() + source_counts[source] = source_counts.get(source, 0) + 1 completed += 1 progress_args = (completed, len(queries), len(all_jobs)) - if min_unique_jobs and len(all_jobs) >= min_unique_jobs: + quota_met = True + if quota_sources: + quota_met = all(source_counts.get(source, 0) >= _MIN_JOBS_PER_PROVIDER for source in quota_sources) + if min_unique_jobs and len(all_jobs) >= min_unique_jobs and quota_met: early_stop.set() # Callbacks outside the lock to avoid blocking other threads if on_progress is not None: @@ -305,4 +373,19 @@ def _search_one(query: str) -> list[JobListing]: f.cancel() break + if source_counts: + counts_text = ", ".join(f"{source}={count}" for source, count in sorted(source_counts.items())) + logger.info("Search source counts for location '%s': %s", location or "(none)", counts_text) + if quota_sources: + missing = [ + source for source in sorted(quota_sources) if source_counts.get(source, 0) < _MIN_JOBS_PER_PROVIDER + ] + if missing: + logger.warning( + "Provider quota not reached for location '%s': %s (required >= %d each)", + location or "(none)", + ", ".join(missing), + _MIN_JOBS_PER_PROVIDER, + ) + return list(all_jobs.values()) diff --git a/immermatch/search_provider.py b/immermatch/search_provider.py index 4fa6745..157226e 100644 --- a/immermatch/search_provider.py +++ b/immermatch/search_provider.py @@ -8,12 +8,33 @@ from __future__ import annotations import logging +import os from typing import Protocol, runtime_checkable from .models import JobListing logger = logging.getLogger(__name__) +_PROVIDER_QUERY_PREFIX = "provider=" +_PROVIDER_QUERY_SEPARATOR = "::" + + +def parse_provider_query(query: str) -> tuple[str | None, str]: + """Parse an optionally provider-targeted query. + + Query format: + provider=:: + + Returns: + (target_provider_name, clean_query) + """ + if query.startswith(_PROVIDER_QUERY_PREFIX) and _PROVIDER_QUERY_SEPARATOR in query: + meta, clean_query = query.split(_PROVIDER_QUERY_SEPARATOR, 1) + target_provider = meta.removeprefix(_PROVIDER_QUERY_PREFIX).strip() + if target_provider and clean_query.strip(): + return target_provider, clean_query.strip() + return None, query + @runtime_checkable class SearchProvider(Protocol): @@ -45,16 +66,66 @@ def search( ... +class CombinedSearchProvider: + """Run multiple providers for each query and merge their results.""" + + name: str = "Bundesagentur + SerpApi" + + def __init__(self, providers: list[SearchProvider]) -> None: + self.providers = providers + + def search( + self, + query: str, + location: str, + max_results: int = 50, + ) -> list[JobListing]: + if not self.providers: + return [] + + target_provider, clean_query = parse_provider_query(query) + providers = self.providers + if target_provider is not None: + providers = [provider for provider in self.providers if provider.name == target_provider] + if not providers: + logger.warning( + "Unknown targeted provider '%s' in query, falling back to all providers", target_provider + ) + providers = self.providers + + merged: dict[str, JobListing] = {} + per_provider = max(1, max_results) + for provider in providers: + try: + jobs = provider.search(clean_query, location, max_results=per_provider) + except Exception: + logger.exception("Provider '%s' failed for query '%s'", provider.name, clean_query) + continue + + for job in jobs: + key = f"{job.title}|{job.company_name}|{job.location}" + if key not in merged: + merged[key] = job + + return list(merged.values())[:max_results] + + def get_provider(location: str = "") -> SearchProvider: # noqa: ARG001 """Return the appropriate ``SearchProvider`` for *location*. - Currently always returns the Bundesagentur für Arbeit provider - (Germany-only). This factory is the single extension point for - future per-country routing — e.g. returning ``SerpApiProvider`` - for non-German locations. + Returns a combined provider that merges Bundesagentur and SerpApi + results when ``SERPAPI_KEY`` is available. If SerpApi is not + configured, falls back to Bundesagentur only. """ # Lazy import so the module can be loaded without pulling in httpx # when only the protocol is needed (e.g. for type-checking). from .bundesagentur import BundesagenturProvider # noqa: PLC0415 + from .serpapi_provider import SerpApiProvider # noqa: PLC0415 + + providers: list[SearchProvider] = [BundesagenturProvider()] + if os.getenv("SERPAPI_KEY"): + providers.append(SerpApiProvider()) - return BundesagenturProvider() + if len(providers) == 1: + return providers[0] + return CombinedSearchProvider(providers) diff --git a/tests/test_bundesagentur.py b/tests/test_bundesagentur.py index 899531c..0ee25d4 100644 --- a/tests/test_bundesagentur.py +++ b/tests/test_bundesagentur.py @@ -378,10 +378,6 @@ def test_search_respects_max_results(self) -> None: assert len(jobs) == 3 - def test_veroeffentlichtseit_default(self) -> None: - provider = BundesagenturProvider() - assert provider._days_published == 7 - def test_veroeffentlichtseit_custom(self) -> None: provider = BundesagenturProvider(days_published=3) assert provider._days_published == 3 diff --git a/tests/test_cache.py b/tests/test_cache.py index 87eccde..19bf4f2 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -132,17 +132,17 @@ class TestEvaluationsCache: def test_round_trip(self, cache: ResultCache, profile: CandidateProfile): job = JobListing(title="Dev", company_name="Corp", location="Berlin") ev = JobEvaluation(score=80, reasoning="Good match.") - evaluated = {"Dev|Corp": EvaluatedJob(job=job, evaluation=ev)} + evaluated = {"Dev|Corp|Berlin": EvaluatedJob(job=job, evaluation=ev)} cache.save_evaluations(profile, evaluated) loaded = cache.load_evaluations(profile) - assert "Dev|Corp" in loaded - assert loaded["Dev|Corp"].evaluation.score == 80 + assert "Dev|Corp|Berlin" in loaded + assert loaded["Dev|Corp|Berlin"].evaluation.score == 80 def test_miss_on_different_profile(self, cache: ResultCache, profile: CandidateProfile): job = JobListing(title="Dev", company_name="Corp", location="Berlin") ev = JobEvaluation(score=80, reasoning="Good match.") - cache.save_evaluations(profile, {"Dev|Corp": EvaluatedJob(job=job, evaluation=ev)}) + cache.save_evaluations(profile, {"Dev|Corp|Berlin": EvaluatedJob(job=job, evaluation=ev)}) other = CandidateProfile( skills=["Java"], @@ -159,9 +159,9 @@ def test_filters_already_evaluated(self, cache: ResultCache, profile: CandidateP job1 = JobListing(title="Dev", company_name="Corp", location="Berlin") job2 = JobListing(title="PM", company_name="Corp", location="Berlin") ev = JobEvaluation(score=80, reasoning="Good.") - cache.save_evaluations(profile, {"Dev|Corp": EvaluatedJob(job=job1, evaluation=ev)}) + cache.save_evaluations(profile, {"Dev|Corp|Berlin": EvaluatedJob(job=job1, evaluation=ev)}) new_jobs, cached = cache.get_unevaluated_jobs([job1, job2], profile) assert len(new_jobs) == 1 assert new_jobs[0].title == "PM" - assert "Dev|Corp" in cached + assert "Dev|Corp|Berlin" in cached diff --git a/tests/test_integration.py b/tests/test_integration.py index 8108e1b..bf6e0a6 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -322,6 +322,12 @@ def mock_client() -> MagicMock: return MagicMock() +def _query_provider() -> MagicMock: + provider = MagicMock() + provider.name = "Bundesagentur für Arbeit" + return provider + + # --------------------------------------------------------------------------- # Integration tests # --------------------------------------------------------------------------- @@ -368,7 +374,7 @@ def test_full_pipeline_happy_path( assert len(profile.work_history) == 2 # --- Act: Stage 2 — Queries --- - queries = generate_search_queries(mock_client, profile, "Munich, Germany") + queries = generate_search_queries(mock_client, profile, "Munich, Germany", provider=_query_provider()) assert isinstance(queries, list) assert len(queries) == 20 @@ -452,7 +458,7 @@ def test_full_pipeline_non_tech_cv( assert profile.experience_level == "Mid" assert any("Sustainability" in r for r in profile.roles) - queries = generate_search_queries(mock_client, profile, "Munich, Germany") + queries = generate_search_queries(mock_client, profile, "Munich, Germany", provider=_query_provider()) assert len(queries) == 20 jobs = search_all_queries( @@ -534,7 +540,7 @@ def test_queries_are_strings_and_correct_count( mock_gemini.side_effect = [TECH_PROFILE_JSON, QUERIES_JSON] profile = profile_candidate(mock_client, tech_cv_text) - queries = generate_search_queries(mock_client, profile, "Munich, Germany") + queries = generate_search_queries(mock_client, profile, "Munich, Germany", provider=_query_provider()) assert len(queries) == 20 assert all(isinstance(q, str) for q in queries) @@ -689,7 +695,7 @@ def test_empty_search_produces_empty_evaluations( mock_provider.search.return_value = [] profile = profile_candidate(mock_client, tech_cv_text) - queries = generate_search_queries(mock_client, profile, "Munich, Germany") + queries = generate_search_queries(mock_client, profile, "Munich, Germany", provider=_query_provider()) jobs = search_all_queries( queries, jobs_per_query=10, @@ -739,7 +745,7 @@ def test_cv_data_flows_through_all_stages( assert "TechCorp" in profile_prompt or "John Doe" in profile_prompt # Stage 2: Queries — verify profile data was sent to Gemini - queries = generate_search_queries(mock_client, profile, "Munich, Germany") + queries = generate_search_queries(mock_client, profile, "Munich, Germany", provider=_query_provider()) query_prompt = mock_search_gemini.call_args_list[1][0][1] assert "Senior Software Engineer" in query_prompt # from profile.roles assert "Python" in query_prompt # from profile.skills diff --git a/tests/test_search_agent.py b/tests/test_search_agent.py index ba047af..a7c5a86 100644 --- a/tests/test_search_agent.py +++ b/tests/test_search_agent.py @@ -15,6 +15,7 @@ profile_candidate, search_all_queries, ) +from immermatch.search_provider import CombinedSearchProvider class TestIsRemoteOnly: @@ -178,11 +179,11 @@ def test_highlights_in_description(self): class TestSearchAllQueries: """Tests for search_all_queries() — mock provider to test orchestration logic.""" - def _make_job(self, title: str, company: str = "Co") -> JobListing: + def _make_job(self, title: str, company: str = "Co", location: str = "Berlin") -> JobListing: return JobListing( title=title, company_name=company, - location="Berlin", + location=location, apply_options=[ApplyOption(source="LinkedIn", url="https://linkedin.com/1")], ) @@ -208,8 +209,14 @@ def test_passes_query_and_location_to_provider(self): max_results=10, ) - def test_deduplicates_by_title_and_company(self): - provider = self._make_provider([self._make_job("Dev"), self._make_job("Dev")]) + def test_deduplicates_by_title_company_and_location(self): + provider = self._make_provider( + [ + self._make_job("Dev", location="Berlin"), + self._make_job("Dev", location="Berlin"), + self._make_job("Dev", location="Munich"), + ] + ) results = search_all_queries( queries=["query1", "query2"], @@ -218,7 +225,7 @@ def test_deduplicates_by_title_and_company(self): provider=provider, ) - assert len(results) == 1 + assert len(results) == 2 def test_stops_early_when_min_unique_jobs_reached(self): provider = self._make_provider([self._make_job("Unique Job")]) @@ -276,6 +283,82 @@ def test_defaults_to_get_provider(self, mock_gp: MagicMock): mock_gp.assert_called_once_with("Berlin") + def test_combined_provider_hard_quota_requires_30_each_before_stop(self): + ba_provider = MagicMock() + ba_provider.name = "Bundesagentur für Arbeit" + ba_jobs = [self._make_job(f"BA {i}", company=f"BA Co {i}", location="Berlin") for i in range(30)] + for job in ba_jobs: + job.source = "bundesagentur" + ba_provider.search.return_value = ba_jobs + + serp_provider = MagicMock() + serp_provider.name = "SerpApi (Google Jobs)" + serp_jobs = [self._make_job(f"SERP {i}", company=f"SERP Co {i}", location="Berlin") for i in range(30)] + for job in serp_jobs: + job.source = "serpapi" + serp_provider.search.return_value = serp_jobs + + combined = CombinedSearchProvider([ba_provider, serp_provider]) + results = search_all_queries( + queries=[ + "provider=Bundesagentur für Arbeit::Softwareentwickler", + "provider=SerpApi (Google Jobs)::Python Developer Berlin", + ], + jobs_per_query=30, + location="Berlin", + min_unique_jobs=50, + provider=combined, + ) + + assert len(results) == 60 + ba_count = len([job for job in results if job.source == "bundesagentur"]) + serp_count = len([job for job in results if job.source == "serpapi"]) + assert ba_count >= 30 + assert serp_count >= 30 + + @patch("immermatch.search_agent.logger") + def test_logs_source_counts(self, mock_logger: MagicMock): + provider = self._make_provider( + [ + self._make_job("BA Job", location="Berlin"), + self._make_job("SERP Job", location="Munich"), + ] + ) + provider.search.return_value[0].source = "bundesagentur" + provider.search.return_value[1].source = "serpapi" + + search_all_queries( + queries=["query1"], + location="Berlin", + min_unique_jobs=0, + provider=provider, + ) + + assert mock_logger.info.called + logged_texts = " ".join(str(call.args) for call in mock_logger.info.call_args_list) + assert "bundesagentur" in logged_texts + assert "serpapi" in logged_texts + + def test_combined_provider_routes_query_to_target_provider(self): + ba_provider = MagicMock() + ba_provider.name = "Bundesagentur für Arbeit" + ba_provider.search.return_value = [] + + serp_provider = MagicMock() + serp_provider.name = "SerpApi (Google Jobs)" + serp_provider.search.return_value = [self._make_job("Dev", location="Berlin")] + + combined = CombinedSearchProvider([ba_provider, serp_provider]) + search_all_queries( + queries=["provider=SerpApi (Google Jobs)::Python Developer Berlin"], + location="Berlin", + min_unique_jobs=0, + provider=combined, + ) + + ba_provider.search.assert_not_called() + serp_provider.search.assert_called_once_with("Python Developer Berlin", "Berlin", max_results=10) + class TestLlmJsonRecovery: @patch("immermatch.search_agent.call_gemini") @@ -325,8 +408,16 @@ def test_generate_search_queries_retries_after_invalid_json(self, mock_call_gemi education_history=[], ) mock_call_gemini.side_effect = ["not json", '["python developer berlin", "backend berlin"]'] + provider = MagicMock() + provider.name = "SerpApi (Google Jobs)" - queries = generate_search_queries(MagicMock(), profile, location="Berlin, Germany", num_queries=2) + queries = generate_search_queries( + MagicMock(), + profile, + location="Berlin, Germany", + num_queries=2, + provider=provider, + ) assert queries == ["python developer berlin", "backend berlin"] assert mock_call_gemini.call_count == 2 @@ -356,8 +447,16 @@ def test_generate_search_queries_returns_empty_list_after_all_retries_fail(self, education_history=[], ) mock_call_gemini.side_effect = ["not json", "still not json"] + provider = MagicMock() + provider.name = "SerpApi (Google Jobs)" - queries = generate_search_queries(MagicMock(), profile, location="Berlin, Germany", num_queries=2) + queries = generate_search_queries( + MagicMock(), + profile, + location="Berlin, Germany", + num_queries=2, + provider=provider, + ) assert queries == [] assert mock_call_gemini.call_count == 2 @@ -480,3 +579,30 @@ def test_other_provider_uses_default_prompt(self, mock_call_gemini: MagicMock): prompt_sent = mock_call_gemini.call_args[0][1] assert "Google Jobs" in prompt_sent assert "LOCAL names" in prompt_sent + + @patch("immermatch.search_agent.call_gemini") + def test_combined_provider_generates_queries_per_child_provider(self, mock_call_gemini: MagicMock): + mock_call_gemini.side_effect = [ + '["Softwareentwickler", "Datenanalyst"]', + '["Python Developer Berlin", "Data Engineer Berlin"]', + ] + + ba_provider = MagicMock() + ba_provider.name = "Bundesagentur für Arbeit" + serp_provider = MagicMock() + serp_provider.name = "SerpApi (Google Jobs)" + combined = CombinedSearchProvider([ba_provider, serp_provider]) + + queries = generate_search_queries( + MagicMock(), + self._PROFILE, + location="Berlin", + num_queries=4, + provider=combined, + ) + + assert len(queries) == 4 + assert all(query.startswith("provider=") for query in queries) + prompts_sent = [call.args[1] for call in mock_call_gemini.call_args_list] + assert any("Bundesagentur" in prompt for prompt in prompts_sent) + assert any("Google Jobs" in prompt for prompt in prompts_sent) diff --git a/tests/test_search_provider.py b/tests/test_search_provider.py new file mode 100644 index 0000000..d8ef755 --- /dev/null +++ b/tests/test_search_provider.py @@ -0,0 +1,15 @@ +"""Tests for search provider helpers and combined provider behavior.""" + +from immermatch.search_provider import parse_provider_query + + +class TestParseProviderQuery: + def test_parses_targeted_query(self): + target, query = parse_provider_query("provider=SerpApi (Google Jobs)::Python Developer Berlin") + assert target == "SerpApi (Google Jobs)" + assert query == "Python Developer Berlin" + + def test_returns_original_when_not_targeted(self): + target, query = parse_provider_query("Softwareentwickler") + assert target is None + assert query == "Softwareentwickler" From 3b46333480c1a3d65d010e002311328b5c231e4c Mon Sep 17 00:00:00 2001 From: Adrian Immer Date: Mon, 2 Mar 2026 00:31:25 +0100 Subject: [PATCH 10/11] Address valid PR review comments for provider quotas and budgeting --- immermatch/search_agent.py | 17 ++++++++++--- immermatch/search_provider.py | 6 ++++- tests/test_search_agent.py | 48 +++++++++++++++++++++++++++++++++++ tests/test_search_provider.py | 43 ++++++++++++++++++++++++++++++- 4 files changed, 108 insertions(+), 6 deletions(-) diff --git a/immermatch/search_agent.py b/immermatch/search_agent.py index a86285a..8657b50 100644 --- a/immermatch/search_agent.py +++ b/immermatch/search_agent.py @@ -33,6 +33,17 @@ logger = logging.getLogger(__name__) _MIN_JOBS_PER_PROVIDER = 30 + +def _provider_quota_source_key(provider: SearchProvider) -> str: + """Return a stable source key for per-provider quota accounting.""" + source_id = getattr(provider, "source_id", None) + if isinstance(source_id, str) and source_id.strip(): + return source_id.strip().lower() + if getattr(provider, "name", None) == "Bundesagentur für Arbeit": + return "bundesagentur" + return type(provider).__name__.lower() + + # System prompt for the Profiler agent PROFILER_SYSTEM_PROMPT = """You are an expert technical recruiter with deep knowledge of European job markets. You will be given the raw text of a candidate's CV. Extract a comprehensive profile. @@ -317,10 +328,8 @@ def search_all_queries( quota_sources: set[str] = set() if isinstance(provider, CombinedSearchProvider): - quota_sources = { - "bundesagentur" if p.name == "Bundesagentur für Arbeit" else "serpapi" for p in provider.providers - } - if quota_sources: + quota_sources = {_provider_quota_source_key(p) for p in provider.providers} + if quota_sources and min_unique_jobs > 0: min_unique_jobs = max(min_unique_jobs, _MIN_JOBS_PER_PROVIDER * len(quota_sources)) all_jobs: dict[str, JobListing] = {} # Use title+company+location as key for dedup diff --git a/immermatch/search_provider.py b/immermatch/search_provider.py index 157226e..0cdab6d 100644 --- a/immermatch/search_provider.py +++ b/immermatch/search_provider.py @@ -8,6 +8,7 @@ from __future__ import annotations import logging +import math import os from typing import Protocol, runtime_checkable @@ -93,8 +94,11 @@ def search( ) providers = self.providers + if max_results <= 0: + return [] + merged: dict[str, JobListing] = {} - per_provider = max(1, max_results) + per_provider = max(1, math.ceil(max_results / len(providers))) for provider in providers: try: jobs = provider.search(clean_query, location, max_results=per_provider) diff --git a/tests/test_search_agent.py b/tests/test_search_agent.py index a7c5a86..c96808a 100644 --- a/tests/test_search_agent.py +++ b/tests/test_search_agent.py @@ -1,6 +1,7 @@ """Tests for immermatch.search_agent — pure helper functions and search_all_queries orchestration.""" import json +from typing import ClassVar from unittest.mock import MagicMock, patch import pytest @@ -11,6 +12,7 @@ _is_remote_only, _localise_query, _parse_job_results, + _provider_quota_source_key, generate_search_queries, profile_candidate, search_all_queries, @@ -359,6 +361,52 @@ def test_combined_provider_routes_query_to_target_provider(self): ba_provider.search.assert_not_called() serp_provider.search.assert_called_once_with("Python Developer Berlin", "Berlin", max_results=10) + def test_provider_quota_source_key_prefers_source_id(self): + class ThirdProvider: + name: ClassVar[str] = "Third Provider" + source_id: ClassVar[str] = "third-source" + + ba_provider = MagicMock() + ba_provider.name = "Bundesagentur für Arbeit" + + serp_provider = MagicMock() + serp_provider.name = "SerpApi (Google Jobs)" + serp_provider.source_id = "serpapi" + + third_provider = ThirdProvider() + + assert _provider_quota_source_key(ba_provider) == "bundesagentur" + assert _provider_quota_source_key(serp_provider) == "serpapi" + assert _provider_quota_source_key(third_provider) == "third-source" + + def test_min_unique_zero_does_not_enable_combined_quota(self): + ba_provider = MagicMock() + ba_provider.name = "Bundesagentur für Arbeit" + ba_provider.source_id = "bundesagentur" + ba_jobs = [self._make_job(f"BA {i}", company=f"BA Co {i}", location="Berlin") for i in range(10)] + for job in ba_jobs: + job.source = "bundesagentur" + ba_provider.search.return_value = ba_jobs + + serp_provider = MagicMock() + serp_provider.name = "SerpApi (Google Jobs)" + serp_provider.source_id = "serpapi" + serp_jobs = [self._make_job(f"SERP {i}", company=f"SERP Co {i}", location="Berlin") for i in range(10)] + for job in serp_jobs: + job.source = "serpapi" + serp_provider.search.return_value = serp_jobs + + combined = CombinedSearchProvider([ba_provider, serp_provider]) + results = search_all_queries( + queries=["q1", "q2"], + jobs_per_query=10, + location="Berlin", + min_unique_jobs=0, + provider=combined, + ) + + assert len(results) == 10 + class TestLlmJsonRecovery: @patch("immermatch.search_agent.call_gemini") diff --git a/tests/test_search_provider.py b/tests/test_search_provider.py index d8ef755..f17e9f2 100644 --- a/tests/test_search_provider.py +++ b/tests/test_search_provider.py @@ -1,6 +1,20 @@ """Tests for search provider helpers and combined provider behavior.""" -from immermatch.search_provider import parse_provider_query +from __future__ import annotations + +from unittest.mock import MagicMock + +from immermatch.models import ApplyOption, JobListing +from immermatch.search_provider import CombinedSearchProvider, parse_provider_query + + +def _make_job(title: str, company: str, location: str = "Berlin") -> JobListing: + return JobListing( + title=title, + company_name=company, + location=location, + apply_options=[ApplyOption(source="Company Website", url="https://example.com")], + ) class TestParseProviderQuery: @@ -13,3 +27,30 @@ def test_returns_original_when_not_targeted(self): target, query = parse_provider_query("Softwareentwickler") assert target is None assert query == "Softwareentwickler" + + +class TestCombinedSearchProvider: + def test_splits_max_results_budget_across_providers(self): + p1 = MagicMock() + p1.name = "Bundesagentur für Arbeit" + p1.search.return_value = [_make_job(f"BA {i}", f"BA Co {i}") for i in range(3)] + + p2 = MagicMock() + p2.name = "SerpApi (Google Jobs)" + p2.search.return_value = [_make_job(f"SERP {i}", f"SERP Co {i}") for i in range(3)] + + provider = CombinedSearchProvider([p1, p2]) + results = provider.search("Developer", "Berlin", max_results=5) + + p1.search.assert_called_once_with("Developer", "Berlin", max_results=3) + p2.search.assert_called_once_with("Developer", "Berlin", max_results=3) + assert len(results) == 5 + + def test_returns_empty_when_max_results_non_positive(self): + p1 = MagicMock() + p1.name = "Bundesagentur für Arbeit" + p1.search.return_value = [_make_job("BA", "BA Co")] + + provider = CombinedSearchProvider([p1]) + assert provider.search("Developer", "Berlin", max_results=0) == [] + p1.search.assert_not_called() From 49bb462cb4c2b622204da5d3658d2b3c6d1be3c2 Mon Sep 17 00:00:00 2001 From: Adrian Immer Date: Mon, 2 Mar 2026 00:58:03 +0100 Subject: [PATCH 11/11] Address latest PR review comments and cache/provider safety --- immermatch/app.py | 15 +++++++++++---- immermatch/bundesagentur.py | 1 + immermatch/cache.py | 18 ++++++++++++++++-- immermatch/search_agent.py | 17 ++++++++++++++--- immermatch/search_provider.py | 28 ++++++++++++++++++++++++++++ immermatch/serpapi_provider.py | 1 + tests/test_bundesagentur.py | 3 ++- tests/test_cache.py | 4 ++++ tests/test_search_agent.py | 2 ++ 9 files changed, 79 insertions(+), 10 deletions(-) diff --git a/immermatch/app.py b/immermatch/app.py index 6de3e58..f8a8acf 100644 --- a/immermatch/app.py +++ b/immermatch/app.py @@ -52,7 +52,11 @@ profile_candidate, search_all_queries, ) -from immermatch.search_provider import parse_provider_query # noqa: E402 +from immermatch.search_provider import ( # noqa: E402 + get_provider, + get_provider_fingerprint, + parse_provider_query, # noqa: E402 +) # --------------------------------------------------------------------------- # Page configuration @@ -727,21 +731,23 @@ def _run_pipeline() -> None: return cache = _get_cache() + provider = get_provider(location) + provider_fingerprint = get_provider_fingerprint(provider) # Eagerly create the Gemini client so it's ready before the pipeline starts — # avoids lazy-init delay between query generation and job evaluation. client = create_client() if _keys_ok() else None # ---- Step 1: Generate queries ---------------------------------------- with st.status("✨ Crafting search queries...", expanded=False) as status: - cached_queries = cache.load_queries(profile, location) + cached_queries = cache.load_queries(profile, location, provider_fingerprint) if cached_queries is not None: queries = cached_queries status.update(label="✅ Queries generated (cached)", state="complete") else: if client is None: client = create_client() - queries = generate_search_queries(client, profile, location) - cache.save_queries(profile, location, queries) + queries = generate_search_queries(client, profile, location, provider=provider) + cache.save_queries(profile, location, queries, provider_fingerprint) status.update(label="✅ Queries generated", state="complete") st.session_state.queries = queries @@ -826,6 +832,7 @@ def _search_progress(qi: int, total: int, unique: int) -> None: location=location, on_progress=_search_progress, on_jobs_found=_on_jobs_found, + provider=provider, ) cache.save_jobs(jobs, location) search_status.update(label=f"✅ Found {len(jobs)} unique jobs", state="complete") diff --git a/immermatch/bundesagentur.py b/immermatch/bundesagentur.py index 8c9f7f6..57774e7 100644 --- a/immermatch/bundesagentur.py +++ b/immermatch/bundesagentur.py @@ -234,6 +234,7 @@ class BundesagenturProvider: """ name: str = "Bundesagentur für Arbeit" + source_id: str = "bundesagentur" def __init__( self, diff --git a/immermatch/cache.py b/immermatch/cache.py index f2cc149..798ee3b 100644 --- a/immermatch/cache.py +++ b/immermatch/cache.py @@ -77,7 +77,12 @@ def save_profile(self, cv_text: str, profile: CandidateProfile) -> None: # 2. Queries (keyed by profile hash + location) # ------------------------------------------------------------------ - def load_queries(self, profile: CandidateProfile, location: str) -> list[str] | None: + def load_queries( + self, + profile: CandidateProfile, + location: str, + provider_fingerprint: str = "", + ) -> list[str] | None: data = self._load("queries.json") if data is None: return None @@ -85,17 +90,26 @@ def load_queries(self, profile: CandidateProfile, location: str) -> list[str] | return None if data.get("location") != location: return None + if data.get("provider_fingerprint", "") != provider_fingerprint: + return None queries = data.get("queries") if not isinstance(queries, list): return None return queries - def save_queries(self, profile: CandidateProfile, location: str, queries: list[str]) -> None: + def save_queries( + self, + profile: CandidateProfile, + location: str, + queries: list[str], + provider_fingerprint: str = "", + ) -> None: self._save( "queries.json", { "profile_hash": _profile_hash(profile), "location": location, + "provider_fingerprint": provider_fingerprint, "queries": queries, }, ) diff --git a/immermatch/search_agent.py b/immermatch/search_agent.py index 8657b50..8471d68 100644 --- a/immermatch/search_agent.py +++ b/immermatch/search_agent.py @@ -17,7 +17,13 @@ from .llm import call_gemini, parse_json from .models import CandidateProfile, JobListing -from .search_provider import CombinedSearchProvider, SearchProvider, get_provider, parse_provider_query +from .search_provider import ( + CombinedSearchProvider, + SearchProvider, + format_provider_query, + get_provider, + parse_provider_query, +) # Re-export SerpApi helpers so existing imports keep working. from .serpapi_provider import BLOCKED_PORTALS as _BLOCKED_PORTALS # noqa: F401 @@ -39,8 +45,13 @@ def _provider_quota_source_key(provider: SearchProvider) -> str: source_id = getattr(provider, "source_id", None) if isinstance(source_id, str) and source_id.strip(): return source_id.strip().lower() - if getattr(provider, "name", None) == "Bundesagentur für Arbeit": + name = getattr(provider, "name", None) + if isinstance(name, str) and name == "Bundesagentur für Arbeit": return "bundesagentur" + if isinstance(name, str) and "serpapi" in name.lower(): + return "serpapi" + if type(provider).__name__ == "SerpApiProvider": + return "serpapi" return type(provider).__name__.lower() @@ -232,7 +243,7 @@ def generate_search_queries( child_count, child_provider, ) - merged_queries.extend([f"provider={child_provider.name}::{query}" for query in child_queries]) + merged_queries.extend([format_provider_query(child_provider.name, query) for query in child_queries]) seen: set[str] = set() unique_queries: list[str] = [] diff --git a/immermatch/search_provider.py b/immermatch/search_provider.py index 0cdab6d..0c81c8e 100644 --- a/immermatch/search_provider.py +++ b/immermatch/search_provider.py @@ -20,6 +20,11 @@ _PROVIDER_QUERY_SEPARATOR = "::" +def format_provider_query(provider_name: str, query: str) -> str: + """Format a query with explicit provider routing metadata.""" + return f"{_PROVIDER_QUERY_PREFIX}{provider_name}{_PROVIDER_QUERY_SEPARATOR}{query}" + + def parse_provider_query(query: str) -> tuple[str | None, str]: """Parse an optionally provider-targeted query. @@ -37,6 +42,26 @@ def parse_provider_query(query: str) -> tuple[str | None, str]: return None, query +def get_provider_fingerprint(provider: SearchProvider) -> str: + """Return a stable fingerprint for the active provider configuration. + + Used by query cache to avoid reusing provider-targeted query sets when + provider configuration changes (e.g. SerpApi enabled/disabled). + """ + + def _provider_key(p: SearchProvider) -> str: + source_id = getattr(p, "source_id", None) + if isinstance(source_id, str) and source_id.strip(): + return source_id.strip().lower() + name = getattr(p, "name", "") + if isinstance(name, str) and name.strip(): + return name.strip().lower() + return type(p).__name__.lower() + + providers = provider.providers if isinstance(provider, CombinedSearchProvider) else [provider] + return "|".join(sorted({_provider_key(p) for p in providers})) + + @runtime_checkable class SearchProvider(Protocol): """Pluggable interface for job-search backends. @@ -94,6 +119,9 @@ def search( ) providers = self.providers + if not providers: + return [] + if max_results <= 0: return [] diff --git a/immermatch/serpapi_provider.py b/immermatch/serpapi_provider.py index cf9bdbf..7a21a0a 100644 --- a/immermatch/serpapi_provider.py +++ b/immermatch/serpapi_provider.py @@ -348,6 +348,7 @@ class SerpApiProvider: """ name: str = "SerpApi (Google Jobs)" + source_id: str = "serpapi" def search( self, diff --git a/tests/test_bundesagentur.py b/tests/test_bundesagentur.py index c947589..e220bac 100644 --- a/tests/test_bundesagentur.py +++ b/tests/test_bundesagentur.py @@ -586,13 +586,14 @@ def test_html_only_strategy_uses_html_detail(self) -> None: provider = BundesagenturProvider(detail_strategy="html_only") with ( patch("immermatch.bundesagentur._fetch_detail", return_value=html_detail), - patch("immermatch.bundesagentur._fetch_detail_api"), + patch("immermatch.bundesagentur._fetch_detail_api") as mock_api, patch("immermatch.bundesagentur.httpx.Client"), ): listings = provider._enrich(items) assert len(listings) == 1 assert listings[0].description == "HTML only detail" + mock_api.assert_not_called() class TestSearchProviderProtocol: diff --git a/tests/test_cache.py b/tests/test_cache.py index 19bf4f2..06a4223 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -70,6 +70,10 @@ def test_miss_on_different_profile(self, cache: ResultCache, profile: CandidateP ) assert cache.load_queries(other, "Munich") is None + def test_miss_on_different_provider_fingerprint(self, cache: ResultCache, profile: CandidateProfile): + cache.save_queries(profile, "Munich", ["q1"], provider_fingerprint="bundesagentur|serpapi") + assert cache.load_queries(profile, "Munich", provider_fingerprint="bundesagentur") is None + class TestJobsCache: @freeze_time("2026-02-20") diff --git a/tests/test_search_agent.py b/tests/test_search_agent.py index c96808a..085c7e4 100644 --- a/tests/test_search_agent.py +++ b/tests/test_search_agent.py @@ -288,6 +288,7 @@ def test_defaults_to_get_provider(self, mock_gp: MagicMock): def test_combined_provider_hard_quota_requires_30_each_before_stop(self): ba_provider = MagicMock() ba_provider.name = "Bundesagentur für Arbeit" + ba_provider.source_id = "bundesagentur" ba_jobs = [self._make_job(f"BA {i}", company=f"BA Co {i}", location="Berlin") for i in range(30)] for job in ba_jobs: job.source = "bundesagentur" @@ -295,6 +296,7 @@ def test_combined_provider_hard_quota_requires_30_each_before_stop(self): serp_provider = MagicMock() serp_provider.name = "SerpApi (Google Jobs)" + serp_provider.source_id = "serpapi" serp_jobs = [self._make_job(f"SERP {i}", company=f"SERP Co {i}", location="Berlin") for i in range(30)] for job in serp_jobs: job.source = "serpapi"