diff --git a/.env.example b/.env.example index 0f45cea..bdc401d 100644 --- a/.env.example +++ b/.env.example @@ -5,10 +5,6 @@ # Get yours at: https://aistudio.google.com/app/apikey GOOGLE_API_KEY=your_google_api_key_here -# SerpApi Key (required) -# Get yours at: https://serpapi.com/ -SERPAPI_KEY=your_serpapi_key_here - # Supabase (required for production / daily digest) # Get yours at: https://supabase.com/dashboard SUPABASE_URL=https://your-project.supabase.co diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 07fd87b..c915300 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -11,7 +11,7 @@ Run the full check suite without asking — just do it: ```bash -source .venv/bin/activate && pytest tests/ -x -q && ruff check . && mypy . +source .venv/bin/activate && pytest tests/ -x -q && ruff check --fix . && mypy . ``` ## Testing conventions diff --git a/.github/workflows/daily-digest.yml b/.github/workflows/daily-digest.yml index cc8f53e..0ade6dd 100644 --- a/.github/workflows/daily-digest.yml +++ b/.github/workflows/daily-digest.yml @@ -24,7 +24,6 @@ jobs: - name: Run daily digest env: GOOGLE_API_KEY: ${{ secrets.GOOGLE_API_KEY }} - SERPAPI_KEY: ${{ secrets.SERPAPI_KEY }} SUPABASE_URL: ${{ secrets.SUPABASE_URL }} SUPABASE_KEY: ${{ secrets.SUPABASE_KEY }} SUPABASE_SERVICE_KEY: ${{ secrets.SUPABASE_SERVICE_KEY }} diff --git a/AGENTS.md b/AGENTS.md index 232c366..bffb26e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -65,54 +65,68 @@ This document defines the persona, context, and instruction sets for the AI agen **Role:** Search Engine Optimization Specialist **Input:** The "Profiler" JSON summary + User's desired location (e.g., "Munich, Germany"). -**Output:** A JSON array of 20 search query strings optimized for Google Jobs. +**Output:** A JSON array of 20 search query strings. + +The system prompt is selected based on the active **SearchProvider**: + +### 2a. BA Headhunter Prompt (Bundesagentur für Arbeit — default) + +Used when `provider.name == "Bundesagentur für Arbeit"`. Generates keyword-only queries (no location tokens) because the BA API has a dedicated `wo` parameter for location filtering. **System Prompt:** -> You are a Search Specialist. Based on the candidate's profile and location, generate 20 distinct search queries to find relevant job openings. +> You are a Search Specialist generating keyword queries for the German Federal Employment Agency job search API (Bundesagentur für Arbeit). > -> IMPORTANT: Keep queries SHORT and SIMPLE (1-3 words). Google Jobs works best with simple, broad queries. +> Based on the candidate's profile, generate distinct keyword queries to find relevant job openings. The API searches across German job listings and handles location filtering separately. > -> CRITICAL: Always use LOCAL names, not English ones. For example use "München" not "Munich", "Köln" not "Cologne", "Wien" not "Vienna", "Zürich" not "Zurich", "Praha" not "Prague", "Deutschland" not "Germany". +> IMPORTANT RULES: +> - Queries must be SHORT: 1-3 words ONLY +> - Do NOT include any city, region, or country names — location is handled by the API +> - Do NOT include "remote", "hybrid", or similar work-mode keywords +> - Include BOTH German and English job titles (the API indexes both) +> - Use different synonyms for the same role > -> **Adapt your strategy to the SCOPE of the Target Location:** -> -> A) If the location is a CITY (e.g. "München", "Amsterdam"): -> 1. Queries 1-5: Exact role titles + local city name -> 2. Queries 6-10: Broader role synonyms + city -> 3. Queries 11-15: Industry/domain keywords without city or with "remote" -> 4. Queries 16-20: Very broad industry terms +> Strategy: +> 1. First third: Exact role titles in German (e.g., "Softwareentwickler", "Datenanalyst", "Projektleiter") +> 2. Second third: Exact role titles in English (e.g., "Software Developer", "Data Analyst", "Project Manager") +> 3. Final third: Technology + role combinations and broader terms (e.g., "Python Entwickler", "Machine Learning", "DevOps Engineer") + +### 2b. SerpApi Headhunter Prompt (Google Jobs — retained for future use) + +Used when `provider.name != "Bundesagentur für Arbeit"` (e.g., SerpApiProvider for non-German markets). Generates location-enriched queries optimised for Google Jobs. + +**System Prompt:** +> You are a Search Specialist. Based on the candidate's profile and location, generate 20 distinct search queries to find relevant job openings. > -> B) If the location is a COUNTRY (e.g. "Germany", "Netherlands"): -> 1. Queries 1-5: Exact role titles + local country name (e.g. "Data Engineer Deutschland") -> 2. Queries 6-10: Same roles + major cities in that country (e.g. "Backend Developer München", "Backend Developer Berlin") -> 3. Queries 11-15: Broader role synonyms + country or "remote" -> 4. Queries 16-20: Very broad industry terms +> IMPORTANT: Keep queries SHORT and SIMPLE (1-3 words). Google Jobs works best with simple, broad queries. > -> C) If the location is "remote", "worldwide", or similar: -> 1. Queries 1-10: Exact role titles + "remote" -> 2. Queries 11-15: Broader role synonyms + "remote" -> 3. Queries 16-20: Very broad industry terms without any location +> CRITICAL: Always use LOCAL names, not English ones. For example use "München" not "Munich", "Köln" not "Cologne", "Wien" not "Vienna", "Zürich" not "Zurich", "Praha" not "Prague", "Deutschland" not "Germany". > -> Additional strategy: -> - Include BOTH English and local-language job titles for the target country -> - Use different synonyms for the same role (e.g., "Manager", "Lead", "Specialist", "Analyst") +> *(Full location-strategy sections A/B/C as before)* -**Generation Config:** +**Generation Config (both prompts):** - Temperature: 0.5 - Max tokens: 8192 -**Post-processing:** -- Queries missing a location keyword (words from the user's location string, or "remote") get the target location auto-appended before searching. -- English city names are automatically translated to local names (e.g., Munich → München, Vienna → Wien, Cologne → Köln) via a regex-based replacement in `search_agent.py:_localise_query()`. -- English country names are also translated to local names (e.g., Germany → Deutschland, Austria → Österreich, Switzerland → Schweiz) via `_COUNTRY_LOCALISE` and `_COUNTRY_LOCALISE_PATTERN`. -- Both city and country localisation are applied to the query itself *and* to the auto-appended location suffix. +**Search Provider Architecture:** + +The search pipeline uses a pluggable `SearchProvider` protocol (defined in `search_provider.py`): -**Search behaviour:** -- Google `gl=` country code is inferred from the location string via `_infer_gl()` (maps ~60 country/city names to 2-letter codes, defaults to `"de"` for non-remote locations). -- For purely remote/global searches (location contains only tokens like "remote", "worldwide", "global", "anywhere", "weltweit"), `_is_remote_only()` returns `True` and `_infer_gl()` returns `None` — the `gl` param is omitted from SerpApi so results aren't country-biased. -- SerpApi's `location` parameter is passed for non-remote searches (the raw user-supplied string, e.g. "Munich, Germany") for geographic filtering. Omitted for remote searches. -- Searches stop early once 50 unique jobs have been collected, saving SerpAPI quota. -- Listings from questionable job portals (BeBee, Jooble, Adzuna, etc.) are filtered out at parse time. Jobs with no remaining apply links after filtering are discarded entirely. +```python +class SearchProvider(Protocol): + name: str + def search(self, query: str, location: str, max_results: int = 50) -> list[JobListing]: ... +``` + +- **`BundesagenturProvider`** (default) — queries the free Bundesagentur für Arbeit REST API (`rest.arbeitsagentur.de`). Handles pagination, parallel detail-fetching, and retry logic internally. +- **`SerpApiProvider`** — wraps Google Jobs via SerpApi. Handles localisation, `gl` code inference, and blocked portal filtering internally. +- **`get_provider(location)`** factory — currently always returns `BundesagenturProvider`. Future: route by country. + +**Search orchestration (`search_all_queries()`):** +- Iterates queries in parallel (`ThreadPoolExecutor`, max 5 workers) +- Each query is forwarded to `provider.search(query, location, max_results=jobs_per_query)` +- Deduplicates by `title|company_name` +- Stops early once 50 unique jobs are collected +- Supports `on_progress` and `on_jobs_found` callbacks for streaming results --- @@ -192,7 +206,14 @@ MODEL = "gemini-3-flash-preview" MAX_RETRIES = 5 BASE_DELAY = 3 # seconds, exponential backoff with jitter -# SerpApi Google Jobs parameters +# Bundesagentur für Arbeit API (default provider) +BA_BASE_URL = "https://rest.arbeitsagentur.de/jobboerse/jobsuche-service" +BA_API_KEY = "jobboerse-jobsuche" # pragma: allowlist secret # public API key, not a secret +# Search endpoint: /pc/v4/jobs (params: was, wo, veroeffentlichtseit, size, page, angebotsart) +# Detail endpoint: /pc/v4/jobdetails/{refnr} +# Default: veroeffentlichtseit=28 (last 28 days only) + +# SerpApi Google Jobs parameters (retained for future non-German markets) SERPAPI_PARAMS = { "engine": "google_jobs", "hl": "en", # Language: English (for broader results) @@ -203,11 +224,12 @@ SERPAPI_PARAMS = { ### Rate Limiting & Retry -- Exponential backoff with jitter for `429 RESOURCE_EXHAUSTED` and `503 UNAVAILABLE` errors +- Exponential backoff with jitter for `429 RESOURCE_EXHAUSTED` and `503 UNAVAILABLE` errors (Gemini) - Centralized in `llm.py:call_gemini()` — 5 retries with `3 * 2^attempt + random(0,1)` second delays -- SerpApi: 100 searches/month on free tier +- Bundesagentur API: retry up to 3 times on 5xx errors with exponential backoff +- SerpApi: 100 searches/month on free tier (not currently used) -### Blocked Job Portals +### Blocked Job Portals (SerpApi only) Jobs from the following portals are discarded during search result parsing (see `search_agent.py:_BLOCKED_PORTALS`): @@ -262,6 +284,7 @@ class JobListing(BaseModel): description: str = "" link: str = "" posted_at: str = "" + source: str = "" # search provider that produced this listing apply_options: list[ApplyOption] = [] # direct apply links (LinkedIn, career page, etc.) class JobEvaluation(BaseModel): @@ -379,7 +402,7 @@ Per-subscriber pipeline, designed to run in GitHub Actions (or any cron schedule **Privacy:** All log messages reference subscribers by UUID (`sub=`), never by email address. Email addresses are only used in the `send_daily_digest()` call. -Required env vars: `GOOGLE_API_KEY`, `SERPAPI_KEY`, `SUPABASE_URL`, `SUPABASE_SERVICE_KEY`, `RESEND_API_KEY`, `RESEND_FROM`, `APP_URL`. +Required env vars: `GOOGLE_API_KEY`, `SUPABASE_URL`, `SUPABASE_SERVICE_KEY`, `RESEND_API_KEY`, `RESEND_FROM`, `APP_URL`. ### Email Templates (`emailer.py`) - `send_daily_digest(user_email, jobs, unsubscribe_url, target_location)` — card-style job listings with score pill badges, location pins, "View Job" CTA buttons, match summary stats (excellent/good counts), and target location in header @@ -479,7 +502,8 @@ Schema setup: run `python setup_db.py` to check tables and print migration SQL. |---|---|---| | `test_llm.py` (12 tests) | `llm.py` | `parse_json()` (8 cases: raw, fenced, embedded, nested, errors) + `call_gemini()` retry logic (4 cases: success, ServerError retry, 429 retry, non-429 immediate raise) | | `test_evaluator_agent.py` (8 tests) | `evaluator_agent.py` | `evaluate_job()` (4 cases: happy path, API error fallback, parse error fallback, non-dict fallback) + `evaluate_all_jobs()` (3 cases: sorted output, progress callback, empty list) + `generate_summary()` (2 cases: score distribution in prompt, missing skills in prompt) | -| `test_search_agent.py` (32 tests) | `search_agent.py` | `_is_remote_only()` (remote tokens, non-remote) + `_infer_gl()` (known locations, unknown default, remote returns None, case insensitive) + `_localise_query()` (city names, country names, case insensitive, multiple cities) + `_parse_job_results()` (valid, blocked portals, mixed, empty, no-apply-links) + `search_all_queries()` (location auto-append with localisation, no double-append, early stopping) + `TestLlmJsonRecovery` (profile_candidate and generate_search_queries retry/recovery) | +| `test_search_agent.py` (35 tests) | `search_agent.py` | `_is_remote_only()` (remote tokens, non-remote) + `_infer_gl()` (known locations, unknown default, remote returns None, case insensitive) + `_localise_query()` (city names, country names, case insensitive, multiple cities) + `_parse_job_results()` (valid, blocked portals, mixed, empty, no-apply-links) + `search_all_queries()` (provider delegation, dedup, early stopping, callbacks, default provider) + `generate_search_queries()` prompt selection (BA vs SerpApi) + `TestLlmJsonRecovery` (profile_candidate and generate_search_queries retry/recovery) | +| `test_bundesagentur.py` (22 tests) | `bundesagentur.py` | `_build_ba_link()`, `_parse_location()`, `_parse_search_results()`, `_parse_listing()`, `BundesagenturProvider.search()` (basic merge, pagination, HTTP errors, empty results, detail fetch failures), `SearchProvider` protocol conformance | | `test_cache.py` (17 tests) | `cache.py` | All cache operations: profile, queries, jobs (merge/dedup), evaluations, unevaluated job filtering | | `test_cv_parser.py` (6 tests) | `cv_parser.py` | `_clean_text()` + `extract_text()` for .txt/.md, error cases | | `test_models.py` (23 tests) | `models.py` | All Pydantic models: validation, defaults, round-trip serialization | @@ -538,15 +562,15 @@ This section documents the development process and conventions for both human an source .venv/bin/activate # Test: pytest tests/ -x -q -# Lint: ruff check . && ruff format --check . +# Lint: ruff check --fix . && ruff format --check . # Types: mypy . # Run app: streamlit run immermatch/app.py -# All: ruff check . && mypy . && pytest tests/ -x -q +# All: ruff check --fix . && mypy . && pytest tests/ -x -q ``` **IMPORTANT:** After every code change, run the check suite **without asking for permission** — just do it: ```bash -source .venv/bin/activate && pytest tests/ -x -q && ruff check . && mypy . +source .venv/bin/activate && pytest tests/ -x -q && ruff check --fix . && mypy . ``` Do not ask the user "Shall I run the tests?" — always run them automatically. @@ -564,6 +588,7 @@ Do not ask the user "Shall I run the tests?" — always run them automatically. - All `st.error()` calls must show generic messages; real exceptions go to `logger.exception()` - Follow the test file naming convention: `tests/test_.py` for `immermatch/.py` - After implementing changes, always run `pytest tests/ -x -q` to verify nothing is broken +- Use as much as possible external libraries and built-in functions instead of writing custom code (e.g., for date parsing, string manipulation, etc.) — this increases reliability and reduces bugs ### Development workflow diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e6e001b..4216f19 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -31,7 +31,7 @@ No API keys are needed for running tests — all external services are mocked. 2. Make your changes and run quality checks: ```bash - ruff check . && ruff format --check . + ruff check --fix . && ruff format --check . mypy immermatch/ daily_task.py pytest tests/ -x -q ``` diff --git a/Improving Job Search API Results.md b/Improving Job Search API Results.md new file mode 100644 index 0000000..62d2523 --- /dev/null +++ b/Improving Job Search API Results.md @@ -0,0 +1,290 @@ +# **Architecting High-Fidelity Job Aggregation Pipelines: Mitigating Expired, Fraudulent, and Misaligned Listings in Recruitment Data** + +## **1\. Introduction to the Problem Space: The Degradation of SERP-Based Job Aggregation** + +The modern digital recruitment landscape is characterized by a high degree of fragmentation, necessitating the use of data aggregation systems to construct centralized, accessible job repositories. Platforms relying on Search Engine Results Pages (SERP) APIs, specifically Google Jobs accessed via proxy services such as SerpAPI, frequently encounter severe data quality degradation. In the context of the "immermatch" project, this degradation severely impacts the user experience and algorithmic matching efficacy. The system currently manifests three primary failure modalities: temporal decay (expired job links), adversarial monetization (routing to paywalled platforms or fraudulent scam networks), and semantic drift (links redirecting to misaligned job titles or entirely different corporate entities). + +The reliance on Google Jobs introduces a fundamental architectural vulnerability. Google Jobs does not function as a primary Applicant Tracking System (ATS); rather, it operates as a secondary aggregator that indexes JobPosting schema markup distributed across the open web.1 Because Google’s ranking algorithms heavily weight domain authority, structural formatting, and Search Engine Optimization (SEO) metrics, third-party job boards, recruitment agencies, and malicious scraping platforms frequently outrank the primary corporate ATS.2 Consequently, an aggregation pipeline fetching data from Google Jobs inherits the noise, duplication, and manipulative routing mechanisms prevalent in the SEO-driven employment market. + +To elevate the fidelity of an aggregation platform like immermatch, the underlying system architecture must transition from passive SERP consumption to active, programmatic validation. This necessitates the implementation of a multi-tiered data governance pipeline capable of pre-flight validation, semantic alignment verification, and direct-source API integration. The following report provides an exhaustive, expert-level analysis of the programmatic solutions required to rectify these data quality issues, with a specific emphasis on global applicability, computational natural language processing (NLP), and the unique regulatory and technological nuances of the German labor market. + +## **2\. The Architectural Mechanics and SEO Vulnerabilities of Google Jobs** + +To engineer effective solutions for the immermatch platform, it is critical to deconstruct why Google Jobs natively produces suboptimal search results. The phenomenon of misaligned links and aggregator dominance is not an anomaly; it is a direct consequence of how search engines process and rank structured data. + +### **2.1. Schema Markup and Indexation Bias** + +When a user searches for job-related keywords, Google Jobs aggregates listings by scraping websites that have implemented the JobPosting structured data schema.1 However, organic ranking factors heavily influence which version of a duplicated job posting is displayed to the end-user. Research into Google Jobs indexation reveals that third-party platforms, such as The Muse or Monster, often outrank the original hiring company's website because they provide a richer schema payload.2 + +Third-party aggregators systematically inject extensive metadata into their JobPosting JSON-LD structures, including specific experience requirements, HTML-formatted descriptions, unique URL structures per position, and highly optimized organization logos.2 Furthermore, large aggregators utilize the Google Indexing API to push real-time updates, whereas smaller corporate ATS platforms wait for passive algorithmic crawling.2 This creates a structural bias where the immermatch system, querying SerpAPI, will predominantly receive links to secondary job boards rather than direct employer applications. + +### **2.2. Exploiting Dynamic and Static Filter Parameters** + +If the immermatch platform must continue to leverage SerpAPI for specific geographic or niche queries, the query construction logic must be heavily optimized using Google's internal filtering syntax. The SerpAPI Google Jobs engine relies on a filtering mechanism separated into dynamic and static values, which are passed via the chips array.4 + +Static filter values apply universally across all search contexts. These include parameters such as the date a job was posted (date\_posted:week, date\_posted:today) or the employment type (employment\_type:FULLTIME, employment\_type:CONTRACTOR).4 Utilizing strict static filters, particularly narrowing the temporal window to date\_posted:3days, can marginally reduce the ingestion of expired ghost jobs, though it does not eliminate the root cause of temporal decay. + +Conversely, dynamic filter values adapt based on the specific search keyword, generating internal Google knowledge graph identifiers for specific corporate entities, represented as organization\_mid.4 For example, targeting Apple requires appending the parameter chips=organization\_mid:/m/0k8z. To mitigate the routing of candidates to incorrect companies (semantic drift), the immermatch aggregation platform must be refactored to extract the organization\_mid from initial broad queries and append it to subsequent deep-dive queries. This forces the Google API to strictly isolate jobs belonging to the cryptographically verified corporate entity, significantly reducing the noise caused by recruitment agencies bidding on identical job titles. + +## **3\. Strategic Alternatives to SERP Aggregation: Direct API Integrations** + +The most definitive and architecturally sound solution to the vulnerabilities inherent in Google Jobs scraping is the bypass of the SERP entirely in favor of direct Application Programming Interface (API) integrations. By sourcing data directly from specialized job data providers or governmental databases, aggregation platforms can guarantee higher data provenance, significantly reduce latency, and entirely eliminate the intermediary layer of SEO-optimized third-party boards that harbor scams and paywalls. + +### **3.1. Evaluating Commercial Job Data API Providers** + +A transition to commercial data feeds allows for the ingestion of structured, normalized data that has already undergone preliminary deduplication and validation. The landscape of job data APIs in 2025 and 2026 presents several highly capable alternatives, each with distinct operational and financial profiles: + +* **Coresignal:** Recognized for its immense historical breadth, Coresignal provides access to over 349 million global job records, heavily enriched by broad LinkedIn coverage.5 The platform excels in providing structured employee data alongside job data, which is highly beneficial for advanced predictive talent analytics.6 However, Coresignal charges a premium (up to 10 times what alternatives charge), which is justified only if the immermatch platform requires multi-source enrichment and recruiter contact data; for strictly raw job listings, it is cost-prohibitive.6 +* **Techmap:** Positioned as a highly scalable and cost-effective alternative to Coresignal, Techmap offers direct company sourcing capabilities across a broader range of countries.5 Its fixed-cost structure and hourly update frequency make it highly viable for platforms requiring high-volume daily ingestions without facing exponential cost scaling.5 +* **LinkUp:** LinkUp distinguishes itself by exclusively indexing jobs directly from primary employer websites, thereby entirely bypassing the third-party job board ecosystem.5 For the immermatch architecture, this design choice natively solves the issue of misaligned links and aggregator paywalls, providing the highest possible fidelity of direct-source data. However, its geographic coverage is less comprehensive than Techmap.5 +* **JobDataAPI and JobsPikr:** At a highly competitive price point of approximately $1 per 1,000 jobs, JobDataAPI provides access to roughly 6.7 million new global job postings per month.5 It supports modern data formats including JSON, Parquet, and JSON-LD, making it highly compatible with big data processing pipelines.6 Similarly, JobsPikr offers customizable crawling solutions tailored for large-scale database ingestion.5 + +| API Provider | Primary Data Source Architecture | Core Strategic Advantage | Pricing Paradigm | Geographic Optimization | +| :---- | :---- | :---- | :---- | :---- | +| **Coresignal** | Multi-source aggregation (LinkedIn heavy) | Vast historical data, employee enrichment integration | Premium / Volume-based | Global | +| **Techmap** | Direct ATS & Company sites | High country coverage, frequent hourly updates | Fixed Subscription | Global | +| **LinkUp** | Strictly Employer ATS websites | Zero aggregator noise, guaranteed direct applications | Enterprise Subscription | Global (US Heavy) | +| **JobDataAPI** | Aggregated web sources | High cost efficiency ($1/1000 jobs), vast format support | Pay-per-use | Global | + +### **3.2. Exploiting Federal Data Architectures: The Bundesagentur für Arbeit API** + +For the immermatch platform, particularly when operating within or targeting the German labor market, leveraging the official digital infrastructure of the German government provides an unparalleled strategic advantage. The Bundesagentur für Arbeit (BA), Germany's Federal Employment Agency, maintains the largest, most tightly regulated, and authoritative job database in the nation.7 + +The BA provides a highly structured RESTful API (jobsuche.api.bund.dev), which allows developers to programmatically query the national registry of open positions.7 Unlike SERP scraping, this data is strictly regulated by federal mandates, drastically reducing the probability of encountering fraudulent listings, ghost jobs, or paywalled entry points. Employers posting on the BA portal undergo verification, ensuring a baseline of corporate legitimacy.8 + +The technical integration requires querying the endpoint https://rest.arbeitsagentur.de/jobboerse/jobsuche-service.7 Authentication is managed via a static client ID header, specifically passing X-API-Key: jobboerse-jobsuche in GET requests.7 The API supports complex, type-safe querying parameters, allowing the immermatch system to filter by geographic location, contract duration (Befristung), employment type (Arbeitszeit), and specific professional classifications utilizing endpoints such as /pc/v4/jobs for standard searches and /ed/v1/arbeitgeberlogo/{hashID} for retrieving corporate branding assets.7 + +To abstract the pagination, rate-limiting, and payload deserialization complexities of the BA API, robust open-source clients have been developed. For Rust environments, the jobsuche crate provides strong typing with enums for all parameters and supports both synchronous and asynchronous operations.10 For Python-centric data pipelines, the de-jobsuche PyPI package provides equivalent wrapper functionality, allowing rapid integration via pip install deutschland\[jobsuche\].12 Furthermore, data science teams utilizing R can leverage the bunddev CRAN package, which provides native functions like jobsuche\_search() and jobsuche\_logo() alongside built-in throttling and caching mechanisms.9 + +By directly integrating with the BA API, the immermatch platform immediately bypasses the SEO-manipulated Google Jobs environment, securing direct links to verified employer portals or official federal application systems. + +## **4\. Navigating the Aggregator Ecosystem and Strategic Domain Blacklisting** + +While transitioning to direct APIs is the optimal long-term strategy, legacy systems relying on SerpAPI must implement intermediate defensive measures. The most immediate method to prevent routing to paywalled aggregators or misaligned search pages is the implementation of stringent Boolean exclusion logic within the search query.14 + +Because Google Jobs aggregates from thousands of domains, many of which are parasitic entities that simply re-host content to harvest applicant data or serve advertisements, these domains must be algorithmically blacklisted at the query level. + +### **4.1. Differentiating Marketplaces from Meta-Aggregators** + +To build an effective exclusion list, the data engineering team must distinguish between primary job marketplaces and meta-aggregators (search engines).15 Job marketplaces, such as StepStone, XING, Monster, and LinkedIn, host primary content; hiring companies pay these platforms directly to host their requisitions.15 While links to these platforms are not as ideal as a direct corporate ATS link, they generally represent valid, actionable job postings. + +Conversely, meta-aggregators function similarly to Google Jobs itself—they scrape the internet for job ads and syndicate content from other boards.15 When a candidate clicks a Google Jobs link that routes to a meta-aggregator, they are dumped into a secondary search interface rather than a targeted job application page, resulting in a severely degraded user experience.8 + +### **4.2. Implementing Exclusionary Query Logic** + +Implementing search operators such as the uppercase NOT (e.g., software engineer NOT jobot NOT bravado) forces the SERP engine to drop listings originating from known spam networks or meta-aggregators before the payload is ever returned via SerpAPI.14 This logic must be applied dynamically, backed by an internal database of known adversarial domains. + +For the German market specifically, the immermatch pipeline must implement permanent exclusion filters against meta-aggregators that loop traffic without providing direct ATS links. Domains that should be rigorously blacklisted from the search parameters include: + +* **Jobrapido:** A high-traffic search engine that scrapes primary boards, frequently resulting in infinite redirect loops for the end-user.15 +* **Kimeta:** Functions entirely as a crawling technology, rarely hosting primary application infrastructure.15 +* **Jooble, Adzuna, and Talent.com:** Global aggregators that syndicate content, heavily diluting the provenance of the original job posting.18 +* **Zuhausejobs.com:** Often cited as a lower-tier platform prone to hosting generic or aggregated remote listings that lack strict verification.8 + +By injecting an exclusion array (e.g., NOT site:jobrapido.de NOT site:kimeta.de) into every SerpAPI request payload, the immermatch system can artificially elevate the quality of the returned SERP data, forcing Google to surface primary ATS links or verified marketplaces like StepStone and Arbeitnow.17 + +## **5\. Mitigating Temporal Decay: The Engineering of Expired Link Detection** + +The phenomenon of "ghost jobs"—listings that remain active on the internet despite the position being filled, the hiring initiative being canceled, or the requisition being a facade to collect resumes—is a systemic issue in digital recruitment.20 Google Jobs indexation algorithms inherently lag behind the real-time state of corporate ATS databases.1 Consequently, the SerpAPI payload will inevitably contain links that are temporally decayed. To maintain platform integrity, the immermatch architecture must implement an asynchronous, multi-stage URL validation pipeline. + +### **5.1. HTTP Protocol Analysis and Redirect Tracing** + +The foundational layer of expired job detection relies on automated HTTP status code analysis. This can be achieved using libraries such as Python's native urllib, the popular requests library, or asynchronous equivalents like aiohttp for high-throughput concurrency.22 When a requisition is removed from a corporate ATS, the server rarely serves a standard HTTP 200 OK response containing the original content. Instead, the server behavior typically falls into one of three distinct paradigms: + +1. **Hard Deletion (HTTP 404 / 410):** The ATS returns a 404 Not Found or a 410 Gone status code.25 This is the most deterministic indicator of expiration. A simple Python script executing a HEAD request (which is significantly faster than a GET request as it omits the response body) can instantly flag these URLs, permitting immediate purging of the listing from the immermatch database.23 +2. **Access Revocation (HTTP 403 / 401):** Less commonly, the system may return a 403 Forbidden or 401 Unauthorized error, indicating that the listing has been transitioned from the public career site to an internal, authenticated tier accessible only to current employees.25 +3. **Soft Redirection (HTTP 301 / 302 / 307):** To preserve SEO authority and prevent users from encountering dead pages, many enterprise platforms implement a 301 Moved Permanently or a 302/307 Temporary Redirect.25 Instead of showing an error, the ATS redirects the user to the company’s main career homepage or a generic search interface. + +To programmatically identify these soft redirections, the validation script must execute requests with automatic redirection tracking explicitly disabled (e.g., setting allow\_redirects=False in the Python requests.get() method).27 By capturing the Location header in the 3xx response, the system can mathematically compare the destination URL against the original target. If the Uniform Resource Identifier (URI) path depth decreases significantly—for example, redirecting from a highly specific path like company.com/careers/engineering/req-1234 to a generic company.com/careers—the system can reliably infer that the specific requisition has been terminated and flag the job as expired.22 + +### **5.2. DOM Parsing and Lexical Heuristics for "Zombie" Pages** + +The most complex expiration scenario occurs when an ATS returns a valid HTTP 200 OK status code but dynamically replaces the job description with a generic expiration message.25 These "zombie" pages completely bypass HTTP-level status code detection. For example, enterprise systems like Oracle Taleo will frequently maintain the active URL structure but inject the text "Job is no longer available" or "This position has been filled" into the application container.28 + +Addressing this requires a secondary validation layer utilizing headless browser automation paired with robust HTML parsing frameworks.29 Because modern ATS platforms heavily utilize single-page application (SPA) frameworks like React, Angular, or Vue.js, the actual Document Object Model (DOM) content is rendered client-side via JavaScript.30 Attempting to scrape these pages using standard HTTP GET requests paired with BeautifulSoup will fail, returning only an empty HTML shell or loading scripts.30 + +To overcome JavaScript rendering, the validation pipeline must instantiate a headless browser. While legacy options like Scrapy coupled with Splash or Selenium exist, modern architectures favor Playwright or Pyppeteer due to their superior performance, native asynchronous support, and modern JavaScript engine compatibility.30 Tools like Crawl4AI can also be leveraged for extracting structured data from live, dynamic web pages without the overhead of manually managing browser contexts.32 + +A Playwright-based pipeline must instantiate a Chromium instance, navigate to the target URL, await network idle states to ensure all asynchronous API calls within the ATS have resolved, and then extract the fully rendered textual payload.30 Subsequently, a lexical analysis engine must scan the extracted text for predefined semantic markers of expiration. An array of regular expression (Regex) patterns targeting phrases such as (?i)(no longer available|position closed|role filled|not accepting applications) must be executed against the DOM text.28 If a match exceeds a predefined confidence threshold, the job is classified as a zombie page and removed from the immermatch index. + +| Expiration Paradigm | Technical Indicator | Required Validation Protocol | Computational Cost | +| :---- | :---- | :---- | :---- | +| **Hard Deletion** | HTTP 404 / 410 | Asynchronous HTTP HEAD/GET request | Low | +| **Soft Redirection** | HTTP 301 / 302 / 307 | HTTP Request with allow\_redirects=False | Low | +| **Zombie Page (Static)** | HTTP 200 \+ Expiration Text | BeautifulSoup DOM Parsing \+ Regex | Medium | +| **Zombie Page (Dynamic)** | HTTP 200 \+ JS Rendered Text | Playwright / Crawl4AI \+ Regex | High | + +## **6\. Eradicating Adversarial Monetization: Scam and Paywall Defense Architectures** + +The proliferation of fraudulent job listings and paywalled aggregator traps represents a critical threat to user trust and platform viability. By 2025 and 2026, scammers have evolved beyond simple phishing emails, utilizing sophisticated techniques including AI-generated job descriptions, cloned corporate sites, and "task scams" to harvest personally identifiable information (PII) or extort upfront fees from candidates.33 Simultaneously, parasitic job boards institute strict paywalls, demanding subscription fees for access to listings that are freely available on primary corporate sites.37 The immermatch pipeline must implement autonomous defense mechanisms against these dual adversarial vectors. + +### **6.1. Programmatic Detection of Paywalls via Semantic Web Standards** + +The most elegant and deterministic method for detecting paywalls without requiring complex, site-specific web scraping relies on the semantic web standards established by Schema.org, specifically the application/ld+json structured data specifications.39 To comply with Google’s strict indexing guidelines and prevent algorithmic penalties for cloaking (where content served to Googlebot differs from content served to users), websites implementing paywalls are required to explicitly declare the hidden nature of their content.42 + +This declaration is achieved using the isAccessibleForFree property within their JSON-LD payload.39 An example implementation provided by search engine guidelines mandates the inclusion of this boolean flag, alongside a hasPart array that explicitly maps CSS selectors (e.g., .meteredContent, .paywall) to the gated content regions.39 + +A Python-based extraction module can be deployed to intercept and evaluate this metadata. Using the requests library to fetch the HTML document and BeautifulSoup to parse the DOM, the pipeline isolates all \', + re.DOTALL, +) + + +def _build_ba_link(refnr: str) -> str: + """Construct the public Arbeitsagentur URL for a listing.""" + return f"https://www.arbeitsagentur.de/jobsuche/jobdetail/{refnr}" + + +def _parse_location(arbeitsort: dict) -> str: + """Build a human-readable location string from the API's *arbeitsort*.""" + parts: list[str] = [] + if ort := arbeitsort.get("ort"): + parts.append(ort) + if region := arbeitsort.get("region"): + # Avoid duplicating city name when region == city + if region != ort: + parts.append(region) + if land := arbeitsort.get("land"): + if land not in parts: + parts.append(land) + return ", ".join(parts) if parts else "Germany" + + +def _clean_html(raw: str) -> str: + """Strip HTML tags and decode entities, collapse whitespace.""" + text = html_mod.unescape(raw) + text = re.sub(r"<[^>]+>", " ", text) + return re.sub(r"\s+", " ", text).strip() + + +# ------------------------------------------------------------------ +# Detail page scraping +# ------------------------------------------------------------------ + + +def _fetch_detail(client: httpx.Client, refnr: str) -> dict: + """Fetch the public detail page and extract the ng-state JSON. + + Returns the ``jobdetail`` dict on success, or ``{}`` on any failure. + """ + url = _build_ba_link(refnr) + last_exc: Exception | None = None + for attempt in range(_MAX_RETRIES): + try: + resp = client.get(url) + if resp.status_code == 200: + match = _NG_STATE_RE.search(resp.text) + if match: + state = json.loads(match.group(1)) + return state.get("jobdetail", {}) # type: ignore[no-any-return] + logger.debug("BA detail %s: ng-state not found in HTML", refnr) + return {} + if resp.status_code in {403, 429, 500, 502, 503}: + delay = _BASE_DELAY * (2**attempt) + _BACKOFF_JITTER + logger.warning( + "BA detail page %s returned %s, retrying in %ss", + refnr, + resp.status_code, + delay, + ) + time.sleep(delay) + continue + logger.debug("BA detail page %s returned %s, skipping", refnr, resp.status_code) + return {} + except httpx.HTTPError as exc: + last_exc = exc + delay = _BASE_DELAY * (2**attempt) + _BACKOFF_JITTER + logger.warning("BA detail %s network error: %s, retrying in %ss", refnr, exc, delay) + time.sleep(delay) + if last_exc: + logger.error("BA detail %s failed after %d retries: %s", refnr, _MAX_RETRIES, last_exc) + return {} + + +def _fetch_detail_api(client: httpx.Client, refnr: str) -> dict: + """Fetch structured job detail JSON from the BA API using plain ``refnr``. + + Returns the detail dict on success, or ``{}`` on any failure. + """ + url = f"{_BASE_URL}/pc/v4/jobdetails/{refnr}" + last_exc: Exception | None = None + for attempt in range(_MAX_RETRIES): + try: + resp = client.get(url) + if resp.status_code == 200: + data = resp.json() + return data if isinstance(data, dict) else {} + if resp.status_code in {403, 429, 500, 502, 503}: + delay = _BASE_DELAY * (2**attempt) + _BACKOFF_JITTER + logger.warning( + "BA API detail %s returned %s, retrying in %ss", + refnr, + resp.status_code, + delay, + ) + time.sleep(delay) + continue + logger.debug("BA API detail %s returned %s, skipping", refnr, resp.status_code) + return {} + except (httpx.HTTPError, ValueError) as exc: + last_exc = exc + delay = _BASE_DELAY * (2**attempt) + _BACKOFF_JITTER + logger.warning("BA API detail %s error: %s, retrying in %ss", refnr, exc, delay) + time.sleep(delay) + if last_exc: + logger.error("BA API detail %s failed after %d retries: %s", refnr, _MAX_RETRIES, last_exc) + return {} + + +# ------------------------------------------------------------------ +# Search result parsing +# ------------------------------------------------------------------ + + +def _parse_listing(item: dict, detail: dict | None = None) -> JobListing | None: + """Convert a search-result item (+ optional detail) into a :class:`JobListing`. + + Returns ``None`` when the item lacks a ``refnr`` (the unique job ID). + """ + refnr = item.get("refnr", "") + if not refnr: + return None + + arbeitsort = item.get("arbeitsort", {}) + link = _build_ba_link(refnr) + + titel = item.get("titel", "") + beruf = item.get("beruf", "") + arbeitgeber = item.get("arbeitgeber", "") + ort = _parse_location(arbeitsort) + + # Prefer the rich description from the detail page when available. + description = "" + if detail: + raw_desc = detail.get("stellenangebotsBeschreibung", "") + if raw_desc: + description = _clean_html(raw_desc) + + # Fallback: build a minimal description from search fields. + if not description: + parts: list[str] = [] + if beruf and beruf != titel: + parts.append(f"Beruf: {beruf}") + if arbeitgeber: + parts.append(f"Arbeitgeber: {arbeitgeber}") + if ort: + parts.append(f"Standort: {ort}") + description = "\n".join(parts) + + # Build apply options — always include the Arbeitsagentur page link, + # plus an external career-site link when available in the detail data. + apply_options = [ApplyOption(source="Arbeitsagentur", url=link)] + if detail: + ext_url = str(detail.get("allianzpartnerUrl", "")).strip() + if ext_url: + if ext_url.startswith("//"): + ext_url = f"https:{ext_url}" + elif not re.match(r"^[a-zA-Z][a-zA-Z0-9+.-]*://", ext_url): + ext_url = f"https://{ext_url}" + ext_name = detail.get("allianzpartnerName", "Company Website") + apply_options.append(ApplyOption(source=ext_name, url=ext_url)) + + return JobListing( + title=titel or beruf or "Unknown", + company_name=arbeitgeber or "Unknown", + location=ort, + description=description, + link=link, + posted_at=item.get("aktuelleVeroeffentlichungsdatum", ""), + source="bundesagentur", + apply_options=apply_options, + ) + + +def _parse_search_results(data: dict) -> list[dict]: + """Return the raw search-result items (dicts) that have a ``refnr``.""" + return [item for item in data.get("stellenangebote", []) if item.get("refnr")] + + +class BundesagenturProvider: + """Job-search provider backed by the Bundesagentur für Arbeit API. + + Satisfies the :class:`~immermatch.search_provider.SearchProvider` protocol. + """ + + name: str = "Bundesagentur für Arbeit" + + def __init__( + self, + days_published: int = _DEFAULT_DAYS_PUBLISHED, + detail_workers: int = 5, + detail_strategy: Literal["api_then_html", "api_only", "html_only"] = "api_then_html", + ) -> None: + self._days_published = days_published + self._detail_workers = detail_workers + self._detail_strategy = detail_strategy + + # ------------------------------------------------------------------ + # Public API (SearchProvider protocol) + # ------------------------------------------------------------------ + + def search( + self, + query: str, + location: str, + max_results: int = 50, + ) -> list[JobListing]: + """Search for jobs and return listings with full descriptions. + + Args: + query: Free-text keyword (job title, skill, …). + location: City / region in Germany. + max_results: Upper bound on total results. + + Returns: + List of ``JobListing`` objects. When possible, descriptions + are scraped from the public detail pages; otherwise a minimal + fallback description is built from the search data. + """ + if not query or not query.strip(): + logger.debug("Skipping BA search: empty query") + return [] + + items = self._search_items(query, location, max_results) + if not items: + return [] + return self._enrich(items) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _search_items( + self, + query: str, + location: str, + max_results: int, + ) -> list[dict]: + """Paginate through the search endpoint and collect raw items.""" + page_size = min(max_results, 50) # BA allows up to 100, 50 is safe + items: list[dict] = [] + page = 1 # BA API pages are 1-indexed + + with httpx.Client(headers=_DEFAULT_HEADERS, timeout=30) as client: + while len(items) < max_results and page <= _MAX_PAGES: + params: dict[str, str | int] = { + "was": query, + "size": page_size, + "page": page, + "veroeffentlichtseit": self._days_published, + "angebotsart": 1, # jobs only (not self-employed / training) + } + if location.strip(): + params["wo"] = location + + resp = self._get_with_retry(client, f"{_BASE_URL}/pc/v4/jobs", params) + if resp is None: + break + + data = resp.json() + page_items = _parse_search_results(data) + if not page_items: + break + + items.extend(page_items) + total = int(data.get("maxErgebnisse", 0)) + if len(items) >= total or len(items) >= max_results: + break + + page += 1 + + if page > _MAX_PAGES and len(items) < max_results: + logger.warning("Reached BA page cap (%s) while searching query=%r", _MAX_PAGES, query) + + return items[:max_results] + + def _enrich(self, items: list[dict]) -> list[JobListing]: + """Fetch detail pages in parallel and build ``JobListing`` objects.""" + # Map refnr → detail dict (fetched in parallel). + details: dict[str, dict] = {} + with ( + httpx.Client(headers=_DEFAULT_HEADERS, timeout=30) as api_client, + httpx.Client( + timeout=30, + headers={ + "User-Agent": "Mozilla/5.0 (compatible; Immermatch/1.0)", + "Accept": "text/html", + }, + follow_redirects=True, + ) as html_client, + ): + with ThreadPoolExecutor(max_workers=self._detail_workers) as pool: + future_to_refnr = { + pool.submit(self._get_detail, api_client, html_client, item["refnr"]): item["refnr"] + for item in items + } + for future in as_completed(future_to_refnr): + refnr = future_to_refnr[future] + try: + details[refnr] = future.result() + except Exception: + logger.exception("Failed to fetch detail for %s", refnr) + details[refnr] = {} + + listings: list[JobListing] = [] + for item in items: + refnr = item["refnr"] + listing = _parse_listing(item, detail=details.get(refnr)) + if listing is not None: + listings.append(listing) + return listings + + def _get_detail(self, api_client: httpx.Client, html_client: httpx.Client, refnr: str) -> dict: + """Resolve job detail using the configured endpoint strategy.""" + if self._detail_strategy == "api_only": + return _fetch_detail_api(api_client, refnr) + if self._detail_strategy == "html_only": + return _fetch_detail(html_client, refnr) + + detail = _fetch_detail_api(api_client, refnr) + if detail: + return detail + return _fetch_detail(html_client, refnr) + + @staticmethod + def _get_with_retry( + client: httpx.Client, + url: str, + params: dict, + ) -> httpx.Response | None: + """GET with retry on transient errors.""" + last_exc: Exception | None = None + for attempt in range(_MAX_RETRIES): + try: + resp = client.get(url, params=params) + if resp.status_code == 200: + return resp + if resp.status_code in {403, 429, 500, 502, 503}: + delay = _BASE_DELAY * (2**attempt) + _BACKOFF_JITTER + logger.warning("BA search %s returned %s, retry in %ss", url, resp.status_code, delay) + time.sleep(delay) + continue + logger.warning("BA search %s returned %s, giving up", url, resp.status_code) + return None + except httpx.HTTPError as exc: + last_exc = exc + delay = _BASE_DELAY * (2**attempt) + _BACKOFF_JITTER + logger.warning("BA search network error: %s, retry in %ss", exc, delay) + time.sleep(delay) + if last_exc: + logger.error("BA search failed after %d retries: %s", _MAX_RETRIES, last_exc) + return None diff --git a/immermatch/models.py b/immermatch/models.py index 5fb0d42..2dd44fc 100644 --- a/immermatch/models.py +++ b/immermatch/models.py @@ -67,7 +67,7 @@ class ApplyOption(BaseModel): class JobListing(BaseModel): - """A job listing from SerpApi.""" + """A job listing returned by a search provider.""" title: str company_name: str @@ -75,6 +75,7 @@ class JobListing(BaseModel): description: str = "" link: str = "" posted_at: str = "" + source: str = Field(default="", description="Search provider that produced this listing (e.g. 'bundesagentur')") apply_options: list[ApplyOption] = Field( default_factory=list, description="List of direct application links (LinkedIn, company site, etc.)" ) diff --git a/immermatch/search_agent.py b/immermatch/search_agent.py index 4f93534..e233d9c 100644 --- a/immermatch/search_agent.py +++ b/immermatch/search_agent.py @@ -1,253 +1,33 @@ -"""Search Agent module - Generates optimized job search queries using LLM.""" +"""Search Agent module - Generates optimized job search queries using LLM. + +The SerpApi-specific helpers (``_infer_gl``, ``_localise_query``, etc.) live +in :mod:`immermatch.serpapi_provider` and are re-exported here for backward +compatibility. +""" + +from __future__ import annotations -import os -import re import threading from collections.abc import Callable from concurrent.futures import ThreadPoolExecutor, as_completed from google import genai from pydantic import ValidationError -from serpapi import GoogleSearch from .llm import call_gemini, parse_json -from .models import ApplyOption, CandidateProfile, JobListing - -# Questionable job portals that often have expired listings or paywalls -_BLOCKED_PORTALS = { - "bebee", - "trabajo", - "jooble", - "adzuna", - "jobrapido", - "neuvoo", - "mitula", - "trovit", - "jobomas", - "jobijoba", - "talent", - "jobatus", - "jobsora", - "studysmarter", - "jobilize", - "learn4good", - "grabjobs", - "jobtensor", - "zycto", - "terra.do", - "jobzmall", - "simplyhired", -} - -# Map country/city names to Google gl= codes so SerpApi doesn't default to "us" -_GL_CODES: dict[str, str] = { - # Countries - "germany": "de", - "deutschland": "de", - "france": "fr", - "netherlands": "nl", - "holland": "nl", - "belgium": "be", - "austria": "at", - "österreich": "at", - "switzerland": "ch", - "schweiz": "ch", - "suisse": "ch", - "spain": "es", - "españa": "es", - "italy": "it", - "italia": "it", - "portugal": "pt", - "poland": "pl", - "polska": "pl", - "sweden": "se", - "sverige": "se", - "norway": "no", - "norge": "no", - "denmark": "dk", - "danmark": "dk", - "finland": "fi", - "suomi": "fi", - "ireland": "ie", - "czech republic": "cz", - "czechia": "cz", - "romania": "ro", - "hungary": "hu", - "greece": "gr", - "luxembourg": "lu", - "uk": "uk", - "united kingdom": "uk", - "england": "uk", - # Major cities → country - "berlin": "de", - "munich": "de", - "münchen": "de", - "hamburg": "de", - "frankfurt": "de", - "stuttgart": "de", - "düsseldorf": "de", - "köln": "de", - "cologne": "de", - "hannover": "de", - "nürnberg": "de", - "nuremberg": "de", - "leipzig": "de", - "dresden": "de", - "dortmund": "de", - "essen": "de", - "bremen": "de", - "paris": "fr", - "lyon": "fr", - "marseille": "fr", - "toulouse": "fr", - "amsterdam": "nl", - "rotterdam": "nl", - "eindhoven": "nl", - "utrecht": "nl", - "brussels": "be", - "bruxelles": "be", - "antwerp": "be", - "vienna": "at", - "wien": "at", - "graz": "at", - "zurich": "ch", - "zürich": "ch", - "geneva": "ch", - "genève": "ch", - "basel": "ch", - "bern": "ch", - "madrid": "es", - "barcelona": "es", - "rome": "it", - "milan": "it", - "milano": "it", - "lisbon": "pt", - "porto": "pt", - "warsaw": "pl", - "kraków": "pl", - "krakow": "pl", - "wrocław": "pl", - "stockholm": "se", - "gothenburg": "se", - "malmö": "se", - "oslo": "no", - "copenhagen": "dk", - "helsinki": "fi", - "dublin": "ie", - "prague": "cz", - "bucharest": "ro", - "budapest": "hu", - "athens": "gr", - "london": "uk", - "manchester": "uk", - "edinburgh": "uk", -} - - -# Tokens that signal a purely remote / worldwide search (no single country). -_REMOTE_TOKENS = {"remote", "worldwide", "global", "anywhere", "weltweit"} - - -def _is_remote_only(location: str) -> bool: - """Return True when the location string contains ONLY remote-like tokens.""" - words = {re.sub(r"[^\w]", "", w).lower() for w in location.split() if w.strip()} - return bool(words) and words <= _REMOTE_TOKENS - - -def _infer_gl(location: str) -> str | None: - """Infer a Google gl= country code from a free-form location string. - - Returns *None* for purely remote/global searches so CallerCode can - decide whether to set ``gl`` at all (SerpApi defaults to "us"). - Falls back to "de" when a location is given but no country can be - determined, since SerpApi defaults to "us" otherwise and returns - 0 European results. - """ - if _is_remote_only(location): - return None - - loc_lower = location.lower() - for name, code in _GL_CODES.items(): - if name in loc_lower: - return code - return "de" - - -# English city names → local names used by Google Jobs. -# Google Jobs with gl=de returns 0 results for "Munich" but 30 for "München". -_CITY_LOCALISE: dict[str, str] = { - # German - "munich": "München", - "cologne": "Köln", - "nuremberg": "Nürnberg", - "hanover": "Hannover", - "dusseldorf": "Düsseldorf", - # Austrian - "vienna": "Wien", - # Swiss - "zurich": "Zürich", - "geneva": "Genève", - # Czech - "prague": "Praha", - # Polish - "warsaw": "Warszawa", - "krakow": "Kraków", - "wroclaw": "Wrocław", - # Danish - "copenhagen": "København", - # Greek - "athens": "Athína", - # Romanian - "bucharest": "București", - # Italian - "milan": "Milano", - "rome": "Roma", - # Portuguese - "lisbon": "Lisboa", - # Belgian - "brussels": "Bruxelles", - "antwerp": "Antwerpen", - # Swedish - "gothenburg": "Göteborg", -} - -# English country names → local names for search queries. -_COUNTRY_LOCALISE: dict[str, str] = { - "germany": "Deutschland", - "austria": "Österreich", - "switzerland": "Schweiz", - "netherlands": "Niederlande", - "czech republic": "Česká republika", - "czechia": "Česko", - "poland": "Polska", - "sweden": "Sverige", - "norway": "Norge", - "denmark": "Danmark", - "finland": "Suomi", - "hungary": "Magyarország", - "romania": "România", - "greece": "Ελλάδα", -} - -# Build a case-insensitive regex that matches any English city name as a whole word -_LOCALISE_PATTERN = re.compile( - r"\b(" + "|".join(re.escape(k) for k in _CITY_LOCALISE) + r")\b", - re.IGNORECASE, -) - -# Same for country names (longer keys first so "czech republic" beats "czech") -_COUNTRY_LOCALISE_PATTERN = re.compile( - r"\b(" + "|".join(re.escape(k) for k in sorted(_COUNTRY_LOCALISE, key=len, reverse=True)) + r")\b", - re.IGNORECASE, -) - - -def _localise_query(query: str) -> str: - """Replace English city and country names with their local equivalents.""" - query = _LOCALISE_PATTERN.sub(lambda m: _CITY_LOCALISE[m.group(0).lower()], query) - query = _COUNTRY_LOCALISE_PATTERN.sub(lambda m: _COUNTRY_LOCALISE[m.group(0).lower()], query) - return query - +from .models import CandidateProfile, JobListing +from .search_provider import SearchProvider, get_provider + +# Re-export SerpApi helpers so existing imports keep working. +from .serpapi_provider import BLOCKED_PORTALS as _BLOCKED_PORTALS # noqa: F401 +from .serpapi_provider import CITY_LOCALISE as _CITY_LOCALISE # noqa: F401 +from .serpapi_provider import COUNTRY_LOCALISE as _COUNTRY_LOCALISE # noqa: F401 +from .serpapi_provider import GL_CODES as _GL_CODES # noqa: F401 +from .serpapi_provider import infer_gl as _infer_gl # noqa: F401 +from .serpapi_provider import is_remote_only as _is_remote_only # noqa: F401 +from .serpapi_provider import localise_query as _localise_query # noqa: F401 +from .serpapi_provider import parse_job_results as _parse_job_results # noqa: F401 +from .serpapi_provider import search_jobs # noqa: F401 # System prompt for the Profiler agent PROFILER_SYSTEM_PROMPT = """You are an expert technical recruiter with deep knowledge of European job markets. @@ -320,6 +100,27 @@ def _localise_query(query: str) -> str: Return ONLY a JSON array of 20 search query strings, no explanation.""" +# System prompt for keyword-only queries used with Bundesagentur für Arbeit API. +# The BA API has a dedicated ``wo`` (where) parameter, so queries must NOT +# contain any location tokens. +BA_HEADHUNTER_SYSTEM_PROMPT = """You are a Search Specialist generating keyword queries for the German Federal Employment Agency job search API (Bundesagentur für Arbeit). + +Based on the candidate's profile, generate distinct keyword queries to find relevant job openings. The API searches across German job listings and handles location filtering separately. + +IMPORTANT RULES: +- Queries must be SHORT: 1-3 words ONLY +- Do NOT include any city, region, or country names — location is handled by the API +- Do NOT include "remote", "hybrid", or similar work-mode keywords +- Include BOTH German and English job titles (the API indexes both) +- Use different synonyms for the same role + +Strategy: +1. First third: Exact role titles in German (e.g., "Softwareentwickler", "Datenanalyst", "Projektleiter") +2. Second third: Exact role titles in English (e.g., "Software Developer", "Data Analyst", "Project Manager") +3. Final third: Technology + role combinations and broader terms (e.g., "Python Entwickler", "Machine Learning", "DevOps Engineer") + +Return ONLY a JSON array of search query strings, no explanation.""" + def profile_candidate(client: genai.Client, cv_text: str) -> CandidateProfile: """ @@ -373,18 +174,35 @@ def generate_search_queries( profile: CandidateProfile, location: str = "", num_queries: int = 20, + *, + provider: SearchProvider | None = None, ) -> list[str]: - """ - Generate optimized job search queries based on candidate profile. + """Generate optimized job search queries based on candidate profile. + + When a :class:`~immermatch.bundesagentur.BundesagenturProvider` is active + the prompt asks the LLM for short keyword-only queries (no location + tokens). For SerpApi / Google Jobs the prompt includes location-enrichment + strategies. Args: client: Gemini client instance. profile: Structured candidate profile. location: Target job location. + num_queries: Number of queries to generate. + provider: Explicit provider; defaults to ``get_provider(location)``. Returns: List of search query strings. """ + if provider is None: + provider = get_provider(location) + + # Select system prompt based on active provider + if provider.name == "Bundesagentur für Arbeit": + system_prompt = BA_HEADHUNTER_SYSTEM_PROMPT + else: + system_prompt = HEADHUNTER_SYSTEM_PROMPT + profile_text = f"""Candidate Profile: - Skills: {", ".join(profile.skills)} - Experience Level: {profile.experience_level} @@ -393,7 +211,7 @@ def generate_search_queries( - Domain Expertise: {", ".join(profile.domain_expertise)} - Target Location: {location}""" - prompt = f"{HEADHUNTER_SYSTEM_PROMPT}\n\nGenerate exactly {num_queries} queries.\n\n{profile_text}" + prompt = f"{system_prompt}\n\nGenerate exactly {num_queries} queries.\n\n{profile_text}" retry_prompt = ( f"{prompt}\n\nIMPORTANT: Return ONLY a valid JSON array of strings with exactly {num_queries} queries." @@ -412,185 +230,56 @@ def generate_search_queries( return [] -def _parse_job_results(results: dict) -> list[JobListing]: - """Parse job listings from a SerpApi response dict.""" - jobs: list[JobListing] = [] - - for job_data in results.get("jobs_results", []): - description_parts = [] - if "description" in job_data: - description_parts.append(job_data["description"]) - if "highlights" in job_data: - for highlight in job_data.get("highlights", []): - if "items" in highlight: - description_parts.extend(highlight["items"]) - - # Extract apply options (LinkedIn, company website, etc.) - # Filter out questionable job portals - apply_options = [] - for option in job_data.get("apply_options", []): - if "title" in option and "link" in option: - url = option["link"].lower() - # Skip if the URL contains any blocked portal domain - if not any(blocked in url for blocked in _BLOCKED_PORTALS): - apply_options.append(ApplyOption(source=option["title"], url=option["link"])) - - # Skip jobs that only have questionable portal links or no links at all - if not apply_options: - continue - - job = JobListing( - title=job_data.get("title", "Unknown"), - company_name=job_data.get("company_name", "Unknown"), - location=job_data.get("location", "Unknown"), - description="\n".join(description_parts), - link=job_data.get("share_link", job_data.get("link", "")), - posted_at=job_data.get("detected_extensions", {}).get("posted_at", ""), - apply_options=apply_options, - ) - jobs.append(job) - - return jobs - - -def search_jobs( - query: str, - num_results: int = 10, - gl: str | None = "de", - location: str | None = None, -) -> list[JobListing]: - """ - Search for jobs using SerpApi Google Jobs engine with pagination. - - Args: - query: Search query string. - num_results: Maximum number of results to return. - gl: Google country code (e.g. "de", "fr"). *None* to omit. - location: SerpApi ``location`` parameter for geographic filtering - (e.g. "Germany", "Munich, Bavaria, Germany"). *None* to omit. - - Returns: - List of job listings. - """ - api_key = os.getenv("SERPAPI_KEY") - if not api_key: - raise ValueError("SERPAPI_KEY environment variable not set") - - all_jobs: list[JobListing] = [] - next_page_token = None - - while len(all_jobs) < num_results: - params: dict[str, str] = { - "engine": "google_jobs", - "q": query, - "hl": "en", # English results - "api_key": api_key, - } - if gl is not None: - params["gl"] = gl - if location is not None: - params["location"] = location - if next_page_token: - params["next_page_token"] = next_page_token - - search = GoogleSearch(params) - results = search.get_dict() - - page_jobs = _parse_job_results(results) - if not page_jobs: - break - - all_jobs.extend(page_jobs) - - # Check for next page - pagination = results.get("serpapi_pagination", {}) - next_page_token = pagination.get("next_page_token") - if not next_page_token: - break - - return all_jobs[:num_results] - - def search_all_queries( queries: list[str], jobs_per_query: int = 10, location: str = "", min_unique_jobs: int = 50, - on_progress: "None | Callable" = None, - on_jobs_found: "None | Callable[[list[JobListing]], None]" = None, + on_progress: None | Callable = None, + on_jobs_found: None | Callable[[list[JobListing]], None] = None, + *, + provider: SearchProvider | None = None, ) -> list[JobListing]: - """ - Search for jobs across multiple queries and deduplicate results. - Queries without a location keyword get the location appended automatically, - since Google Jobs returns nothing without geographic context. + """Search for jobs across multiple queries and deduplicate results. + + Each query is forwarded to the active :class:`SearchProvider` which handles + location filtering, API-specific localisation, and pagination internally. - Stops early once *min_unique_jobs* unique listings have been collected, - saving SerpAPI calls for candidates in active markets. + Stops early once *min_unique_jobs* unique listings have been collected. Args: - queries: List of search queries. + queries: List of search queries (keywords). jobs_per_query: Number of jobs to fetch per query. - location: Target location to append to queries missing one. + location: Target location passed to the provider. min_unique_jobs: Stop after collecting this many unique jobs (0 to disable). on_progress: Optional callback(completed_count, total_queries, unique_jobs_count) - invoked after each query completes. Because queries run in + invoked after each query completes. Because queries run in parallel, completed_count reflects finish order, not the original query index. on_jobs_found: Optional callback(new_unique_jobs) invoked with each batch of newly discovered unique jobs as soon as a query completes. Enables the caller to start processing (e.g. evaluating) jobs before all searches finish. + provider: Explicit provider instance; defaults to ``get_provider(location)``. Returns: Deduplicated list of job listings. """ - # Translate English city/country names to local names (e.g. Munich → München) - local_location = _localise_query(location) - remote_search = _is_remote_only(location) - - # Build location keywords from BOTH original and localised forms - _location_words = set() - for loc in (location, local_location): - for w in loc.split(): - cleaned = re.sub(r"[^\w]", "", w).lower() - if len(cleaned) >= 3: - _location_words.add(cleaned) - _location_words.add("remote") - - # Infer Google country code for localisation (None for remote-only) - gl = _infer_gl(location) - - # Determine SerpApi `location` param for geographic filtering. - # For remote searches we omit it; for everything else we pass the - # raw user-supplied string which SerpApi resolves to its geo DB. - serpapi_location: str | None = None if remote_search else location or None + if provider is None: + provider = get_provider(location) all_jobs: dict[str, JobListing] = {} # Use title+company as key for dedup lock = threading.Lock() completed = 0 early_stop = threading.Event() - # Prepare all search queries upfront (localisation, location append) - prepared_queries: list[str] = [] - for query in queries: - query_lower = query.lower() - has_location = any(kw in query_lower for kw in _location_words) - search_query = query if has_location else f"{query} {local_location}" - search_query = _localise_query(search_query) - prepared_queries.append(search_query) - - def _search_one(search_query: str) -> list[JobListing]: + def _search_one(query: str) -> list[JobListing]: if early_stop.is_set(): return [] - return search_jobs( - search_query, - num_results=jobs_per_query, - gl=gl, - location=serpapi_location, - ) - - with ThreadPoolExecutor(max_workers=min(5, max(1, len(prepared_queries)))) as executor: - futures = [executor.submit(_search_one, sq) for sq in prepared_queries] + return provider.search(query, location, max_results=jobs_per_query) + + with ThreadPoolExecutor(max_workers=min(5, max(1, len(queries)))) as executor: + futures = [executor.submit(_search_one, q) for q in queries] for future in as_completed(futures): jobs = future.result() batch_new: list[JobListing] = [] diff --git a/immermatch/search_provider.py b/immermatch/search_provider.py new file mode 100644 index 0000000..4fa6745 --- /dev/null +++ b/immermatch/search_provider.py @@ -0,0 +1,60 @@ +"""Abstract search-provider interface and provider factory. + +Every job-search backend (Bundesagentur für Arbeit, SerpApi, …) implements +the ``SearchProvider`` protocol so the rest of the pipeline can be +search-engine-agnostic. +""" + +from __future__ import annotations + +import logging +from typing import Protocol, runtime_checkable + +from .models import JobListing + +logger = logging.getLogger(__name__) + + +@runtime_checkable +class SearchProvider(Protocol): + """Pluggable interface for job-search backends. + + Implementations must expose a ``name`` attribute and a ``search`` method + that translate a keyword + location into a list of ``JobListing`` objects. + """ + + name: str + """Human-readable provider name, e.g. ``"Bundesagentur für Arbeit"``.""" + + def search( + self, + query: str, + location: str, + max_results: int = 50, + ) -> list[JobListing]: + """Run a single search and return parsed job listings. + + Args: + query: Free-text keyword (job title, skill, …). + location: Free-text target location (city, region, country). + max_results: Upper bound on results to return. + + Returns: + De-duplicated list of ``JobListing`` objects. + """ + ... + + +def get_provider(location: str = "") -> SearchProvider: # noqa: ARG001 + """Return the appropriate ``SearchProvider`` for *location*. + + Currently always returns the Bundesagentur für Arbeit provider + (Germany-only). This factory is the single extension point for + future per-country routing — e.g. returning ``SerpApiProvider`` + for non-German locations. + """ + # Lazy import so the module can be loaded without pulling in httpx + # when only the protocol is needed (e.g. for type-checking). + from .bundesagentur import BundesagenturProvider # noqa: PLC0415 + + return BundesagenturProvider() diff --git a/immermatch/serpapi_provider.py b/immermatch/serpapi_provider.py new file mode 100644 index 0000000..cf9bdbf --- /dev/null +++ b/immermatch/serpapi_provider.py @@ -0,0 +1,363 @@ +"""SerpApi-backed job search provider (Google Jobs). + +This module wraps the existing SerpApi integration behind the +:class:`~immermatch.search_provider.SearchProvider` protocol so it can +be swapped in alongside other providers (e.g. Bundesagentur für Arbeit). +""" + +from __future__ import annotations + +import os +import re + +from serpapi import GoogleSearch + +from .models import ApplyOption, JobListing + +# --------------------------------------------------------------------------- +# Blocked portal list (questionable job aggregators / paywalls) +# --------------------------------------------------------------------------- + +BLOCKED_PORTALS = { + "bebee", + "trabajo", + "jooble", + "adzuna", + "jobrapido", + "neuvoo", + "mitula", + "trovit", + "jobomas", + "jobijoba", + "talent", + "jobatus", + "jobsora", + "studysmarter", + "jobilize", + "learn4good", + "grabjobs", + "jobtensor", + "zycto", + "terra.do", + "jobzmall", + "simplyhired", +} + +# --------------------------------------------------------------------------- +# Google gl= country codes +# --------------------------------------------------------------------------- + +GL_CODES: dict[str, str] = { + # Countries + "germany": "de", + "deutschland": "de", + "france": "fr", + "netherlands": "nl", + "holland": "nl", + "belgium": "be", + "austria": "at", + "österreich": "at", + "switzerland": "ch", + "schweiz": "ch", + "suisse": "ch", + "spain": "es", + "españa": "es", + "italy": "it", + "italia": "it", + "portugal": "pt", + "poland": "pl", + "polska": "pl", + "sweden": "se", + "sverige": "se", + "norway": "no", + "norge": "no", + "denmark": "dk", + "danmark": "dk", + "finland": "fi", + "suomi": "fi", + "ireland": "ie", + "czech republic": "cz", + "czechia": "cz", + "romania": "ro", + "hungary": "hu", + "greece": "gr", + "luxembourg": "lu", + "uk": "uk", + "united kingdom": "uk", + "england": "uk", + # Major cities → country + "berlin": "de", + "munich": "de", + "münchen": "de", + "hamburg": "de", + "frankfurt": "de", + "stuttgart": "de", + "düsseldorf": "de", + "köln": "de", + "cologne": "de", + "hannover": "de", + "nürnberg": "de", + "nuremberg": "de", + "leipzig": "de", + "dresden": "de", + "dortmund": "de", + "essen": "de", + "bremen": "de", + "paris": "fr", + "lyon": "fr", + "marseille": "fr", + "toulouse": "fr", + "amsterdam": "nl", + "rotterdam": "nl", + "eindhoven": "nl", + "utrecht": "nl", + "brussels": "be", + "bruxelles": "be", + "antwerp": "be", + "vienna": "at", + "wien": "at", + "graz": "at", + "zurich": "ch", + "zürich": "ch", + "geneva": "ch", + "genève": "ch", + "basel": "ch", + "bern": "ch", + "madrid": "es", + "barcelona": "es", + "rome": "it", + "milan": "it", + "milano": "it", + "lisbon": "pt", + "porto": "pt", + "warsaw": "pl", + "kraków": "pl", + "krakow": "pl", + "wrocław": "pl", + "stockholm": "se", + "gothenburg": "se", + "malmö": "se", + "oslo": "no", + "copenhagen": "dk", + "helsinki": "fi", + "dublin": "ie", + "prague": "cz", + "bucharest": "ro", + "budapest": "hu", + "athens": "gr", + "london": "uk", + "manchester": "uk", + "edinburgh": "uk", +} + +# --------------------------------------------------------------------------- +# Remote-search helpers +# --------------------------------------------------------------------------- + +_REMOTE_TOKENS = {"remote", "worldwide", "global", "anywhere", "weltweit"} + + +def is_remote_only(location: str) -> bool: + """Return True when the location string contains ONLY remote-like tokens.""" + words = {re.sub(r"[^\w]", "", w).lower() for w in location.split() if w.strip()} + return bool(words) and words <= _REMOTE_TOKENS + + +def infer_gl(location: str) -> str | None: + """Infer a Google gl= country code from a free-form location string. + + Returns *None* for purely remote/global searches so the caller can + decide whether to set ``gl`` at all (SerpApi defaults to "us"). + Falls back to "de" when a location is given but no country can be + determined. + """ + if is_remote_only(location): + return None + loc_lower = location.lower() + for name, code in GL_CODES.items(): + if name in loc_lower: + return code + return "de" + + +# --------------------------------------------------------------------------- +# City / country localisation for Google Jobs queries +# --------------------------------------------------------------------------- + +CITY_LOCALISE: dict[str, str] = { + "munich": "München", + "cologne": "Köln", + "nuremberg": "Nürnberg", + "hanover": "Hannover", + "dusseldorf": "Düsseldorf", + "vienna": "Wien", + "zurich": "Zürich", + "geneva": "Genève", + "prague": "Praha", + "warsaw": "Warszawa", + "krakow": "Kraków", + "wroclaw": "Wrocław", + "copenhagen": "København", + "athens": "Athína", + "bucharest": "București", + "milan": "Milano", + "rome": "Roma", + "lisbon": "Lisboa", + "brussels": "Bruxelles", + "antwerp": "Antwerpen", + "gothenburg": "Göteborg", +} + +COUNTRY_LOCALISE: dict[str, str] = { + "germany": "Deutschland", + "austria": "Österreich", + "switzerland": "Schweiz", + "netherlands": "Niederlande", + "czech republic": "Česká republika", + "czechia": "Česko", + "poland": "Polska", + "sweden": "Sverige", + "norway": "Norge", + "denmark": "Danmark", + "finland": "Suomi", + "hungary": "Magyarország", + "romania": "România", + "greece": "Ελλάδα", +} + +_LOCALISE_PATTERN = re.compile( + r"\b(" + "|".join(re.escape(k) for k in CITY_LOCALISE) + r")\b", + re.IGNORECASE, +) + +_COUNTRY_LOCALISE_PATTERN = re.compile( + r"\b(" + "|".join(re.escape(k) for k in sorted(COUNTRY_LOCALISE, key=len, reverse=True)) + r")\b", + re.IGNORECASE, +) + + +def localise_query(query: str) -> str: + """Replace English city and country names with their local equivalents.""" + query = _LOCALISE_PATTERN.sub(lambda m: CITY_LOCALISE[m.group(0).lower()], query) + query = _COUNTRY_LOCALISE_PATTERN.sub(lambda m: COUNTRY_LOCALISE[m.group(0).lower()], query) + return query + + +# --------------------------------------------------------------------------- +# SerpApi response parsing +# --------------------------------------------------------------------------- + + +def parse_job_results(results: dict) -> list[JobListing]: + """Parse job listings from a SerpApi response dict.""" + jobs: list[JobListing] = [] + + for job_data in results.get("jobs_results", []): + description_parts = [] + if "description" in job_data: + description_parts.append(job_data["description"]) + if "highlights" in job_data: + for highlight in job_data.get("highlights", []): + if "items" in highlight: + description_parts.extend(highlight["items"]) + + apply_options = [] + for option in job_data.get("apply_options", []): + if "title" in option and "link" in option: + url = option["link"].lower() + if not any(blocked in url for blocked in BLOCKED_PORTALS): + apply_options.append(ApplyOption(source=option["title"], url=option["link"])) + + if not apply_options: + continue + + job = JobListing( + title=job_data.get("title", "Unknown"), + company_name=job_data.get("company_name", "Unknown"), + location=job_data.get("location", "Unknown"), + description="\n".join(description_parts), + link=job_data.get("share_link", job_data.get("link", "")), + posted_at=job_data.get("detected_extensions", {}).get("posted_at", ""), + source="serpapi", + apply_options=apply_options, + ) + jobs.append(job) + + return jobs + + +# --------------------------------------------------------------------------- +# Direct SerpApi search +# --------------------------------------------------------------------------- + + +def search_jobs( + query: str, + num_results: int = 10, + gl: str | None = "de", + location: str | None = None, +) -> list[JobListing]: + """Search for jobs using SerpApi Google Jobs engine with pagination.""" + api_key = os.getenv("SERPAPI_KEY") + if not api_key: + raise ValueError("SERPAPI_KEY environment variable not set") + + all_jobs: list[JobListing] = [] + next_page_token = None + + while len(all_jobs) < num_results: + params: dict[str, str] = { + "engine": "google_jobs", + "q": query, + "hl": "en", + "api_key": api_key, + } + if gl is not None: + params["gl"] = gl + if location is not None: + params["location"] = location + if next_page_token: + params["next_page_token"] = next_page_token + + search = GoogleSearch(params) + results = search.get_dict() + + page_jobs = parse_job_results(results) + if not page_jobs: + break + + all_jobs.extend(page_jobs) + + pagination = results.get("serpapi_pagination", {}) + next_page_token = pagination.get("next_page_token") + if not next_page_token: + break + + return all_jobs[:num_results] + + +# --------------------------------------------------------------------------- +# SerpApiProvider (SearchProvider protocol) +# --------------------------------------------------------------------------- + + +class SerpApiProvider: + """Google Jobs search via SerpApi. + + Satisfies the :class:`~immermatch.search_provider.SearchProvider` protocol. + """ + + name: str = "SerpApi (Google Jobs)" + + def search( + self, + query: str, + location: str, + max_results: int = 50, + ) -> list[JobListing]: + """Run a single SerpApi search with localisation and gl-code inference.""" + remote = is_remote_only(location) + gl = infer_gl(location) + serpapi_location: str | None = None if remote else location or None + localised_query = localise_query(query) + return search_jobs(localised_query, num_results=max_results, gl=gl, location=serpapi_location) diff --git a/tests/test_app_ui.py b/tests/test_app_ui.py index 629307e..83c9a35 100644 --- a/tests/test_app_ui.py +++ b/tests/test_app_ui.py @@ -10,7 +10,6 @@ _FAKE_ENV = { "GOOGLE_API_KEY": "fake-google-key", # pragma: allowlist secret - "SERPAPI_KEY": "fake-serpapi-key", # pragma: allowlist secret "SUPABASE_URL": "https://fake.supabase.co", "SUPABASE_KEY": "fake-anon-key", # pragma: allowlist secret "SUPABASE_SERVICE_KEY": "fake-service-key", # pragma: allowlist secret diff --git a/tests/test_bundesagentur.py b/tests/test_bundesagentur.py new file mode 100644 index 0000000..e220bac --- /dev/null +++ b/tests/test_bundesagentur.py @@ -0,0 +1,610 @@ +"""Tests for the Bundesagentur für Arbeit search provider.""" + +from __future__ import annotations + +import json +from unittest.mock import MagicMock, patch + +import httpx + +from immermatch.bundesagentur import ( + BundesagenturProvider, + _build_ba_link, + _clean_html, + _fetch_detail, + _fetch_detail_api, + _parse_listing, + _parse_location, + _parse_search_results, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_stellenangebot( + refnr: str = "10000-1234567890-S", + titel: str = "Python Entwickler (m/w/d)", + beruf: str = "Python Entwickler", + arbeitgeber: str = "ACME GmbH", + ort: str = "Berlin", + region: str = "Berlin", + land: str = "Deutschland", + posted: str = "2026-02-25", +) -> dict: + return { + "beruf": beruf, + "titel": titel, + "arbeitgeber": arbeitgeber, + "refnr": refnr, + "aktuelleVeroeffentlichungsdatum": posted, + "arbeitsort": {"ort": ort, "region": region, "land": land}, + } + + +def _make_search_response( + items: list[dict] | None = None, + total: int | None = None, +) -> dict: + items = items if items is not None else [_make_stellenangebot()] + return { + "stellenangebote": items, + "maxErgebnisse": str(total if total is not None else len(items)), + "page": "1", + "size": "50", + } + + +def _make_ng_state_html(jobdetail: dict) -> str: + """Wrap a jobdetail dict in the Angular SSR ng-state script tag.""" + state = {"jobdetail": jobdetail} + return f'' + + +def _make_detail( + description: str = "Great job & benefits", + partner_url: str = "", + partner_name: str = "", +) -> dict: + d: dict = {"stellenangebotsBeschreibung": description} + if partner_url: + d["allianzpartnerUrl"] = partner_url + if partner_name: + d["allianzpartnerName"] = partner_name + return d + + +# =========================================================================== +# Unit tests for helper functions +# =========================================================================== + + +class TestBuildBaLink: + def test_simple(self) -> None: + assert _build_ba_link("10000-123-S") == "https://www.arbeitsagentur.de/jobsuche/jobdetail/10000-123-S" + + +class TestParseLocation: + def test_full(self) -> None: + assert _parse_location({"ort": "Berlin", "region": "Berlin", "land": "Deutschland"}) == "Berlin, Deutschland" + + def test_city_and_different_region(self) -> None: + assert _parse_location({"ort": "München", "region": "Bayern", "land": "Deutschland"}) == ( + "München, Bayern, Deutschland" + ) + + def test_empty(self) -> None: + assert _parse_location({}) == "Germany" + + def test_city_only(self) -> None: + assert _parse_location({"ort": "Hamburg"}) == "Hamburg" + + +class TestCleanHtml: + def test_strips_tags(self) -> None: + assert _clean_html("bold text") == "bold text" + + def test_decodes_entities(self) -> None: + # & → & (plain entities are decoded) + assert _clean_html("AT&T rocks") == "AT&T rocks" + + def test_collapses_whitespace(self) -> None: + assert _clean_html("a b\n\nc") == "a b c" + + def test_combined(self) -> None: + assert _clean_html("

