diff --git a/examples/us_fundamental/workflow_config.yaml b/examples/us_fundamental/workflow_config.yaml new file mode 100644 index 00000000000..93da20fba49 --- /dev/null +++ b/examples/us_fundamental/workflow_config.yaml @@ -0,0 +1,155 @@ +qlib_init: + provider_uri: "~/.qlib/qlib_data/us_data" + region: us + +market: &market sp500 +benchmark: &benchmark ^GSPC + +data_handler_config: &data_handler_config + start_time: "2018-01-01" + end_time: "2024-12-31" + fit_start_time: "2018-01-01" + fit_end_time: "2022-12-31" + instruments: *market + infer_processors: + - class: ProcessInf + kwargs: {} + - class: ZScoreNorm + kwargs: {} + - class: Fillna + kwargs: {} + learn_processors: + - class: DropnaLabel + - class: CSZScoreNorm + kwargs: + fields_group: label + +data_loader_config: &data_loader_config + class: QlibDataLoader + kwargs: + config: + feature: + # ── Technical factors (Alpha158 US-tuned) ────────────── + # K-bar features + - ["($close-$open)/$open", "KMID"] + - ["($high-$low)/$open", "KLEN"] + - ["($close-$open)/($high-$low+1e-12)", "KMID2"] + + # Price features (no VWAP) + - ["$open/$close", "OPEN0"] + - ["$high/$close", "HIGH0"] + - ["$low/$close", "LOW0"] + + # Momentum (extended windows for US market) + - ["Ref($close, 5)/$close", "ROC5"] + - ["Ref($close, 10)/$close", "ROC10"] + - ["Ref($close, 20)/$close", "ROC20"] + - ["Ref($close, 60)/$close", "ROC60"] + - ["Ref($close, 120)/$close", "ROC120"] + - ["Ref($close, 250)/$close", "ROC250"] + + # Moving averages + - ["Mean($close, 5)/$close", "MA5"] + - ["Mean($close, 10)/$close", "MA10"] + - ["Mean($close, 20)/$close", "MA20"] + - ["Mean($close, 60)/$close", "MA60"] + - ["Mean($close, 120)/$close", "MA120"] + - ["Mean($close, 250)/$close", "MA250"] + + # Volatility + - ["Std($close, 5)/$close", "STD5"] + - ["Std($close, 20)/$close", "STD20"] + - ["Std($close, 60)/$close", "STD60"] + - ["Std($close, 250)/$close", "STD250"] + + # 12-1 month momentum (Jegadeesh & Titman) + - ["Ref($close, 250)/$close - Ref($close, 20)/$close", "MOM_12_1"] + + # Overnight gap + - ["$open / Ref($close, 1) - 1", "GAP"] + + # ── Fundamental factors ──────────────────────────────── + # Quality + - ["$roe", "ROE"] + - ["$roa", "ROA"] + - ["$gross_margin", "GMARGIN"] + - ["$accruals", "ACCRUALS"] + + # Leverage + - ["$debt_to_equity", "DE_RATIO"] + + # Growth + - ["$revenue_yoy", "REV_YOY"] + - ["$earnings_yoy", "EARN_YOY"] + + # Value (price-relative) + - ["$netincome / ($close + 1e-12)", "EARN_YIELD"] + - ["$totalrevenue / ($close + 1e-12)", "SALES_YIELD"] + - ["$freecashflow / ($close + 1e-12)", "FCF_YIELD"] + - ["$stockholdersequity / ($close + 1e-12)", "BOOK_YIELD"] + + label: + - ["Ref($close, -2)/Ref($close, -1) - 1", "LABEL0"] + +task: + model: + class: LGBModel + module_path: qlib.contrib.model.gbdt + kwargs: + loss: mse + colsample_bytree: 0.8879 + learning_rate: 0.05 + subsample: 0.8789 + lambda_l1: 205.6999 + lambda_l2: 580.9768 + max_depth: 8 + num_leaves: 210 + num_threads: 20 + n_estimators: 1000 + early_stopping_rounds: 50 + verbose: -1 + dataset: + class: DatasetH + module_path: qlib.data.dataset + kwargs: + handler: + class: USAlphaFundamental + module_path: qlib.contrib.data.handler_us + kwargs: *data_handler_config + segments: + train: ["2018-01-01", "2021-12-31"] + valid: ["2022-01-01", "2022-12-31"] + test: ["2023-01-01", "2024-12-31"] + record: + - class: SignalRecord + module_path: qlib.workflow.record_temp + kwargs: + model: + dataset: + - class: SigAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + ana_long_short: false + ann_scaler: 252 + - class: PortAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + config: + strategy: + class: TopkDropoutStrategy + module_path: qlib.contrib.strategy + kwargs: + signal: + topk: 50 + n_drop: 5 + backtest: + start_time: "2023-01-01" + end_time: "2024-12-31" + account: 100000000 + benchmark: *benchmark + exchange_kwargs: + limit_threshold: null # No price limits in US + deal_price: close + open_cost: 0.0005 + close_cost: 0.0015 + min_cost: 5 diff --git a/examples/workflow_by_code.py b/examples/workflow_by_code.py index 0f2623b5f01..072bc18c5f7 100644 --- a/examples/workflow_by_code.py +++ b/examples/workflow_by_code.py @@ -13,7 +13,7 @@ from qlib.workflow import R from qlib.workflow.record_temp import SignalRecord, PortAnaRecord, SigAnaRecord from qlib.tests.data import GetData -from qlib.tests.config import CSI300_BENCH, CSI300_GBDT_TASK +from qlib.tests.config import CSI300_BENCH, CSI300_MARKET, GBDT_MODEL if __name__ == "__main__": @@ -22,8 +22,40 @@ GetData().qlib_data(target_dir=provider_uri, region=REG_CN, exists_skip=True) qlib.init(provider_uri=provider_uri, region=REG_CN) - model = init_instance_by_config(CSI300_GBDT_TASK["model"]) - dataset = init_instance_by_config(CSI300_GBDT_TASK["dataset"]) + # ---------- 自定義:只預測 2017-01-01 起的連續 10 個交易日 ---------- + # A 股 2017 年第一個交易日是 1/3(二),10 個交易日大約到 1/16(一) + # 設 test end 為 1/20 留點餘裕,實際只會取到有交易日的部分 + TEST_START = "2017-01-01" + TEST_END = "2017-01-20" + + task_config = { + "model": GBDT_MODEL, + "dataset": { + "class": "DatasetH", + "module_path": "qlib.data.dataset", + "kwargs": { + "handler": { + "class": "Alpha158", + "module_path": "qlib.contrib.data.handler", + "kwargs": { + "start_time": "2008-01-01", + "end_time": TEST_END, + "fit_start_time": "2008-01-01", + "fit_end_time": "2014-12-31", + "instruments": CSI300_MARKET, + }, + }, + "segments": { + "train": ("2008-01-01", "2014-12-31"), + "valid": ("2015-01-01", "2016-12-31"), + "test": (TEST_START, TEST_END), + }, + }, + }, + } + + model = init_instance_by_config(task_config["model"]) + dataset = init_instance_by_config(task_config["dataset"]) port_analysis_config = { "executor": { @@ -44,8 +76,8 @@ }, }, "backtest": { - "start_time": "2017-01-01", - "end_time": "2020-08-01", + "start_time": TEST_START, + "end_time": TEST_END, "account": 100000000, "benchmark": CSI300_BENCH, "exchange_kwargs": { @@ -66,7 +98,7 @@ # start exp with R.start(experiment_name="workflow"): - R.log_params(**flatten_dict(CSI300_GBDT_TASK)) + R.log_params(**flatten_dict(task_config)) model.fit(dataset) R.save_objects(**{"params.pkl": model}) diff --git a/qlib/contrib/data/handler_us.py b/qlib/contrib/data/handler_us.py new file mode 100644 index 00000000000..07340826820 --- /dev/null +++ b/qlib/contrib/data/handler_us.py @@ -0,0 +1,283 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +US stock data handlers with fundamental factors. + +These handlers extend Alpha158 with fundamental factors collected from +Yahoo Finance + SEC EDGAR filing dates (Route 1.5 approach). + +The fundamental factors are stored as Qlib features (bin files) alongside +the standard OHLCV data. They are expected to be pre-computed and forward- +filled to daily frequency before being dumped to Qlib format. + +Available handlers: + - USAlpha158: Alpha158 technical factors only, tuned for US market + - USFundamental: Fundamental factors only + - USAlphaFundamental: Combined technical + fundamental factors (recommended) + +Usage: + See examples/us_fundamental/workflow_config.yaml for a complete example. +""" + +from qlib.contrib.data.handler import Alpha158, _DEFAULT_LEARN_PROCESSORS, _DEFAULT_INFER_PROCESSORS, check_transform_proc +from qlib.contrib.data.loader import Alpha158DL +from qlib.data.dataset.handler import DataHandlerLP + + +# ── Fundamental factor features ────────────────────────────────────────────── +# These correspond to bin files produced by build_factors.py: +# features//roe.day.bin, features//roa.day.bin, etc. + +FUNDAMENTAL_FIELDS = [ + # Quality factors + "$roe", # Return on Equity (quarterly, forward-filled) + "$roa", # Return on Assets + "$gross_margin", # Gross Profit / Revenue + "$accruals", # (NetIncome - OperatingCashFlow) / TotalAssets + + # Leverage + "$debt_to_equity", # TotalDebt / StockholdersEquity + + # Growth factors (YOY) + "$revenue_yoy", # Revenue growth vs same quarter last year + "$earnings_yoy", # Earnings growth vs same quarter last year +] + +FUNDAMENTAL_NAMES = [ + "ROE", "ROA", "GMARGIN", "ACCRUALS", + "DE_RATIO", + "REV_YOY", "EARN_YOY", +] + +# Price-relative factors (need to be divided by market cap or price) +# These use raw fundamental values from bin files + current price +PRICE_RELATIVE_FIELDS = [ + # EP = NetIncome / (Close * SharesOutstanding) ≈ NetIncome / MarketCap + # Since we don't have shares outstanding in daily data, we use the + # pre-computed quarterly NetIncome and normalize by close price. + # This gives a "per-dollar-of-price" measure, comparable across stocks + # within the cross-sectional normalization. + "$netincome / ($close + 1e-12)", # Earnings yield proxy + "$totalrevenue / ($close + 1e-12)", # Sales yield proxy + "$freecashflow / ($close + 1e-12)", # FCF yield proxy + "$stockholdersequity / ($close + 1e-12)", # Book yield proxy + "$ebitda / ($close + 1e-12)", # EBITDA yield proxy +] + +PRICE_RELATIVE_NAMES = [ + "EARN_YIELD", "SALES_YIELD", "FCF_YIELD", "BOOK_YIELD", "EBITDA_YIELD", +] + + +class USAlpha158(DataHandlerLP): + """Alpha158 technical factors tuned for US stocks. + + Changes from standard Alpha158: + - Extended rolling windows (up to 250 days) for momentum + - Added 12-1 month momentum factor (academically proven for US) + - Added overnight gap factor (no price limits in US market) + - Removed VWAP from price features (often unavailable in free data) + """ + + def __init__( + self, + instruments="sp500", + start_time=None, + end_time=None, + freq="day", + infer_processors=[], + learn_processors=_DEFAULT_LEARN_PROCESSORS, + fit_start_time=None, + fit_end_time=None, + process_type=DataHandlerLP.PTYPE_A, + filter_pipe=None, + inst_processors=None, + **kwargs, + ): + infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time) + learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time) + + data_loader = { + "class": "QlibDataLoader", + "kwargs": { + "config": { + "feature": self.get_feature_config(), + "label": kwargs.pop("label", self.get_label_config()), + }, + "filter_pipe": filter_pipe, + "freq": freq, + "inst_processors": inst_processors, + }, + } + super().__init__( + instruments=instruments, + start_time=start_time, + end_time=end_time, + data_loader=data_loader, + infer_processors=infer_processors, + learn_processors=learn_processors, + process_type=process_type, + **kwargs, + ) + + def get_feature_config(self): + # Alpha158 with US-tuned config + conf = { + "kbar": {}, + "price": { + "windows": [0], + "feature": ["OPEN", "HIGH", "LOW"], # No VWAP + }, + "rolling": { + "windows": [5, 10, 20, 30, 60, 120, 250], # Extended windows + "exclude": ["RANK"], + }, + } + fields, names = Alpha158DL.get_feature_config(conf) + + # Add US-specific technical factors + extra_fields = [ + # 12-1 month momentum (Jegadeesh & Titman) + "Ref($close, 250)/$close - Ref($close, 20)/$close", + # Annualized volatility + "Std($close/Ref($close,1)-1, 250)", + # Volume surge (relative to long-term average) + "Mean($volume, 5) / (Mean($volume, 120) + 1e-12)", + # Overnight gap (US market has no price limits) + "$open / Ref($close, 1) - 1", + # Intraday range trend + "Mean(($high-$low)/$open, 20) / (Mean(($high-$low)/$open, 120) + 1e-12)", + ] + extra_names = [ + "MOM_12_1", "VOL_250", "VOLUME_SURGE", "GAP", "RANGE_TREND", + ] + + return fields + extra_fields, names + extra_names + + def get_label_config(self): + return ["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"] + + +class USFundamental(DataHandlerLP): + """Fundamental-only factors for US stocks. + + Uses pre-computed fundamental factors stored as Qlib features. + Requires running the us_fundamental data collector pipeline first. + """ + + def __init__( + self, + instruments="sp500", + start_time=None, + end_time=None, + freq="day", + infer_processors=_DEFAULT_INFER_PROCESSORS, + learn_processors=_DEFAULT_LEARN_PROCESSORS, + fit_start_time=None, + fit_end_time=None, + process_type=DataHandlerLP.PTYPE_A, + filter_pipe=None, + inst_processors=None, + **kwargs, + ): + infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time) + learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time) + + data_loader = { + "class": "QlibDataLoader", + "kwargs": { + "config": { + "feature": self.get_feature_config(), + "label": kwargs.pop("label", self.get_label_config()), + }, + "filter_pipe": filter_pipe, + "freq": freq, + "inst_processors": inst_processors, + }, + } + super().__init__( + instruments=instruments, + start_time=start_time, + end_time=end_time, + data_loader=data_loader, + infer_processors=infer_processors, + learn_processors=learn_processors, + process_type=process_type, + **kwargs, + ) + + def get_feature_config(self): + fields = FUNDAMENTAL_FIELDS + PRICE_RELATIVE_FIELDS + names = FUNDAMENTAL_NAMES + PRICE_RELATIVE_NAMES + return fields, names + + def get_label_config(self): + return ["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"] + + +class USAlphaFundamental(DataHandlerLP): + """Combined technical (Alpha158) + fundamental factors for US stocks. + + This is the recommended handler for US stock prediction. It combines: + - Alpha158 technical factors (tuned for US market) + - Fundamental quality/value/growth factors + - Price-relative fundamental factors + + Total: ~180 features (158 tech + ~12 US-specific tech + ~12 fundamental) + """ + + def __init__( + self, + instruments="sp500", + start_time=None, + end_time=None, + freq="day", + infer_processors=[], + learn_processors=_DEFAULT_LEARN_PROCESSORS, + fit_start_time=None, + fit_end_time=None, + process_type=DataHandlerLP.PTYPE_A, + filter_pipe=None, + inst_processors=None, + **kwargs, + ): + infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time) + learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time) + + data_loader = { + "class": "QlibDataLoader", + "kwargs": { + "config": { + "feature": self.get_feature_config(), + "label": kwargs.pop("label", self.get_label_config()), + }, + "filter_pipe": filter_pipe, + "freq": freq, + "inst_processors": inst_processors, + }, + } + super().__init__( + instruments=instruments, + start_time=start_time, + end_time=end_time, + data_loader=data_loader, + infer_processors=infer_processors, + learn_processors=learn_processors, + process_type=process_type, + **kwargs, + ) + + def get_feature_config(self): + # Start with US-tuned Alpha158 technical factors + tech_handler = USAlpha158.__new__(USAlpha158) + tech_fields, tech_names = tech_handler.get_feature_config() + + # Add fundamental factors + fund_fields = FUNDAMENTAL_FIELDS + PRICE_RELATIVE_FIELDS + fund_names = FUNDAMENTAL_NAMES + PRICE_RELATIVE_NAMES + + return tech_fields + fund_fields, tech_names + fund_names + + def get_label_config(self): + return ["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"] diff --git a/scripts/data_collector/us_fundamental/README.md b/scripts/data_collector/us_fundamental/README.md new file mode 100644 index 00000000000..881f66cd2e8 --- /dev/null +++ b/scripts/data_collector/us_fundamental/README.md @@ -0,0 +1,157 @@ +# US Fundamental Data Collector (Route 1.5) + +Collect fundamental factors for US stocks using **Yahoo Finance** (free) + **SEC EDGAR** filing dates (free) to avoid look-ahead bias. + +## Architecture + +``` +Yahoo Finance (yahooquery) SEC EDGAR + income_statement() CIK submissions API + balance_sheet() → filing dates only + cash_flow() (lightweight, no XBRL parsing) + │ │ + ▼ ▼ + yahoo_fundamental.py edgar_filing_dates.py + (quarterly financials) (when each 10-Q/10-K was filed) + │ │ + └──────────────┬─────────────────────┘ + ▼ + build_factors.py + ├── merge on (symbol, reportDate) + ├── use filingDate as availableDate (no look-ahead!) + ├── compute factors (ROE, EP, Growth, ...) + ├── forward-fill to daily frequency + └── output per-symbol CSVs + │ + ▼ + dump_bin.py dump_all + (existing Qlib tool) + │ + ▼ + Qlib binary features: + features/AAPL/roe.day.bin + features/AAPL/roa.day.bin + features/AAPL/netincome.day.bin + ... +``` + +## Quick Start + +### Step 1: Prepare symbol list + +```bash +# Use existing Qlib US data instrument list, or create your own +echo -e "AAPL\nMSFT\nGOOGL\nAMZN\nMETA\nNVDA\nTSLA" > symbols.txt +``` + +### Step 2: Collect Yahoo Finance fundamental data + +```bash +python yahoo_fundamental.py collect_from_file \ + --symbol_file symbols.txt \ + --save_dir ./yahoo_data \ + --start 2018-01-01 \ + --delay 0.5 +``` + +### Step 3: Collect SEC EDGAR filing dates + +```bash +python edgar_filing_dates.py fetch_from_file \ + --symbol_file symbols.txt \ + --save_path ./edgar_filing_dates.csv \ + --delay 0.15 +``` + +### Step 4: Build daily factor CSVs + +```bash +python build_factors.py build \ + --yahoo_data_path ./yahoo_data/_all_fundamentals.csv \ + --edgar_data_path ./edgar_filing_dates.csv \ + --qlib_dir ~/.qlib/qlib_data/us_data \ + --output_dir ./fundamental_daily \ + --start 2018-01-01 +``` + +### Step 5: Dump to Qlib binary format + +```bash +# IMPORTANT: Use dump_update (not dump_all) to ADD fundamental features +# to an existing Qlib dataset that already has OHLCV data +python ../../../dump_bin.py dump_update \ + --data_path ./fundamental_daily \ + --qlib_dir ~/.qlib/qlib_data/us_data \ + --freq day \ + --exclude_fields symbol,date +``` + +### Step 6: Run a model with fundamental factors + +```bash +cd ../../examples/us_fundamental +python -m qlib.workflow -c workflow_config.yaml +``` + +Or use the handler directly in Python: + +```python +from qlib.contrib.data.handler_us import USAlphaFundamental + +handler = USAlphaFundamental( + instruments="sp500", + start_time="2018-01-01", + end_time="2024-12-31", + fit_start_time="2018-01-01", + fit_end_time="2022-12-31", +) +``` + +## Without SEC EDGAR (Simpler but Less Accurate) + +If you want to skip the SEC EDGAR step, you can use a conservative fallback +lag. The `build_factors.py` script will add 90 days to each report period +date, which is safe but means you'll use data slightly later than necessary: + +```bash +python build_factors.py build \ + --yahoo_data_path ./yahoo_data/_all_fundamentals.csv \ + --qlib_dir ~/.qlib/qlib_data/us_data \ + --output_dir ./fundamental_daily \ + --fallback_lag_days 90 +``` + +## Available Factors + +| Category | Factor | Description | +|----------|--------|-------------| +| Quality | `roe` | Return on Equity | +| Quality | `roa` | Return on Assets | +| Quality | `gross_margin` | Gross Profit / Revenue | +| Quality | `accruals` | (NI - OCF) / Assets (earnings quality) | +| Growth | `revenue_yoy` | Revenue growth YOY | +| Growth | `earnings_yoy` | Earnings growth YOY | +| Leverage | `debt_to_equity` | Total Debt / Equity | +| Value* | `netincome` | Used by handler as `$netincome/$close` | +| Value* | `totalrevenue` | Used by handler as `$totalrevenue/$close` | +| Value* | `freecashflow` | Used by handler as `$freecashflow/$close` | +| Value* | `stockholdersequity` | Used by handler as `$stockholdersequity/$close` | +| Value* | `ebitda` | Used by handler as `$ebitda/$close` | + +*Value factors are computed as price-relative ratios in the handler, not in the CSV. + +## Available Handlers + +| Handler | Features | Use Case | +|---------|----------|----------| +| `USAlpha158` | ~170 tech factors | When you only have OHLCV data | +| `USFundamental` | ~12 fundamental factors | When you only want fundamentals | +| `USAlphaFundamental` | ~182 combined | **Recommended** for best results | + +## Limitations + +- **Yahoo Finance data depth**: Only ~4 years of quarterly data available +- **Filing date accuracy**: SEC EDGAR API returns recent filings; very old + filings may not be available, in which case the fallback lag is used +- **No analyst estimates**: Yahoo Finance free tier doesn't provide consensus + estimates or earnings surprises diff --git a/scripts/data_collector/us_fundamental/__init__.py b/scripts/data_collector/us_fundamental/__init__.py new file mode 100644 index 00000000000..59e481eb93d --- /dev/null +++ b/scripts/data_collector/us_fundamental/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. diff --git a/scripts/data_collector/us_fundamental/build_factors.py b/scripts/data_collector/us_fundamental/build_factors.py new file mode 100644 index 00000000000..be36cfe4dd7 --- /dev/null +++ b/scripts/data_collector/us_fundamental/build_factors.py @@ -0,0 +1,355 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +Build fundamental factors from Yahoo Finance data + SEC EDGAR filing dates. + +This is the core pipeline of "Route 1.5": + 1. Read Yahoo fundamental data (quarterly financial statements) + 2. Read SEC EDGAR filing dates (when each 10-Q/10-K was actually filed) + 3. Use the filing date (NOT the report period date) as the availability date + 4. Compute fundamental factors (EP, BP, ROE, etc.) + 5. Forward-fill to daily frequency aligned with an existing Qlib calendar + 6. Output per-symbol CSVs ready for `dump_bin.py` + +The key insight: a Q1 report (period ending 3/31) filed on 5/15 should only +be usable from 5/15 onwards. Using it from 4/1 would be look-ahead bias. + +Usage: + # Full pipeline + python build_factors.py build \ + --yahoo_data_path ./yahoo_fundamental/_all_fundamentals.csv \ + --edgar_data_path ./edgar_filing_dates.csv \ + --qlib_dir ~/.qlib/qlib_data/us_data \ + --output_dir ./fundamental_daily + + # Then dump to Qlib binary format: + python ../../dump_bin.py dump_all \ + --data_path ./fundamental_daily \ + --qlib_dir ~/.qlib/qlib_data/us_data \ + --freq day \ + --exclude_fields symbol,date + + # Alternative: if you have NO SEC EDGAR data, use a conservative lag + python build_factors.py build \ + --yahoo_data_path ./yahoo_fundamental/_all_fundamentals.csv \ + --qlib_dir ~/.qlib/qlib_data/us_data \ + --output_dir ./fundamental_daily \ + --fallback_lag_days 90 +""" + +from pathlib import Path +from typing import Optional + +import fire +import numpy as np +import pandas as pd +from loguru import logger + + +# ── Factor definitions ──────────────────────────────────────────────────────── + +def compute_factors(df: pd.DataFrame) -> pd.DataFrame: + """Compute fundamental factors from raw financial statement data. + + Input columns (from Yahoo Finance): + TotalRevenue, GrossProfit, NetIncome, EBIT, EBITDA, + TotalAssets, StockholdersEquity, TotalDebt, + OperatingCashFlow, FreeCashFlow + + Output columns (added): + gross_margin, roe, roa, accruals, debt_to_equity, + revenue_yoy, earnings_yoy + """ + df = df.copy() + + # ── Quality factors ── + # Gross Margin + df["gross_margin"] = df["GrossProfit"] / df["TotalRevenue"].replace(0, np.nan) + + # ROE = Net Income / Stockholders' Equity + df["roe"] = df["NetIncome"] / df["StockholdersEquity"].replace(0, np.nan) + + # ROA = Net Income / Total Assets + df["roa"] = df["NetIncome"] / df["TotalAssets"].replace(0, np.nan) + + # Accruals = (Net Income - Operating Cash Flow) / Total Assets + # High accruals = low earnings quality + df["accruals"] = (df["NetIncome"] - df["OperatingCashFlow"]) / df["TotalAssets"].replace(0, np.nan) + + # ── Leverage ── + df["debt_to_equity"] = df["TotalDebt"] / df["StockholdersEquity"].replace(0, np.nan) + + # ── Growth factors (YOY) ── + # Sort by symbol and date first for proper shift + df = df.sort_values(["symbol", "reportDate"]).reset_index(drop=True) + for col, out_col in [("TotalRevenue", "revenue_yoy"), ("NetIncome", "earnings_yoy")]: + if col in df.columns: + # YOY = current quarter vs same quarter last year (shift 4 quarters) + df[out_col] = df.groupby("symbol")[col].transform( + lambda x: x / x.shift(4).replace(0, np.nan) - 1 + ) + + return df + + +def _merge_with_filing_dates( + yahoo_df: pd.DataFrame, + edgar_df: pd.DataFrame, + fallback_lag_days: int = 90, +) -> pd.DataFrame: + """Merge Yahoo fundamental data with SEC EDGAR filing dates. + + For each (symbol, reportDate) pair, find the corresponding filing date + from SEC EDGAR. If no match is found, use reportDate + fallback_lag_days. + + Parameters + ---------- + yahoo_df : pd.DataFrame + Must have columns: [symbol, reportDate, ...] + edgar_df : pd.DataFrame + Must have columns: [symbol, filingDate, reportDate] + fallback_lag_days : int + Days to add to reportDate when no EDGAR match is found. + + Returns + ------- + pd.DataFrame + With added column 'availableDate': the date from which this data + can be used without look-ahead bias. + """ + yahoo_df = yahoo_df.copy() + yahoo_df["reportDate"] = pd.to_datetime(yahoo_df["reportDate"]) + + if edgar_df is not None and not edgar_df.empty: + edgar_df = edgar_df.copy() + edgar_df["filingDate"] = pd.to_datetime(edgar_df["filingDate"]) + edgar_df["reportDate"] = pd.to_datetime(edgar_df["reportDate"]) + + # Merge on (symbol, reportDate) + merged = yahoo_df.merge( + edgar_df[["symbol", "reportDate", "filingDate"]], + on=["symbol", "reportDate"], + how="left", + ) + + # For unmatched rows, use conservative fallback + no_match = merged["filingDate"].isna() + if no_match.any(): + logger.info( + f"{no_match.sum()}/{len(merged)} records have no EDGAR match, " + f"using fallback lag of {fallback_lag_days} days" + ) + merged.loc[no_match, "filingDate"] = ( + merged.loc[no_match, "reportDate"] + pd.Timedelta(days=fallback_lag_days) + ) + merged["availableDate"] = merged["filingDate"] + else: + logger.warning( + f"No EDGAR data provided. Using fallback lag of {fallback_lag_days} days " + f"for all records. This may introduce minor look-ahead bias." + ) + yahoo_df["availableDate"] = yahoo_df["reportDate"] + pd.Timedelta(days=fallback_lag_days) + merged = yahoo_df + + return merged + + +def _forward_fill_to_daily( + factor_df: pd.DataFrame, + calendar: pd.DatetimeIndex, + factor_columns: list, +) -> pd.DataFrame: + """Forward-fill quarterly factor data to daily frequency. + + For each symbol, at each calendar date, use the most recent factor values + that were available (based on availableDate, not reportDate). + + Parameters + ---------- + factor_df : pd.DataFrame + Must have columns: [symbol, availableDate] + factor_columns + calendar : pd.DatetimeIndex + The trading calendar to align to. + factor_columns : list of str + Which columns to forward-fill. + + Returns + ------- + pd.DataFrame + Daily data with columns: [date, symbol] + factor_columns + """ + all_daily = [] + symbols = factor_df["symbol"].unique() + + for symbol in symbols: + sym_df = factor_df[factor_df["symbol"] == symbol].copy() + sym_df = sym_df.sort_values("availableDate").drop_duplicates("availableDate", keep="last") + + # Create a daily series using the calendar + daily = pd.DataFrame({"date": calendar}) + daily["symbol"] = symbol + + # For each factor column, forward-fill from availableDate + for col in factor_columns: + if col not in sym_df.columns: + daily[col] = np.nan + continue + + # Build a series indexed by availableDate + values = sym_df.set_index("availableDate")[col] + values = values[~values.index.duplicated(keep="last")] + + # Reindex to calendar and forward-fill + aligned = values.reindex(calendar, method="ffill") + daily[col] = aligned.values + + all_daily.append(daily) + + if not all_daily: + return pd.DataFrame() + + result = pd.concat(all_daily, ignore_index=True) + return result + + +def build( + yahoo_data_path: str, + output_dir: str, + edgar_data_path: Optional[str] = None, + qlib_dir: Optional[str] = None, + calendar_path: Optional[str] = None, + fallback_lag_days: int = 90, + start: Optional[str] = None, + end: Optional[str] = None, +): + """Build daily fundamental factor CSVs from Yahoo + EDGAR data. + + Parameters + ---------- + yahoo_data_path : str + Path to Yahoo fundamental CSV (output of yahoo_fundamental.py). + output_dir : str + Directory to save per-symbol daily CSVs (input to dump_bin.py). + edgar_data_path : str, optional + Path to EDGAR filing dates CSV (output of edgar_filing_dates.py). + If None, uses conservative fallback lag. + qlib_dir : str, optional + Path to existing Qlib data directory (to read trading calendar). + Either qlib_dir or calendar_path must be provided. + calendar_path : str, optional + Path to calendar file (one date per line). Overrides qlib_dir. + fallback_lag_days : int + Days to add to reportDate when no EDGAR filing date is available. + Default 90 (conservative: SEC requires 10-Q within 40-45 days). + start : str, optional + Start date filter for output data. + end : str, optional + End date filter for output data. + """ + output_dir = Path(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + # ── Step 1: Load data ── + logger.info("Loading Yahoo fundamental data...") + yahoo_df = pd.read_csv(yahoo_data_path) + yahoo_df["reportDate"] = pd.to_datetime(yahoo_df["reportDate"]) + logger.info(f" {len(yahoo_df)} records, {yahoo_df['symbol'].nunique()} symbols") + + edgar_df = None + if edgar_data_path and Path(edgar_data_path).exists(): + logger.info("Loading SEC EDGAR filing dates...") + edgar_df = pd.read_csv(edgar_data_path) + logger.info(f" {len(edgar_df)} filing records") + + # ── Step 2: Load calendar ── + if calendar_path: + calendar = pd.to_datetime( + pd.read_csv(calendar_path, header=None)[0] + ) + elif qlib_dir: + cal_file = Path(qlib_dir) / "calendars" / "day.txt" + if not cal_file.exists(): + raise FileNotFoundError(f"Calendar not found: {cal_file}") + calendar = pd.to_datetime( + pd.read_csv(cal_file, header=None)[0] + ) + else: + raise ValueError("Must provide either qlib_dir or calendar_path") + + if start: + calendar = calendar[calendar >= pd.Timestamp(start)] + if end: + calendar = calendar[calendar <= pd.Timestamp(end)] + calendar = pd.DatetimeIndex(sorted(calendar)) + logger.info(f"Calendar: {calendar[0].date()} to {calendar[-1].date()}, {len(calendar)} days") + + # ── Step 3: Merge with filing dates ── + logger.info("Merging with filing dates...") + merged = _merge_with_filing_dates(yahoo_df, edgar_df, fallback_lag_days) + + # ── Step 4: Compute factors ── + logger.info("Computing fundamental factors...") + factor_df = compute_factors(merged) + + # Factor columns to output (these will become Qlib features) + factor_columns = [ + # Raw values (for computing price-relative factors in handler) + "NetIncome", + "TotalRevenue", + "StockholdersEquity", + "TotalAssets", + "TotalDebt", + "OperatingCashFlow", + "FreeCashFlow", + "EBITDA", + # Computed factors + "gross_margin", + "roe", + "roa", + "accruals", + "debt_to_equity", + "revenue_yoy", + "earnings_yoy", + ] + # Only keep columns that actually exist + factor_columns = [c for c in factor_columns if c in factor_df.columns] + + # ── Step 5: Forward-fill to daily ── + logger.info("Forward-filling to daily frequency...") + daily_df = _forward_fill_to_daily(factor_df, calendar, factor_columns) + logger.info(f"Daily data: {len(daily_df)} rows, {daily_df['symbol'].nunique()} symbols") + + # ── Step 6: Save per-symbol CSVs ── + logger.info(f"Saving to {output_dir}...") + # Rename columns to lowercase for Qlib convention + rename_map = {c: c.lower() for c in factor_columns if c != c.lower()} + daily_df.rename(columns=rename_map, inplace=True) + factor_columns_lower = [c.lower() for c in factor_columns] + + saved_count = 0 + for symbol, sym_df in daily_df.groupby("symbol"): + # Drop rows where ALL factors are NaN (before first filing) + sym_df = sym_df.dropna(subset=factor_columns_lower, how="all") + if sym_df.empty: + continue + sym_df.to_csv(output_dir / f"{symbol}.csv", index=False) + saved_count += 1 + + logger.info(f"Saved {saved_count} symbol files to {output_dir}") + logger.info( + f"\nNext step: dump to Qlib binary format:\n" + f" python scripts/dump_bin.py dump_all \\\n" + f" --data_path {output_dir} \\\n" + f" --qlib_dir \\\n" + f" --freq day \\\n" + f" --exclude_fields symbol,date\n" + f"\n" + f" NOTE: Use dump_update instead of dump_all if you want to ADD\n" + f" fundamental features to an existing Qlib dataset that already\n" + f" has OHLCV data." + ) + + +if __name__ == "__main__": + fire.Fire({"build": build}) diff --git a/scripts/data_collector/us_fundamental/edgar_filing_dates.py b/scripts/data_collector/us_fundamental/edgar_filing_dates.py new file mode 100644 index 00000000000..6316e063ff8 --- /dev/null +++ b/scripts/data_collector/us_fundamental/edgar_filing_dates.py @@ -0,0 +1,201 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +Fetch SEC EDGAR filing dates for US stocks. + +This module fetches the actual filing dates of 10-Q and 10-K reports from +SEC EDGAR, which is essential for avoiding look-ahead bias when using +fundamental data. Yahoo Finance provides financial statement values but +NOT the date they were publicly filed -- only the report period end date. + +Without filing dates, you risk using Q1 data (period ending 3/31) on 4/1, +even though the company might not file until 5/15. + +Usage: + python edgar_filing_dates.py fetch \ + --symbols AAPL,MSFT,GOOGL \ + --save_path ./edgar_filing_dates.csv + + python edgar_filing_dates.py fetch_from_file \ + --symbol_file ./symbols.txt \ + --save_path ./edgar_filing_dates.csv +""" + +import time +import json +from pathlib import Path +from typing import Dict, List, Optional, Union + +import fire +import pandas as pd +import requests +from loguru import logger + +# SEC requires a User-Agent header with contact info +SEC_HEADERS = { + "User-Agent": "QlibResearch research@example.com", + "Accept-Encoding": "gzip, deflate", +} + +# CIK lookup endpoint +CIK_LOOKUP_URL = "https://efts.sec.gov/LATEST/search-index?q=%22{ticker}%22&dateRange=custom&forms=10-K,10-Q" +SUBMISSIONS_URL = "https://data.sec.gov/submissions/CIK{cik}.json" +TICKERS_URL = "https://www.sec.gov/files/company_tickers.json" + + +def _load_ticker_to_cik_map() -> Dict[str, str]: + """Load the SEC ticker-to-CIK mapping. + + Returns a dict mapping uppercase ticker symbols to zero-padded CIK strings. + """ + resp = requests.get(TICKERS_URL, headers=SEC_HEADERS, timeout=30) + resp.raise_for_status() + data = resp.json() + mapping = {} + for entry in data.values(): + ticker = str(entry["ticker"]).upper() + cik = str(entry["cik_str"]).zfill(10) + mapping[ticker] = cik + return mapping + + +def get_filing_dates_for_cik(cik: str) -> pd.DataFrame: + """Fetch 10-Q and 10-K filing dates from SEC EDGAR for a given CIK. + + Parameters + ---------- + cik : str + The CIK number, zero-padded to 10 digits. + + Returns + ------- + pd.DataFrame + Columns: [form, filingDate, reportDate] + - form: "10-Q" or "10-K" + - filingDate: the date the filing was submitted to SEC (public date) + - reportDate: the period end date of the financial report + """ + url = SUBMISSIONS_URL.format(cik=cik) + resp = requests.get(url, headers=SEC_HEADERS, timeout=30) + resp.raise_for_status() + data = resp.json() + + recent = data.get("filings", {}).get("recent", {}) + if not recent: + return pd.DataFrame(columns=["form", "filingDate", "reportDate"]) + + forms = recent.get("form", []) + filing_dates = recent.get("filingDate", []) + report_dates = recent.get("reportDate", []) + + records = [] + for form, f_date, r_date in zip(forms, filing_dates, report_dates): + if form in ("10-Q", "10-K", "10-Q/A", "10-K/A"): + records.append( + { + "form": form.replace("/A", ""), # treat amendments same as original + "filingDate": f_date, + "reportDate": r_date, + } + ) + + df = pd.DataFrame(records) + if not df.empty: + # Keep only the earliest filing for each (form, reportDate) pair + # This handles amendments: the original filing date is what matters + df = df.sort_values("filingDate").drop_duplicates( + subset=["form", "reportDate"], keep="first" + ) + return df + + +def fetch_filing_dates( + symbols: Union[str, List[str]], + save_path: Optional[str] = None, + delay: float = 0.15, +) -> pd.DataFrame: + """Fetch filing dates for a list of symbols. + + Parameters + ---------- + symbols : str or list of str + Comma-separated string or list of ticker symbols. + save_path : str, optional + Path to save the result CSV. + delay : float + Delay between SEC API requests (SEC rate limit: 10 req/sec). + + Returns + ------- + pd.DataFrame + Columns: [symbol, form, filingDate, reportDate] + """ + if isinstance(symbols, str): + symbols = [s.strip().upper() for s in symbols.split(",")] + else: + symbols = [s.strip().upper() for s in symbols] + + logger.info(f"Loading SEC ticker-to-CIK mapping...") + try: + ticker_cik_map = _load_ticker_to_cik_map() + except Exception as e: + logger.error(f"Failed to load ticker-to-CIK mapping: {e}") + return pd.DataFrame() + + all_records = [] + skipped = [] + + for i, symbol in enumerate(symbols): + cik = ticker_cik_map.get(symbol) + if cik is None: + skipped.append(symbol) + continue + + try: + df = get_filing_dates_for_cik(cik) + if not df.empty: + df["symbol"] = symbol + all_records.append(df) + logger.info(f"[{i+1}/{len(symbols)}] {symbol}: {len(df)} filings") + else: + logger.warning(f"[{i+1}/{len(symbols)}] {symbol}: no filings found") + except Exception as e: + logger.warning(f"[{i+1}/{len(symbols)}] {symbol}: error - {e}") + + time.sleep(delay) + + if skipped: + logger.warning(f"Skipped {len(skipped)} symbols (CIK not found): {skipped[:20]}...") + + if not all_records: + logger.warning("No filing date data collected") + return pd.DataFrame(columns=["symbol", "form", "filingDate", "reportDate"]) + + result = pd.concat(all_records, ignore_index=True) + result = result[["symbol", "form", "filingDate", "reportDate"]] + result["filingDate"] = pd.to_datetime(result["filingDate"]) + result["reportDate"] = pd.to_datetime(result["reportDate"]) + + if save_path: + save_path = Path(save_path) + save_path.parent.mkdir(parents=True, exist_ok=True) + result.to_csv(save_path, index=False) + logger.info(f"Saved {len(result)} filing records to {save_path}") + + return result + + +def fetch_from_file( + symbol_file: str, + save_path: str = "./edgar_filing_dates.csv", + delay: float = 0.15, +) -> pd.DataFrame: + """Fetch filing dates from a file containing one symbol per line.""" + symbols = Path(symbol_file).read_text().strip().split("\n") + symbols = [s.strip() for s in symbols if s.strip()] + return fetch_filing_dates(symbols, save_path=save_path, delay=delay) + + +if __name__ == "__main__": + fire.Fire({"fetch": fetch_filing_dates, "fetch_from_file": fetch_from_file}) diff --git a/scripts/data_collector/us_fundamental/run_all.py b/scripts/data_collector/us_fundamental/run_all.py new file mode 100644 index 00000000000..393020caa88 --- /dev/null +++ b/scripts/data_collector/us_fundamental/run_all.py @@ -0,0 +1,151 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +One-command pipeline to collect US fundamental data and prepare it for Qlib. + +Usage: + python run_all.py \ + --symbols AAPL,MSFT,GOOGL,AMZN,META,NVDA \ + --qlib_dir ~/.qlib/qlib_data/us_data \ + --start 2018-01-01 \ + --work_dir ./us_fundamental_workdir + + # Or with a symbol file: + python run_all.py \ + --symbol_file ./symbols.txt \ + --qlib_dir ~/.qlib/qlib_data/us_data \ + --work_dir ./us_fundamental_workdir + + # Skip SEC EDGAR (faster, slightly less accurate): + python run_all.py \ + --symbols AAPL,MSFT \ + --qlib_dir ~/.qlib/qlib_data/us_data \ + --skip_edgar \ + --fallback_lag_days 90 +""" + +import sys +from pathlib import Path +from typing import List, Optional, Union + +import fire +from loguru import logger + +# Ensure parent directories are importable +CUR_DIR = Path(__file__).resolve().parent +sys.path.append(str(CUR_DIR.parent.parent)) + +from us_fundamental.yahoo_fundamental import collect_fundamental_data +from us_fundamental.edgar_filing_dates import fetch_filing_dates +from us_fundamental.build_factors import build + + +def run( + qlib_dir: str, + symbols: Optional[str] = None, + symbol_file: Optional[str] = None, + work_dir: str = "./us_fundamental_workdir", + start: Optional[str] = "2018-01-01", + skip_edgar: bool = False, + fallback_lag_days: int = 90, + yahoo_delay: float = 0.5, + edgar_delay: float = 0.15, +): + """Run the complete US fundamental data pipeline. + + Parameters + ---------- + qlib_dir : str + Path to existing Qlib data directory with OHLCV data. + symbols : str, optional + Comma-separated ticker symbols. + symbol_file : str, optional + Path to file with one symbol per line. + work_dir : str + Working directory for intermediate files. + start : str + Start date for data collection. + skip_edgar : bool + Skip SEC EDGAR filing date collection (use fallback lag instead). + fallback_lag_days : int + Days to add to reportDate when no EDGAR match. Default 90. + yahoo_delay : float + Delay between Yahoo Finance API requests. + edgar_delay : float + Delay between SEC EDGAR API requests. + """ + work_dir = Path(work_dir) + work_dir.mkdir(parents=True, exist_ok=True) + + # Resolve symbol list + if symbol_file: + symbol_list = Path(symbol_file).read_text().strip().split("\n") + symbol_list = [s.strip() for s in symbol_list if s.strip()] + elif symbols: + symbol_list = [s.strip().upper() for s in symbols.split(",")] + else: + raise ValueError("Must provide either --symbols or --symbol_file") + + logger.info(f"Pipeline starting for {len(symbol_list)} symbols") + + # ── Step 1: Yahoo Finance ── + yahoo_dir = work_dir / "yahoo_data" + logger.info("=" * 60) + logger.info("Step 1/3: Collecting Yahoo Finance fundamental data...") + logger.info("=" * 60) + collect_fundamental_data( + symbols=symbol_list, + save_dir=str(yahoo_dir), + start=start, + delay=yahoo_delay, + ) + yahoo_csv = yahoo_dir / "_all_fundamentals.csv" + + # ── Step 2: SEC EDGAR ── + edgar_csv = work_dir / "edgar_filing_dates.csv" + if not skip_edgar: + logger.info("=" * 60) + logger.info("Step 2/3: Collecting SEC EDGAR filing dates...") + logger.info("=" * 60) + fetch_filing_dates( + symbols=symbol_list, + save_path=str(edgar_csv), + delay=edgar_delay, + ) + else: + logger.info("=" * 60) + logger.info(f"Step 2/3: Skipping EDGAR (using {fallback_lag_days}-day lag)") + logger.info("=" * 60) + edgar_csv = None + + # ── Step 3: Build factors ── + output_dir = work_dir / "fundamental_daily" + logger.info("=" * 60) + logger.info("Step 3/3: Building daily factor CSVs...") + logger.info("=" * 60) + build( + yahoo_data_path=str(yahoo_csv), + output_dir=str(output_dir), + edgar_data_path=str(edgar_csv) if edgar_csv else None, + qlib_dir=qlib_dir, + fallback_lag_days=fallback_lag_days, + start=start, + ) + + # ── Done ── + logger.info("=" * 60) + logger.info("Pipeline complete!") + logger.info(f"Daily factor CSVs: {output_dir}") + logger.info("") + logger.info("Next step: dump to Qlib binary format:") + logger.info(f" python scripts/dump_bin.py dump_update \\") + logger.info(f" --data_path {output_dir} \\") + logger.info(f" --qlib_dir {qlib_dir} \\") + logger.info(f" --freq day \\") + logger.info(f' --exclude_fields symbol,date') + logger.info("=" * 60) + + +if __name__ == "__main__": + fire.Fire(run) diff --git a/scripts/data_collector/us_fundamental/yahoo_fundamental.py b/scripts/data_collector/us_fundamental/yahoo_fundamental.py new file mode 100644 index 00000000000..48490537dc1 --- /dev/null +++ b/scripts/data_collector/us_fundamental/yahoo_fundamental.py @@ -0,0 +1,223 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +Collect fundamental data from Yahoo Finance for US stocks. + +This module uses yahooquery (already a Qlib dependency) to fetch quarterly +and annual financial statements (income statement, balance sheet, cash flow). +From these raw statements, we compute standard fundamental factors: + + Value: EP, BP, SP, CFP + Quality: ROE, ROA, GrossMargin, Accruals + Growth: RevenueYOY, EarningsYOY + Leverage: DebtToEquity + +Usage: + python yahoo_fundamental.py collect \ + --symbols AAPL,MSFT,GOOGL \ + --save_dir ./yahoo_fundamental \ + --start 2018-01-01 + + python yahoo_fundamental.py collect_from_file \ + --symbol_file ./symbols.txt \ + --save_dir ./yahoo_fundamental +""" + +import time +from pathlib import Path +from typing import List, Optional, Union + +import fire +import numpy as np +import pandas as pd +from loguru import logger +from yahooquery import Ticker + + +# ── Fields we extract from Yahoo Finance ────────────────────────────────────── +# Income statement fields +INCOME_FIELDS = [ + "TotalRevenue", + "GrossProfit", + "NetIncome", + "EBIT", + "EBITDA", + "CostOfRevenue", +] + +# Balance sheet fields +BALANCE_FIELDS = [ + "TotalAssets", + "StockholdersEquity", + "TotalDebt", + "CurrentAssets", + "CurrentLiabilities", +] + +# Cash flow fields +CASHFLOW_FIELDS = [ + "OperatingCashFlow", + "FreeCashFlow", + "CapitalExpenditure", +] + + +def _safe_get_financial(ticker_obj: Ticker, method: str, frequency: str = "q") -> pd.DataFrame: + """Safely call a yahooquery financial method. + + Returns an empty DataFrame if the call fails or returns a dict (error). + """ + try: + func = getattr(ticker_obj, method) + result = func(frequency=frequency) + if isinstance(result, pd.DataFrame) and not result.empty: + return result + return pd.DataFrame() + except Exception as e: + logger.debug(f"Failed to get {method}: {e}") + return pd.DataFrame() + + +def _collect_single_symbol(symbol: str, start: Optional[str] = None) -> pd.DataFrame: + """Collect fundamental data for a single symbol. + + Returns a DataFrame with columns: [date, symbol, field1, field2, ...] + where each row is a quarterly snapshot. + """ + ticker = Ticker(symbol, asynchronous=False) + + # Collect quarterly financial data + income_df = _safe_get_financial(ticker, "income_statement", "q") + balance_df = _safe_get_financial(ticker, "balance_sheet", "q") + cashflow_df = _safe_get_financial(ticker, "cash_flow", "q") + + if income_df.empty and balance_df.empty and cashflow_df.empty: + logger.warning(f"{symbol}: no financial data available") + return pd.DataFrame() + + # Normalize index: yahooquery returns MultiIndex (symbol, asOfDate) + dfs = {} + for name, df, fields in [ + ("income", income_df, INCOME_FIELDS), + ("balance", balance_df, BALANCE_FIELDS), + ("cashflow", cashflow_df, CASHFLOW_FIELDS), + ]: + if df.empty: + continue + # Reset index to get asOfDate as column + if isinstance(df.index, pd.MultiIndex): + df = df.reset_index() + # Standardize date column + if "asOfDate" in df.columns: + df["asOfDate"] = pd.to_datetime(df["asOfDate"]) + elif "index" in df.columns: + df.rename(columns={"index": "asOfDate"}, inplace=True) + df["asOfDate"] = pd.to_datetime(df["asOfDate"]) + + # Select only the fields we care about + available_fields = [f for f in fields if f in df.columns] + if not available_fields: + continue + + keep_cols = ["asOfDate"] + available_fields + df = df[keep_cols].copy() + df = df.drop_duplicates("asOfDate").sort_values("asOfDate") + dfs[name] = df + + if not dfs: + return pd.DataFrame() + + # Merge all financial data on asOfDate + merged = None + for df in dfs.values(): + if merged is None: + merged = df + else: + merged = pd.merge(merged, df, on="asOfDate", how="outer") + + merged = merged.sort_values("asOfDate").reset_index(drop=True) + merged["symbol"] = symbol + merged.rename(columns={"asOfDate": "reportDate"}, inplace=True) + + if start: + merged = merged[merged["reportDate"] >= pd.Timestamp(start)] + + return merged + + +def collect_fundamental_data( + symbols: Union[str, List[str]], + save_dir: Optional[str] = None, + start: Optional[str] = None, + delay: float = 0.5, +) -> pd.DataFrame: + """Collect fundamental data for multiple symbols. + + Parameters + ---------- + symbols : str or list of str + Comma-separated string or list of ticker symbols. + save_dir : str, optional + Directory to save per-symbol CSV files. + start : str, optional + Start date filter (e.g., "2018-01-01"). + delay : float + Delay between Yahoo API requests. + + Returns + ------- + pd.DataFrame + All symbols' fundamental data concatenated. + """ + if isinstance(symbols, str): + symbols = [s.strip().upper() for s in symbols.split(",")] + else: + symbols = [s.strip().upper() for s in symbols] + + if save_dir: + save_dir = Path(save_dir) + save_dir.mkdir(parents=True, exist_ok=True) + + all_data = [] + for i, symbol in enumerate(symbols): + try: + df = _collect_single_symbol(symbol, start=start) + if df.empty: + logger.warning(f"[{i+1}/{len(symbols)}] {symbol}: no data") + continue + all_data.append(df) + logger.info(f"[{i+1}/{len(symbols)}] {symbol}: {len(df)} quarters") + + if save_dir: + df.to_csv(save_dir / f"{symbol}.csv", index=False) + except Exception as e: + logger.warning(f"[{i+1}/{len(symbols)}] {symbol}: error - {e}") + + time.sleep(delay) + + if not all_data: + return pd.DataFrame() + + result = pd.concat(all_data, ignore_index=True) + if save_dir: + result.to_csv(save_dir / "_all_fundamentals.csv", index=False) + logger.info(f"Saved {len(result)} total records to {save_dir}") + + return result + + +def collect_from_file( + symbol_file: str, + save_dir: str = "./yahoo_fundamental", + start: Optional[str] = None, + delay: float = 0.5, +) -> pd.DataFrame: + """Collect fundamental data from a file containing one symbol per line.""" + symbols = Path(symbol_file).read_text().strip().split("\n") + symbols = [s.strip() for s in symbols if s.strip()] + return collect_fundamental_data(symbols, save_dir=save_dir, start=start, delay=delay) + + +if __name__ == "__main__": + fire.Fire({"collect": collect_fundamental_data, "collect_from_file": collect_from_file}) diff --git a/scripts/prepare_us_data.py b/scripts/prepare_us_data.py new file mode 100644 index 00000000000..645329ea23d --- /dev/null +++ b/scripts/prepare_us_data.py @@ -0,0 +1,278 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +One-command US stock data preparation for Qlib + Alpha158. + +Usage +----- + # Full pipeline: download from Yahoo, normalize, dump to bin, generate SP500 instruments + python scripts/prepare_us_data.py all + + # Only download raw CSV from Yahoo Finance + python scripts/prepare_us_data.py download --start 2000-01-01 --end 2025-01-01 + + # Only normalize (requires download first) + python scripts/prepare_us_data.py normalize + + # Only dump to qlib bin format (requires normalize first) + python scripts/prepare_us_data.py dump + + # Only generate SP500/NASDAQ100/DJIA instrument lists + python scripts/prepare_us_data.py instruments + + # Use pre-packaged data from Azure Blob instead of Yahoo + python scripts/prepare_us_data.py from_qlib_data + + # Full pipeline with custom paths + python scripts/prepare_us_data.py all --source_dir ~/my_data/source --qlib_dir ~/my_data/us_data +""" + +import sys +import datetime +from pathlib import Path + +import fire +import pandas as pd +from loguru import logger + +CUR_DIR = Path(__file__).resolve().parent +sys.path.append(str(CUR_DIR)) + + +DEFAULT_QLIB_DIR = "~/.qlib/qlib_data/us_data" +DEFAULT_SOURCE_DIR = "~/.qlib/stock_data/source/us_data" +DEFAULT_NORMALIZE_DIR = "~/.qlib/stock_data/normalize/us_data" + + +class PrepareUSData: + """One-command US stock data preparation pipeline.""" + + def __init__( + self, + source_dir: str = DEFAULT_SOURCE_DIR, + normalize_dir: str = DEFAULT_NORMALIZE_DIR, + qlib_dir: str = DEFAULT_QLIB_DIR, + max_workers: int = 1, + ): + self.source_dir = Path(source_dir).expanduser().resolve() + self.normalize_dir = Path(normalize_dir).expanduser().resolve() + self.qlib_dir = Path(qlib_dir).expanduser().resolve() + self.max_workers = max_workers + + # ------------------------------------------------------------------ + # Step 1: Download raw CSV from Yahoo Finance + # ------------------------------------------------------------------ + def download( + self, + start: str = "2000-01-01", + end: str = None, + delay: float = 1.0, + max_collector_count: int = 2, + check_data_length: int = None, + limit_nums: int = None, + ): + """Download US stock OHLCV data from Yahoo Finance. + + Parameters + ---------- + start : str + Start date (inclusive), default "2000-01-01". + end : str + End date (exclusive), default today. + delay : float + Seconds between API requests, default 1.0. + """ + if end is None: + end = pd.Timestamp(datetime.datetime.now()).strftime("%Y-%m-%d") + + logger.info(f"[Step 1/4] Downloading US stock data: {start} ~ {end}") + logger.info(f" source_dir: {self.source_dir}") + + from data_collector.yahoo.collector import Run as YahooRun + + runner = YahooRun( + source_dir=str(self.source_dir), + normalize_dir=str(self.normalize_dir), + max_workers=self.max_workers, + interval="1d", + region="US", + ) + runner.download_data( + max_collector_count=max_collector_count, + delay=delay, + start=start, + end=end, + check_data_length=check_data_length, + limit_nums=limit_nums, + ) + logger.info("[Step 1/4] Download complete.") + + # ------------------------------------------------------------------ + # Step 2: Normalize (adjust price + scale) + # ------------------------------------------------------------------ + def normalize(self): + """Normalize downloaded CSV data (adjust price, scale to first close = 1).""" + logger.info("[Step 2/4] Normalizing data...") + logger.info(f" source_dir: {self.source_dir}") + logger.info(f" normalize_dir: {self.normalize_dir}") + + from data_collector.yahoo.collector import Run as YahooRun + + runner = YahooRun( + source_dir=str(self.source_dir), + normalize_dir=str(self.normalize_dir), + max_workers=self.max_workers, + interval="1d", + region="US", + ) + runner.normalize_data( + date_field_name="date", + symbol_field_name="symbol", + ) + logger.info("[Step 2/4] Normalize complete.") + + # ------------------------------------------------------------------ + # Step 3: Dump to qlib binary format + # ------------------------------------------------------------------ + def dump(self): + """Convert normalized CSV to qlib binary format.""" + import multiprocessing + + logger.info("[Step 3/4] Dumping to qlib binary format...") + logger.info(f" normalize_dir: {self.normalize_dir}") + logger.info(f" qlib_dir: {self.qlib_dir}") + + from dump_bin import DumpDataAll + + dumper = DumpDataAll( + data_path=str(self.normalize_dir), + qlib_dir=str(self.qlib_dir), + freq="day", + max_workers=max(multiprocessing.cpu_count() - 2, 1), + exclude_fields="date,symbol", + file_suffix=".csv", + ) + dumper.dump() + logger.info("[Step 3/4] Dump complete.") + + # ------------------------------------------------------------------ + # Step 4: Generate US index instrument lists + # ------------------------------------------------------------------ + def instruments(self, index_list: str = "SP500,NASDAQ100,DJIA,SP400"): + """Generate instrument lists for US indices (SP500, NASDAQ100, etc.). + + Parameters + ---------- + index_list : str + Comma-separated index names, default "SP500,NASDAQ100,DJIA,SP400". + """ + logger.info("[Step 4/4] Generating US index instrument files...") + logger.info(f" qlib_dir: {self.qlib_dir}") + logger.info(f" indices: {index_list}") + + sys.path.append(str(CUR_DIR / "data_collector")) + + from data_collector.us_index.collector import get_instruments + + for index_name in index_list.split(","): + index_name = index_name.strip() + if not index_name: + continue + logger.info(f" Generating {index_name}...") + try: + get_instruments( + str(self.qlib_dir), + index_name, + market_index="us_index", + ) + except Exception as e: + logger.warning(f" Failed to generate {index_name}: {e}") + logger.info("[Step 4/4] Instruments generation complete.") + + # ------------------------------------------------------------------ + # Full pipeline + # ------------------------------------------------------------------ + def all( + self, + start: str = "2000-01-01", + end: str = None, + delay: float = 1.0, + max_collector_count: int = 2, + check_data_length: int = None, + limit_nums: int = None, + index_list: str = "SP500,NASDAQ100,DJIA,SP400", + ): + """Run the full pipeline: download -> normalize -> dump -> instruments. + + Parameters + ---------- + start : str + Start date (inclusive), default "2000-01-01". + end : str + End date (exclusive), default today. + delay : float + Seconds between Yahoo API requests, default 1.0. + index_list : str + Comma-separated index names for instrument generation. + """ + logger.info("=" * 60) + logger.info("US Stock Data Preparation Pipeline") + logger.info("=" * 60) + logger.info(f" source_dir: {self.source_dir}") + logger.info(f" normalize_dir: {self.normalize_dir}") + logger.info(f" qlib_dir: {self.qlib_dir}") + logger.info("=" * 60) + + self.download( + start=start, + end=end, + delay=delay, + max_collector_count=max_collector_count, + check_data_length=check_data_length, + limit_nums=limit_nums, + ) + self.normalize() + self.dump() + self.instruments(index_list=index_list) + + logger.info("=" * 60) + logger.info("All done! You can now use the data:") + logger.info("") + logger.info(" import qlib") + logger.info(f' qlib.init(provider_uri="{self.qlib_dir}", region="us")') + logger.info("") + logger.info(" from qlib.contrib.data.handler import Alpha158") + logger.info(' h = Alpha158(instruments="sp500", start_time="2008-01-01", end_time="2024-12-31")') + logger.info(" df = h.fetch() # shape: (N, 158)") + logger.info("=" * 60) + + # ------------------------------------------------------------------ + # Alternative: use pre-packaged data from Azure Blob + # ------------------------------------------------------------------ + def from_qlib_data(self, delete_old: bool = True, exists_skip: bool = False): + """Download pre-packaged US data from Qlib's Azure Blob storage. + + This is the fastest way to get started, but data may not be up-to-date. + After downloading, you can use `update_data_to_bin` to update incrementally. + """ + logger.info("Downloading pre-packaged US data from Azure Blob...") + logger.info(f" qlib_dir: {self.qlib_dir}") + + from qlib.tests.data import GetData + + GetData().qlib_data( + name="qlib_data", + target_dir=str(self.qlib_dir), + version=None, + interval="1d", + region="us", + delete_old=delete_old, + exists_skip=exists_skip, + ) + logger.info("Download complete. Generating instruments...") + self.instruments() + + +if __name__ == "__main__": + fire.Fire(PrepareUSData)