Hello & world


ok") == "Hello & world ok" + + def test_empty_string(self) -> None: + assert _clean_html("") == "" + + +class TestParseListing: + def test_valid_item(self) -> None: + item = _make_stellenangebot(refnr="REF1", titel="Dev (m/w/d)", beruf="Entwickler", arbeitgeber="Co") + listing = _parse_listing(item) + assert listing is not None + assert listing.title == "Dev (m/w/d)" + assert listing.company_name == "Co" + assert listing.source == "bundesagentur" + assert "REF1" in listing.link + assert len(listing.apply_options) == 1 + assert listing.apply_options[0].source == "Arbeitsagentur" + + def test_description_includes_beruf_when_different_from_title(self) -> None: + item = _make_stellenangebot(titel="Senior Dev", beruf="Softwareentwickler") + listing = _parse_listing(item) + assert listing is not None + assert "Beruf: Softwareentwickler" in listing.description + + def test_description_omits_beruf_when_equal_to_title(self) -> None: + item = _make_stellenangebot(titel="Python Dev", beruf="Python Dev") + listing = _parse_listing(item) + assert listing is not None + assert "Beruf:" not in listing.description + + def test_missing_refnr_returns_none(self) -> None: + item = {"beruf": "Dev", "arbeitgeber": "Co", "arbeitsort": {}} + assert _parse_listing(item) is None + + def test_fallback_title_from_beruf(self) -> None: + item = _make_stellenangebot(titel="", beruf="QA Engineer") + listing = _parse_listing(item) + assert listing is not None + assert listing.title == "QA Engineer" + + def test_with_detail_description(self) -> None: + item = _make_stellenangebot(refnr="REF1") + detail = _make_detail(description="

Full desc

& more") + listing = _parse_listing(item, detail=detail) + assert listing is not None + assert listing.description == "Full desc & more" + + def test_with_detail_empty_description_falls_back(self) -> None: + item = _make_stellenangebot(refnr="REF1", beruf="QA", arbeitgeber="Corp") + detail = {"stellenangebotsBeschreibung": ""} + listing = _parse_listing(item, detail=detail) + assert listing is not None + # Falls back to search-field description + assert "Arbeitgeber: Corp" in listing.description + + def test_with_detail_external_apply_url(self) -> None: + item = _make_stellenangebot(refnr="REF1") + detail = _make_detail(partner_url="https://careers.acme.com/apply", partner_name="ACME Careers") + listing = _parse_listing(item, detail=detail) + assert listing is not None + assert len(listing.apply_options) == 2 + assert listing.apply_options[1].source == "ACME Careers" + assert listing.apply_options[1].url == "https://careers.acme.com/apply" + + def test_with_detail_external_url_adds_https_prefix(self) -> None: + item = _make_stellenangebot(refnr="REF1") + detail = _make_detail(partner_url="careers.acme.com") + listing = _parse_listing(item, detail=detail) + assert listing is not None + assert listing.apply_options[1].url == "https://careers.acme.com" + + def test_with_detail_external_url_default_name(self) -> None: + item = _make_stellenangebot(refnr="REF1") + detail = _make_detail(partner_url="https://example.com") + listing = _parse_listing(item, detail=detail) + assert listing is not None + assert listing.apply_options[1].source == "Company Website" + + def test_with_no_detail(self) -> None: + item = _make_stellenangebot(refnr="REF1") + listing = _parse_listing(item, detail=None) + assert listing is not None + assert len(listing.apply_options) == 1 # Only Arbeitsagentur + + +class TestParseSearchResults: + def test_valid_items(self) -> None: + data = _make_search_response( + [ + _make_stellenangebot(refnr="r1", beruf="Dev", arbeitgeber="Co"), + _make_stellenangebot(refnr="r2", beruf="QA", arbeitgeber="Co2"), + ] + ) + results = _parse_search_results(data) + assert len(results) == 2 + # Returns raw dicts, not JobListing objects + assert results[0]["arbeitgeber"] == "Co" + assert results[1]["arbeitgeber"] == "Co2" + + def test_skips_missing_refnr(self) -> None: + data = {"stellenangebote": [{"beruf": "Dev"}]} + assert _parse_search_results(data) == [] + + def test_empty_response(self) -> None: + assert _parse_search_results({}) == [] + assert _parse_search_results({"stellenangebote": []}) == [] + + +# =========================================================================== +# Tests for _fetch_detail +# =========================================================================== + + +class TestFetchDetail: + def test_extracts_ng_state(self) -> None: + detail = {"stellenangebotsBeschreibung": "

Hello

", "firma": "ACME"} + html = _make_ng_state_html(detail) + + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.text = html + client = MagicMock(spec=httpx.Client) + client.get.return_value = mock_resp + + result = _fetch_detail(client, "REF-123") + assert result == detail + + def test_missing_ng_state_returns_empty(self) -> None: + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.text = "No state here" + client = MagicMock(spec=httpx.Client) + client.get.return_value = mock_resp + + assert _fetch_detail(client, "REF-123") == {} + + def test_non_200_returns_empty(self) -> None: + mock_resp = MagicMock() + mock_resp.status_code = 404 + client = MagicMock(spec=httpx.Client) + client.get.return_value = mock_resp + + assert _fetch_detail(client, "REF-123") == {} + + def test_retries_on_server_error(self) -> None: + error_resp = MagicMock() + error_resp.status_code = 503 + + detail = {"stellenangebotsBeschreibung": "ok"} + ok_resp = MagicMock() + ok_resp.status_code = 200 + ok_resp.text = _make_ng_state_html(detail) + + client = MagicMock(spec=httpx.Client) + client.get.side_effect = [error_resp, ok_resp] + + with patch("immermatch.bundesagentur.time.sleep"): + result = _fetch_detail(client, "REF-123") + assert result == detail + + def test_retries_on_403_then_succeeds(self) -> None: + blocked_resp = MagicMock() + blocked_resp.status_code = 403 + + detail = {"stellenangebotsBeschreibung": "ok"} + ok_resp = MagicMock() + ok_resp.status_code = 200 + ok_resp.text = _make_ng_state_html(detail) + + client = MagicMock(spec=httpx.Client) + client.get.side_effect = [blocked_resp, ok_resp] + + with patch("immermatch.bundesagentur.time.sleep"): + result = _fetch_detail(client, "REF-123") + assert result == detail + + def test_retries_on_network_error(self) -> None: + detail = {"stellenangebotsBeschreibung": "recovered"} + ok_resp = MagicMock() + ok_resp.status_code = 200 + ok_resp.text = _make_ng_state_html(detail) + + client = MagicMock(spec=httpx.Client) + client.get.side_effect = [httpx.ConnectError("timeout"), ok_resp] + + with patch("immermatch.bundesagentur.time.sleep"): + result = _fetch_detail(client, "REF-123") + assert result == detail + + def test_all_retries_fail(self) -> None: + client = MagicMock(spec=httpx.Client) + client.get.side_effect = httpx.ConnectError("down") + + with patch("immermatch.bundesagentur.time.sleep"): + result = _fetch_detail(client, "REF-123") + assert result == {} + + +# =========================================================================== +# Integration-style tests for BundesagenturProvider +# =========================================================================== + + +class TestBundesagenturProviderSearch: + """Test the full search pipeline with mocked HTTP.""" + + def test_search_returns_listings(self) -> None: + items = [ + _make_stellenangebot(refnr="r1", titel="Dev A", arbeitgeber="Co A"), + _make_stellenangebot(refnr="r2", titel="Dev B", arbeitgeber="Co B"), + ] + resp_data = _make_search_response(items, total=2) + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = resp_data + + provider = BundesagenturProvider(days_published=7) + with ( + patch.object(provider, "_get_with_retry", return_value=mock_resp), + patch.object(provider, "_enrich", side_effect=lambda it: [_parse_listing(i) for i in it]), + ): + jobs = provider.search("Python", "Berlin", max_results=10) + + assert len(jobs) == 2 + assert all(j.source == "bundesagentur" for j in jobs) + assert jobs[0].title == "Dev A" + assert jobs[1].title == "Dev B" + + def test_search_empty_results(self) -> None: + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = _make_search_response([], total=0) + + provider = BundesagenturProvider() + with ( + patch.object(provider, "_get_with_retry", return_value=mock_resp), + patch.object(provider, "_enrich", side_effect=lambda it: [_parse_listing(i) for i in it]), + ): + jobs = provider.search("Niche Job", "Berlin") + assert jobs == [] + + def test_search_empty_query_returns_empty(self) -> None: + """Empty or whitespace-only queries are rejected before hitting the API.""" + provider = BundesagenturProvider() + assert provider.search("", "Berlin") == [] + assert provider.search(" ", "Berlin") == [] + + def test_search_respects_max_results(self) -> None: + resp_data = _make_search_response( + [_make_stellenangebot(refnr=f"r{i}") for i in range(5)], + total=5, + ) + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = resp_data + + provider = BundesagenturProvider() + with ( + patch.object(provider, "_get_with_retry", return_value=mock_resp), + patch.object(provider, "_enrich", side_effect=lambda it: [_parse_listing(i) for i in it]), + ): + jobs = provider.search("Dev", "Berlin", max_results=3) + + assert len(jobs) == 3 + + def test_veroeffentlichtseit_custom(self) -> None: + provider = BundesagenturProvider(days_published=3) + assert provider._days_published == 3 + + +class TestBundesagenturProviderPagination: + """Test pagination logic via _search_items.""" + + def test_single_page(self) -> None: + resp_data = _make_search_response( + [_make_stellenangebot(refnr=f"r{i}") for i in range(5)], + total=5, + ) + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = resp_data + + provider = BundesagenturProvider() + with ( + patch.object(provider, "_get_with_retry", return_value=mock_resp), + patch("immermatch.bundesagentur.httpx.Client"), + ): + items = provider._search_items("Dev", "Berlin", max_results=50) + + assert len(items) == 5 + + def test_multi_page(self) -> None: + page_1 = _make_search_response( + [_make_stellenangebot(refnr=f"p1_{i}") for i in range(50)], + total=60, + ) + page_2 = _make_search_response( + [_make_stellenangebot(refnr=f"p2_{i}") for i in range(10)], + total=60, + ) + call_count = 0 + + def mock_get(client, url, params): + nonlocal call_count + call_count += 1 + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = page_2 if params.get("page", 1) >= 2 else page_1 + return mock_resp + + provider = BundesagenturProvider() + with ( + patch.object(provider, "_get_with_retry", side_effect=mock_get), + patch("immermatch.bundesagentur.httpx.Client"), + ): + items = provider._search_items("Dev", "Berlin", max_results=100) + + assert len(items) == 60 + assert call_count == 2 + + +class TestBundesagenturProviderErrors: + """Test error handling in the provider.""" + + def test_search_items_server_error_returns_empty(self) -> None: + """A persistent failure from the search endpoint returns an empty list.""" + provider = BundesagenturProvider() + with ( + patch.object(provider, "_get_with_retry", return_value=None), + patch("immermatch.bundesagentur.httpx.Client"), + ): + items = provider._search_items("Dev", "Berlin", max_results=50) + assert items == [] + + def test_get_with_retry_retries_on_403(self) -> None: + blocked_resp = MagicMock() + blocked_resp.status_code = 403 + + ok_resp = MagicMock() + ok_resp.status_code = 200 + + client = MagicMock(spec=httpx.Client) + client.get.side_effect = [blocked_resp, ok_resp] + + with patch("immermatch.bundesagentur.time.sleep"): + result = BundesagenturProvider._get_with_retry(client, "https://example.com", {}) + + assert result is ok_resp + + +class TestFetchDetailApi: + def test_fetches_json_detail(self) -> None: + detail = {"stellenangebotsBeschreibung": "API detail"} + ok_resp = MagicMock() + ok_resp.status_code = 200 + ok_resp.json.return_value = detail + + client = MagicMock(spec=httpx.Client) + client.get.return_value = ok_resp + + assert _fetch_detail_api(client, "REF-123") == detail + + def test_retries_on_403_then_succeeds(self) -> None: + blocked_resp = MagicMock() + blocked_resp.status_code = 403 + + detail = {"stellenangebotsBeschreibung": "API detail"} + ok_resp = MagicMock() + ok_resp.status_code = 200 + ok_resp.json.return_value = detail + + client = MagicMock(spec=httpx.Client) + client.get.side_effect = [blocked_resp, ok_resp] + + with patch("immermatch.bundesagentur.time.sleep"): + result = _fetch_detail_api(client, "REF-123") + + assert result == detail + + +class TestEnrich: + """Test the _enrich detail-fetching pipeline.""" + + def test_enriches_items_with_details(self) -> None: + items = [ + _make_stellenangebot(refnr="r1", titel="Dev A"), + _make_stellenangebot(refnr="r2", titel="Dev B"), + ] + details = { + "r1": _make_detail(description="Desc A"), + "r2": _make_detail(description="

Desc B

"), + } + + provider = BundesagenturProvider() + with ( + patch("immermatch.bundesagentur._fetch_detail_api", return_value={}), + patch("immermatch.bundesagentur._fetch_detail", side_effect=lambda _c, refnr: details.get(refnr, {})), + patch("immermatch.bundesagentur.httpx.Client"), + ): + listings = provider._enrich(items) + + assert len(listings) == 2 + assert listings[0].description == "Desc A" + assert listings[1].description == "Desc B" + + def test_enrich_falls_back_on_failed_detail(self) -> None: + items = [_make_stellenangebot(refnr="r1", titel="Dev", arbeitgeber="Corp")] + + provider = BundesagenturProvider() + with ( + patch("immermatch.bundesagentur._fetch_detail_api", return_value={}), + patch("immermatch.bundesagentur._fetch_detail", return_value={}), + patch("immermatch.bundesagentur.httpx.Client"), + ): + listings = provider._enrich(items) + + assert len(listings) == 1 + # Uses fallback description from search fields + assert "Arbeitgeber: Corp" in listings[0].description + + def test_enrich_with_external_apply_url(self) -> None: + items = [_make_stellenangebot(refnr="r1")] + detail = _make_detail(partner_url="https://jobs.example.com", partner_name="Example") + + provider = BundesagenturProvider() + with ( + patch("immermatch.bundesagentur._fetch_detail_api", return_value={}), + patch("immermatch.bundesagentur._fetch_detail", return_value=detail), + patch("immermatch.bundesagentur.httpx.Client"), + ): + listings = provider._enrich(items) + + assert len(listings[0].apply_options) == 2 + assert listings[0].apply_options[1].source == "Example" + assert listings[0].apply_options[1].url == "https://jobs.example.com" + + def test_api_then_html_strategy_falls_back_to_html(self) -> None: + items = [_make_stellenangebot(refnr="r1", titel="Dev", arbeitgeber="Corp")] + html_detail = _make_detail(description="HTML fallback") + + provider = BundesagenturProvider(detail_strategy="api_then_html") + with ( + patch("immermatch.bundesagentur._fetch_detail_api", return_value={}), + patch("immermatch.bundesagentur._fetch_detail", return_value=html_detail), + patch("immermatch.bundesagentur.httpx.Client"), + ): + listings = provider._enrich(items) + + assert len(listings) == 1 + assert listings[0].description == "HTML fallback" + + def test_api_only_strategy_uses_api_detail(self) -> None: + items = [_make_stellenangebot(refnr="r1", titel="Dev", arbeitgeber="Corp")] + api_detail = {"stellenangebotsBeschreibung": "API detail"} + + provider = BundesagenturProvider(detail_strategy="api_only") + with ( + patch("immermatch.bundesagentur._fetch_detail_api", return_value=api_detail), + patch("immermatch.bundesagentur._fetch_detail", return_value={}), + patch("immermatch.bundesagentur.httpx.Client"), + ): + listings = provider._enrich(items) + + assert len(listings) == 1 + assert listings[0].description == "API detail" + + def test_html_only_strategy_uses_html_detail(self) -> None: + items = [_make_stellenangebot(refnr="r1", titel="Dev", arbeitgeber="Corp")] + html_detail = _make_detail(description="HTML only detail") + + provider = BundesagenturProvider(detail_strategy="html_only") + with ( + patch("immermatch.bundesagentur._fetch_detail", return_value=html_detail), + patch("immermatch.bundesagentur._fetch_detail_api") as mock_api, + patch("immermatch.bundesagentur.httpx.Client"), + ): + listings = provider._enrich(items) + + assert len(listings) == 1 + assert listings[0].description == "HTML only detail" + mock_api.assert_not_called() + + +class TestSearchProviderProtocol: + """Verify BundesagenturProvider satisfies the SearchProvider protocol.""" + + def test_conforms_to_protocol(self) -> None: + from immermatch.search_provider import SearchProvider + + provider = BundesagenturProvider() + assert isinstance(provider, SearchProvider) + + def test_has_name(self) -> None: + provider = BundesagenturProvider() + assert provider.name == "Bundesagentur für Arbeit" diff --git a/tests/test_integration.py b/tests/test_integration.py index 5273ec7..8108e1b 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -331,14 +331,12 @@ def mock_client() -> MagicMock: class TestFullPipelineTechCV: """End-to-end pipeline with the tech CV (sample.md).""" - @patch("immermatch.search_agent.search_jobs") @patch("immermatch.evaluator_agent.call_gemini") @patch("immermatch.search_agent.call_gemini") def test_full_pipeline_happy_path( self, mock_search_gemini: MagicMock, mock_eval_gemini: MagicMock, - mock_search_jobs: MagicMock, mock_client: MagicMock, tech_cv_text: str, ) -> None: @@ -347,8 +345,10 @@ def test_full_pipeline_happy_path( # search_agent.call_gemini: 1st call → profile, 2nd call → queries mock_search_gemini.side_effect = [TECH_PROFILE_JSON, QUERIES_JSON] - # search_jobs returns our fixture jobs (spread across queries) - mock_search_jobs.side_effect = ( + # Build a mock provider whose search() returns jobs in batches + mock_provider = MagicMock() + mock_provider.name = "test" + mock_provider.search.side_effect = ( [ MOCK_JOBS[:2], # query 1 → 2 jobs MOCK_JOBS[2:4], # query 2 → 2 jobs @@ -373,7 +373,13 @@ def test_full_pipeline_happy_path( assert len(queries) == 20 # --- Act: Stage 3 — Search --- - jobs = search_all_queries(queries, jobs_per_query=10, location="Munich, Germany", min_unique_jobs=0) + jobs = search_all_queries( + queries, + jobs_per_query=10, + location="Munich, Germany", + min_unique_jobs=0, + provider=mock_provider, + ) assert len(jobs) == 5 assert all(isinstance(j, JobListing) for j in jobs) @@ -397,14 +403,12 @@ def test_full_pipeline_happy_path( class TestFullPipelineSustainabilityCV: """End-to-end pipeline with the non-tech sustainability CV.""" - @patch("immermatch.search_agent.search_jobs") @patch("immermatch.evaluator_agent.call_gemini") @patch("immermatch.search_agent.call_gemini") def test_full_pipeline_non_tech_cv( self, mock_search_gemini: MagicMock, mock_eval_gemini: MagicMock, - mock_search_jobs: MagicMock, mock_client: MagicMock, sustainability_cv_text: str, ) -> None: @@ -431,7 +435,9 @@ def test_full_pipeline_non_tech_cv( apply_options=[ApplyOption(source="Company Website", url="https://sustaincorp.de/jobs/2")], ), ] - mock_search_jobs.side_effect = [sustainability_jobs] + [[] for _ in range(19)] + mock_provider = MagicMock() + mock_provider.name = "test" + mock_provider.search.side_effect = [sustainability_jobs] + [[] for _ in range(19)] eval_responses = [ json.dumps({"score": 88, "reasoning": "Excellent CSRD/GHG match.", "missing_skills": []}), @@ -449,7 +455,13 @@ def test_full_pipeline_non_tech_cv( queries = generate_search_queries(mock_client, profile, "Munich, Germany") assert len(queries) == 20 - jobs = search_all_queries(queries, jobs_per_query=10, location="Munich, Germany", min_unique_jobs=0) + jobs = search_all_queries( + queries, + jobs_per_query=10, + location="Munich, Germany", + min_unique_jobs=0, + provider=mock_provider, + ) assert len(jobs) == 2 evaluated = evaluate_all_jobs(mock_client, profile, jobs, max_workers=2) @@ -533,11 +545,7 @@ def test_queries_are_strings_and_correct_count( class TestSearchDeduplication: """Verify search_all_queries deduplicates overlapping results.""" - @patch("immermatch.search_agent.search_jobs") - def test_duplicate_jobs_across_queries_are_merged( - self, - mock_search_jobs: MagicMock, - ) -> None: + def test_duplicate_jobs_across_queries_are_merged(self) -> None: """Jobs with the same title+company from different queries appear only once.""" duplicate_job = JobListing( title="Senior Python Developer", @@ -557,7 +565,9 @@ def test_duplicate_jobs_across_queries_are_merged( ) # Three queries all return the same duplicate + one unique in the second - mock_search_jobs.side_effect = [ + mock_provider = MagicMock() + mock_provider.name = "test" + mock_provider.search.side_effect = [ [duplicate_job], [duplicate_job, unique_job], [duplicate_job], @@ -568,6 +578,7 @@ def test_duplicate_jobs_across_queries_are_merged( jobs_per_query=10, location="Munich, Germany", min_unique_jobs=0, + provider=mock_provider, ) assert len(jobs) == 2 @@ -659,26 +670,33 @@ class TestEmptySearchResults: """Verify the pipeline handles empty search results gracefully.""" @patch("immermatch.evaluator_agent.call_gemini") - @patch("immermatch.search_agent.search_jobs") @patch("immermatch.search_agent.call_gemini") def test_empty_search_produces_empty_evaluations( self, mock_search_gemini: MagicMock, - mock_search_jobs: MagicMock, mock_eval_gemini: MagicMock, mock_client: MagicMock, tech_cv_text: str, ) -> None: """When search returns no jobs, evaluate and summary still work.""" mock_search_gemini.side_effect = [TECH_PROFILE_JSON, QUERIES_JSON] - # All searches return empty - mock_search_jobs.return_value = [] # Summary for empty results mock_eval_gemini.return_value = "No strong matches found. Consider broadening your search." + # All searches return empty + mock_provider = MagicMock() + mock_provider.name = "test" + mock_provider.search.return_value = [] + profile = profile_candidate(mock_client, tech_cv_text) queries = generate_search_queries(mock_client, profile, "Munich, Germany") - jobs = search_all_queries(queries, jobs_per_query=10, location="Munich, Germany", min_unique_jobs=0) + jobs = search_all_queries( + queries, + jobs_per_query=10, + location="Munich, Germany", + min_unique_jobs=0, + provider=mock_provider, + ) assert jobs == [] @@ -694,25 +712,26 @@ def test_empty_search_produces_empty_evaluations( class TestDataFlowBetweenStages: """Verify that data produced by earlier stages reaches later stages.""" - @patch("immermatch.search_agent.search_jobs") @patch("immermatch.evaluator_agent.call_gemini") @patch("immermatch.search_agent.call_gemini") def test_cv_data_flows_through_all_stages( self, mock_search_gemini: MagicMock, mock_eval_gemini: MagicMock, - mock_search_jobs: MagicMock, mock_client: MagicMock, tech_cv_text: str, ) -> None: """Data from the CV reaches the profile, queries, and evaluation prompts.""" mock_search_gemini.side_effect = [TECH_PROFILE_JSON, QUERIES_JSON] - mock_search_jobs.side_effect = [MOCK_JOBS[:1]] + [[] for _ in range(19)] mock_eval_gemini.side_effect = [ json.dumps(EVAL_RESPONSES[0]), SUMMARY_RESPONSE, ] + mock_provider = MagicMock() + mock_provider.name = "test" + mock_provider.search.side_effect = [MOCK_JOBS[:1]] + [[] for _ in range(19)] + # Stage 1: Profile — verify CV text was sent to Gemini profile = profile_candidate(mock_client, tech_cv_text) profile_prompt = mock_search_gemini.call_args_list[0][0][1] # 2nd positional arg @@ -727,7 +746,13 @@ def test_cv_data_flows_through_all_stages( assert "Munich" in query_prompt # location passed through # Stage 3: Search - jobs = search_all_queries(queries, jobs_per_query=10, location="Munich, Germany", min_unique_jobs=0) + jobs = search_all_queries( + queries, + jobs_per_query=10, + location="Munich, Germany", + min_unique_jobs=0, + provider=mock_provider, + ) assert len(jobs) == 1 # Stage 4: Evaluate — verify profile data is in the evaluation prompt diff --git a/tests/test_search_agent.py b/tests/test_search_agent.py index ceee6eb..ba047af 100644 --- a/tests/test_search_agent.py +++ b/tests/test_search_agent.py @@ -176,7 +176,7 @@ def test_highlights_in_description(self): class TestSearchAllQueries: - """Tests for search_all_queries() — mock search_jobs to test orchestration logic.""" + """Tests for search_all_queries() — mock provider to test orchestration logic.""" def _make_job(self, title: str, company: str = "Co") -> JobListing: return JobListing( @@ -186,80 +186,95 @@ def _make_job(self, title: str, company: str = "Co") -> JobListing: apply_options=[ApplyOption(source="LinkedIn", url="https://linkedin.com/1")], ) - @patch("immermatch.search_agent.search_jobs") - def test_appends_localised_location_to_query_without_one(self, mock_search: MagicMock): - mock_search.return_value = [self._make_job("Dev")] + def _make_provider(self, jobs: list[JobListing] | None = None) -> MagicMock: + provider = MagicMock() + provider.name = "test" + provider.search.return_value = jobs if jobs is not None else [] + return provider + + def test_passes_query_and_location_to_provider(self): + provider = self._make_provider([self._make_job("Dev")]) search_all_queries( queries=["Python Developer"], location="Munich, Germany", min_unique_jobs=0, + provider=provider, ) - # The query should have "München" appended (localised) and then localised again (no-op) - actual_query = mock_search.call_args[0][0] - assert "München" in actual_query - # Should pass gl and location to search_jobs - assert mock_search.call_args[1]["gl"] == "de" - assert mock_search.call_args[1]["location"] == "Munich, Germany" + provider.search.assert_called_once_with( + "Python Developer", + "Munich, Germany", + max_results=10, + ) - @patch("immermatch.search_agent.search_jobs") - def test_does_not_double_append_location(self, mock_search: MagicMock): - mock_search.return_value = [self._make_job("Dev")] + def test_deduplicates_by_title_and_company(self): + provider = self._make_provider([self._make_job("Dev"), self._make_job("Dev")]) - search_all_queries( - queries=["Python Developer München"], - location="Munich, Germany", + results = search_all_queries( + queries=["query1", "query2"], + location="Berlin", min_unique_jobs=0, + provider=provider, ) - # Query already contains "münchen" (location keyword) so location should NOT be appended - actual_query = mock_search.call_args[0][0] - assert actual_query.count("München") == 1 + assert len(results) == 1 - @patch("immermatch.search_agent.search_jobs") - def test_remote_search_omits_gl_and_serpapi_location(self, mock_search: MagicMock): - mock_search.return_value = [self._make_job("Remote Dev")] + def test_stops_early_when_min_unique_jobs_reached(self): + provider = self._make_provider([self._make_job("Unique Job")]) + + results = search_all_queries( + queries=["query1", "query2", "query3"], + location="Berlin, Germany", + min_unique_jobs=1, + provider=provider, + ) + + # Parallel dispatch: all futures may fire before early_stop takes effect + # (mocks return instantly). The guarantee is correct dedup + results. + assert len(results) == 1 + assert provider.search.call_count <= 3 + + def test_on_progress_callback(self): + provider = self._make_provider([self._make_job("Dev")]) + progress_calls: list[tuple] = [] search_all_queries( - queries=["Python Developer remote"], - location="remote", + queries=["query1"], + location="Berlin", min_unique_jobs=0, + provider=provider, + on_progress=lambda *args: progress_calls.append(args), ) - assert mock_search.call_args[1]["gl"] is None - assert mock_search.call_args[1]["location"] is None + assert len(progress_calls) == 1 + assert progress_calls[0] == (1, 1, 1) - @patch("immermatch.search_agent.search_jobs") - def test_country_search_passes_location_param(self, mock_search: MagicMock): - mock_search.return_value = [self._make_job("Dev")] + def test_on_jobs_found_callback(self): + provider = self._make_provider([self._make_job("Dev")]) + found_batches: list[list[JobListing]] = [] search_all_queries( - queries=["Python Developer"], - location="Germany", + queries=["query1"], + location="Berlin", min_unique_jobs=0, + provider=provider, + on_jobs_found=lambda batch: found_batches.append(batch), ) - assert mock_search.call_args[1]["gl"] == "de" - assert mock_search.call_args[1]["location"] == "Germany" - # Query should have localised country name appended - actual_query = mock_search.call_args[0][0] - assert "Deutschland" in actual_query + assert len(found_batches) == 1 + assert found_batches[0][0].title == "Dev" - @patch("immermatch.search_agent.search_jobs") - def test_stops_early_when_min_unique_jobs_reached(self, mock_search: MagicMock): - mock_search.return_value = [self._make_job("Unique Job")] + @patch("immermatch.search_agent.get_provider") + def test_defaults_to_get_provider(self, mock_gp: MagicMock): + """When no provider given, get_provider(location) is called.""" + mock_provider = MagicMock() + mock_provider.search.return_value = [] + mock_gp.return_value = mock_provider - results = search_all_queries( - queries=["query1", "query2", "query3"], - location="Berlin, Germany", - min_unique_jobs=1, - ) + search_all_queries(queries=["test"], location="Berlin") - # Parallel dispatch: all futures may fire before early_stop takes effect - # (mocks return instantly). The guarantee is correct dedup + results. - assert len(results) == 1 - assert mock_search.call_count <= 3 + mock_gp.assert_called_once_with("Berlin") class TestLlmJsonRecovery: @@ -411,3 +426,57 @@ def test_profile_candidate_retries_when_json_is_not_dict(self, mock_call_gemini: assert result.experience_level == "Mid" assert mock_call_gemini.call_count == 2 + + +class TestGenerateSearchQueriesProviderPrompt: + """Verify that generate_search_queries picks the right prompt per provider.""" + + _PROFILE = CandidateProfile( + skills=["Python"], + experience_level="Mid", + years_of_experience=3, + roles=["Backend Developer", "Python Developer", "Software Engineer", "Entwickler", "Engineer"], + languages=["English C1"], + domain_expertise=["SaaS"], + certifications=[], + education=[], + summary="", + work_history=[], + education_history=[], + ) + + @patch("immermatch.search_agent.call_gemini") + def test_ba_provider_uses_ba_prompt(self, mock_call_gemini: MagicMock): + mock_call_gemini.return_value = '["Softwareentwickler", "Python Developer"]' + ba_provider = MagicMock() + ba_provider.name = "Bundesagentur für Arbeit" + + generate_search_queries( + MagicMock(), + self._PROFILE, + location="Berlin", + num_queries=2, + provider=ba_provider, + ) + + prompt_sent = mock_call_gemini.call_args[0][1] + assert "Bundesagentur" in prompt_sent + assert "Do NOT include any city" in prompt_sent + + @patch("immermatch.search_agent.call_gemini") + def test_other_provider_uses_default_prompt(self, mock_call_gemini: MagicMock): + mock_call_gemini.return_value = '["Python Developer Berlin"]' + other_provider = MagicMock() + other_provider.name = "SerpApi (Google Jobs)" + + generate_search_queries( + MagicMock(), + self._PROFILE, + location="Berlin", + num_queries=2, + provider=other_provider, + ) + + prompt_sent = mock_call_gemini.call_args[0][1] + assert "Google Jobs" in prompt_sent + assert "LOCAL names" in prompt_sent