Situational Awareness LP
Table of contents
Situational Awareness LP is a hedge fund founded in 2024 by Leopold Aschenbrenner and co-managed by Carl Shulman. It is backed by Patrick & John Collison, Daniel Gross, and Nat Friedman. The fund’s thesis is explicitly AGI-focused, pursuing an opportunistic approach to public equities and strategic investments in semiconductor companies and energy infrastructure.
The copycat approach
Investing directly in the fund requires a $25M minimum, a two-year lockup followed by quarterly redemptions over two more years, and Qualified Purchaser accreditation. The fund also has the standard “2 and 20” fee structure. If those barriers are too much for you, there is an alternative. Institutional investment managers with over $100M in qualifying assets must file Form 13F with the United States Securities and Exchange Commission (SEC) each quarter, disclosing their long equity positions, options, and convertible bonds. Filings are due 45 days after quarter-end and are published on the SEC’s EDGAR system.
Such a copycat strategy has significant limitations, however. The copycat sees each portfolio only after it is disclosed, not when the fund actually trades into it;1 13F filings exclude short positions, foreign-listed securities, and non-equity assets; and, for options, filings identify the underlying security but omit strike prices, expirations, and premiums. Still, the loss of fidelity may be acceptable if one is sufficiently bullish on the fund’s strategy and wants a simple way to get exposure to its public equity bets.
Backtesting the strategy
A trader following this strategy would rebalance to each new 13F on its filing date and hold unchanged until the next filing. Backtesting the rule is then mostly mechanical: replay the rebalances across all available filings and compound the period returns.
As noted, however, 13F filings report options only partially: they name the underlying security and give a dollar value, but omit strike, expiration, and premium. Since the disclosed information is not sufficient to reconstruct the actual option positions, the script below reports the backtest under two modes, labelled equity proxy and option proxy (explained below). The final period runs from the most recent filing to today, so re-running the script updates the last row automatically.2
Code
# ── SA LP 13F data fetcher ─────────────────────────────────────────
# Fetches all 13F-HR and 13F-HR/A filings from SEC EDGAR for
# Situational Awareness LP, parses the infotable XML, and resolves CUSIPs.
# Output: JSON with one entry per report quarter, each containing the
# rebalance filing date, source filing metadata, and holdings.
# Caches results in CACHE_DIR to avoid redundant SEC requests.
import urllib.request, re, json, sys, time, os, xml.etree.ElementTree as ET
from datetime import datetime
# ── Configuration (update these for your environment) ──────────────
SEC_UA = os.environ.get(
'SEC_USER_AGENT',
'stafforini.com situational-awareness-lp research; contact via stafforini.com')
CACHE_DIR = os.path.expanduser('~/.cache')
CIK = '2045724'
BASE = f'https://www.sec.gov/Archives/edgar/data/{CIK}'
NS = {'ns': 'http://www.sec.gov/edgar/document/thirteenf/informationtable'}
CACHE = os.path.join(CACHE_DIR, 'sa-lp-13f.json')
CUSIP_TICKER = {
'038169207': 'APLD', '05614L209': 'BW', '09173B107': 'BITF',
'093712107': 'BE', '093712AH0': 'BE', '11135F101': 'AVGO',
'12514G108': 'CIFR', '17253J106': 'CIFR', '17253JAA4': 'CIFR',
'18452B209': 'CLSK', '19247G107': 'COHR', '21037T109': 'CEG',
'21873S108': 'CRWV', '21874A106': 'CORZ', '26884L109': 'EQT',
'36168Q104': 'GLXY', '36317J209': 'GLXY', '44282L109': 'HUT',
'44812J104': 'HUT', '456788108': 'INFY', '458140100': 'INTC',
'49338L103': 'KRC', '49427F108': 'KRC', '53115L104': 'LBRT',
'55024U109': 'LITE', '55024UAD1': 'LITE', '573874104': 'MRVL',
'577933104': 'MRVL', '593787101': 'MU', '593787105': 'MU',
'595112103': 'MU', '607828100': 'MOD', '67066G104': 'NVDA',
'683344105': 'ONTO', '68340J108': 'ONTO', '73933G202': 'PSIX',
'73933H100': 'PSIX', '743344109': 'PUMP', '74347M108': 'PUMP',
'76754A103': 'RIOT', '767292105': 'RIOT', '80004C200': 'SNDK',
'80106M109': 'SNDK', '83418M103': 'SEI', '87422Q109': 'TLN',
'87425V106': 'TLN', '874039100': 'TSM', '89854H102': 'TSEM',
'92189F106': 'SMH', '92189F676': 'SMH', '92535P101': 'VRT',
'92537N108': 'VRT', '92840M102': 'VST', '958102105': 'WDC',
'958102AT2': 'WDC', '98321C108': 'WYFI',
'G1110V104': 'BITF', 'G1189L107': 'BTDR', 'G11448100': 'BTDR',
'G7945J104': 'STX', 'G7997R103': 'STX', 'G96115103': 'WYFI',
'M87915274': 'TSEM', 'Q4982L109': 'IREN',
}
def fetch(url, timeout=10):
time.sleep(0.5)
req = urllib.request.Request(url, headers={'User-Agent': SEC_UA})
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read()
def find_infotable_filename(acc):
"""Discover the infotable XML filename for a filing via EFTS, then -index.htm."""
# EFTS search (fast, reliable)
try:
efts = f'https://efts.sec.gov/LATEST/search-index?q=%22{acc}%22'
data = json.loads(fetch(efts))
for hit in data.get('hits', {}).get('hits', []):
doc_id = hit['_id'] # format: "accession:filename"
filename = doc_id.split(':', 1)[1] if ':' in doc_id else ''
if filename.endswith('.xml') and 'primary_doc' not in filename:
return filename
except Exception:
pass
# Fallback: filing index page
acc_path = acc.replace('-', '')
try:
html = fetch(f'{BASE}/{acc_path}/{acc}-index.htm').decode()
for href in re.findall(r'href="([^"]*\.xml)"', html):
fn = href.split('/')[-1]
if fn != 'primary_doc.xml' and 'xslForm' not in href:
return fn
except Exception:
pass
return None
def parse_infotable(xml_data):
root = ET.fromstring(xml_data)
holdings = []
for info in root.findall('.//ns:infoTable', NS):
cusip = info.findtext('ns:cusip', '', NS).strip()
value = int(info.findtext('ns:value', '0', NS))
putcall = info.findtext('ns:putCall', '', NS).strip().lower()
ticker = CUSIP_TICKER.get(cusip, '')
if not ticker:
issuer = info.findtext('ns:nameOfIssuer', '', NS)
raise RuntimeError(
f"Unknown CUSIP {cusip} ({issuer}); add it to CUSIP_TICKER")
pos_type = 'put' if putcall == 'put' else 'call' if putcall == 'call' else 'long'
holdings.append({"ticker": ticker, "type": pos_type, "value": value})
return holdings
def quarter_from_filing_date(fdate):
d = datetime.strptime(fdate, '%Y-%m-%d')
m, y = d.month, d.year
if m <= 3: return f'Q4_{y-1}', f'{y-1}-12-31'
elif m <= 6: return f'Q1_{y}', f'{y}-03-31'
elif m <= 9: return f'Q2_{y}', f'{y}-06-30'
else: return f'Q3_{y}', f'{y}-09-30'
def quarter_from_report_date(rdate):
d = datetime.strptime(rdate, '%Y-%m-%d')
q = (d.month - 1) // 3 + 1
return f'Q{q}_{d.year}', d.strftime('%Y-%m-%d')
def load_cache():
if os.path.exists(CACHE):
with open(CACHE) as f:
return json.load(f)
return {"filings": []}
def save_cache(data):
os.makedirs(os.path.dirname(CACHE), exist_ok=True)
with open(CACHE, 'w') as f:
json.dump(data, f)
cached = load_cache()
cached_by_quarter = {f["quarter"]: f for f in cached["filings"]}
result = {"filings": list(cached_by_quarter.values())}
try:
subs_url = f'https://data.sec.gov/submissions/CIK{CIK.zfill(10)}.json'
subs = json.loads(fetch(subs_url))
recent = subs['filings']['recent']
def recent_field(field, i, default=''):
vals = recent.get(field, [])
return vals[i] if i < len(vals) else default
filings_by_quarter = {}
for i, form in enumerate(recent['form']):
if form not in ('13F-HR', '13F-HR/A'):
continue
fdate = recent_field('filingDate', i)
acc = recent_field('accessionNumber', i)
rdate = recent_field('reportDate', i)
if rdate:
quarter, quarter_end = quarter_from_report_date(rdate)
else:
quarter, quarter_end = quarter_from_filing_date(fdate)
filings_by_quarter.setdefault(quarter, []).append({
"quarter": quarter, "quarter_end": quarter_end,
"filing_date": fdate, "form": form, "accession": acc})
accessions = []
for quarter, rows in filings_by_quarter.items():
rows.sort(key=lambda r: (r["filing_date"], r["accession"]))
selected = rows[-1].copy()
# Use the original filing date as the rebalance date. If an amendment
# exists, treat it as a restatement of that quarter's disclosed holdings.
selected["original_filing_date"] = rows[0]["filing_date"]
accessions.append(selected)
accessions.sort(key=lambda r: r["quarter_end"])
for filing in accessions:
quarter = filing["quarter"]
quarter_end = filing["quarter_end"]
fdate = filing["filing_date"]
acc = filing["accession"]
form = filing["form"]
original_fdate = filing["original_filing_date"]
cached_filing = cached_by_quarter.get(quarter)
if cached_filing and cached_filing.get("accession") == acc:
continue
filename = find_infotable_filename(acc)
if not filename:
print(f"Could not find infotable for {acc}", file=sys.stderr)
continue
acc_path = acc.replace('-', '')
xml = fetch(f'{BASE}/{acc_path}/{filename}')
holdings = parse_infotable(xml)
cached_by_quarter[quarter] = {
"quarter": quarter, "quarter_end": quarter_end,
"filing_date": original_fdate,
"source_filing_date": fdate, "form": form,
"accession": acc, "holdings": holdings}
result["filings"] = sorted(
cached_by_quarter.values(),
key=lambda f: f.get("quarter_end", f["filing_date"]))
save_cache(result)
except Exception as e:
if result["filings"]:
print(f"SEC fetch error ({e}); using cache", file=sys.stderr)
else:
raise
return json.dumps(result)
import json
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
import requests
import time
import os
import warnings
warnings.filterwarnings('ignore')
# Parse data from the scraper block
parsed = json.loads(data) if isinstance(data, str) else data
filings = parsed["filings"]
# Build internal structures
filing_dates = {f["quarter"]: f["filing_date"] for f in filings}
quarter_end_dates = {f["quarter"]: f["quarter_end"] for f in filings}
quarters = [f["quarter"] for f in filings]
# Convert holdings list to dict keyed by quarter.
# Multiple positions in the same ticker with different types are aggregated
# by value per (ticker, type) pair.
holdings = {}
for f in filings:
positions = {}
for h in f["holdings"]:
ticker = h["ticker"]
pos_type = h["type"]
value = h["value"]
key = (ticker, pos_type)
positions[key] = positions.get(key, 0) + value
holdings[f["quarter"]] = positions
def _extract_close_series(df, ticker):
"""Extract a single close-price series from a yfinance result."""
if df.empty:
return pd.Series(dtype=float)
if isinstance(df.columns, pd.MultiIndex):
if 'Close' not in df.columns.get_level_values(0):
return pd.Series(dtype=float)
close = df['Close']
if isinstance(close, pd.DataFrame):
if ticker in close.columns:
series = close[ticker]
elif len(close.columns) == 1:
series = close.iloc[:, 0]
else:
return pd.Series(dtype=float)
else:
series = close
elif 'Close' in df.columns:
series = df['Close']
if isinstance(series, pd.DataFrame):
series = series.iloc[:, 0]
else:
return pd.Series(dtype=float)
return pd.to_numeric(series, errors='coerce').dropna()
def _download_close_series(ticker, start, end):
"""Download one ticker's close series; used to repair flaky batch misses."""
df = yf.download(ticker, start=start, end=end, progress=False,
auto_adjust=True)
return _extract_close_series(df, ticker)
def get_prices(tickers, dates):
"""Fetch close prices for tickers on specific dates."""
unique_tickers = sorted(set(tickers))
all_dates = [datetime.strptime(d, '%Y-%m-%d') for d in dates]
start = min(all_dates) - timedelta(days=5)
end = max(all_dates) + timedelta(days=5)
df = yf.download(unique_tickers, start=start, end=end, progress=False, auto_adjust=True)
# yf.download returns MultiIndex columns (metric, ticker) for multiple tickers
if df.empty:
close = pd.DataFrame()
elif isinstance(df.columns, pd.MultiIndex) and 'Close' in df.columns.get_level_values(0):
close = df['Close'].copy()
elif 'Close' in df.columns:
close = df[['Close']].copy()
close.columns = unique_tickers
else:
close = pd.DataFrame()
prices = {}
for ticker in unique_tickers:
if ticker in close.columns:
series = pd.to_numeric(close[ticker], errors='coerce').dropna()
else:
series = pd.Series(dtype=float)
if series.empty:
series = _download_close_series(ticker, start, end)
if series.empty:
continue
prices[ticker] = {}
for date_str in dates:
target = pd.Timestamp(datetime.strptime(date_str, '%Y-%m-%d'))
after = series[series.index >= target]
if not after.empty:
prices[ticker][date_str] = float(after.iloc[0])
else:
before = series[series.index <= target]
if not before.empty:
prices[ticker][date_str] = float(before.iloc[-1])
return prices
def _price_on_or_after(px_by_date, target_date):
"""Return (date, price) for the first available price on/after target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d >= target_date)
if not dates:
return None
d = dates[0]
return d, px_by_date[d]
def _price_on_or_before(px_by_date, target_date):
"""Return (date, price) for the last available price on/before target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d <= target_date)
if not dates:
return None
d = dates[-1]
return d, px_by_date[d]
def _period_price_pair(px_by_date, start_date, end_date):
"""Return start/end prices for a period using sensible boundary alignment."""
start = _price_on_or_after(px_by_date, start_date)
end = _price_on_or_before(px_by_date, end_date)
if start is None or end is None:
return None
start_actual, p0 = start
end_actual, p1 = end
if end_actual < start_actual:
return None
return start_actual, end_actual, p0, p1
def _option_position_key(ticker, pos_type):
return (ticker, pos_type)
def _linear_underlying_sign(pos_type):
"""Direction when option rows are converted to underlying equity exposure."""
return -1 if pos_type == 'put' else 1
def compute_return(positions, prices, start_date, end_date, mode='equity_only',
option_prices=None):
"""Compute portfolio return between two dates.
The 13F value for an option row is treated as underlying notional, not
option premium. Option contracts are sized from that notional, but the
portfolio denominator is estimated deployed capital: stock value plus option
premium cost. This avoids treating the gap between option notional and
option premium as cash. In 'full' mode, every option row requires a
MarketData price series; missing data raises rather than falling back.
"""
total_cost = 0
portfolio_pnl = 0
for (ticker, pos_type), value in positions.items():
is_option = pos_type in ('call', 'put')
stock_px = prices.get(ticker)
if mode == 'equity_only':
if pos_type not in ('long', 'call', 'put'):
continue
pair = _period_price_pair(stock_px, start_date, end_date)
if pair is None:
continue
start_actual, end_actual, p0, p1 = pair
if p0 == 0:
continue
stock_ret = (p1 - p0) / p0
total_cost += value
portfolio_pnl += value * _linear_underlying_sign(pos_type) * stock_ret
continue
if is_option:
opt_key = _option_position_key(ticker, pos_type)
opt_px = option_prices.get(opt_key) if option_prices else None
if not opt_px:
raise RuntimeError(
f"No MarketData option prices for {opt_key} in period "
f"{start_date}..{end_date}")
pair = _period_price_pair(opt_px, start_date, end_date)
if pair is None:
raise RuntimeError(
f"MarketData option price series for {opt_key} does not "
f"cover {start_date}..{end_date}")
start_actual, end_actual, opt_p0, opt_p1 = pair
stock_start = _price_on_or_after(stock_px, start_actual)
if stock_start is None or stock_start[1] <= 0:
stock_start = _price_on_or_after(stock_px, start_date)
if stock_start is None or stock_start[1] <= 0:
raise RuntimeError(
f"No underlying price for {ticker} at {start_date}")
p0, p1 = opt_p0, opt_p1
underlying_p0 = stock_start[1]
if p0 <= 0 or underlying_p0 <= 0:
continue
position_cost = value * (p0 / underlying_p0)
position_pnl = value * ((p1 - p0) / underlying_p0)
else:
pair = _period_price_pair(stock_px, start_date, end_date)
if pair is None:
continue
start_actual, end_actual, p0, p1 = pair
if p0 == 0:
continue
stock_ret = (p1 - p0) / p0
position_cost = value
position_pnl = value * stock_ret
if position_cost <= 0:
continue
total_cost += position_cost
portfolio_pnl += position_pnl
return portfolio_pnl / total_cost if total_cost else None
def annualize(ret, days):
"""Annualize a return over a given number of calendar days."""
if ret is None or days <= 0:
return None
return (1 + ret) ** (365.25 / days) - 1
def fmt(ret):
return f"{ret * 100:+.2f}%" if ret is not None else "N/A"
# Collect all tickers and dates
all_tickers = set()
for positions in holdings.values():
for (ticker, _) in positions:
all_tickers.add(ticker)
all_tickers.add('SPY')
all_tickers.add('AIS')
today = datetime.now().strftime('%Y-%m-%d')
first_date = filing_dates[quarters[0]]
all_dates = set(filing_dates.values()) | set(quarter_end_dates.values()) | {today}
prices = get_prices(sorted(all_tickers), sorted(all_dates))
# Resolve `today` to the actual last available closing date.
# yfinance may not have data for today (market still open or holiday),
# so we look up what date SPY's price actually corresponds to.
def _resolve_price_date(prices, requested_date):
"""Return the actual trading date of the price stored under requested_date."""
ref = 'SPY' if 'SPY' in prices else next(iter(prices), None)
if not ref or requested_date not in prices[ref]:
return requested_date
target_price = prices[ref][requested_date]
# Re-download a small window to find the real date of this price
start = datetime.strptime(requested_date, '%Y-%m-%d') - timedelta(days=10)
end = datetime.strptime(requested_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(ref, start=start, end=end, progress=False, auto_adjust=True)
if df.empty:
return requested_date
if isinstance(df.columns, pd.MultiIndex):
close = df['Close'][ref].dropna()
elif 'Close' in df.columns:
close = df['Close'].dropna()
else:
close = df.iloc[:, 0].dropna()
for dt, px in close.items():
val = float(px.iloc[0]) if isinstance(px, pd.Series) else float(px)
if abs(val - target_price) < 0.01:
ts = dt[0] if isinstance(dt, tuple) else dt
return pd.Timestamp(ts).strftime('%Y-%m-%d')
return requested_date
today_resolved = _resolve_price_date(prices, today)
if today_resolved != today:
for ticker in prices:
if today in prices[ticker]:
prices[ticker][today_resolved] = prices[ticker].pop(today)
today = today_resolved
def download_daily(tickers, start_date, end_date):
"""Download daily close prices from yfinance, handling MultiIndex.
Dates are 'YYYY-MM-DD' strings. Adds a small buffer for trading-day alignment."""
tickers_sorted = sorted(tickers)
start = datetime.strptime(start_date, '%Y-%m-%d') - timedelta(days=5)
end = datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(tickers_sorted, start=start, end=end,
progress=False, auto_adjust=True)
if df.empty:
close = pd.DataFrame()
elif isinstance(df.columns, pd.MultiIndex) and 'Close' in df.columns.get_level_values(0):
close = df['Close'].copy()
elif 'Close' in df.columns:
close = df[['Close']].copy()
close.columns = tickers_sorted
else:
close = pd.DataFrame()
for ticker in tickers_sorted:
if ticker in close.columns and not close[ticker].dropna().empty:
continue
series = _download_close_series(ticker, start, end)
if not series.empty:
close[ticker] = series
return close.sort_index()
# -- Historical option prices via MarketData --------------------------------
OPTION_CACHE_DIR = os.path.expanduser('~/My Drive/notes/.sa-lp-option-cache')
_MD_BASE = 'https://api.marketdata.app/v1'
_MD_RATE_DELAY = 0.15
OPTION_CACHE_COLUMNS = [
'date', 'selected_on', 'option_type', 'symbol', 'strike', 'expiry',
'delta', 'price']
# Default contract selection parameters. The option proxy picks a contract
# matching option type, with expiry between min_days and max_days of the
# period start, and |delta| closest to delta_target. When the chain is
# sparse, the achieved |delta| may be far from the target; the sensitivity
# block reports achieved |delta| so this is visible rather than silent.
OPTION_DELTA = 0.15
EXPIRY_MIN_DAYS = 270 # ~9 months
EXPIRY_MAX_DAYS = 456 # ~15 months
def _normalize_option_type(option_type):
option_type = str(option_type).lower()
if option_type not in ('call', 'put'):
raise ValueError(f"Unsupported option type: {option_type}")
return option_type
def _empty_option_cache():
return pd.DataFrame(columns=OPTION_CACHE_COLUMNS)
def _option_cache_path(ticker, option_type, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Return the cache CSV path for (ticker, type, delta_target, window).
When the parameter triple equals the baseline (0.15, 270-456 days), the
historical filename ``TICKER-TYPE.csv`` is used so the main-backtest
cache is reused automatically. Any non-baseline combo lives in a
separate ``TICKER-TYPE-d<delta>-e<min>-<max>.csv`` file so a sensitivity
sweep never pollutes the baseline cache (which the portfolio calculator
reads to pick the representative contract for the current filing).
"""
option_type = _normalize_option_type(option_type)
is_baseline = (
abs(delta_target - OPTION_DELTA) < 1e-9
and min_days == EXPIRY_MIN_DAYS
and max_days == EXPIRY_MAX_DAYS)
if is_baseline:
return os.path.join(OPTION_CACHE_DIR, f'{ticker}-{option_type}.csv')
return os.path.join(
OPTION_CACHE_DIR,
f'{ticker}-{option_type}-d{delta_target:g}-e{min_days}-{max_days}.csv')
def _load_option_cache(ticker, option_type, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Load cached MarketData rows for a ticker/type/target/window. Returns DataFrame or empty."""
option_type = _normalize_option_type(option_type)
path = _option_cache_path(ticker, option_type, delta_target,
min_days, max_days)
if not os.path.exists(path):
return _empty_option_cache()
df = pd.read_csv(path)
if df.empty:
return _empty_option_cache()
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
for col in ('date', 'selected_on'):
df[col] = pd.to_datetime(
df[col], errors='coerce').dt.strftime('%Y-%m-%d')
df['option_type'] = df['option_type'].fillna(option_type).str.lower()
cache = df[OPTION_CACHE_COLUMNS].copy()
cache = cache[cache['option_type'] == option_type].copy()
cache.dropna(subset=['date'], inplace=True)
for col in ('strike', 'delta', 'price'):
cache[col] = pd.to_numeric(cache[col], errors='coerce')
cache.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
return cache[OPTION_CACHE_COLUMNS]
def _save_option_cache(ticker, option_type, df, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Persist typed option cache to CSV."""
option_type = _normalize_option_type(option_type)
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
path = _option_cache_path(ticker, option_type, delta_target,
min_days, max_days)
if df.empty:
df = _empty_option_cache()
else:
df = df.copy()
df['option_type'] = option_type
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
df.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
df.sort_values(['date', 'expiry', 'strike'], inplace=True)
df.to_csv(path, index=False)
def _contract_window(ref_date_str, min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
ref = datetime.strptime(ref_date_str, '%Y-%m-%d')
return ref + timedelta(days=min_days), ref + timedelta(days=max_days)
def _contract_from_cache_row(row, ref_date_str, option_type,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
option_type = _normalize_option_type(option_type)
if str(row.get('option_type', option_type)).lower() != option_type:
return None
lo, hi = _contract_window(ref_date_str, min_days, max_days)
try:
exp = datetime.strptime(str(row['expiry']), '%Y-%m-%d')
except (KeyError, TypeError, ValueError):
return None
if not (lo <= exp <= hi):
return None
strike = _safe_float(row.get('strike'))
delta = _safe_float(row.get('delta'))
price = _safe_float(row.get('price'))
if strike is None or delta is None or price is None or price <= 0:
return None
return {
'selected_on': row.get('selected_on'),
'option_type': option_type,
'symbol': row.get('symbol'),
'strike': strike,
'expiry': str(row['expiry']),
'delta': delta,
'price': price,
}
def _select_cached_contract(cache, option_type, ref_date_str,
delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS,
require_selected=False):
rows = cache[(cache['date'] == ref_date_str)
& (cache['option_type'] == option_type)]
selected_rows = rows[rows['selected_on'] == ref_date_str]
if not selected_rows.empty:
rows = selected_rows
elif require_selected:
rows = selected_rows
candidates = []
for _, row in rows.iterrows():
contract = _contract_from_cache_row(row, ref_date_str, option_type,
min_days, max_days)
if contract:
candidates.append(contract)
if not candidates:
return None
candidates.sort(key=lambda x: abs(abs(x['delta']) - delta_target))
return candidates[0]
def _parse_option_price(contract):
"""Extract a mark price from an option contract record."""
mid = _safe_float(contract.get('mid'))
if mid and mid > 0:
return mid
bid = _safe_float(contract.get('bid'))
ask = _safe_float(contract.get('ask'))
last = _safe_float(contract.get('last'))
if bid and ask and bid > 0 and ask > 0:
return (bid + ask) / 2
if last and last > 0:
return last
return None
def _safe_float(val):
try:
out = float(val)
if np.isnan(out):
return None
return out
except (TypeError, ValueError):
return None
def _marketdata_key():
"""Return the MarketData API key, or None if unavailable.
Resolution order:
1. ``MARKETDATA_KEY`` / ``MARKETDATA_API_KEY`` environment variables.
2. ``pass env/marketdata-token`` (local ``pass`` store).
The result is memoised on the function object so repeated lookups
during a sweep do not reshell. Fetch helpers raise themselves when
called without a key, so a fully cached run still succeeds without
requiring either source.
"""
if hasattr(_marketdata_key, '_cached'):
return _marketdata_key._cached
key = (os.environ.get('MARKETDATA_KEY', '')
or os.environ.get('MARKETDATA_API_KEY', ''))
if not key:
try:
import subprocess
out = subprocess.run(
['pass', 'show', 'env/marketdata-token'],
capture_output=True, text=True, timeout=5, check=False)
if out.returncode == 0:
key = out.stdout.strip().splitlines()[0] if out.stdout else ''
except (FileNotFoundError, subprocess.TimeoutExpired):
key = ''
_marketdata_key._cached = key or None
return _marketdata_key._cached
def _marketdata_get(path, params, api_key):
"""Fetch a MarketData endpoint, returning normalized row dictionaries.
Raises on HTTP errors or a non-'ok' status. 'no_data' is returned as
an empty list so that callers can distinguish 'nothing available' from
'request failed'.
"""
headers = {'Accept': 'application/json', 'Authorization': f'Bearer {api_key}'}
resp = requests.get(_MD_BASE + path, params=params, headers=headers,
timeout=30)
resp.raise_for_status()
body = resp.json()
status = body.get('s')
if status == 'no_data':
return []
if status != 'ok':
raise RuntimeError(
f"MarketData {path} returned status={status!r}: "
f"{body.get('errmsg') or body}")
lengths = [len(v) for v in body.values() if isinstance(v, list)]
n = max(lengths) if lengths else 0
rows = []
for i in range(n):
row = {}
for key, val in body.items():
if isinstance(val, list):
row[key] = val[i] if i < len(val) else None
else:
row[key] = val
rows.append(row)
return rows
def _marketdata_date(timestamp):
try:
return datetime.utcfromtimestamp(int(timestamp)).strftime('%Y-%m-%d')
except (TypeError, ValueError, OSError):
return None
def _occ_symbol(ticker, option_type, strike, expiry):
"""Build a standard OCC option symbol from contract fields."""
cp = 'C' if _normalize_option_type(option_type) == 'call' else 'P'
exp = datetime.strptime(str(expiry), '%Y-%m-%d').strftime('%y%m%d')
strike_int = int(round(float(strike) * 1000))
root = ticker.upper().replace('.', '')
return f'{root}{exp}{cp}{strike_int:08d}'
# Chains are always fetched with a broad expiry window so they can be cached
# and reused for in-memory selection across any (delta_target, expiry window)
# combination in the sensitivity sweep.
CHAIN_FETCH_MIN_DAYS = 30
CHAIN_FETCH_MAX_DAYS = 760
def _fetch_marketdata_chain(ticker, date_str, option_type, api_key,
min_days=CHAIN_FETCH_MIN_DAYS,
max_days=CHAIN_FETCH_MAX_DAYS):
lo, hi = _contract_window(date_str, min_days, max_days)
params = {
'date': date_str,
'from': lo.strftime('%Y-%m-%d'),
'to': hi.strftime('%Y-%m-%d'),
'side': _normalize_option_type(option_type),
'expiration': 'all',
}
return _marketdata_get(f'/options/chain/{ticker}/', params, api_key)
# Chain cache: one CSV per (ticker, type, date) storing the broad-window chain.
# Lets the sensitivity sweep re-select contracts for different delta targets
# and expiry windows without refetching.
CHAIN_CACHE_DIR = os.path.join(OPTION_CACHE_DIR, 'chains')
def _chain_cache_path(ticker, option_type, date_str):
option_type = _normalize_option_type(option_type)
return os.path.join(CHAIN_CACHE_DIR,
f'{ticker}-{option_type}-{date_str}.csv')
def _load_chain_cache(ticker, option_type, date_str):
path = _chain_cache_path(ticker, option_type, date_str)
if not os.path.exists(path):
return None
df = pd.read_csv(path)
if df.empty:
return []
return df.to_dict('records')
def _save_chain_cache(ticker, option_type, date_str, chain):
if not chain:
return
os.makedirs(CHAIN_CACHE_DIR, exist_ok=True)
path = _chain_cache_path(ticker, option_type, date_str)
pd.DataFrame(chain).to_csv(path, index=False)
def _get_or_fetch_chain(ticker, date_str, option_type, api_key,
fetched_counter=None):
"""Return the cached broad chain for (ticker, type, date), fetching if absent.
Requires ``api_key`` only when a fetch is actually needed.
"""
chain = _load_chain_cache(ticker, option_type, date_str)
if chain is not None:
return chain
if not api_key:
raise RuntimeError(
"MARKETDATA_KEY is not set but a chain fetch is required for "
f"{ticker} {option_type} on {date_str}.")
time.sleep(_MD_RATE_DELAY)
chain = _fetch_marketdata_chain(ticker, date_str, option_type, api_key)
if fetched_counter is not None:
fetched_counter['marketdata_chains'] += 1
_save_chain_cache(ticker, option_type, date_str, chain)
return chain
def _fetch_marketdata_quotes(symbol, start_date, end_date, api_key):
to_date = (datetime.strptime(end_date, '%Y-%m-%d')
+ timedelta(days=1)).strftime('%Y-%m-%d')
rows = _marketdata_get(f'/options/quotes/{symbol}/',
{'from': start_date, 'to': to_date}, api_key)
prices = {}
for row in rows:
date_str = _marketdata_date(row.get('updated'))
if not date_str:
continue
price = _parse_option_price(row)
if price is not None and price > 0:
prices[date_str] = price
return prices
def _implied_vol_from_price(S, K, T, option_price, option_type):
"""Infer Black-Scholes volatility from an observed option mid price."""
if any(x is None for x in (S, K, T, option_price)):
return None
if S <= 0 or K <= 0 or T <= 0 or option_price <= 0:
return None
intrinsic = max(S - K, 0) if option_type == 'call' else max(K - S, 0)
upper = S if option_type == 'call' else K
if option_price < intrinsic - 1e-6 or option_price > upper * 1.5:
return None
lo, hi = 1e-4, 5.0
try:
if (option_price < bs_price(S, K, T, lo, option_type) - 1e-4
or option_price > bs_price(S, K, T, hi, option_type) + 1e-4):
return None
for _ in range(80):
mid = (lo + hi) / 2
if bs_price(S, K, T, mid, option_type) < option_price:
lo = mid
else:
hi = mid
return (lo + hi) / 2
except (FloatingPointError, ValueError, ZeroDivisionError):
return None
def _marketdata_delta(row, ref_date_str, expiry, option_type, price):
"""Use vendor delta when present; otherwise infer it from the quote."""
native = _safe_float(row.get('delta'))
if native is not None and native != 0:
return native
S = _safe_float(row.get('underlyingPrice'))
K = _safe_float(row.get('strike'))
ref = datetime.strptime(ref_date_str, '%Y-%m-%d')
exp = datetime.strptime(expiry, '%Y-%m-%d')
T = max((exp - ref).days / 365.25, 1e-6)
sigma = _safe_float(row.get('iv'))
if sigma is None or sigma <= 0:
sigma = _implied_vol_from_price(S, K, T, price, option_type)
if S is None or K is None or sigma is None or sigma <= 0:
return None
return bs_delta(S, K, T, sigma, option_type)
def _select_marketdata_contract(chain, ref_date_str, option_type,
delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
option_type = _normalize_option_type(option_type)
lo, hi = _contract_window(ref_date_str, min_days, max_days)
candidates = []
for c in chain:
if str(c.get('side', '')).lower() != option_type:
continue
expiry = _marketdata_date(c.get('expiration'))
if not expiry:
continue
exp = datetime.strptime(expiry, '%Y-%m-%d')
if not (lo <= exp <= hi):
continue
price = _parse_option_price(c)
if price is None or price <= 0:
continue
delta = _marketdata_delta(c, ref_date_str, expiry, option_type, price)
if delta is None or delta == 0:
continue
strike = _safe_float(c.get('strike'))
symbol = c.get('optionSymbol')
if strike is None or not symbol:
continue
candidates.append({
'option_type': option_type,
'symbol': symbol,
'strike': strike,
'expiry': expiry,
'delta': delta,
'price': price,
})
if not candidates:
return None
candidates.sort(key=lambda x: abs(abs(x['delta']) - delta_target))
return candidates[0]
def download_option_prices(option_positions, quarters, holdings, filing_dates,
today, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
"""Download historical representative option prices from MarketData.
MarketData is the sole supported provider. MARKETDATA_KEY must be set.
For each (ticker, option_type) and each filing period in which that
position is held:
1. On the first trading day, select a contract matching type, with
expiry between ``min_days`` and ``max_days`` of the period start, and
|delta| closest to ``delta_target``. MarketData's Starter plan often
returns null Greeks, so delta is inferred from the observed mid price
via Black-Scholes when the vendor delta is missing.
2. Lock in that contract for the period.
3. Track its historical mid price through the period.
The broad option chain for each (ticker, type, first_day) is cached to
disk so that sensitivity sweeps over (delta_target, expiry window) reuse
a single fetch.
Raises ``RuntimeError`` if no suitable contract can be selected for any
required (ticker, type, period), or if MarketData returns no price series
for the selected contract.
Parameters
----------
delta_target : float
Target |delta| for contract selection (default ``OPTION_DELTA``).
min_days, max_days : int
Contract expiry window in days from period start (default 270-456,
i.e. 9-15 months).
Returns
-------
per_period : dict {quarter_str: {(ticker, type): {date_str: float}}}
Option prices keyed by filing period then option position. Each
period has its own contract's prices.
"""
option_positions = sorted({
(ticker, _normalize_option_type(pos_type))
for ticker, pos_type in option_positions})
md_key = _marketdata_key()
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
per_period = {} # {q: {(ticker, type): {date_str: price}}}
fetched = {'marketdata_chains': 0, 'marketdata_quotes': 0}
for ticker, option_type in option_positions:
opt_key = _option_position_key(ticker, option_type)
cache = _load_option_cache(ticker, option_type, delta_target,
min_days, max_days)
new_rows = []
for i, q in enumerate(quarters):
# Skip quarters where this exact option position is absent.
if opt_key not in holdings[q]:
continue
period_start = filing_dates[q]
period_end = (filing_dates[quarters[i + 1]]
if i < len(quarters) - 1 else today)
trading_days = pd.bdate_range(period_start, period_end)
if len(trading_days) == 0:
continue
first_day = trading_days[0].strftime('%Y-%m-%d')
# -- Select contract on first trading day --
contract = _select_cached_contract(
cache, option_type, first_day,
delta_target=delta_target,
min_days=min_days, max_days=max_days,
require_selected=True)
if contract is None:
chain = _get_or_fetch_chain(
ticker, first_day, option_type, md_key, fetched)
contract = _select_marketdata_contract(
chain, first_day, option_type,
delta_target=delta_target,
min_days=min_days, max_days=max_days)
if contract is None:
raise RuntimeError(
f"MarketData returned no usable {option_type} contract "
f"for {ticker} on {first_day} (period {q}) at "
f"delta={delta_target}, "
f"expiry {min_days}-{max_days}d")
new_rows.append({
'date': first_day,
'selected_on': first_day,
'option_type': option_type,
'symbol': contract.get('symbol'),
'strike': contract['strike'],
'expiry': contract['expiry'],
'delta': contract['delta'],
'price': contract['price'],
})
strike = contract['strike']
expiry = contract['expiry']
symbol = contract.get('symbol') or _occ_symbol(
ticker, option_type, strike, expiry)
# -- Collect prices for this period (fresh dict per period) --
period_prices = {}
# Fast path: read matching prices from cache.
rows = cache[
(cache['date'] >= period_start)
& (cache['date'] <= period_end)
& (cache['option_type'] == option_type)
& (abs(cache['strike'] - strike) < 0.01)
& (cache['expiry'].astype(str) == str(expiry))
& pd.notna(cache['price'])]
selected_rows = rows[rows['selected_on'] == first_day]
if not selected_rows.empty:
rows = selected_rows
for _, row in rows.iterrows():
period_prices[row['date']] = float(row['price'])
# Decide whether to refresh quotes. With a key, refresh whenever
# the cached series does not reach period_end. Without a key,
# only fail if the cached series is empty; a slightly stale
# tail is acceptable for cache-only runs (e.g. sensitivity
# sweeps replaying the baseline contract).
has_partial = bool(period_prices)
reaches_end = has_partial and max(period_prices) >= period_end
if md_key and not reaches_end:
time.sleep(_MD_RATE_DELAY)
quote_prices = _fetch_marketdata_quotes(
symbol, period_start, period_end, md_key)
fetched['marketdata_quotes'] += 1
for day_str, price in quote_prices.items():
if period_start <= day_str <= period_end:
period_prices[day_str] = price
new_rows.append({
'date': day_str,
'selected_on': first_day,
'option_type': option_type,
'symbol': symbol,
'strike': strike,
'expiry': expiry,
'delta': contract['delta'],
'price': price,
})
if contract.get('price') and first_day not in period_prices:
period_prices[first_day] = contract['price']
elif not md_key and not has_partial:
raise RuntimeError(
"MARKETDATA_KEY is not set and no cached quotes exist "
f"for {symbol} in {period_start}..{period_end}.")
if not period_prices:
raise RuntimeError(
f"MarketData returned no quotes for {symbol} "
f"({opt_key}) in {period_start}..{period_end}")
per_period.setdefault(q, {})[opt_key] = period_prices
# Persist new data to cache
if new_rows:
new_df = pd.DataFrame(new_rows)
cache = pd.concat([cache, new_df], ignore_index=True)
cache.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
_save_option_cache(ticker, option_type, cache, delta_target,
min_days, max_days)
if any(fetched.values()):
import sys
parts = []
if fetched['marketdata_chains']:
parts.append(f"{fetched['marketdata_chains']} MarketData chains")
if fetched['marketdata_quotes']:
parts.append(f"{fetched['marketdata_quotes']} MarketData quote series")
print(f"[options] Fetched {', '.join(parts)}", file=sys.stderr)
return per_period
# -- Black-Scholes helpers (used only to infer delta when MarketData's
# Starter-plan historical Greeks are null; never to reprice returns) -----
from scipy.stats import norm as _norm
def bs_price(S, K, T, sigma, option_type='call'):
"""Black-Scholes option price (assumes zero risk-free rate and dividends)."""
if T <= 0 or sigma <= 0:
if option_type == 'call':
return max(S - K, 0)
return max(K - S, 0)
d1 = (np.log(S / K) + (sigma ** 2 / 2) * T) / (sigma * np.sqrt(T))
d2 = d1 - sigma * np.sqrt(T)
if option_type == 'call':
return S * _norm.cdf(d1) - K * _norm.cdf(d2)
return K * _norm.cdf(-d2) - S * _norm.cdf(-d1)
def bs_delta(S, K, T, sigma, option_type='call'):
"""Black-Scholes delta (assumes zero risk-free rate and dividends)."""
if T <= 0 or sigma <= 0:
if option_type == 'call':
return 1.0 if S > K else 0.0
return -1.0 if S < K else 0.0
d1 = (np.log(S / K) + (sigma ** 2 / 2) * T) / (sigma * np.sqrt(T))
if option_type == 'call':
return _norm.cdf(d1)
return _norm.cdf(d1) - 1
def daily_cumulative(holdings, quarters, filing_dates, close, today, mode,
per_period_opt=None):
"""Build a daily series of cumulative growth factors for a given mode.
For each filing period, stock shares and option contracts are fixed. In
equity-proxy mode, option rows are converted to linear underlying exposure:
calls are long underlying and puts are short underlying. In option-proxy
mode, option rows are sized by 13F underlying notional and returns come
from MarketData quotes; returns are divided by deployed capital (stock
value plus option premium cost). Option-proxy mode raises if MarketData
prices are missing for any required position.
"""
cum_growth = 1.0
dates_out = []
values_out = []
for i, q in enumerate(quarters):
period_start = filing_dates[q]
period_end = filing_dates[quarters[i + 1]] if i < len(quarters) - 1 else today
ps = pd.Timestamp(period_start)
pe = pd.Timestamp(period_end)
# Trading days in this period
mask = (close.index >= ps) & (close.index <= pe)
period_close = close[mask]
if period_close.empty:
continue
# Option prices for this period (keyed by (ticker, type) → prices)
quarter_opt = per_period_opt.get(q, {}) if per_period_opt else {}
# Determine starting prices, fixed exposure, and deployed capital.
positions = holdings[q]
exposure = {}
costs = {}
start_prices = {}
start_underlying = {}
use_opt_px = {} # track which positions use option prices
total_cost = 0
for (ticker, pos_type), value in positions.items():
is_option = pos_type in ('call', 'put')
opt_key = _option_position_key(ticker, pos_type)
if mode == 'equity_only':
if pos_type not in ('long', 'call', 'put'):
continue
if ticker not in close.columns:
continue
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
continue
stock_start = float(avail.iloc[0])
if stock_start <= 0:
continue
start_prices[(ticker, pos_type)] = stock_start
start_underlying[(ticker, pos_type)] = stock_start
costs[(ticker, pos_type)] = value
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = False
total_cost += value
continue
# mode == 'full' (option proxy)
if is_option:
if opt_key not in quarter_opt:
raise RuntimeError(
f"No MarketData option prices for {opt_key} in "
f"period {q}")
ticker_opt = quarter_opt[opt_key]
opt_dates = sorted(d for d in ticker_opt if d >= period_start)
if not opt_dates:
raise RuntimeError(
f"MarketData option prices for {opt_key} in period "
f"{q} contain no dates at or after {period_start}")
if ticker not in close.columns:
raise RuntimeError(
f"No underlying close series for {ticker}")
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
raise RuntimeError(
f"No underlying price for {ticker} at {period_start}")
opt_start = ticker_opt[opt_dates[0]]
underlying_start = float(avail.iloc[0])
if opt_start <= 0 or underlying_start <= 0:
raise RuntimeError(
f"Non-positive starting price for {opt_key} in "
f"period {q}")
start_prices[(ticker, pos_type)] = opt_start
start_underlying[(ticker, pos_type)] = underlying_start
costs[(ticker, pos_type)] = value * opt_start / underlying_start
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = True
total_cost += costs[(ticker, pos_type)]
continue
# Plain stock in full mode
if ticker not in close.columns:
continue
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
continue
stock_start = float(avail.iloc[0])
if stock_start <= 0:
continue
start_prices[(ticker, pos_type)] = stock_start
start_underlying[(ticker, pos_type)] = stock_start
costs[(ticker, pos_type)] = value
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = False
total_cost += value
if total_cost == 0:
continue
# Daily P&L relative to period start.
# Skip first day of subsequent periods (already recorded as last day
# of the prior period) to avoid duplicate boundary dates.
start_idx = 1 if i > 0 else 0
# Forward-fill: track last known option price so that gaps in
# option data don't cause positions to vanish mid-period.
last_opt = {k: v for k, v in start_prices.items()
if use_opt_px.get(k)}
for day_idx in range(start_idx, len(period_close)):
day = period_close.index[day_idx]
day_str = day.strftime('%Y-%m-%d')
period_pnl = 0
for (ticker, pos_type), value in exposure.items():
p0 = start_prices[(ticker, pos_type)]
if p0 == 0:
continue
if use_opt_px[(ticker, pos_type)]:
opt_key = _option_position_key(ticker, pos_type)
p1_val = quarter_opt.get(opt_key, {}).get(day_str)
if p1_val is not None:
last_opt[(ticker, pos_type)] = p1_val
else:
p1_val = last_opt.get((ticker, pos_type))
if p1_val is None:
continue
underlying_p0 = start_underlying.get((ticker, pos_type))
if not underlying_p0 or underlying_p0 <= 0:
continue
position_pnl = value * (float(p1_val) - p0) / underlying_p0
else:
if ticker not in period_close.columns:
continue
p1_val = period_close[ticker].iloc[day_idx]
if pd.isna(p1_val):
continue
stock_ret = (float(p1_val) - p0) / p0
if mode == 'equity_only':
position_pnl = (
value * _linear_underlying_sign(pos_type) * stock_ret)
else:
position_pnl = value * stock_ret
period_pnl += position_pnl
dates_out.append(day)
values_out.append(cum_growth * (1 + period_pnl / total_cost))
# Chain: next period starts from the last day's growth factor
if values_out:
cum_growth = values_out[-1]
return dates_out, values_out
# Download representative option prices from MarketData (raises on missing data).
option_positions = sorted({
(t, pt)
for q in quarters
for (t, pt) in holdings[q]
if pt in ('call', 'put')
})
per_period_opt = download_option_prices(
option_positions, quarters, holdings, filing_dates, today)
# Compute copycat returns
header = (f"{'Period':<16} {'Dates':<24} "
f"{'Eq. proxy':>9} {'Opt. proxy':>10} {'SPY':>9}")
print("COPYCAT STRATEGY RETURNS")
print("=" * 72)
print(header)
print("-" * 72)
cum_eq = 1.0
cum_full = 1.0
cum_spy = 1.0
for i, q in enumerate(quarters):
start = filing_dates[q]
end = filing_dates[quarters[i + 1]] if i < len(quarters) - 1 else today
suffix = " †" if i == len(quarters) - 1 else ""
ret_eq = compute_return(holdings[q], prices, start, end, 'equity_only')
ret_full = compute_return(holdings[q], prices, start, end, 'full',
option_prices=per_period_opt.get(q, {}))
ret_spy = None
if 'SPY' in prices and start in prices['SPY'] and end in prices['SPY']:
spy_p0, spy_p1 = prices['SPY'][start], prices['SPY'][end]
if spy_p0 != 0:
ret_spy = (spy_p1 - spy_p0) / spy_p0
if ret_eq is not None:
cum_eq *= (1 + ret_eq)
if ret_full is not None:
cum_full *= (1 + ret_full)
if ret_spy is not None:
cum_spy *= (1 + ret_spy)
dates_str = f"{start} to {end}"
print(f"{q + suffix:<16} {dates_str:<24} "
f"{fmt(ret_eq):>9} {fmt(ret_full):>9} {fmt(ret_spy):>9}")
print("-" * 72)
cum_eq_ret = cum_eq - 1
cum_full_ret = cum_full - 1
cum_spy_ret = cum_spy - 1
dates_str = f"{first_date} to {today}"
print(f"{'Cumulative':<16} {dates_str:<24} "
f"{fmt(cum_eq_ret):>9} {fmt(cum_full_ret):>9} {fmt(cum_spy_ret):>9}")
print()
print("† = partial period (still holding; updates on re-evaluation)")
print("Eq. proxy = stocks plus option rows as linear underlying exposure")
print("Opt. proxy = options sized to 13F notional; returns on deployed capital")
# ── Risk-adjusted returns ──────────────────────────────────────────
daily_close = download_daily(all_tickers, first_date, today)
def daily_returns_from_cumulative(mode, per_period_opt=None):
if daily_close.empty:
return pd.Series(dtype=float)
dates, values = daily_cumulative(
holdings, quarters, filing_dates, daily_close, today, mode,
per_period_opt=per_period_opt)
if not dates:
return pd.Series(dtype=float)
growth = pd.Series(values, index=dates)
return growth.pct_change().dropna()
ret_eq_d = daily_returns_from_cumulative('equity_only')
ret_full_d = daily_returns_from_cumulative(
'full', per_period_opt=per_period_opt)
if 'SPY' in daily_close.columns:
spy_close = daily_close['SPY'].dropna()
spy_period = spy_close[spy_close.index >= pd.Timestamp(first_date)]
ret_spy_d = spy_period.pct_change().dropna()
else:
ret_spy_d = pd.Series(dtype=float)
def sharpe(daily_rets, rf_annual=0.04):
if daily_rets.empty:
return float('nan')
rf_daily = (1 + rf_annual) ** (1 / 252) - 1
excess = daily_rets - rf_daily
if excess.std() == 0 or pd.isna(excess.std()):
return float('nan')
return float(excess.mean() / excess.std() * 252 ** 0.5)
def max_drawdown(daily_rets):
if daily_rets.empty:
return float('nan')
cum = (1 + daily_rets).cumprod()
return float(((cum - cum.cummax()) / cum.cummax()).min() * 100)
print()
print("RISK-ADJUSTED RETURNS")
print("=" * 55)
print(f"{'Metric':<25} {'Eq.proxy':>9} {'Opt.proxy':>9} {'SPY':>9}")
print("-" * 55)
vol_eq = float(ret_eq_d.std() * 252 ** 0.5 * 100)
vol_full = float(ret_full_d.std() * 252 ** 0.5 * 100)
vol_spy = float(ret_spy_d.std() * 252 ** 0.5 * 100)
print(f"{'Ann. volatility':<25} {vol_eq:>8.1f}% {vol_full:>8.1f}% {vol_spy:>8.1f}%")
sh_eq = sharpe(ret_eq_d)
sh_full = sharpe(ret_full_d)
sh_spy = sharpe(ret_spy_d)
print(f"{'Sharpe (rf=4%)':<25} {sh_eq:>9.2f} {sh_full:>9.2f} {sh_spy:>9.2f}")
mdd_eq = max_drawdown(ret_eq_d)
mdd_full = max_drawdown(ret_full_d)
mdd_spy = max_drawdown(ret_spy_d)
print(f"{'Max drawdown':<25} {mdd_eq:>8.1f}% {mdd_full:>8.1f}% {mdd_spy:>8.1f}%")
import json
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
import requests
import time
import os
import warnings
warnings.filterwarnings('ignore')
# Parse data from the scraper block
parsed = json.loads(data) if isinstance(data, str) else data
filings = parsed["filings"]
# Build internal structures
filing_dates = {f["quarter"]: f["filing_date"] for f in filings}
quarter_end_dates = {f["quarter"]: f["quarter_end"] for f in filings}
quarters = [f["quarter"] for f in filings]
# Convert holdings list to dict keyed by quarter.
# Multiple positions in the same ticker with different types are aggregated
# by value per (ticker, type) pair.
holdings = {}
for f in filings:
positions = {}
for h in f["holdings"]:
ticker = h["ticker"]
pos_type = h["type"]
value = h["value"]
key = (ticker, pos_type)
positions[key] = positions.get(key, 0) + value
holdings[f["quarter"]] = positions
def _extract_close_series(df, ticker):
"""Extract a single close-price series from a yfinance result."""
if df.empty:
return pd.Series(dtype=float)
if isinstance(df.columns, pd.MultiIndex):
if 'Close' not in df.columns.get_level_values(0):
return pd.Series(dtype=float)
close = df['Close']
if isinstance(close, pd.DataFrame):
if ticker in close.columns:
series = close[ticker]
elif len(close.columns) == 1:
series = close.iloc[:, 0]
else:
return pd.Series(dtype=float)
else:
series = close
elif 'Close' in df.columns:
series = df['Close']
if isinstance(series, pd.DataFrame):
series = series.iloc[:, 0]
else:
return pd.Series(dtype=float)
return pd.to_numeric(series, errors='coerce').dropna()
def _download_close_series(ticker, start, end):
"""Download one ticker's close series; used to repair flaky batch misses."""
df = yf.download(ticker, start=start, end=end, progress=False,
auto_adjust=True)
return _extract_close_series(df, ticker)
def get_prices(tickers, dates):
"""Fetch close prices for tickers on specific dates."""
unique_tickers = sorted(set(tickers))
all_dates = [datetime.strptime(d, '%Y-%m-%d') for d in dates]
start = min(all_dates) - timedelta(days=5)
end = max(all_dates) + timedelta(days=5)
df = yf.download(unique_tickers, start=start, end=end, progress=False, auto_adjust=True)
# yf.download returns MultiIndex columns (metric, ticker) for multiple tickers
if df.empty:
close = pd.DataFrame()
elif isinstance(df.columns, pd.MultiIndex) and 'Close' in df.columns.get_level_values(0):
close = df['Close'].copy()
elif 'Close' in df.columns:
close = df[['Close']].copy()
close.columns = unique_tickers
else:
close = pd.DataFrame()
prices = {}
for ticker in unique_tickers:
if ticker in close.columns:
series = pd.to_numeric(close[ticker], errors='coerce').dropna()
else:
series = pd.Series(dtype=float)
if series.empty:
series = _download_close_series(ticker, start, end)
if series.empty:
continue
prices[ticker] = {}
for date_str in dates:
target = pd.Timestamp(datetime.strptime(date_str, '%Y-%m-%d'))
after = series[series.index >= target]
if not after.empty:
prices[ticker][date_str] = float(after.iloc[0])
else:
before = series[series.index <= target]
if not before.empty:
prices[ticker][date_str] = float(before.iloc[-1])
return prices
def _price_on_or_after(px_by_date, target_date):
"""Return (date, price) for the first available price on/after target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d >= target_date)
if not dates:
return None
d = dates[0]
return d, px_by_date[d]
def _price_on_or_before(px_by_date, target_date):
"""Return (date, price) for the last available price on/before target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d <= target_date)
if not dates:
return None
d = dates[-1]
return d, px_by_date[d]
def _period_price_pair(px_by_date, start_date, end_date):
"""Return start/end prices for a period using sensible boundary alignment."""
start = _price_on_or_after(px_by_date, start_date)
end = _price_on_or_before(px_by_date, end_date)
if start is None or end is None:
return None
start_actual, p0 = start
end_actual, p1 = end
if end_actual < start_actual:
return None
return start_actual, end_actual, p0, p1
def _option_position_key(ticker, pos_type):
return (ticker, pos_type)
def _linear_underlying_sign(pos_type):
"""Direction when option rows are converted to underlying equity exposure."""
return -1 if pos_type == 'put' else 1
def compute_return(positions, prices, start_date, end_date, mode='equity_only',
option_prices=None):
"""Compute portfolio return between two dates.
The 13F value for an option row is treated as underlying notional, not
option premium. Option contracts are sized from that notional, but the
portfolio denominator is estimated deployed capital: stock value plus option
premium cost. This avoids treating the gap between option notional and
option premium as cash. In 'full' mode, every option row requires a
MarketData price series; missing data raises rather than falling back.
"""
total_cost = 0
portfolio_pnl = 0
for (ticker, pos_type), value in positions.items():
is_option = pos_type in ('call', 'put')
stock_px = prices.get(ticker)
if mode == 'equity_only':
if pos_type not in ('long', 'call', 'put'):
continue
pair = _period_price_pair(stock_px, start_date, end_date)
if pair is None:
continue
start_actual, end_actual, p0, p1 = pair
if p0 == 0:
continue
stock_ret = (p1 - p0) / p0
total_cost += value
portfolio_pnl += value * _linear_underlying_sign(pos_type) * stock_ret
continue
if is_option:
opt_key = _option_position_key(ticker, pos_type)
opt_px = option_prices.get(opt_key) if option_prices else None
if not opt_px:
raise RuntimeError(
f"No MarketData option prices for {opt_key} in period "
f"{start_date}..{end_date}")
pair = _period_price_pair(opt_px, start_date, end_date)
if pair is None:
raise RuntimeError(
f"MarketData option price series for {opt_key} does not "
f"cover {start_date}..{end_date}")
start_actual, end_actual, opt_p0, opt_p1 = pair
stock_start = _price_on_or_after(stock_px, start_actual)
if stock_start is None or stock_start[1] <= 0:
stock_start = _price_on_or_after(stock_px, start_date)
if stock_start is None or stock_start[1] <= 0:
raise RuntimeError(
f"No underlying price for {ticker} at {start_date}")
p0, p1 = opt_p0, opt_p1
underlying_p0 = stock_start[1]
if p0 <= 0 or underlying_p0 <= 0:
continue
position_cost = value * (p0 / underlying_p0)
position_pnl = value * ((p1 - p0) / underlying_p0)
else:
pair = _period_price_pair(stock_px, start_date, end_date)
if pair is None:
continue
start_actual, end_actual, p0, p1 = pair
if p0 == 0:
continue
stock_ret = (p1 - p0) / p0
position_cost = value
position_pnl = value * stock_ret
if position_cost <= 0:
continue
total_cost += position_cost
portfolio_pnl += position_pnl
return portfolio_pnl / total_cost if total_cost else None
def annualize(ret, days):
"""Annualize a return over a given number of calendar days."""
if ret is None or days <= 0:
return None
return (1 + ret) ** (365.25 / days) - 1
def fmt(ret):
return f"{ret * 100:+.2f}%" if ret is not None else "N/A"
# Collect all tickers and dates
all_tickers = set()
for positions in holdings.values():
for (ticker, _) in positions:
all_tickers.add(ticker)
all_tickers.add('SPY')
all_tickers.add('AIS')
today = datetime.now().strftime('%Y-%m-%d')
first_date = filing_dates[quarters[0]]
all_dates = set(filing_dates.values()) | set(quarter_end_dates.values()) | {today}
prices = get_prices(sorted(all_tickers), sorted(all_dates))
# Resolve `today` to the actual last available closing date.
# yfinance may not have data for today (market still open or holiday),
# so we look up what date SPY's price actually corresponds to.
def _resolve_price_date(prices, requested_date):
"""Return the actual trading date of the price stored under requested_date."""
ref = 'SPY' if 'SPY' in prices else next(iter(prices), None)
if not ref or requested_date not in prices[ref]:
return requested_date
target_price = prices[ref][requested_date]
# Re-download a small window to find the real date of this price
start = datetime.strptime(requested_date, '%Y-%m-%d') - timedelta(days=10)
end = datetime.strptime(requested_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(ref, start=start, end=end, progress=False, auto_adjust=True)
if df.empty:
return requested_date
if isinstance(df.columns, pd.MultiIndex):
close = df['Close'][ref].dropna()
elif 'Close' in df.columns:
close = df['Close'].dropna()
else:
close = df.iloc[:, 0].dropna()
for dt, px in close.items():
val = float(px.iloc[0]) if isinstance(px, pd.Series) else float(px)
if abs(val - target_price) < 0.01:
ts = dt[0] if isinstance(dt, tuple) else dt
return pd.Timestamp(ts).strftime('%Y-%m-%d')
return requested_date
today_resolved = _resolve_price_date(prices, today)
if today_resolved != today:
for ticker in prices:
if today in prices[ticker]:
prices[ticker][today_resolved] = prices[ticker].pop(today)
today = today_resolved
def download_daily(tickers, start_date, end_date):
"""Download daily close prices from yfinance, handling MultiIndex.
Dates are 'YYYY-MM-DD' strings. Adds a small buffer for trading-day alignment."""
tickers_sorted = sorted(tickers)
start = datetime.strptime(start_date, '%Y-%m-%d') - timedelta(days=5)
end = datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(tickers_sorted, start=start, end=end,
progress=False, auto_adjust=True)
if df.empty:
close = pd.DataFrame()
elif isinstance(df.columns, pd.MultiIndex) and 'Close' in df.columns.get_level_values(0):
close = df['Close'].copy()
elif 'Close' in df.columns:
close = df[['Close']].copy()
close.columns = tickers_sorted
else:
close = pd.DataFrame()
for ticker in tickers_sorted:
if ticker in close.columns and not close[ticker].dropna().empty:
continue
series = _download_close_series(ticker, start, end)
if not series.empty:
close[ticker] = series
return close.sort_index()
# -- Historical option prices via MarketData --------------------------------
OPTION_CACHE_DIR = os.path.expanduser('~/My Drive/notes/.sa-lp-option-cache')
_MD_BASE = 'https://api.marketdata.app/v1'
_MD_RATE_DELAY = 0.15
OPTION_CACHE_COLUMNS = [
'date', 'selected_on', 'option_type', 'symbol', 'strike', 'expiry',
'delta', 'price']
# Default contract selection parameters. The option proxy picks a contract
# matching option type, with expiry between min_days and max_days of the
# period start, and |delta| closest to delta_target. When the chain is
# sparse, the achieved |delta| may be far from the target; the sensitivity
# block reports achieved |delta| so this is visible rather than silent.
OPTION_DELTA = 0.15
EXPIRY_MIN_DAYS = 270 # ~9 months
EXPIRY_MAX_DAYS = 456 # ~15 months
def _normalize_option_type(option_type):
option_type = str(option_type).lower()
if option_type not in ('call', 'put'):
raise ValueError(f"Unsupported option type: {option_type}")
return option_type
def _empty_option_cache():
return pd.DataFrame(columns=OPTION_CACHE_COLUMNS)
def _option_cache_path(ticker, option_type, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Return the cache CSV path for (ticker, type, delta_target, window).
When the parameter triple equals the baseline (0.15, 270-456 days), the
historical filename ``TICKER-TYPE.csv`` is used so the main-backtest
cache is reused automatically. Any non-baseline combo lives in a
separate ``TICKER-TYPE-d<delta>-e<min>-<max>.csv`` file so a sensitivity
sweep never pollutes the baseline cache (which the portfolio calculator
reads to pick the representative contract for the current filing).
"""
option_type = _normalize_option_type(option_type)
is_baseline = (
abs(delta_target - OPTION_DELTA) < 1e-9
and min_days == EXPIRY_MIN_DAYS
and max_days == EXPIRY_MAX_DAYS)
if is_baseline:
return os.path.join(OPTION_CACHE_DIR, f'{ticker}-{option_type}.csv')
return os.path.join(
OPTION_CACHE_DIR,
f'{ticker}-{option_type}-d{delta_target:g}-e{min_days}-{max_days}.csv')
def _load_option_cache(ticker, option_type, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Load cached MarketData rows for a ticker/type/target/window. Returns DataFrame or empty."""
option_type = _normalize_option_type(option_type)
path = _option_cache_path(ticker, option_type, delta_target,
min_days, max_days)
if not os.path.exists(path):
return _empty_option_cache()
df = pd.read_csv(path)
if df.empty:
return _empty_option_cache()
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
for col in ('date', 'selected_on'):
df[col] = pd.to_datetime(
df[col], errors='coerce').dt.strftime('%Y-%m-%d')
df['option_type'] = df['option_type'].fillna(option_type).str.lower()
cache = df[OPTION_CACHE_COLUMNS].copy()
cache = cache[cache['option_type'] == option_type].copy()
cache.dropna(subset=['date'], inplace=True)
for col in ('strike', 'delta', 'price'):
cache[col] = pd.to_numeric(cache[col], errors='coerce')
cache.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
return cache[OPTION_CACHE_COLUMNS]
def _save_option_cache(ticker, option_type, df, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Persist typed option cache to CSV."""
option_type = _normalize_option_type(option_type)
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
path = _option_cache_path(ticker, option_type, delta_target,
min_days, max_days)
if df.empty:
df = _empty_option_cache()
else:
df = df.copy()
df['option_type'] = option_type
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
df.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
df.sort_values(['date', 'expiry', 'strike'], inplace=True)
df.to_csv(path, index=False)
def _contract_window(ref_date_str, min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
ref = datetime.strptime(ref_date_str, '%Y-%m-%d')
return ref + timedelta(days=min_days), ref + timedelta(days=max_days)
def _contract_from_cache_row(row, ref_date_str, option_type,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
option_type = _normalize_option_type(option_type)
if str(row.get('option_type', option_type)).lower() != option_type:
return None
lo, hi = _contract_window(ref_date_str, min_days, max_days)
try:
exp = datetime.strptime(str(row['expiry']), '%Y-%m-%d')
except (KeyError, TypeError, ValueError):
return None
if not (lo <= exp <= hi):
return None
strike = _safe_float(row.get('strike'))
delta = _safe_float(row.get('delta'))
price = _safe_float(row.get('price'))
if strike is None or delta is None or price is None or price <= 0:
return None
return {
'selected_on': row.get('selected_on'),
'option_type': option_type,
'symbol': row.get('symbol'),
'strike': strike,
'expiry': str(row['expiry']),
'delta': delta,
'price': price,
}
def _select_cached_contract(cache, option_type, ref_date_str,
delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS,
require_selected=False):
rows = cache[(cache['date'] == ref_date_str)
& (cache['option_type'] == option_type)]
selected_rows = rows[rows['selected_on'] == ref_date_str]
if not selected_rows.empty:
rows = selected_rows
elif require_selected:
rows = selected_rows
candidates = []
for _, row in rows.iterrows():
contract = _contract_from_cache_row(row, ref_date_str, option_type,
min_days, max_days)
if contract:
candidates.append(contract)
if not candidates:
return None
candidates.sort(key=lambda x: abs(abs(x['delta']) - delta_target))
return candidates[0]
def _parse_option_price(contract):
"""Extract a mark price from an option contract record."""
mid = _safe_float(contract.get('mid'))
if mid and mid > 0:
return mid
bid = _safe_float(contract.get('bid'))
ask = _safe_float(contract.get('ask'))
last = _safe_float(contract.get('last'))
if bid and ask and bid > 0 and ask > 0:
return (bid + ask) / 2
if last and last > 0:
return last
return None
def _safe_float(val):
try:
out = float(val)
if np.isnan(out):
return None
return out
except (TypeError, ValueError):
return None
def _marketdata_key():
"""Return the MarketData API key, or None if unavailable.
Resolution order:
1. ``MARKETDATA_KEY`` / ``MARKETDATA_API_KEY`` environment variables.
2. ``pass env/marketdata-token`` (local ``pass`` store).
The result is memoised on the function object so repeated lookups
during a sweep do not reshell. Fetch helpers raise themselves when
called without a key, so a fully cached run still succeeds without
requiring either source.
"""
if hasattr(_marketdata_key, '_cached'):
return _marketdata_key._cached
key = (os.environ.get('MARKETDATA_KEY', '')
or os.environ.get('MARKETDATA_API_KEY', ''))
if not key:
try:
import subprocess
out = subprocess.run(
['pass', 'show', 'env/marketdata-token'],
capture_output=True, text=True, timeout=5, check=False)
if out.returncode == 0:
key = out.stdout.strip().splitlines()[0] if out.stdout else ''
except (FileNotFoundError, subprocess.TimeoutExpired):
key = ''
_marketdata_key._cached = key or None
return _marketdata_key._cached
def _marketdata_get(path, params, api_key):
"""Fetch a MarketData endpoint, returning normalized row dictionaries.
Raises on HTTP errors or a non-'ok' status. 'no_data' is returned as
an empty list so that callers can distinguish 'nothing available' from
'request failed'.
"""
headers = {'Accept': 'application/json', 'Authorization': f'Bearer {api_key}'}
resp = requests.get(_MD_BASE + path, params=params, headers=headers,
timeout=30)
resp.raise_for_status()
body = resp.json()
status = body.get('s')
if status == 'no_data':
return []
if status != 'ok':
raise RuntimeError(
f"MarketData {path} returned status={status!r}: "
f"{body.get('errmsg') or body}")
lengths = [len(v) for v in body.values() if isinstance(v, list)]
n = max(lengths) if lengths else 0
rows = []
for i in range(n):
row = {}
for key, val in body.items():
if isinstance(val, list):
row[key] = val[i] if i < len(val) else None
else:
row[key] = val
rows.append(row)
return rows
def _marketdata_date(timestamp):
try:
return datetime.utcfromtimestamp(int(timestamp)).strftime('%Y-%m-%d')
except (TypeError, ValueError, OSError):
return None
def _occ_symbol(ticker, option_type, strike, expiry):
"""Build a standard OCC option symbol from contract fields."""
cp = 'C' if _normalize_option_type(option_type) == 'call' else 'P'
exp = datetime.strptime(str(expiry), '%Y-%m-%d').strftime('%y%m%d')
strike_int = int(round(float(strike) * 1000))
root = ticker.upper().replace('.', '')
return f'{root}{exp}{cp}{strike_int:08d}'
# Chains are always fetched with a broad expiry window so they can be cached
# and reused for in-memory selection across any (delta_target, expiry window)
# combination in the sensitivity sweep.
CHAIN_FETCH_MIN_DAYS = 30
CHAIN_FETCH_MAX_DAYS = 760
def _fetch_marketdata_chain(ticker, date_str, option_type, api_key,
min_days=CHAIN_FETCH_MIN_DAYS,
max_days=CHAIN_FETCH_MAX_DAYS):
lo, hi = _contract_window(date_str, min_days, max_days)
params = {
'date': date_str,
'from': lo.strftime('%Y-%m-%d'),
'to': hi.strftime('%Y-%m-%d'),
'side': _normalize_option_type(option_type),
'expiration': 'all',
}
return _marketdata_get(f'/options/chain/{ticker}/', params, api_key)
# Chain cache: one CSV per (ticker, type, date) storing the broad-window chain.
# Lets the sensitivity sweep re-select contracts for different delta targets
# and expiry windows without refetching.
CHAIN_CACHE_DIR = os.path.join(OPTION_CACHE_DIR, 'chains')
def _chain_cache_path(ticker, option_type, date_str):
option_type = _normalize_option_type(option_type)
return os.path.join(CHAIN_CACHE_DIR,
f'{ticker}-{option_type}-{date_str}.csv')
def _load_chain_cache(ticker, option_type, date_str):
path = _chain_cache_path(ticker, option_type, date_str)
if not os.path.exists(path):
return None
df = pd.read_csv(path)
if df.empty:
return []
return df.to_dict('records')
def _save_chain_cache(ticker, option_type, date_str, chain):
if not chain:
return
os.makedirs(CHAIN_CACHE_DIR, exist_ok=True)
path = _chain_cache_path(ticker, option_type, date_str)
pd.DataFrame(chain).to_csv(path, index=False)
def _get_or_fetch_chain(ticker, date_str, option_type, api_key,
fetched_counter=None):
"""Return the cached broad chain for (ticker, type, date), fetching if absent.
Requires ``api_key`` only when a fetch is actually needed.
"""
chain = _load_chain_cache(ticker, option_type, date_str)
if chain is not None:
return chain
if not api_key:
raise RuntimeError(
"MARKETDATA_KEY is not set but a chain fetch is required for "
f"{ticker} {option_type} on {date_str}.")
time.sleep(_MD_RATE_DELAY)
chain = _fetch_marketdata_chain(ticker, date_str, option_type, api_key)
if fetched_counter is not None:
fetched_counter['marketdata_chains'] += 1
_save_chain_cache(ticker, option_type, date_str, chain)
return chain
def _fetch_marketdata_quotes(symbol, start_date, end_date, api_key):
to_date = (datetime.strptime(end_date, '%Y-%m-%d')
+ timedelta(days=1)).strftime('%Y-%m-%d')
rows = _marketdata_get(f'/options/quotes/{symbol}/',
{'from': start_date, 'to': to_date}, api_key)
prices = {}
for row in rows:
date_str = _marketdata_date(row.get('updated'))
if not date_str:
continue
price = _parse_option_price(row)
if price is not None and price > 0:
prices[date_str] = price
return prices
def _implied_vol_from_price(S, K, T, option_price, option_type):
"""Infer Black-Scholes volatility from an observed option mid price."""
if any(x is None for x in (S, K, T, option_price)):
return None
if S <= 0 or K <= 0 or T <= 0 or option_price <= 0:
return None
intrinsic = max(S - K, 0) if option_type == 'call' else max(K - S, 0)
upper = S if option_type == 'call' else K
if option_price < intrinsic - 1e-6 or option_price > upper * 1.5:
return None
lo, hi = 1e-4, 5.0
try:
if (option_price < bs_price(S, K, T, lo, option_type) - 1e-4
or option_price > bs_price(S, K, T, hi, option_type) + 1e-4):
return None
for _ in range(80):
mid = (lo + hi) / 2
if bs_price(S, K, T, mid, option_type) < option_price:
lo = mid
else:
hi = mid
return (lo + hi) / 2
except (FloatingPointError, ValueError, ZeroDivisionError):
return None
def _marketdata_delta(row, ref_date_str, expiry, option_type, price):
"""Use vendor delta when present; otherwise infer it from the quote."""
native = _safe_float(row.get('delta'))
if native is not None and native != 0:
return native
S = _safe_float(row.get('underlyingPrice'))
K = _safe_float(row.get('strike'))
ref = datetime.strptime(ref_date_str, '%Y-%m-%d')
exp = datetime.strptime(expiry, '%Y-%m-%d')
T = max((exp - ref).days / 365.25, 1e-6)
sigma = _safe_float(row.get('iv'))
if sigma is None or sigma <= 0:
sigma = _implied_vol_from_price(S, K, T, price, option_type)
if S is None or K is None or sigma is None or sigma <= 0:
return None
return bs_delta(S, K, T, sigma, option_type)
def _select_marketdata_contract(chain, ref_date_str, option_type,
delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
option_type = _normalize_option_type(option_type)
lo, hi = _contract_window(ref_date_str, min_days, max_days)
candidates = []
for c in chain:
if str(c.get('side', '')).lower() != option_type:
continue
expiry = _marketdata_date(c.get('expiration'))
if not expiry:
continue
exp = datetime.strptime(expiry, '%Y-%m-%d')
if not (lo <= exp <= hi):
continue
price = _parse_option_price(c)
if price is None or price <= 0:
continue
delta = _marketdata_delta(c, ref_date_str, expiry, option_type, price)
if delta is None or delta == 0:
continue
strike = _safe_float(c.get('strike'))
symbol = c.get('optionSymbol')
if strike is None or not symbol:
continue
candidates.append({
'option_type': option_type,
'symbol': symbol,
'strike': strike,
'expiry': expiry,
'delta': delta,
'price': price,
})
if not candidates:
return None
candidates.sort(key=lambda x: abs(abs(x['delta']) - delta_target))
return candidates[0]
def download_option_prices(option_positions, quarters, holdings, filing_dates,
today, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
"""Download historical representative option prices from MarketData.
MarketData is the sole supported provider. MARKETDATA_KEY must be set.
For each (ticker, option_type) and each filing period in which that
position is held:
1. On the first trading day, select a contract matching type, with
expiry between ``min_days`` and ``max_days`` of the period start, and
|delta| closest to ``delta_target``. MarketData's Starter plan often
returns null Greeks, so delta is inferred from the observed mid price
via Black-Scholes when the vendor delta is missing.
2. Lock in that contract for the period.
3. Track its historical mid price through the period.
The broad option chain for each (ticker, type, first_day) is cached to
disk so that sensitivity sweeps over (delta_target, expiry window) reuse
a single fetch.
Raises ``RuntimeError`` if no suitable contract can be selected for any
required (ticker, type, period), or if MarketData returns no price series
for the selected contract.
Parameters
----------
delta_target : float
Target |delta| for contract selection (default ``OPTION_DELTA``).
min_days, max_days : int
Contract expiry window in days from period start (default 270-456,
i.e. 9-15 months).
Returns
-------
per_period : dict {quarter_str: {(ticker, type): {date_str: float}}}
Option prices keyed by filing period then option position. Each
period has its own contract's prices.
"""
option_positions = sorted({
(ticker, _normalize_option_type(pos_type))
for ticker, pos_type in option_positions})
md_key = _marketdata_key()
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
per_period = {} # {q: {(ticker, type): {date_str: price}}}
fetched = {'marketdata_chains': 0, 'marketdata_quotes': 0}
for ticker, option_type in option_positions:
opt_key = _option_position_key(ticker, option_type)
cache = _load_option_cache(ticker, option_type, delta_target,
min_days, max_days)
new_rows = []
for i, q in enumerate(quarters):
# Skip quarters where this exact option position is absent.
if opt_key not in holdings[q]:
continue
period_start = filing_dates[q]
period_end = (filing_dates[quarters[i + 1]]
if i < len(quarters) - 1 else today)
trading_days = pd.bdate_range(period_start, period_end)
if len(trading_days) == 0:
continue
first_day = trading_days[0].strftime('%Y-%m-%d')
# -- Select contract on first trading day --
contract = _select_cached_contract(
cache, option_type, first_day,
delta_target=delta_target,
min_days=min_days, max_days=max_days,
require_selected=True)
if contract is None:
chain = _get_or_fetch_chain(
ticker, first_day, option_type, md_key, fetched)
contract = _select_marketdata_contract(
chain, first_day, option_type,
delta_target=delta_target,
min_days=min_days, max_days=max_days)
if contract is None:
raise RuntimeError(
f"MarketData returned no usable {option_type} contract "
f"for {ticker} on {first_day} (period {q}) at "
f"delta={delta_target}, "
f"expiry {min_days}-{max_days}d")
new_rows.append({
'date': first_day,
'selected_on': first_day,
'option_type': option_type,
'symbol': contract.get('symbol'),
'strike': contract['strike'],
'expiry': contract['expiry'],
'delta': contract['delta'],
'price': contract['price'],
})
strike = contract['strike']
expiry = contract['expiry']
symbol = contract.get('symbol') or _occ_symbol(
ticker, option_type, strike, expiry)
# -- Collect prices for this period (fresh dict per period) --
period_prices = {}
# Fast path: read matching prices from cache.
rows = cache[
(cache['date'] >= period_start)
& (cache['date'] <= period_end)
& (cache['option_type'] == option_type)
& (abs(cache['strike'] - strike) < 0.01)
& (cache['expiry'].astype(str) == str(expiry))
& pd.notna(cache['price'])]
selected_rows = rows[rows['selected_on'] == first_day]
if not selected_rows.empty:
rows = selected_rows
for _, row in rows.iterrows():
period_prices[row['date']] = float(row['price'])
# Decide whether to refresh quotes. With a key, refresh whenever
# the cached series does not reach period_end. Without a key,
# only fail if the cached series is empty; a slightly stale
# tail is acceptable for cache-only runs (e.g. sensitivity
# sweeps replaying the baseline contract).
has_partial = bool(period_prices)
reaches_end = has_partial and max(period_prices) >= period_end
if md_key and not reaches_end:
time.sleep(_MD_RATE_DELAY)
quote_prices = _fetch_marketdata_quotes(
symbol, period_start, period_end, md_key)
fetched['marketdata_quotes'] += 1
for day_str, price in quote_prices.items():
if period_start <= day_str <= period_end:
period_prices[day_str] = price
new_rows.append({
'date': day_str,
'selected_on': first_day,
'option_type': option_type,
'symbol': symbol,
'strike': strike,
'expiry': expiry,
'delta': contract['delta'],
'price': price,
})
if contract.get('price') and first_day not in period_prices:
period_prices[first_day] = contract['price']
elif not md_key and not has_partial:
raise RuntimeError(
"MARKETDATA_KEY is not set and no cached quotes exist "
f"for {symbol} in {period_start}..{period_end}.")
if not period_prices:
raise RuntimeError(
f"MarketData returned no quotes for {symbol} "
f"({opt_key}) in {period_start}..{period_end}")
per_period.setdefault(q, {})[opt_key] = period_prices
# Persist new data to cache
if new_rows:
new_df = pd.DataFrame(new_rows)
cache = pd.concat([cache, new_df], ignore_index=True)
cache.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
_save_option_cache(ticker, option_type, cache, delta_target,
min_days, max_days)
if any(fetched.values()):
import sys
parts = []
if fetched['marketdata_chains']:
parts.append(f"{fetched['marketdata_chains']} MarketData chains")
if fetched['marketdata_quotes']:
parts.append(f"{fetched['marketdata_quotes']} MarketData quote series")
print(f"[options] Fetched {', '.join(parts)}", file=sys.stderr)
return per_period
# -- Black-Scholes helpers (used only to infer delta when MarketData's
# Starter-plan historical Greeks are null; never to reprice returns) -----
from scipy.stats import norm as _norm
def bs_price(S, K, T, sigma, option_type='call'):
"""Black-Scholes option price (assumes zero risk-free rate and dividends)."""
if T <= 0 or sigma <= 0:
if option_type == 'call':
return max(S - K, 0)
return max(K - S, 0)
d1 = (np.log(S / K) + (sigma ** 2 / 2) * T) / (sigma * np.sqrt(T))
d2 = d1 - sigma * np.sqrt(T)
if option_type == 'call':
return S * _norm.cdf(d1) - K * _norm.cdf(d2)
return K * _norm.cdf(-d2) - S * _norm.cdf(-d1)
def bs_delta(S, K, T, sigma, option_type='call'):
"""Black-Scholes delta (assumes zero risk-free rate and dividends)."""
if T <= 0 or sigma <= 0:
if option_type == 'call':
return 1.0 if S > K else 0.0
return -1.0 if S < K else 0.0
d1 = (np.log(S / K) + (sigma ** 2 / 2) * T) / (sigma * np.sqrt(T))
if option_type == 'call':
return _norm.cdf(d1)
return _norm.cdf(d1) - 1
def daily_cumulative(holdings, quarters, filing_dates, close, today, mode,
per_period_opt=None):
"""Build a daily series of cumulative growth factors for a given mode.
For each filing period, stock shares and option contracts are fixed. In
equity-proxy mode, option rows are converted to linear underlying exposure:
calls are long underlying and puts are short underlying. In option-proxy
mode, option rows are sized by 13F underlying notional and returns come
from MarketData quotes; returns are divided by deployed capital (stock
value plus option premium cost). Option-proxy mode raises if MarketData
prices are missing for any required position.
"""
cum_growth = 1.0
dates_out = []
values_out = []
for i, q in enumerate(quarters):
period_start = filing_dates[q]
period_end = filing_dates[quarters[i + 1]] if i < len(quarters) - 1 else today
ps = pd.Timestamp(period_start)
pe = pd.Timestamp(period_end)
# Trading days in this period
mask = (close.index >= ps) & (close.index <= pe)
period_close = close[mask]
if period_close.empty:
continue
# Option prices for this period (keyed by (ticker, type) → prices)
quarter_opt = per_period_opt.get(q, {}) if per_period_opt else {}
# Determine starting prices, fixed exposure, and deployed capital.
positions = holdings[q]
exposure = {}
costs = {}
start_prices = {}
start_underlying = {}
use_opt_px = {} # track which positions use option prices
total_cost = 0
for (ticker, pos_type), value in positions.items():
is_option = pos_type in ('call', 'put')
opt_key = _option_position_key(ticker, pos_type)
if mode == 'equity_only':
if pos_type not in ('long', 'call', 'put'):
continue
if ticker not in close.columns:
continue
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
continue
stock_start = float(avail.iloc[0])
if stock_start <= 0:
continue
start_prices[(ticker, pos_type)] = stock_start
start_underlying[(ticker, pos_type)] = stock_start
costs[(ticker, pos_type)] = value
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = False
total_cost += value
continue
# mode == 'full' (option proxy)
if is_option:
if opt_key not in quarter_opt:
raise RuntimeError(
f"No MarketData option prices for {opt_key} in "
f"period {q}")
ticker_opt = quarter_opt[opt_key]
opt_dates = sorted(d for d in ticker_opt if d >= period_start)
if not opt_dates:
raise RuntimeError(
f"MarketData option prices for {opt_key} in period "
f"{q} contain no dates at or after {period_start}")
if ticker not in close.columns:
raise RuntimeError(
f"No underlying close series for {ticker}")
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
raise RuntimeError(
f"No underlying price for {ticker} at {period_start}")
opt_start = ticker_opt[opt_dates[0]]
underlying_start = float(avail.iloc[0])
if opt_start <= 0 or underlying_start <= 0:
raise RuntimeError(
f"Non-positive starting price for {opt_key} in "
f"period {q}")
start_prices[(ticker, pos_type)] = opt_start
start_underlying[(ticker, pos_type)] = underlying_start
costs[(ticker, pos_type)] = value * opt_start / underlying_start
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = True
total_cost += costs[(ticker, pos_type)]
continue
# Plain stock in full mode
if ticker not in close.columns:
continue
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
continue
stock_start = float(avail.iloc[0])
if stock_start <= 0:
continue
start_prices[(ticker, pos_type)] = stock_start
start_underlying[(ticker, pos_type)] = stock_start
costs[(ticker, pos_type)] = value
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = False
total_cost += value
if total_cost == 0:
continue
# Daily P&L relative to period start.
# Skip first day of subsequent periods (already recorded as last day
# of the prior period) to avoid duplicate boundary dates.
start_idx = 1 if i > 0 else 0
# Forward-fill: track last known option price so that gaps in
# option data don't cause positions to vanish mid-period.
last_opt = {k: v for k, v in start_prices.items()
if use_opt_px.get(k)}
for day_idx in range(start_idx, len(period_close)):
day = period_close.index[day_idx]
day_str = day.strftime('%Y-%m-%d')
period_pnl = 0
for (ticker, pos_type), value in exposure.items():
p0 = start_prices[(ticker, pos_type)]
if p0 == 0:
continue
if use_opt_px[(ticker, pos_type)]:
opt_key = _option_position_key(ticker, pos_type)
p1_val = quarter_opt.get(opt_key, {}).get(day_str)
if p1_val is not None:
last_opt[(ticker, pos_type)] = p1_val
else:
p1_val = last_opt.get((ticker, pos_type))
if p1_val is None:
continue
underlying_p0 = start_underlying.get((ticker, pos_type))
if not underlying_p0 or underlying_p0 <= 0:
continue
position_pnl = value * (float(p1_val) - p0) / underlying_p0
else:
if ticker not in period_close.columns:
continue
p1_val = period_close[ticker].iloc[day_idx]
if pd.isna(p1_val):
continue
stock_ret = (float(p1_val) - p0) / p0
if mode == 'equity_only':
position_pnl = (
value * _linear_underlying_sign(pos_type) * stock_ret)
else:
position_pnl = value * stock_ret
period_pnl += position_pnl
dates_out.append(day)
values_out.append(cum_growth * (1 + period_pnl / total_cost))
# Chain: next period starts from the last day's growth factor
if values_out:
cum_growth = values_out[-1]
return dates_out, values_out
import plotly.graph_objects as go
HUGO_BASE = os.path.expanduser('~/My Drive/repos/stafforini.com')
# ── Fetch daily prices ────────────────────────────────────────────
close = download_daily(all_tickers, first_date, today)
dates_eq, vals_eq = daily_cumulative(
holdings, quarters, filing_dates, close, today, 'equity_only')
# ── Option proxy with representative notional-matched options ───────
option_positions = sorted({
(t, pt)
for q in quarters
for (t, pt) in holdings[q]
if pt in ('call', 'put')
})
per_period_opt = download_option_prices(
option_positions, quarters, holdings, filing_dates, today)
dates_full, vals_full = daily_cumulative(
holdings, quarters, filing_dates, close, today, 'full',
per_period_opt=per_period_opt)
# ── Compute SPY benchmark ─────────────────────────────────────────
spy_series = close['SPY'].dropna()
spy_start = spy_series[spy_series.index >= pd.Timestamp(first_date)]
if not spy_start.empty:
spy_p0 = float(spy_start.iloc[0])
spy_dates = spy_start.index.tolist()
spy_vals = [float(p) / spy_p0 for p in spy_start.values]
else:
spy_dates, spy_vals = [], []
# ── Plot with Plotly ───────────────────────────────────────────────
eq_pct = [round((v - 1) * 100, 1) for v in vals_eq]
full_pct = [round((v - 1) * 100, 1) for v in vals_full]
spy_pct = [round((v - 1) * 100, 1) for v in spy_vals]
fig = go.Figure()
fig.add_trace(go.Scatter(
x=dates_eq, y=eq_pct, mode='lines',
name='Equity proxy',
line=dict(color='#2563eb', width=2)))
fig.add_trace(go.Scatter(
x=dates_full, y=full_pct, mode='lines',
name='Option proxy',
line=dict(color='#dc2626', width=2)))
fig.add_trace(go.Scatter(
x=spy_dates, y=spy_pct, mode='lines',
name='S&P 500 (SPY)',
line=dict(color='#16a34a', width=2, dash='dot')))
# Vertical lines at filing dates (rebalancing points)
for fd in filing_dates.values():
fig.add_vline(x=fd, line=dict(color='gray', width=0.5), opacity=0.4)
fig.add_hline(y=0, line=dict(color='gray', width=0.8))
fig.update_layout(
title=dict(text='SA LP copycat: cumulative returns',
font=dict(size=15)),
yaxis=dict(title='Cumulative return', hoverformat='+.1f',
ticksuffix='%'),
hovermode='x unified',
xaxis=dict(spikemode='across', spikethickness=0.5,
spikedash='solid', spikecolor='gray'),
template='plotly_white',
legend=dict(x=0.02, y=0.98, bgcolor='rgba(255,255,255,0.8)'),
margin=dict(l=60, r=20, t=50, b=40),
height=500,
)
# ── Generate HTML with dark-mode support ──────────────────────────
import re
chart_html = fig.to_html(full_html=False, include_plotlyjs='cdn',
config={'responsive': True, 'displayModeBar': False})
div_id = re.search(r'id="([^"]+)"', chart_html).group(1)
dark_script = """
<script>
(function() {
var gd = document.getElementById('%s');
function isDark() {
try { return parent.document.documentElement.getAttribute('data-theme') === 'dark'; }
catch(e) { return window.matchMedia('(prefers-color-scheme: dark)').matches; }
}
function apply() {
var dk = isDark();
Plotly.relayout(gd, {
paper_bgcolor: 'rgba(0,0,0,0)',
plot_bgcolor: dk ? 'rgba(30,30,30,0.5)' : 'rgba(255,255,255,0.8)',
font: {color: dk ? '#d4d4d4' : '#333'},
'title.font.color': dk ? '#d4d4d4' : '#333',
'xaxis.gridcolor': dk ? 'rgba(255,255,255,0.1)' : 'rgba(0,0,0,0.1)',
'yaxis.gridcolor': dk ? 'rgba(255,255,255,0.1)' : 'rgba(0,0,0,0.1)',
'legend.bgcolor': dk ? 'rgba(30,30,30,0.8)' : 'rgba(255,255,255,0.8)',
'legend.font.color': dk ? '#d4d4d4' : '#333',
});
}
apply();
new MutationObserver(function() { apply(); }).observe(
parent.document.documentElement, {attributes: true, attributeFilter: ['data-theme']});
})();
</script>""" % div_id
outpath = os.path.join(HUGO_BASE, 'static', 'images', 'sa-lp-returns.html')
with open(outpath, 'w') as f:
f.write('<!DOCTYPE html>\n<html>\n<head><meta charset="utf-8">\n'
'<meta http-equiv="Cache-Control" content="no-cache, no-store, must-revalidate">\n'
'<style>body { margin: 0; background: transparent; }</style>\n'
'</head>\n<body>\n' + chart_html + dark_script +
'\n</body>\n</html>')
COPYCAT STRATEGY RETURNS
========================================================================
Period Dates Eq. proxy Opt. proxy SPY
------------------------------------------------------------------------
Q4_2024 2025-02-12 to 2025-05-14 -14.73% -14.73% -2.32%
Q1_2025 2025-05-14 to 2025-08-14 +24.14% +35.28% +10.09%
Q2_2025 2025-08-14 to 2025-11-14 +16.45% +22.37% +4.47%
Q3_2025 2025-11-14 to 2026-02-11 +14.54% +19.94% +3.29%
Q4_2025 † 2026-02-11 to 2026-05-04 +51.40% +54.97% +4.05%
------------------------------------------------------------------------
Cumulative 2025-02-12 to 2026-05-04 +113.77% +162.39% +20.73%
† = partial period (still holding; updates on re-evaluation)
Eq. proxy = stocks plus option rows as linear underlying exposure
Opt. proxy = options sized to 13F notional; returns on deployed capital
RISK-ADJUSTED RETURNS
=======================================================
Metric Eq.proxy Opt.proxy SPY
-------------------------------------------------------
Ann. volatility 52.4% 61.4% 18.7%
Sharpe (rf=4%) 1.38 1.54 0.71
Max drawdown -45.8% -45.8% -18.8%
Understanding the equity and option proxies requires a brief excursus into how 13F reports options. Special Instruction 10 to Form 13F directs filers to give entries for option positions “in terms of the securities underlying the options, not the options themselves.” In particular, the reported dollar value is the number of underlying shares controlled by the option multiplied by the underlying’s closing price on the last day of the quarter. It is not the premium the fund paid or the option’s own market value. A filing showing \(N\) of INTC calls is therefore roughly \(N\) of INTC exposure held through calls, not \(N\) of capital spent on call premiums. (I know many people, including a past version of me, who misunderstand this point, and ended up taking a far more levered position as a consequence.)
The inflation this introduces is easier to see with numbers. Imagine a fund whose 13F reports three positions:
| Row | 13F value | 13F % |
|---|---|---|
| INTC stock | $100M | 50% |
| INTC calls | $50M | 25% |
| NVDA stock | $50M | 25% |
| Total | $200M | 100% |
The $50M on the calls is underlying notional, not premium. If the representative out-of-the-money 0.15-delta call trades at roughly 5% of spot, the premium actually paid on that position is around $2.5M. Recomputed on deployed capital instead, the picture shifts drastically:
| Row | Capital | Capital % |
|---|---|---|
| INTC stock | $100M | 65.6% |
| INTC calls | $2.5M | 1.6% |
| NVDA stock | $50M | 32.8% |
| Total | $152.5M | 100% |
The INTC calls look like 25% of the portfolio in the 13F but consume only ~1.6% of the fund’s deployed capital. The two proxies handle this mismatch differently.
The equity proxy converts every row into linear exposure to the underlying, sized by the reported dollar value: long stock for calls, short stock for puts. The denominator is the sum of reported values. This proxy makes no assumption about the missing option details: it simply asks what the disclosed directional bets would have earned if executed as vanilla equities.
The option proxy mode tries to preserve the option-like payoff shape. A call is not just levered stock and a put is not just a short: options can express a view about tail size, volatility, or downside capped at the premium. A low-delta call, for instance, pays off on a large move while risking only the premium.
Since, as noted above, the filings reveal the underlying but not the actual options contract, the proxy picks a deliberately narrow representative contract for each option row: same type (call or put), expiring 9–15 months out, with absolute delta closest to 0.15. This is not an estimate of the fund’s actual strike or expiry. Rather, it is an attempt to preserve the qualitative thesis: out-of-the-money optionality and convex exposure to large moves.
With the contract fixed, the position is sized from the 13F underlying notional. If a filing reports \(N\) of INTC underlying notional and INTC starts the period at \(S_0\), the proxy holds approximately \(N / (100 \cdot S_0)\) contracts (one contract covers 100 shares). The contract’s daily mid price is then pulled from MarketData.app and used to compute the period’s option P&L.3
Note that the option proxy divides the period’s total P&L by the capital actually deployed (stock market value plus option premium paid) not by the sum of reported 13F values. Return to the three-position example and suppose that, over the period of interest, INTC stock gains 10% (+$10M), the INTC calls roughly double (+$2.5M), and NVDA gains 20% (+$10M), for a total P&L of $22.5M. On deployed capital, that is a $22.5M / $152.5M = 14.75% return; on the 13F total, the return is instead $22.5M / $200M = 11.25%. The latter incorrectly dilutes the return by treating the $47.5M gap between notional and premium as if it were cash sitting idle.
Sensitivity to contract choice
The option proxy has two free parameters that control which contract stands in for each disclosed option row: the target absolute delta (0.15 in the main backtest) and the expiry window (9–15 months). Because the fund’s actual contracts are undisclosed, neither choice is constrained by evidence. The question is then how much the proxy’s reported performance depends on these choices. If the cumulative return swings dramatically across plausible alternatives, the option proxy is really a family of proxies and the headline figure should be read as one point in a wide band. If it barely moves, the default choice becomes more defensible.
The block below reruns the option proxy under (a) six different delta targets at the baseline 9–15 month expiry window, and (b) four different expiry windows at the baseline |delta|=0.15. The sweep extends down to |delta|=0.05 and out to a 12–24 month expiry window, spanning as much of the deeper-OTM, longer-dated end of the LEAPs spectrum as the available chain data supports.
Code
import json
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
import requests
import time
import os
import warnings
warnings.filterwarnings('ignore')
# Parse data from the scraper block
parsed = json.loads(data) if isinstance(data, str) else data
filings = parsed["filings"]
# Build internal structures
filing_dates = {f["quarter"]: f["filing_date"] for f in filings}
quarter_end_dates = {f["quarter"]: f["quarter_end"] for f in filings}
quarters = [f["quarter"] for f in filings]
# Convert holdings list to dict keyed by quarter.
# Multiple positions in the same ticker with different types are aggregated
# by value per (ticker, type) pair.
holdings = {}
for f in filings:
positions = {}
for h in f["holdings"]:
ticker = h["ticker"]
pos_type = h["type"]
value = h["value"]
key = (ticker, pos_type)
positions[key] = positions.get(key, 0) + value
holdings[f["quarter"]] = positions
def _extract_close_series(df, ticker):
"""Extract a single close-price series from a yfinance result."""
if df.empty:
return pd.Series(dtype=float)
if isinstance(df.columns, pd.MultiIndex):
if 'Close' not in df.columns.get_level_values(0):
return pd.Series(dtype=float)
close = df['Close']
if isinstance(close, pd.DataFrame):
if ticker in close.columns:
series = close[ticker]
elif len(close.columns) == 1:
series = close.iloc[:, 0]
else:
return pd.Series(dtype=float)
else:
series = close
elif 'Close' in df.columns:
series = df['Close']
if isinstance(series, pd.DataFrame):
series = series.iloc[:, 0]
else:
return pd.Series(dtype=float)
return pd.to_numeric(series, errors='coerce').dropna()
def _download_close_series(ticker, start, end):
"""Download one ticker's close series; used to repair flaky batch misses."""
df = yf.download(ticker, start=start, end=end, progress=False,
auto_adjust=True)
return _extract_close_series(df, ticker)
def get_prices(tickers, dates):
"""Fetch close prices for tickers on specific dates."""
unique_tickers = sorted(set(tickers))
all_dates = [datetime.strptime(d, '%Y-%m-%d') for d in dates]
start = min(all_dates) - timedelta(days=5)
end = max(all_dates) + timedelta(days=5)
df = yf.download(unique_tickers, start=start, end=end, progress=False, auto_adjust=True)
# yf.download returns MultiIndex columns (metric, ticker) for multiple tickers
if df.empty:
close = pd.DataFrame()
elif isinstance(df.columns, pd.MultiIndex) and 'Close' in df.columns.get_level_values(0):
close = df['Close'].copy()
elif 'Close' in df.columns:
close = df[['Close']].copy()
close.columns = unique_tickers
else:
close = pd.DataFrame()
prices = {}
for ticker in unique_tickers:
if ticker in close.columns:
series = pd.to_numeric(close[ticker], errors='coerce').dropna()
else:
series = pd.Series(dtype=float)
if series.empty:
series = _download_close_series(ticker, start, end)
if series.empty:
continue
prices[ticker] = {}
for date_str in dates:
target = pd.Timestamp(datetime.strptime(date_str, '%Y-%m-%d'))
after = series[series.index >= target]
if not after.empty:
prices[ticker][date_str] = float(after.iloc[0])
else:
before = series[series.index <= target]
if not before.empty:
prices[ticker][date_str] = float(before.iloc[-1])
return prices
def _price_on_or_after(px_by_date, target_date):
"""Return (date, price) for the first available price on/after target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d >= target_date)
if not dates:
return None
d = dates[0]
return d, px_by_date[d]
def _price_on_or_before(px_by_date, target_date):
"""Return (date, price) for the last available price on/before target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d <= target_date)
if not dates:
return None
d = dates[-1]
return d, px_by_date[d]
def _period_price_pair(px_by_date, start_date, end_date):
"""Return start/end prices for a period using sensible boundary alignment."""
start = _price_on_or_after(px_by_date, start_date)
end = _price_on_or_before(px_by_date, end_date)
if start is None or end is None:
return None
start_actual, p0 = start
end_actual, p1 = end
if end_actual < start_actual:
return None
return start_actual, end_actual, p0, p1
def _option_position_key(ticker, pos_type):
return (ticker, pos_type)
def _linear_underlying_sign(pos_type):
"""Direction when option rows are converted to underlying equity exposure."""
return -1 if pos_type == 'put' else 1
def compute_return(positions, prices, start_date, end_date, mode='equity_only',
option_prices=None):
"""Compute portfolio return between two dates.
The 13F value for an option row is treated as underlying notional, not
option premium. Option contracts are sized from that notional, but the
portfolio denominator is estimated deployed capital: stock value plus option
premium cost. This avoids treating the gap between option notional and
option premium as cash. In 'full' mode, every option row requires a
MarketData price series; missing data raises rather than falling back.
"""
total_cost = 0
portfolio_pnl = 0
for (ticker, pos_type), value in positions.items():
is_option = pos_type in ('call', 'put')
stock_px = prices.get(ticker)
if mode == 'equity_only':
if pos_type not in ('long', 'call', 'put'):
continue
pair = _period_price_pair(stock_px, start_date, end_date)
if pair is None:
continue
start_actual, end_actual, p0, p1 = pair
if p0 == 0:
continue
stock_ret = (p1 - p0) / p0
total_cost += value
portfolio_pnl += value * _linear_underlying_sign(pos_type) * stock_ret
continue
if is_option:
opt_key = _option_position_key(ticker, pos_type)
opt_px = option_prices.get(opt_key) if option_prices else None
if not opt_px:
raise RuntimeError(
f"No MarketData option prices for {opt_key} in period "
f"{start_date}..{end_date}")
pair = _period_price_pair(opt_px, start_date, end_date)
if pair is None:
raise RuntimeError(
f"MarketData option price series for {opt_key} does not "
f"cover {start_date}..{end_date}")
start_actual, end_actual, opt_p0, opt_p1 = pair
stock_start = _price_on_or_after(stock_px, start_actual)
if stock_start is None or stock_start[1] <= 0:
stock_start = _price_on_or_after(stock_px, start_date)
if stock_start is None or stock_start[1] <= 0:
raise RuntimeError(
f"No underlying price for {ticker} at {start_date}")
p0, p1 = opt_p0, opt_p1
underlying_p0 = stock_start[1]
if p0 <= 0 or underlying_p0 <= 0:
continue
position_cost = value * (p0 / underlying_p0)
position_pnl = value * ((p1 - p0) / underlying_p0)
else:
pair = _period_price_pair(stock_px, start_date, end_date)
if pair is None:
continue
start_actual, end_actual, p0, p1 = pair
if p0 == 0:
continue
stock_ret = (p1 - p0) / p0
position_cost = value
position_pnl = value * stock_ret
if position_cost <= 0:
continue
total_cost += position_cost
portfolio_pnl += position_pnl
return portfolio_pnl / total_cost if total_cost else None
def annualize(ret, days):
"""Annualize a return over a given number of calendar days."""
if ret is None or days <= 0:
return None
return (1 + ret) ** (365.25 / days) - 1
def fmt(ret):
return f"{ret * 100:+.2f}%" if ret is not None else "N/A"
# Collect all tickers and dates
all_tickers = set()
for positions in holdings.values():
for (ticker, _) in positions:
all_tickers.add(ticker)
all_tickers.add('SPY')
all_tickers.add('AIS')
today = datetime.now().strftime('%Y-%m-%d')
first_date = filing_dates[quarters[0]]
all_dates = set(filing_dates.values()) | set(quarter_end_dates.values()) | {today}
prices = get_prices(sorted(all_tickers), sorted(all_dates))
# Resolve `today` to the actual last available closing date.
# yfinance may not have data for today (market still open or holiday),
# so we look up what date SPY's price actually corresponds to.
def _resolve_price_date(prices, requested_date):
"""Return the actual trading date of the price stored under requested_date."""
ref = 'SPY' if 'SPY' in prices else next(iter(prices), None)
if not ref or requested_date not in prices[ref]:
return requested_date
target_price = prices[ref][requested_date]
# Re-download a small window to find the real date of this price
start = datetime.strptime(requested_date, '%Y-%m-%d') - timedelta(days=10)
end = datetime.strptime(requested_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(ref, start=start, end=end, progress=False, auto_adjust=True)
if df.empty:
return requested_date
if isinstance(df.columns, pd.MultiIndex):
close = df['Close'][ref].dropna()
elif 'Close' in df.columns:
close = df['Close'].dropna()
else:
close = df.iloc[:, 0].dropna()
for dt, px in close.items():
val = float(px.iloc[0]) if isinstance(px, pd.Series) else float(px)
if abs(val - target_price) < 0.01:
ts = dt[0] if isinstance(dt, tuple) else dt
return pd.Timestamp(ts).strftime('%Y-%m-%d')
return requested_date
today_resolved = _resolve_price_date(prices, today)
if today_resolved != today:
for ticker in prices:
if today in prices[ticker]:
prices[ticker][today_resolved] = prices[ticker].pop(today)
today = today_resolved
def download_daily(tickers, start_date, end_date):
"""Download daily close prices from yfinance, handling MultiIndex.
Dates are 'YYYY-MM-DD' strings. Adds a small buffer for trading-day alignment."""
tickers_sorted = sorted(tickers)
start = datetime.strptime(start_date, '%Y-%m-%d') - timedelta(days=5)
end = datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(tickers_sorted, start=start, end=end,
progress=False, auto_adjust=True)
if df.empty:
close = pd.DataFrame()
elif isinstance(df.columns, pd.MultiIndex) and 'Close' in df.columns.get_level_values(0):
close = df['Close'].copy()
elif 'Close' in df.columns:
close = df[['Close']].copy()
close.columns = tickers_sorted
else:
close = pd.DataFrame()
for ticker in tickers_sorted:
if ticker in close.columns and not close[ticker].dropna().empty:
continue
series = _download_close_series(ticker, start, end)
if not series.empty:
close[ticker] = series
return close.sort_index()
# -- Historical option prices via MarketData --------------------------------
OPTION_CACHE_DIR = os.path.expanduser('~/My Drive/notes/.sa-lp-option-cache')
_MD_BASE = 'https://api.marketdata.app/v1'
_MD_RATE_DELAY = 0.15
OPTION_CACHE_COLUMNS = [
'date', 'selected_on', 'option_type', 'symbol', 'strike', 'expiry',
'delta', 'price']
# Default contract selection parameters. The option proxy picks a contract
# matching option type, with expiry between min_days and max_days of the
# period start, and |delta| closest to delta_target. When the chain is
# sparse, the achieved |delta| may be far from the target; the sensitivity
# block reports achieved |delta| so this is visible rather than silent.
OPTION_DELTA = 0.15
EXPIRY_MIN_DAYS = 270 # ~9 months
EXPIRY_MAX_DAYS = 456 # ~15 months
def _normalize_option_type(option_type):
option_type = str(option_type).lower()
if option_type not in ('call', 'put'):
raise ValueError(f"Unsupported option type: {option_type}")
return option_type
def _empty_option_cache():
return pd.DataFrame(columns=OPTION_CACHE_COLUMNS)
def _option_cache_path(ticker, option_type, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Return the cache CSV path for (ticker, type, delta_target, window).
When the parameter triple equals the baseline (0.15, 270-456 days), the
historical filename ``TICKER-TYPE.csv`` is used so the main-backtest
cache is reused automatically. Any non-baseline combo lives in a
separate ``TICKER-TYPE-d<delta>-e<min>-<max>.csv`` file so a sensitivity
sweep never pollutes the baseline cache (which the portfolio calculator
reads to pick the representative contract for the current filing).
"""
option_type = _normalize_option_type(option_type)
is_baseline = (
abs(delta_target - OPTION_DELTA) < 1e-9
and min_days == EXPIRY_MIN_DAYS
and max_days == EXPIRY_MAX_DAYS)
if is_baseline:
return os.path.join(OPTION_CACHE_DIR, f'{ticker}-{option_type}.csv')
return os.path.join(
OPTION_CACHE_DIR,
f'{ticker}-{option_type}-d{delta_target:g}-e{min_days}-{max_days}.csv')
def _load_option_cache(ticker, option_type, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Load cached MarketData rows for a ticker/type/target/window. Returns DataFrame or empty."""
option_type = _normalize_option_type(option_type)
path = _option_cache_path(ticker, option_type, delta_target,
min_days, max_days)
if not os.path.exists(path):
return _empty_option_cache()
df = pd.read_csv(path)
if df.empty:
return _empty_option_cache()
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
for col in ('date', 'selected_on'):
df[col] = pd.to_datetime(
df[col], errors='coerce').dt.strftime('%Y-%m-%d')
df['option_type'] = df['option_type'].fillna(option_type).str.lower()
cache = df[OPTION_CACHE_COLUMNS].copy()
cache = cache[cache['option_type'] == option_type].copy()
cache.dropna(subset=['date'], inplace=True)
for col in ('strike', 'delta', 'price'):
cache[col] = pd.to_numeric(cache[col], errors='coerce')
cache.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
return cache[OPTION_CACHE_COLUMNS]
def _save_option_cache(ticker, option_type, df, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Persist typed option cache to CSV."""
option_type = _normalize_option_type(option_type)
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
path = _option_cache_path(ticker, option_type, delta_target,
min_days, max_days)
if df.empty:
df = _empty_option_cache()
else:
df = df.copy()
df['option_type'] = option_type
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
df.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
df.sort_values(['date', 'expiry', 'strike'], inplace=True)
df.to_csv(path, index=False)
def _contract_window(ref_date_str, min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
ref = datetime.strptime(ref_date_str, '%Y-%m-%d')
return ref + timedelta(days=min_days), ref + timedelta(days=max_days)
def _contract_from_cache_row(row, ref_date_str, option_type,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
option_type = _normalize_option_type(option_type)
if str(row.get('option_type', option_type)).lower() != option_type:
return None
lo, hi = _contract_window(ref_date_str, min_days, max_days)
try:
exp = datetime.strptime(str(row['expiry']), '%Y-%m-%d')
except (KeyError, TypeError, ValueError):
return None
if not (lo <= exp <= hi):
return None
strike = _safe_float(row.get('strike'))
delta = _safe_float(row.get('delta'))
price = _safe_float(row.get('price'))
if strike is None or delta is None or price is None or price <= 0:
return None
return {
'selected_on': row.get('selected_on'),
'option_type': option_type,
'symbol': row.get('symbol'),
'strike': strike,
'expiry': str(row['expiry']),
'delta': delta,
'price': price,
}
def _select_cached_contract(cache, option_type, ref_date_str,
delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS,
require_selected=False):
rows = cache[(cache['date'] == ref_date_str)
& (cache['option_type'] == option_type)]
selected_rows = rows[rows['selected_on'] == ref_date_str]
if not selected_rows.empty:
rows = selected_rows
elif require_selected:
rows = selected_rows
candidates = []
for _, row in rows.iterrows():
contract = _contract_from_cache_row(row, ref_date_str, option_type,
min_days, max_days)
if contract:
candidates.append(contract)
if not candidates:
return None
candidates.sort(key=lambda x: abs(abs(x['delta']) - delta_target))
return candidates[0]
def _parse_option_price(contract):
"""Extract a mark price from an option contract record."""
mid = _safe_float(contract.get('mid'))
if mid and mid > 0:
return mid
bid = _safe_float(contract.get('bid'))
ask = _safe_float(contract.get('ask'))
last = _safe_float(contract.get('last'))
if bid and ask and bid > 0 and ask > 0:
return (bid + ask) / 2
if last and last > 0:
return last
return None
def _safe_float(val):
try:
out = float(val)
if np.isnan(out):
return None
return out
except (TypeError, ValueError):
return None
def _marketdata_key():
"""Return the MarketData API key, or None if unavailable.
Resolution order:
1. ``MARKETDATA_KEY`` / ``MARKETDATA_API_KEY`` environment variables.
2. ``pass env/marketdata-token`` (local ``pass`` store).
The result is memoised on the function object so repeated lookups
during a sweep do not reshell. Fetch helpers raise themselves when
called without a key, so a fully cached run still succeeds without
requiring either source.
"""
if hasattr(_marketdata_key, '_cached'):
return _marketdata_key._cached
key = (os.environ.get('MARKETDATA_KEY', '')
or os.environ.get('MARKETDATA_API_KEY', ''))
if not key:
try:
import subprocess
out = subprocess.run(
['pass', 'show', 'env/marketdata-token'],
capture_output=True, text=True, timeout=5, check=False)
if out.returncode == 0:
key = out.stdout.strip().splitlines()[0] if out.stdout else ''
except (FileNotFoundError, subprocess.TimeoutExpired):
key = ''
_marketdata_key._cached = key or None
return _marketdata_key._cached
def _marketdata_get(path, params, api_key):
"""Fetch a MarketData endpoint, returning normalized row dictionaries.
Raises on HTTP errors or a non-'ok' status. 'no_data' is returned as
an empty list so that callers can distinguish 'nothing available' from
'request failed'.
"""
headers = {'Accept': 'application/json', 'Authorization': f'Bearer {api_key}'}
resp = requests.get(_MD_BASE + path, params=params, headers=headers,
timeout=30)
resp.raise_for_status()
body = resp.json()
status = body.get('s')
if status == 'no_data':
return []
if status != 'ok':
raise RuntimeError(
f"MarketData {path} returned status={status!r}: "
f"{body.get('errmsg') or body}")
lengths = [len(v) for v in body.values() if isinstance(v, list)]
n = max(lengths) if lengths else 0
rows = []
for i in range(n):
row = {}
for key, val in body.items():
if isinstance(val, list):
row[key] = val[i] if i < len(val) else None
else:
row[key] = val
rows.append(row)
return rows
def _marketdata_date(timestamp):
try:
return datetime.utcfromtimestamp(int(timestamp)).strftime('%Y-%m-%d')
except (TypeError, ValueError, OSError):
return None
def _occ_symbol(ticker, option_type, strike, expiry):
"""Build a standard OCC option symbol from contract fields."""
cp = 'C' if _normalize_option_type(option_type) == 'call' else 'P'
exp = datetime.strptime(str(expiry), '%Y-%m-%d').strftime('%y%m%d')
strike_int = int(round(float(strike) * 1000))
root = ticker.upper().replace('.', '')
return f'{root}{exp}{cp}{strike_int:08d}'
# Chains are always fetched with a broad expiry window so they can be cached
# and reused for in-memory selection across any (delta_target, expiry window)
# combination in the sensitivity sweep.
CHAIN_FETCH_MIN_DAYS = 30
CHAIN_FETCH_MAX_DAYS = 760
def _fetch_marketdata_chain(ticker, date_str, option_type, api_key,
min_days=CHAIN_FETCH_MIN_DAYS,
max_days=CHAIN_FETCH_MAX_DAYS):
lo, hi = _contract_window(date_str, min_days, max_days)
params = {
'date': date_str,
'from': lo.strftime('%Y-%m-%d'),
'to': hi.strftime('%Y-%m-%d'),
'side': _normalize_option_type(option_type),
'expiration': 'all',
}
return _marketdata_get(f'/options/chain/{ticker}/', params, api_key)
# Chain cache: one CSV per (ticker, type, date) storing the broad-window chain.
# Lets the sensitivity sweep re-select contracts for different delta targets
# and expiry windows without refetching.
CHAIN_CACHE_DIR = os.path.join(OPTION_CACHE_DIR, 'chains')
def _chain_cache_path(ticker, option_type, date_str):
option_type = _normalize_option_type(option_type)
return os.path.join(CHAIN_CACHE_DIR,
f'{ticker}-{option_type}-{date_str}.csv')
def _load_chain_cache(ticker, option_type, date_str):
path = _chain_cache_path(ticker, option_type, date_str)
if not os.path.exists(path):
return None
df = pd.read_csv(path)
if df.empty:
return []
return df.to_dict('records')
def _save_chain_cache(ticker, option_type, date_str, chain):
if not chain:
return
os.makedirs(CHAIN_CACHE_DIR, exist_ok=True)
path = _chain_cache_path(ticker, option_type, date_str)
pd.DataFrame(chain).to_csv(path, index=False)
def _get_or_fetch_chain(ticker, date_str, option_type, api_key,
fetched_counter=None):
"""Return the cached broad chain for (ticker, type, date), fetching if absent.
Requires ``api_key`` only when a fetch is actually needed.
"""
chain = _load_chain_cache(ticker, option_type, date_str)
if chain is not None:
return chain
if not api_key:
raise RuntimeError(
"MARKETDATA_KEY is not set but a chain fetch is required for "
f"{ticker} {option_type} on {date_str}.")
time.sleep(_MD_RATE_DELAY)
chain = _fetch_marketdata_chain(ticker, date_str, option_type, api_key)
if fetched_counter is not None:
fetched_counter['marketdata_chains'] += 1
_save_chain_cache(ticker, option_type, date_str, chain)
return chain
def _fetch_marketdata_quotes(symbol, start_date, end_date, api_key):
to_date = (datetime.strptime(end_date, '%Y-%m-%d')
+ timedelta(days=1)).strftime('%Y-%m-%d')
rows = _marketdata_get(f'/options/quotes/{symbol}/',
{'from': start_date, 'to': to_date}, api_key)
prices = {}
for row in rows:
date_str = _marketdata_date(row.get('updated'))
if not date_str:
continue
price = _parse_option_price(row)
if price is not None and price > 0:
prices[date_str] = price
return prices
def _implied_vol_from_price(S, K, T, option_price, option_type):
"""Infer Black-Scholes volatility from an observed option mid price."""
if any(x is None for x in (S, K, T, option_price)):
return None
if S <= 0 or K <= 0 or T <= 0 or option_price <= 0:
return None
intrinsic = max(S - K, 0) if option_type == 'call' else max(K - S, 0)
upper = S if option_type == 'call' else K
if option_price < intrinsic - 1e-6 or option_price > upper * 1.5:
return None
lo, hi = 1e-4, 5.0
try:
if (option_price < bs_price(S, K, T, lo, option_type) - 1e-4
or option_price > bs_price(S, K, T, hi, option_type) + 1e-4):
return None
for _ in range(80):
mid = (lo + hi) / 2
if bs_price(S, K, T, mid, option_type) < option_price:
lo = mid
else:
hi = mid
return (lo + hi) / 2
except (FloatingPointError, ValueError, ZeroDivisionError):
return None
def _marketdata_delta(row, ref_date_str, expiry, option_type, price):
"""Use vendor delta when present; otherwise infer it from the quote."""
native = _safe_float(row.get('delta'))
if native is not None and native != 0:
return native
S = _safe_float(row.get('underlyingPrice'))
K = _safe_float(row.get('strike'))
ref = datetime.strptime(ref_date_str, '%Y-%m-%d')
exp = datetime.strptime(expiry, '%Y-%m-%d')
T = max((exp - ref).days / 365.25, 1e-6)
sigma = _safe_float(row.get('iv'))
if sigma is None or sigma <= 0:
sigma = _implied_vol_from_price(S, K, T, price, option_type)
if S is None or K is None or sigma is None or sigma <= 0:
return None
return bs_delta(S, K, T, sigma, option_type)
def _select_marketdata_contract(chain, ref_date_str, option_type,
delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
option_type = _normalize_option_type(option_type)
lo, hi = _contract_window(ref_date_str, min_days, max_days)
candidates = []
for c in chain:
if str(c.get('side', '')).lower() != option_type:
continue
expiry = _marketdata_date(c.get('expiration'))
if not expiry:
continue
exp = datetime.strptime(expiry, '%Y-%m-%d')
if not (lo <= exp <= hi):
continue
price = _parse_option_price(c)
if price is None or price <= 0:
continue
delta = _marketdata_delta(c, ref_date_str, expiry, option_type, price)
if delta is None or delta == 0:
continue
strike = _safe_float(c.get('strike'))
symbol = c.get('optionSymbol')
if strike is None or not symbol:
continue
candidates.append({
'option_type': option_type,
'symbol': symbol,
'strike': strike,
'expiry': expiry,
'delta': delta,
'price': price,
})
if not candidates:
return None
candidates.sort(key=lambda x: abs(abs(x['delta']) - delta_target))
return candidates[0]
def download_option_prices(option_positions, quarters, holdings, filing_dates,
today, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
"""Download historical representative option prices from MarketData.
MarketData is the sole supported provider. MARKETDATA_KEY must be set.
For each (ticker, option_type) and each filing period in which that
position is held:
1. On the first trading day, select a contract matching type, with
expiry between ``min_days`` and ``max_days`` of the period start, and
|delta| closest to ``delta_target``. MarketData's Starter plan often
returns null Greeks, so delta is inferred from the observed mid price
via Black-Scholes when the vendor delta is missing.
2. Lock in that contract for the period.
3. Track its historical mid price through the period.
The broad option chain for each (ticker, type, first_day) is cached to
disk so that sensitivity sweeps over (delta_target, expiry window) reuse
a single fetch.
Raises ``RuntimeError`` if no suitable contract can be selected for any
required (ticker, type, period), or if MarketData returns no price series
for the selected contract.
Parameters
----------
delta_target : float
Target |delta| for contract selection (default ``OPTION_DELTA``).
min_days, max_days : int
Contract expiry window in days from period start (default 270-456,
i.e. 9-15 months).
Returns
-------
per_period : dict {quarter_str: {(ticker, type): {date_str: float}}}
Option prices keyed by filing period then option position. Each
period has its own contract's prices.
"""
option_positions = sorted({
(ticker, _normalize_option_type(pos_type))
for ticker, pos_type in option_positions})
md_key = _marketdata_key()
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
per_period = {} # {q: {(ticker, type): {date_str: price}}}
fetched = {'marketdata_chains': 0, 'marketdata_quotes': 0}
for ticker, option_type in option_positions:
opt_key = _option_position_key(ticker, option_type)
cache = _load_option_cache(ticker, option_type, delta_target,
min_days, max_days)
new_rows = []
for i, q in enumerate(quarters):
# Skip quarters where this exact option position is absent.
if opt_key not in holdings[q]:
continue
period_start = filing_dates[q]
period_end = (filing_dates[quarters[i + 1]]
if i < len(quarters) - 1 else today)
trading_days = pd.bdate_range(period_start, period_end)
if len(trading_days) == 0:
continue
first_day = trading_days[0].strftime('%Y-%m-%d')
# -- Select contract on first trading day --
contract = _select_cached_contract(
cache, option_type, first_day,
delta_target=delta_target,
min_days=min_days, max_days=max_days,
require_selected=True)
if contract is None:
chain = _get_or_fetch_chain(
ticker, first_day, option_type, md_key, fetched)
contract = _select_marketdata_contract(
chain, first_day, option_type,
delta_target=delta_target,
min_days=min_days, max_days=max_days)
if contract is None:
raise RuntimeError(
f"MarketData returned no usable {option_type} contract "
f"for {ticker} on {first_day} (period {q}) at "
f"delta={delta_target}, "
f"expiry {min_days}-{max_days}d")
new_rows.append({
'date': first_day,
'selected_on': first_day,
'option_type': option_type,
'symbol': contract.get('symbol'),
'strike': contract['strike'],
'expiry': contract['expiry'],
'delta': contract['delta'],
'price': contract['price'],
})
strike = contract['strike']
expiry = contract['expiry']
symbol = contract.get('symbol') or _occ_symbol(
ticker, option_type, strike, expiry)
# -- Collect prices for this period (fresh dict per period) --
period_prices = {}
# Fast path: read matching prices from cache.
rows = cache[
(cache['date'] >= period_start)
& (cache['date'] <= period_end)
& (cache['option_type'] == option_type)
& (abs(cache['strike'] - strike) < 0.01)
& (cache['expiry'].astype(str) == str(expiry))
& pd.notna(cache['price'])]
selected_rows = rows[rows['selected_on'] == first_day]
if not selected_rows.empty:
rows = selected_rows
for _, row in rows.iterrows():
period_prices[row['date']] = float(row['price'])
# Decide whether to refresh quotes. With a key, refresh whenever
# the cached series does not reach period_end. Without a key,
# only fail if the cached series is empty; a slightly stale
# tail is acceptable for cache-only runs (e.g. sensitivity
# sweeps replaying the baseline contract).
has_partial = bool(period_prices)
reaches_end = has_partial and max(period_prices) >= period_end
if md_key and not reaches_end:
time.sleep(_MD_RATE_DELAY)
quote_prices = _fetch_marketdata_quotes(
symbol, period_start, period_end, md_key)
fetched['marketdata_quotes'] += 1
for day_str, price in quote_prices.items():
if period_start <= day_str <= period_end:
period_prices[day_str] = price
new_rows.append({
'date': day_str,
'selected_on': first_day,
'option_type': option_type,
'symbol': symbol,
'strike': strike,
'expiry': expiry,
'delta': contract['delta'],
'price': price,
})
if contract.get('price') and first_day not in period_prices:
period_prices[first_day] = contract['price']
elif not md_key and not has_partial:
raise RuntimeError(
"MARKETDATA_KEY is not set and no cached quotes exist "
f"for {symbol} in {period_start}..{period_end}.")
if not period_prices:
raise RuntimeError(
f"MarketData returned no quotes for {symbol} "
f"({opt_key}) in {period_start}..{period_end}")
per_period.setdefault(q, {})[opt_key] = period_prices
# Persist new data to cache
if new_rows:
new_df = pd.DataFrame(new_rows)
cache = pd.concat([cache, new_df], ignore_index=True)
cache.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
_save_option_cache(ticker, option_type, cache, delta_target,
min_days, max_days)
if any(fetched.values()):
import sys
parts = []
if fetched['marketdata_chains']:
parts.append(f"{fetched['marketdata_chains']} MarketData chains")
if fetched['marketdata_quotes']:
parts.append(f"{fetched['marketdata_quotes']} MarketData quote series")
print(f"[options] Fetched {', '.join(parts)}", file=sys.stderr)
return per_period
# -- Black-Scholes helpers (used only to infer delta when MarketData's
# Starter-plan historical Greeks are null; never to reprice returns) -----
from scipy.stats import norm as _norm
def bs_price(S, K, T, sigma, option_type='call'):
"""Black-Scholes option price (assumes zero risk-free rate and dividends)."""
if T <= 0 or sigma <= 0:
if option_type == 'call':
return max(S - K, 0)
return max(K - S, 0)
d1 = (np.log(S / K) + (sigma ** 2 / 2) * T) / (sigma * np.sqrt(T))
d2 = d1 - sigma * np.sqrt(T)
if option_type == 'call':
return S * _norm.cdf(d1) - K * _norm.cdf(d2)
return K * _norm.cdf(-d2) - S * _norm.cdf(-d1)
def bs_delta(S, K, T, sigma, option_type='call'):
"""Black-Scholes delta (assumes zero risk-free rate and dividends)."""
if T <= 0 or sigma <= 0:
if option_type == 'call':
return 1.0 if S > K else 0.0
return -1.0 if S < K else 0.0
d1 = (np.log(S / K) + (sigma ** 2 / 2) * T) / (sigma * np.sqrt(T))
if option_type == 'call':
return _norm.cdf(d1)
return _norm.cdf(d1) - 1
def daily_cumulative(holdings, quarters, filing_dates, close, today, mode,
per_period_opt=None):
"""Build a daily series of cumulative growth factors for a given mode.
For each filing period, stock shares and option contracts are fixed. In
equity-proxy mode, option rows are converted to linear underlying exposure:
calls are long underlying and puts are short underlying. In option-proxy
mode, option rows are sized by 13F underlying notional and returns come
from MarketData quotes; returns are divided by deployed capital (stock
value plus option premium cost). Option-proxy mode raises if MarketData
prices are missing for any required position.
"""
cum_growth = 1.0
dates_out = []
values_out = []
for i, q in enumerate(quarters):
period_start = filing_dates[q]
period_end = filing_dates[quarters[i + 1]] if i < len(quarters) - 1 else today
ps = pd.Timestamp(period_start)
pe = pd.Timestamp(period_end)
# Trading days in this period
mask = (close.index >= ps) & (close.index <= pe)
period_close = close[mask]
if period_close.empty:
continue
# Option prices for this period (keyed by (ticker, type) → prices)
quarter_opt = per_period_opt.get(q, {}) if per_period_opt else {}
# Determine starting prices, fixed exposure, and deployed capital.
positions = holdings[q]
exposure = {}
costs = {}
start_prices = {}
start_underlying = {}
use_opt_px = {} # track which positions use option prices
total_cost = 0
for (ticker, pos_type), value in positions.items():
is_option = pos_type in ('call', 'put')
opt_key = _option_position_key(ticker, pos_type)
if mode == 'equity_only':
if pos_type not in ('long', 'call', 'put'):
continue
if ticker not in close.columns:
continue
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
continue
stock_start = float(avail.iloc[0])
if stock_start <= 0:
continue
start_prices[(ticker, pos_type)] = stock_start
start_underlying[(ticker, pos_type)] = stock_start
costs[(ticker, pos_type)] = value
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = False
total_cost += value
continue
# mode == 'full' (option proxy)
if is_option:
if opt_key not in quarter_opt:
raise RuntimeError(
f"No MarketData option prices for {opt_key} in "
f"period {q}")
ticker_opt = quarter_opt[opt_key]
opt_dates = sorted(d for d in ticker_opt if d >= period_start)
if not opt_dates:
raise RuntimeError(
f"MarketData option prices for {opt_key} in period "
f"{q} contain no dates at or after {period_start}")
if ticker not in close.columns:
raise RuntimeError(
f"No underlying close series for {ticker}")
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
raise RuntimeError(
f"No underlying price for {ticker} at {period_start}")
opt_start = ticker_opt[opt_dates[0]]
underlying_start = float(avail.iloc[0])
if opt_start <= 0 or underlying_start <= 0:
raise RuntimeError(
f"Non-positive starting price for {opt_key} in "
f"period {q}")
start_prices[(ticker, pos_type)] = opt_start
start_underlying[(ticker, pos_type)] = underlying_start
costs[(ticker, pos_type)] = value * opt_start / underlying_start
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = True
total_cost += costs[(ticker, pos_type)]
continue
# Plain stock in full mode
if ticker not in close.columns:
continue
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
continue
stock_start = float(avail.iloc[0])
if stock_start <= 0:
continue
start_prices[(ticker, pos_type)] = stock_start
start_underlying[(ticker, pos_type)] = stock_start
costs[(ticker, pos_type)] = value
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = False
total_cost += value
if total_cost == 0:
continue
# Daily P&L relative to period start.
# Skip first day of subsequent periods (already recorded as last day
# of the prior period) to avoid duplicate boundary dates.
start_idx = 1 if i > 0 else 0
# Forward-fill: track last known option price so that gaps in
# option data don't cause positions to vanish mid-period.
last_opt = {k: v for k, v in start_prices.items()
if use_opt_px.get(k)}
for day_idx in range(start_idx, len(period_close)):
day = period_close.index[day_idx]
day_str = day.strftime('%Y-%m-%d')
period_pnl = 0
for (ticker, pos_type), value in exposure.items():
p0 = start_prices[(ticker, pos_type)]
if p0 == 0:
continue
if use_opt_px[(ticker, pos_type)]:
opt_key = _option_position_key(ticker, pos_type)
p1_val = quarter_opt.get(opt_key, {}).get(day_str)
if p1_val is not None:
last_opt[(ticker, pos_type)] = p1_val
else:
p1_val = last_opt.get((ticker, pos_type))
if p1_val is None:
continue
underlying_p0 = start_underlying.get((ticker, pos_type))
if not underlying_p0 or underlying_p0 <= 0:
continue
position_pnl = value * (float(p1_val) - p0) / underlying_p0
else:
if ticker not in period_close.columns:
continue
p1_val = period_close[ticker].iloc[day_idx]
if pd.isna(p1_val):
continue
stock_ret = (float(p1_val) - p0) / p0
if mode == 'equity_only':
position_pnl = (
value * _linear_underlying_sign(pos_type) * stock_ret)
else:
position_pnl = value * stock_ret
period_pnl += position_pnl
dates_out.append(day)
values_out.append(cum_growth * (1 + period_pnl / total_cost))
# Chain: next period starts from the last day's growth factor
if values_out:
cum_growth = values_out[-1]
return dates_out, values_out
# -- Setup (shared across combos) ------------------------------------
option_positions = sorted({
(t, pt)
for q in quarters
for (t, pt) in holdings[q]
if pt in ('call', 'put')
})
daily_close = download_daily(all_tickers, first_date, today)
def _sharpe(daily_rets, rf_annual=0.04):
if daily_rets.empty:
return float('nan')
rf_daily = (1 + rf_annual) ** (1 / 252) - 1
excess = daily_rets - rf_daily
if excess.std() == 0 or pd.isna(excess.std()):
return float('nan')
return float(excess.mean() / excess.std() * 252 ** 0.5)
def _max_drawdown(daily_rets):
if daily_rets.empty:
return float('nan')
cum = (1 + daily_rets).cumprod()
return float(((cum - cum.cummax()) / cum.cummax()).min() * 100)
def _mean_achieved_delta(delta_target, min_days, max_days):
"""Mean |delta| of contracts selected under a given (target, window),
across all (ticker, type, period) positions in that sweep's cache."""
deltas = []
for q in quarters:
for (ticker, pos_type) in holdings[q]:
if pos_type not in ('call', 'put'):
continue
cache = _load_option_cache(ticker, pos_type, delta_target,
min_days, max_days)
if cache.empty:
continue
rows = cache[(cache['selected_on'] == filing_dates[q])
& (cache['option_type'] == pos_type)]
if rows.empty:
continue
d = rows.iloc[0]['delta']
if pd.notna(d):
deltas.append(abs(float(d)))
if not deltas:
return float('nan')
return sum(deltas) / len(deltas)
def backtest_combo(delta_target, min_days, max_days):
"""Run the option-proxy backtest under one contract-selection rule."""
per_period_opt = download_option_prices(
option_positions, quarters, holdings, filing_dates, today,
delta_target=delta_target,
min_days=min_days, max_days=max_days)
cum = 1.0
for i, q in enumerate(quarters):
start = filing_dates[q]
end = filing_dates[quarters[i + 1]] if i < len(quarters) - 1 else today
ret = compute_return(holdings[q], prices, start, end, 'full',
option_prices=per_period_opt.get(q, {}))
if ret is not None:
cum *= (1 + ret)
achieved = _mean_achieved_delta(delta_target, min_days, max_days)
if daily_close.empty:
return {'cum_ret': cum - 1, 'vol': float('nan'),
'sharpe': float('nan'), 'max_dd': float('nan'),
'achieved': achieved}
dates, values = daily_cumulative(
holdings, quarters, filing_dates, daily_close, today, 'full',
per_period_opt=per_period_opt)
if not dates:
return {'cum_ret': cum - 1, 'vol': float('nan'),
'sharpe': float('nan'), 'max_dd': float('nan'),
'achieved': achieved}
growth = pd.Series(values, index=dates)
daily_rets = growth.pct_change().dropna()
return {
'cum_ret': cum - 1,
'vol': float(daily_rets.std() * 252 ** 0.5 * 100),
'sharpe': _sharpe(daily_rets),
'max_dd': _max_drawdown(daily_rets),
'achieved': achieved,
}
def _safe_combo(label, delta_target, min_days, max_days):
try:
return backtest_combo(delta_target, min_days, max_days)
except RuntimeError as e:
import sys
print(f"[{label}] {e}", file=sys.stderr)
return {'cum_ret': None, 'vol': float('nan'),
'sharpe': float('nan'), 'max_dd': float('nan'),
'achieved': float('nan'), 'error': str(e)}
def _print_row(label, r, baseline):
flag = " *" if baseline else " "
cum = fmt(r['cum_ret']) if r['cum_ret'] is not None else "err"
vol = f"{r['vol']:>8.1f}%" if r['vol'] == r['vol'] else " N/A"
sh = f"{r['sharpe']:>9.2f}" if r['sharpe'] == r['sharpe'] else " N/A"
mdd = f"{r['max_dd']:>8.1f}%" if r['max_dd'] == r['max_dd'] else " N/A"
ach = f"{r['achieved']:>7.2f}" if r.get('achieved') == r.get('achieved') else " N/A"
print(f"{label:<10}{flag}{cum:>10} {vol:>10} {sh:>10} {mdd:>10} {ach:>8}")
# -- Delta sweep (9-15 month expiry) ---------------------------------
DELTAS = [0.05, 0.10, 0.15, 0.25, 0.40, 0.50]
BASELINE_DELTA = OPTION_DELTA
BASELINE_EXPIRY = (EXPIRY_MIN_DAYS, EXPIRY_MAX_DAYS)
print("SENSITIVITY TO |DELTA| (expiry 9-15m)")
print("=" * 72)
print(f"{'|Delta|':<10} {'Cum ret':>10} {'Ann vol':>10} "
f"{'Sharpe':>10} {'Max DD':>10} {'Ach |d|':>8}")
print("-" * 72)
delta_results = {}
for d in DELTAS:
r = _safe_combo(f"{d:.2f}", d,
BASELINE_EXPIRY[0], BASELINE_EXPIRY[1])
delta_results[d] = r
_print_row(f"{d:.2f}", r, baseline=(d == BASELINE_DELTA))
cum_rets = [r['cum_ret'] for r in delta_results.values()
if r['cum_ret'] is not None]
print("-" * 72)
if cum_rets:
spread = max(cum_rets) - min(cum_rets)
print(f"{'Spread':<10} {fmt(spread):>10} "
f"(range across delta choices)")
# -- Expiry sweep (|delta| = 0.15) -----------------------------------
EXPIRIES = [
('3-6m', 90, 180),
('6-12m', 180, 365),
('9-15m', 270, 456),
('12-24m', 365, 730),
]
print()
print(f"SENSITIVITY TO EXPIRY WINDOW (|delta| = {BASELINE_DELTA:.2f})")
print("=" * 72)
print(f"{'Expiry':<10} {'Cum ret':>10} {'Ann vol':>10} "
f"{'Sharpe':>10} {'Max DD':>10} {'Ach |d|':>8}")
print("-" * 72)
expiry_results = {}
for label, mn, mx in EXPIRIES:
r = _safe_combo(label, BASELINE_DELTA, mn, mx)
expiry_results[label] = r
_print_row(label, r,
baseline=((mn, mx) == BASELINE_EXPIRY))
cum_rets = [r['cum_ret'] for r in expiry_results.values()
if r['cum_ret'] is not None]
print("-" * 72)
if cum_rets:
spread = max(cum_rets) - min(cum_rets)
print(f"{'Spread':<10} {fmt(spread):>10} "
f"(range across expiry choices)")
print()
print("* = parameter combination used in the main backtest")
print("Ach |d| = mean |delta| of contracts actually selected; differs from "
"target when the chain is sparse")
SENSITIVITY TO |DELTA| (expiry 9-15m)
========================================================================
|Delta| Cum ret Ann vol Sharpe Max DD Ach |d|
------------------------------------------------------------------------
0.05 +162.18% 62.0% 1.53 -45.8% 0.15
0.10 +161.32% 61.8% 1.53 -45.8% 0.18
0.15 * +162.39% 61.4% 1.54 -45.8% 0.21
0.25 +167.85% 61.2% 1.57 -45.8% 0.28
0.40 +173.60% 61.1% 1.60 -45.8% 0.41
0.50 +173.85% 60.7% 1.61 -45.8% 0.50
------------------------------------------------------------------------
Spread +12.54% (range across delta choices)
SENSITIVITY TO EXPIRY WINDOW (|delta| = 0.15)
========================================================================
Expiry Cum ret Ann vol Sharpe Max DD Ach |d|
------------------------------------------------------------------------
3-6m +147.23% 62.2% 1.45 -45.8% 0.16
6-12m +158.99% 61.6% 1.52 -45.8% 0.19
9-15m * +162.39% 61.4% 1.54 -45.8% 0.21
12-24m +165.72% 61.6% 1.55 -45.8% 0.24
------------------------------------------------------------------------
Spread +18.49% (range across expiry choices)
* = parameter combination used in the main backtest
Ach |d| = mean |delta| of contracts actually selected; differs from target when the chain is sparse
The headline is mostly robust to these parameter choices: Sharpe and volatility are nearly constant across both sweeps, and the cumulative-return spread is modest (~12pp across delta, ~17pp across expiry). Two caveats about the corners are worth spelling out.
First, the |delta|=0.05 row’s mean achieved |delta| of 0.15 (rather than 0.05) hides real heterogeneity across tickers. On the mature, heavily-traded names that dominate the portfolio’s put hedges—NVDA, AVGO, TSM, MU, SMH—the chain lists strikes deep enough that the proxy hits the 0.05 target cleanly. The same holds for INTC calls in early 2025, when the underlying was depressed and far-OTM strikes were liquid. But for thinner LEAP chains—BE calls (closest available ~0.47), CRWV calls (0.13–0.23), EQT calls (0.14), INFY puts (0.03), and late-2025 INTC calls (0.23–0.29)—the chain doesn’t list strikes that far OTM at the 9–15 month tenor, and the proxy picks the deepest contract the chain offers. The achieved mean reflects this mix.
Second, the expiry sweep stops at 12–24m because that is the deepest window the chain data fully supports across all positions. A wider 18–30m bucket would be the natural next step for modelling a “longest-dated LEAPs available” approach, but it can’t be priced: several chains’ maximum listed expiry doesn’t reach 540 days from the filing date. INTC’s chain at the August-2025 and November-2025 filings caps at January 2027 and March 2027 respectively (~17 and ~16 months out); TSM’s chain at the November-2025 filing also caps at January 2027 (~14 months out). Listed LEAPs for these names simply don’t go that far out, so any sweep that demands ≥18 month expiry has no contract to price for at least one position. 12–24m is therefore the broadest expiry bucket reported here.
The 3–6m bucket is the one material outlier on the short end, trailing the longer-dated buckets by ~14pp.
What about AIS?
SPY is a generic equity benchmark, useful for showing the copycat clears the bar of “anything beats the broad market”. A more pointed comparison is against an AI-themed ETF that any retail investor can buy without 13F machinery. Popular among some of my friends is AIS (VistaShares Artificial Intelligence Supercycle ETF), an actively managed fund holding a broad basket of AI-exposed names, including semiconductors, hyperscalers, AI-software, and selected datacenter and energy-infrastructure plays.
The chart below replays the same backtest as above, but relying on AIS instead of SPY as the benchmark.
Code
import json
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
import requests
import time
import os
import warnings
warnings.filterwarnings('ignore')
# Parse data from the scraper block
parsed = json.loads(data) if isinstance(data, str) else data
filings = parsed["filings"]
# Build internal structures
filing_dates = {f["quarter"]: f["filing_date"] for f in filings}
quarter_end_dates = {f["quarter"]: f["quarter_end"] for f in filings}
quarters = [f["quarter"] for f in filings]
# Convert holdings list to dict keyed by quarter.
# Multiple positions in the same ticker with different types are aggregated
# by value per (ticker, type) pair.
holdings = {}
for f in filings:
positions = {}
for h in f["holdings"]:
ticker = h["ticker"]
pos_type = h["type"]
value = h["value"]
key = (ticker, pos_type)
positions[key] = positions.get(key, 0) + value
holdings[f["quarter"]] = positions
def _extract_close_series(df, ticker):
"""Extract a single close-price series from a yfinance result."""
if df.empty:
return pd.Series(dtype=float)
if isinstance(df.columns, pd.MultiIndex):
if 'Close' not in df.columns.get_level_values(0):
return pd.Series(dtype=float)
close = df['Close']
if isinstance(close, pd.DataFrame):
if ticker in close.columns:
series = close[ticker]
elif len(close.columns) == 1:
series = close.iloc[:, 0]
else:
return pd.Series(dtype=float)
else:
series = close
elif 'Close' in df.columns:
series = df['Close']
if isinstance(series, pd.DataFrame):
series = series.iloc[:, 0]
else:
return pd.Series(dtype=float)
return pd.to_numeric(series, errors='coerce').dropna()
def _download_close_series(ticker, start, end):
"""Download one ticker's close series; used to repair flaky batch misses."""
df = yf.download(ticker, start=start, end=end, progress=False,
auto_adjust=True)
return _extract_close_series(df, ticker)
def get_prices(tickers, dates):
"""Fetch close prices for tickers on specific dates."""
unique_tickers = sorted(set(tickers))
all_dates = [datetime.strptime(d, '%Y-%m-%d') for d in dates]
start = min(all_dates) - timedelta(days=5)
end = max(all_dates) + timedelta(days=5)
df = yf.download(unique_tickers, start=start, end=end, progress=False, auto_adjust=True)
# yf.download returns MultiIndex columns (metric, ticker) for multiple tickers
if df.empty:
close = pd.DataFrame()
elif isinstance(df.columns, pd.MultiIndex) and 'Close' in df.columns.get_level_values(0):
close = df['Close'].copy()
elif 'Close' in df.columns:
close = df[['Close']].copy()
close.columns = unique_tickers
else:
close = pd.DataFrame()
prices = {}
for ticker in unique_tickers:
if ticker in close.columns:
series = pd.to_numeric(close[ticker], errors='coerce').dropna()
else:
series = pd.Series(dtype=float)
if series.empty:
series = _download_close_series(ticker, start, end)
if series.empty:
continue
prices[ticker] = {}
for date_str in dates:
target = pd.Timestamp(datetime.strptime(date_str, '%Y-%m-%d'))
after = series[series.index >= target]
if not after.empty:
prices[ticker][date_str] = float(after.iloc[0])
else:
before = series[series.index <= target]
if not before.empty:
prices[ticker][date_str] = float(before.iloc[-1])
return prices
def _price_on_or_after(px_by_date, target_date):
"""Return (date, price) for the first available price on/after target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d >= target_date)
if not dates:
return None
d = dates[0]
return d, px_by_date[d]
def _price_on_or_before(px_by_date, target_date):
"""Return (date, price) for the last available price on/before target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d <= target_date)
if not dates:
return None
d = dates[-1]
return d, px_by_date[d]
def _period_price_pair(px_by_date, start_date, end_date):
"""Return start/end prices for a period using sensible boundary alignment."""
start = _price_on_or_after(px_by_date, start_date)
end = _price_on_or_before(px_by_date, end_date)
if start is None or end is None:
return None
start_actual, p0 = start
end_actual, p1 = end
if end_actual < start_actual:
return None
return start_actual, end_actual, p0, p1
def _option_position_key(ticker, pos_type):
return (ticker, pos_type)
def _linear_underlying_sign(pos_type):
"""Direction when option rows are converted to underlying equity exposure."""
return -1 if pos_type == 'put' else 1
def compute_return(positions, prices, start_date, end_date, mode='equity_only',
option_prices=None):
"""Compute portfolio return between two dates.
The 13F value for an option row is treated as underlying notional, not
option premium. Option contracts are sized from that notional, but the
portfolio denominator is estimated deployed capital: stock value plus option
premium cost. This avoids treating the gap between option notional and
option premium as cash. In 'full' mode, every option row requires a
MarketData price series; missing data raises rather than falling back.
"""
total_cost = 0
portfolio_pnl = 0
for (ticker, pos_type), value in positions.items():
is_option = pos_type in ('call', 'put')
stock_px = prices.get(ticker)
if mode == 'equity_only':
if pos_type not in ('long', 'call', 'put'):
continue
pair = _period_price_pair(stock_px, start_date, end_date)
if pair is None:
continue
start_actual, end_actual, p0, p1 = pair
if p0 == 0:
continue
stock_ret = (p1 - p0) / p0
total_cost += value
portfolio_pnl += value * _linear_underlying_sign(pos_type) * stock_ret
continue
if is_option:
opt_key = _option_position_key(ticker, pos_type)
opt_px = option_prices.get(opt_key) if option_prices else None
if not opt_px:
raise RuntimeError(
f"No MarketData option prices for {opt_key} in period "
f"{start_date}..{end_date}")
pair = _period_price_pair(opt_px, start_date, end_date)
if pair is None:
raise RuntimeError(
f"MarketData option price series for {opt_key} does not "
f"cover {start_date}..{end_date}")
start_actual, end_actual, opt_p0, opt_p1 = pair
stock_start = _price_on_or_after(stock_px, start_actual)
if stock_start is None or stock_start[1] <= 0:
stock_start = _price_on_or_after(stock_px, start_date)
if stock_start is None or stock_start[1] <= 0:
raise RuntimeError(
f"No underlying price for {ticker} at {start_date}")
p0, p1 = opt_p0, opt_p1
underlying_p0 = stock_start[1]
if p0 <= 0 or underlying_p0 <= 0:
continue
position_cost = value * (p0 / underlying_p0)
position_pnl = value * ((p1 - p0) / underlying_p0)
else:
pair = _period_price_pair(stock_px, start_date, end_date)
if pair is None:
continue
start_actual, end_actual, p0, p1 = pair
if p0 == 0:
continue
stock_ret = (p1 - p0) / p0
position_cost = value
position_pnl = value * stock_ret
if position_cost <= 0:
continue
total_cost += position_cost
portfolio_pnl += position_pnl
return portfolio_pnl / total_cost if total_cost else None
def annualize(ret, days):
"""Annualize a return over a given number of calendar days."""
if ret is None or days <= 0:
return None
return (1 + ret) ** (365.25 / days) - 1
def fmt(ret):
return f"{ret * 100:+.2f}%" if ret is not None else "N/A"
# Collect all tickers and dates
all_tickers = set()
for positions in holdings.values():
for (ticker, _) in positions:
all_tickers.add(ticker)
all_tickers.add('SPY')
all_tickers.add('AIS')
today = datetime.now().strftime('%Y-%m-%d')
first_date = filing_dates[quarters[0]]
all_dates = set(filing_dates.values()) | set(quarter_end_dates.values()) | {today}
prices = get_prices(sorted(all_tickers), sorted(all_dates))
# Resolve `today` to the actual last available closing date.
# yfinance may not have data for today (market still open or holiday),
# so we look up what date SPY's price actually corresponds to.
def _resolve_price_date(prices, requested_date):
"""Return the actual trading date of the price stored under requested_date."""
ref = 'SPY' if 'SPY' in prices else next(iter(prices), None)
if not ref or requested_date not in prices[ref]:
return requested_date
target_price = prices[ref][requested_date]
# Re-download a small window to find the real date of this price
start = datetime.strptime(requested_date, '%Y-%m-%d') - timedelta(days=10)
end = datetime.strptime(requested_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(ref, start=start, end=end, progress=False, auto_adjust=True)
if df.empty:
return requested_date
if isinstance(df.columns, pd.MultiIndex):
close = df['Close'][ref].dropna()
elif 'Close' in df.columns:
close = df['Close'].dropna()
else:
close = df.iloc[:, 0].dropna()
for dt, px in close.items():
val = float(px.iloc[0]) if isinstance(px, pd.Series) else float(px)
if abs(val - target_price) < 0.01:
ts = dt[0] if isinstance(dt, tuple) else dt
return pd.Timestamp(ts).strftime('%Y-%m-%d')
return requested_date
today_resolved = _resolve_price_date(prices, today)
if today_resolved != today:
for ticker in prices:
if today in prices[ticker]:
prices[ticker][today_resolved] = prices[ticker].pop(today)
today = today_resolved
def download_daily(tickers, start_date, end_date):
"""Download daily close prices from yfinance, handling MultiIndex.
Dates are 'YYYY-MM-DD' strings. Adds a small buffer for trading-day alignment."""
tickers_sorted = sorted(tickers)
start = datetime.strptime(start_date, '%Y-%m-%d') - timedelta(days=5)
end = datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(tickers_sorted, start=start, end=end,
progress=False, auto_adjust=True)
if df.empty:
close = pd.DataFrame()
elif isinstance(df.columns, pd.MultiIndex) and 'Close' in df.columns.get_level_values(0):
close = df['Close'].copy()
elif 'Close' in df.columns:
close = df[['Close']].copy()
close.columns = tickers_sorted
else:
close = pd.DataFrame()
for ticker in tickers_sorted:
if ticker in close.columns and not close[ticker].dropna().empty:
continue
series = _download_close_series(ticker, start, end)
if not series.empty:
close[ticker] = series
return close.sort_index()
# -- Historical option prices via MarketData --------------------------------
OPTION_CACHE_DIR = os.path.expanduser('~/My Drive/notes/.sa-lp-option-cache')
_MD_BASE = 'https://api.marketdata.app/v1'
_MD_RATE_DELAY = 0.15
OPTION_CACHE_COLUMNS = [
'date', 'selected_on', 'option_type', 'symbol', 'strike', 'expiry',
'delta', 'price']
# Default contract selection parameters. The option proxy picks a contract
# matching option type, with expiry between min_days and max_days of the
# period start, and |delta| closest to delta_target. When the chain is
# sparse, the achieved |delta| may be far from the target; the sensitivity
# block reports achieved |delta| so this is visible rather than silent.
OPTION_DELTA = 0.15
EXPIRY_MIN_DAYS = 270 # ~9 months
EXPIRY_MAX_DAYS = 456 # ~15 months
def _normalize_option_type(option_type):
option_type = str(option_type).lower()
if option_type not in ('call', 'put'):
raise ValueError(f"Unsupported option type: {option_type}")
return option_type
def _empty_option_cache():
return pd.DataFrame(columns=OPTION_CACHE_COLUMNS)
def _option_cache_path(ticker, option_type, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Return the cache CSV path for (ticker, type, delta_target, window).
When the parameter triple equals the baseline (0.15, 270-456 days), the
historical filename ``TICKER-TYPE.csv`` is used so the main-backtest
cache is reused automatically. Any non-baseline combo lives in a
separate ``TICKER-TYPE-d<delta>-e<min>-<max>.csv`` file so a sensitivity
sweep never pollutes the baseline cache (which the portfolio calculator
reads to pick the representative contract for the current filing).
"""
option_type = _normalize_option_type(option_type)
is_baseline = (
abs(delta_target - OPTION_DELTA) < 1e-9
and min_days == EXPIRY_MIN_DAYS
and max_days == EXPIRY_MAX_DAYS)
if is_baseline:
return os.path.join(OPTION_CACHE_DIR, f'{ticker}-{option_type}.csv')
return os.path.join(
OPTION_CACHE_DIR,
f'{ticker}-{option_type}-d{delta_target:g}-e{min_days}-{max_days}.csv')
def _load_option_cache(ticker, option_type, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Load cached MarketData rows for a ticker/type/target/window. Returns DataFrame or empty."""
option_type = _normalize_option_type(option_type)
path = _option_cache_path(ticker, option_type, delta_target,
min_days, max_days)
if not os.path.exists(path):
return _empty_option_cache()
df = pd.read_csv(path)
if df.empty:
return _empty_option_cache()
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
for col in ('date', 'selected_on'):
df[col] = pd.to_datetime(
df[col], errors='coerce').dt.strftime('%Y-%m-%d')
df['option_type'] = df['option_type'].fillna(option_type).str.lower()
cache = df[OPTION_CACHE_COLUMNS].copy()
cache = cache[cache['option_type'] == option_type].copy()
cache.dropna(subset=['date'], inplace=True)
for col in ('strike', 'delta', 'price'):
cache[col] = pd.to_numeric(cache[col], errors='coerce')
cache.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
return cache[OPTION_CACHE_COLUMNS]
def _save_option_cache(ticker, option_type, df, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Persist typed option cache to CSV."""
option_type = _normalize_option_type(option_type)
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
path = _option_cache_path(ticker, option_type, delta_target,
min_days, max_days)
if df.empty:
df = _empty_option_cache()
else:
df = df.copy()
df['option_type'] = option_type
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
df.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
df.sort_values(['date', 'expiry', 'strike'], inplace=True)
df.to_csv(path, index=False)
def _contract_window(ref_date_str, min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
ref = datetime.strptime(ref_date_str, '%Y-%m-%d')
return ref + timedelta(days=min_days), ref + timedelta(days=max_days)
def _contract_from_cache_row(row, ref_date_str, option_type,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
option_type = _normalize_option_type(option_type)
if str(row.get('option_type', option_type)).lower() != option_type:
return None
lo, hi = _contract_window(ref_date_str, min_days, max_days)
try:
exp = datetime.strptime(str(row['expiry']), '%Y-%m-%d')
except (KeyError, TypeError, ValueError):
return None
if not (lo <= exp <= hi):
return None
strike = _safe_float(row.get('strike'))
delta = _safe_float(row.get('delta'))
price = _safe_float(row.get('price'))
if strike is None or delta is None or price is None or price <= 0:
return None
return {
'selected_on': row.get('selected_on'),
'option_type': option_type,
'symbol': row.get('symbol'),
'strike': strike,
'expiry': str(row['expiry']),
'delta': delta,
'price': price,
}
def _select_cached_contract(cache, option_type, ref_date_str,
delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS,
require_selected=False):
rows = cache[(cache['date'] == ref_date_str)
& (cache['option_type'] == option_type)]
selected_rows = rows[rows['selected_on'] == ref_date_str]
if not selected_rows.empty:
rows = selected_rows
elif require_selected:
rows = selected_rows
candidates = []
for _, row in rows.iterrows():
contract = _contract_from_cache_row(row, ref_date_str, option_type,
min_days, max_days)
if contract:
candidates.append(contract)
if not candidates:
return None
candidates.sort(key=lambda x: abs(abs(x['delta']) - delta_target))
return candidates[0]
def _parse_option_price(contract):
"""Extract a mark price from an option contract record."""
mid = _safe_float(contract.get('mid'))
if mid and mid > 0:
return mid
bid = _safe_float(contract.get('bid'))
ask = _safe_float(contract.get('ask'))
last = _safe_float(contract.get('last'))
if bid and ask and bid > 0 and ask > 0:
return (bid + ask) / 2
if last and last > 0:
return last
return None
def _safe_float(val):
try:
out = float(val)
if np.isnan(out):
return None
return out
except (TypeError, ValueError):
return None
def _marketdata_key():
"""Return the MarketData API key, or None if unavailable.
Resolution order:
1. ``MARKETDATA_KEY`` / ``MARKETDATA_API_KEY`` environment variables.
2. ``pass env/marketdata-token`` (local ``pass`` store).
The result is memoised on the function object so repeated lookups
during a sweep do not reshell. Fetch helpers raise themselves when
called without a key, so a fully cached run still succeeds without
requiring either source.
"""
if hasattr(_marketdata_key, '_cached'):
return _marketdata_key._cached
key = (os.environ.get('MARKETDATA_KEY', '')
or os.environ.get('MARKETDATA_API_KEY', ''))
if not key:
try:
import subprocess
out = subprocess.run(
['pass', 'show', 'env/marketdata-token'],
capture_output=True, text=True, timeout=5, check=False)
if out.returncode == 0:
key = out.stdout.strip().splitlines()[0] if out.stdout else ''
except (FileNotFoundError, subprocess.TimeoutExpired):
key = ''
_marketdata_key._cached = key or None
return _marketdata_key._cached
def _marketdata_get(path, params, api_key):
"""Fetch a MarketData endpoint, returning normalized row dictionaries.
Raises on HTTP errors or a non-'ok' status. 'no_data' is returned as
an empty list so that callers can distinguish 'nothing available' from
'request failed'.
"""
headers = {'Accept': 'application/json', 'Authorization': f'Bearer {api_key}'}
resp = requests.get(_MD_BASE + path, params=params, headers=headers,
timeout=30)
resp.raise_for_status()
body = resp.json()
status = body.get('s')
if status == 'no_data':
return []
if status != 'ok':
raise RuntimeError(
f"MarketData {path} returned status={status!r}: "
f"{body.get('errmsg') or body}")
lengths = [len(v) for v in body.values() if isinstance(v, list)]
n = max(lengths) if lengths else 0
rows = []
for i in range(n):
row = {}
for key, val in body.items():
if isinstance(val, list):
row[key] = val[i] if i < len(val) else None
else:
row[key] = val
rows.append(row)
return rows
def _marketdata_date(timestamp):
try:
return datetime.utcfromtimestamp(int(timestamp)).strftime('%Y-%m-%d')
except (TypeError, ValueError, OSError):
return None
def _occ_symbol(ticker, option_type, strike, expiry):
"""Build a standard OCC option symbol from contract fields."""
cp = 'C' if _normalize_option_type(option_type) == 'call' else 'P'
exp = datetime.strptime(str(expiry), '%Y-%m-%d').strftime('%y%m%d')
strike_int = int(round(float(strike) * 1000))
root = ticker.upper().replace('.', '')
return f'{root}{exp}{cp}{strike_int:08d}'
# Chains are always fetched with a broad expiry window so they can be cached
# and reused for in-memory selection across any (delta_target, expiry window)
# combination in the sensitivity sweep.
CHAIN_FETCH_MIN_DAYS = 30
CHAIN_FETCH_MAX_DAYS = 760
def _fetch_marketdata_chain(ticker, date_str, option_type, api_key,
min_days=CHAIN_FETCH_MIN_DAYS,
max_days=CHAIN_FETCH_MAX_DAYS):
lo, hi = _contract_window(date_str, min_days, max_days)
params = {
'date': date_str,
'from': lo.strftime('%Y-%m-%d'),
'to': hi.strftime('%Y-%m-%d'),
'side': _normalize_option_type(option_type),
'expiration': 'all',
}
return _marketdata_get(f'/options/chain/{ticker}/', params, api_key)
# Chain cache: one CSV per (ticker, type, date) storing the broad-window chain.
# Lets the sensitivity sweep re-select contracts for different delta targets
# and expiry windows without refetching.
CHAIN_CACHE_DIR = os.path.join(OPTION_CACHE_DIR, 'chains')
def _chain_cache_path(ticker, option_type, date_str):
option_type = _normalize_option_type(option_type)
return os.path.join(CHAIN_CACHE_DIR,
f'{ticker}-{option_type}-{date_str}.csv')
def _load_chain_cache(ticker, option_type, date_str):
path = _chain_cache_path(ticker, option_type, date_str)
if not os.path.exists(path):
return None
df = pd.read_csv(path)
if df.empty:
return []
return df.to_dict('records')
def _save_chain_cache(ticker, option_type, date_str, chain):
if not chain:
return
os.makedirs(CHAIN_CACHE_DIR, exist_ok=True)
path = _chain_cache_path(ticker, option_type, date_str)
pd.DataFrame(chain).to_csv(path, index=False)
def _get_or_fetch_chain(ticker, date_str, option_type, api_key,
fetched_counter=None):
"""Return the cached broad chain for (ticker, type, date), fetching if absent.
Requires ``api_key`` only when a fetch is actually needed.
"""
chain = _load_chain_cache(ticker, option_type, date_str)
if chain is not None:
return chain
if not api_key:
raise RuntimeError(
"MARKETDATA_KEY is not set but a chain fetch is required for "
f"{ticker} {option_type} on {date_str}.")
time.sleep(_MD_RATE_DELAY)
chain = _fetch_marketdata_chain(ticker, date_str, option_type, api_key)
if fetched_counter is not None:
fetched_counter['marketdata_chains'] += 1
_save_chain_cache(ticker, option_type, date_str, chain)
return chain
def _fetch_marketdata_quotes(symbol, start_date, end_date, api_key):
to_date = (datetime.strptime(end_date, '%Y-%m-%d')
+ timedelta(days=1)).strftime('%Y-%m-%d')
rows = _marketdata_get(f'/options/quotes/{symbol}/',
{'from': start_date, 'to': to_date}, api_key)
prices = {}
for row in rows:
date_str = _marketdata_date(row.get('updated'))
if not date_str:
continue
price = _parse_option_price(row)
if price is not None and price > 0:
prices[date_str] = price
return prices
def _implied_vol_from_price(S, K, T, option_price, option_type):
"""Infer Black-Scholes volatility from an observed option mid price."""
if any(x is None for x in (S, K, T, option_price)):
return None
if S <= 0 or K <= 0 or T <= 0 or option_price <= 0:
return None
intrinsic = max(S - K, 0) if option_type == 'call' else max(K - S, 0)
upper = S if option_type == 'call' else K
if option_price < intrinsic - 1e-6 or option_price > upper * 1.5:
return None
lo, hi = 1e-4, 5.0
try:
if (option_price < bs_price(S, K, T, lo, option_type) - 1e-4
or option_price > bs_price(S, K, T, hi, option_type) + 1e-4):
return None
for _ in range(80):
mid = (lo + hi) / 2
if bs_price(S, K, T, mid, option_type) < option_price:
lo = mid
else:
hi = mid
return (lo + hi) / 2
except (FloatingPointError, ValueError, ZeroDivisionError):
return None
def _marketdata_delta(row, ref_date_str, expiry, option_type, price):
"""Use vendor delta when present; otherwise infer it from the quote."""
native = _safe_float(row.get('delta'))
if native is not None and native != 0:
return native
S = _safe_float(row.get('underlyingPrice'))
K = _safe_float(row.get('strike'))
ref = datetime.strptime(ref_date_str, '%Y-%m-%d')
exp = datetime.strptime(expiry, '%Y-%m-%d')
T = max((exp - ref).days / 365.25, 1e-6)
sigma = _safe_float(row.get('iv'))
if sigma is None or sigma <= 0:
sigma = _implied_vol_from_price(S, K, T, price, option_type)
if S is None or K is None or sigma is None or sigma <= 0:
return None
return bs_delta(S, K, T, sigma, option_type)
def _select_marketdata_contract(chain, ref_date_str, option_type,
delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
option_type = _normalize_option_type(option_type)
lo, hi = _contract_window(ref_date_str, min_days, max_days)
candidates = []
for c in chain:
if str(c.get('side', '')).lower() != option_type:
continue
expiry = _marketdata_date(c.get('expiration'))
if not expiry:
continue
exp = datetime.strptime(expiry, '%Y-%m-%d')
if not (lo <= exp <= hi):
continue
price = _parse_option_price(c)
if price is None or price <= 0:
continue
delta = _marketdata_delta(c, ref_date_str, expiry, option_type, price)
if delta is None or delta == 0:
continue
strike = _safe_float(c.get('strike'))
symbol = c.get('optionSymbol')
if strike is None or not symbol:
continue
candidates.append({
'option_type': option_type,
'symbol': symbol,
'strike': strike,
'expiry': expiry,
'delta': delta,
'price': price,
})
if not candidates:
return None
candidates.sort(key=lambda x: abs(abs(x['delta']) - delta_target))
return candidates[0]
def download_option_prices(option_positions, quarters, holdings, filing_dates,
today, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
"""Download historical representative option prices from MarketData.
MarketData is the sole supported provider. MARKETDATA_KEY must be set.
For each (ticker, option_type) and each filing period in which that
position is held:
1. On the first trading day, select a contract matching type, with
expiry between ``min_days`` and ``max_days`` of the period start, and
|delta| closest to ``delta_target``. MarketData's Starter plan often
returns null Greeks, so delta is inferred from the observed mid price
via Black-Scholes when the vendor delta is missing.
2. Lock in that contract for the period.
3. Track its historical mid price through the period.
The broad option chain for each (ticker, type, first_day) is cached to
disk so that sensitivity sweeps over (delta_target, expiry window) reuse
a single fetch.
Raises ``RuntimeError`` if no suitable contract can be selected for any
required (ticker, type, period), or if MarketData returns no price series
for the selected contract.
Parameters
----------
delta_target : float
Target |delta| for contract selection (default ``OPTION_DELTA``).
min_days, max_days : int
Contract expiry window in days from period start (default 270-456,
i.e. 9-15 months).
Returns
-------
per_period : dict {quarter_str: {(ticker, type): {date_str: float}}}
Option prices keyed by filing period then option position. Each
period has its own contract's prices.
"""
option_positions = sorted({
(ticker, _normalize_option_type(pos_type))
for ticker, pos_type in option_positions})
md_key = _marketdata_key()
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
per_period = {} # {q: {(ticker, type): {date_str: price}}}
fetched = {'marketdata_chains': 0, 'marketdata_quotes': 0}
for ticker, option_type in option_positions:
opt_key = _option_position_key(ticker, option_type)
cache = _load_option_cache(ticker, option_type, delta_target,
min_days, max_days)
new_rows = []
for i, q in enumerate(quarters):
# Skip quarters where this exact option position is absent.
if opt_key not in holdings[q]:
continue
period_start = filing_dates[q]
period_end = (filing_dates[quarters[i + 1]]
if i < len(quarters) - 1 else today)
trading_days = pd.bdate_range(period_start, period_end)
if len(trading_days) == 0:
continue
first_day = trading_days[0].strftime('%Y-%m-%d')
# -- Select contract on first trading day --
contract = _select_cached_contract(
cache, option_type, first_day,
delta_target=delta_target,
min_days=min_days, max_days=max_days,
require_selected=True)
if contract is None:
chain = _get_or_fetch_chain(
ticker, first_day, option_type, md_key, fetched)
contract = _select_marketdata_contract(
chain, first_day, option_type,
delta_target=delta_target,
min_days=min_days, max_days=max_days)
if contract is None:
raise RuntimeError(
f"MarketData returned no usable {option_type} contract "
f"for {ticker} on {first_day} (period {q}) at "
f"delta={delta_target}, "
f"expiry {min_days}-{max_days}d")
new_rows.append({
'date': first_day,
'selected_on': first_day,
'option_type': option_type,
'symbol': contract.get('symbol'),
'strike': contract['strike'],
'expiry': contract['expiry'],
'delta': contract['delta'],
'price': contract['price'],
})
strike = contract['strike']
expiry = contract['expiry']
symbol = contract.get('symbol') or _occ_symbol(
ticker, option_type, strike, expiry)
# -- Collect prices for this period (fresh dict per period) --
period_prices = {}
# Fast path: read matching prices from cache.
rows = cache[
(cache['date'] >= period_start)
& (cache['date'] <= period_end)
& (cache['option_type'] == option_type)
& (abs(cache['strike'] - strike) < 0.01)
& (cache['expiry'].astype(str) == str(expiry))
& pd.notna(cache['price'])]
selected_rows = rows[rows['selected_on'] == first_day]
if not selected_rows.empty:
rows = selected_rows
for _, row in rows.iterrows():
period_prices[row['date']] = float(row['price'])
# Decide whether to refresh quotes. With a key, refresh whenever
# the cached series does not reach period_end. Without a key,
# only fail if the cached series is empty; a slightly stale
# tail is acceptable for cache-only runs (e.g. sensitivity
# sweeps replaying the baseline contract).
has_partial = bool(period_prices)
reaches_end = has_partial and max(period_prices) >= period_end
if md_key and not reaches_end:
time.sleep(_MD_RATE_DELAY)
quote_prices = _fetch_marketdata_quotes(
symbol, period_start, period_end, md_key)
fetched['marketdata_quotes'] += 1
for day_str, price in quote_prices.items():
if period_start <= day_str <= period_end:
period_prices[day_str] = price
new_rows.append({
'date': day_str,
'selected_on': first_day,
'option_type': option_type,
'symbol': symbol,
'strike': strike,
'expiry': expiry,
'delta': contract['delta'],
'price': price,
})
if contract.get('price') and first_day not in period_prices:
period_prices[first_day] = contract['price']
elif not md_key and not has_partial:
raise RuntimeError(
"MARKETDATA_KEY is not set and no cached quotes exist "
f"for {symbol} in {period_start}..{period_end}.")
if not period_prices:
raise RuntimeError(
f"MarketData returned no quotes for {symbol} "
f"({opt_key}) in {period_start}..{period_end}")
per_period.setdefault(q, {})[opt_key] = period_prices
# Persist new data to cache
if new_rows:
new_df = pd.DataFrame(new_rows)
cache = pd.concat([cache, new_df], ignore_index=True)
cache.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
_save_option_cache(ticker, option_type, cache, delta_target,
min_days, max_days)
if any(fetched.values()):
import sys
parts = []
if fetched['marketdata_chains']:
parts.append(f"{fetched['marketdata_chains']} MarketData chains")
if fetched['marketdata_quotes']:
parts.append(f"{fetched['marketdata_quotes']} MarketData quote series")
print(f"[options] Fetched {', '.join(parts)}", file=sys.stderr)
return per_period
# -- Black-Scholes helpers (used only to infer delta when MarketData's
# Starter-plan historical Greeks are null; never to reprice returns) -----
from scipy.stats import norm as _norm
def bs_price(S, K, T, sigma, option_type='call'):
"""Black-Scholes option price (assumes zero risk-free rate and dividends)."""
if T <= 0 or sigma <= 0:
if option_type == 'call':
return max(S - K, 0)
return max(K - S, 0)
d1 = (np.log(S / K) + (sigma ** 2 / 2) * T) / (sigma * np.sqrt(T))
d2 = d1 - sigma * np.sqrt(T)
if option_type == 'call':
return S * _norm.cdf(d1) - K * _norm.cdf(d2)
return K * _norm.cdf(-d2) - S * _norm.cdf(-d1)
def bs_delta(S, K, T, sigma, option_type='call'):
"""Black-Scholes delta (assumes zero risk-free rate and dividends)."""
if T <= 0 or sigma <= 0:
if option_type == 'call':
return 1.0 if S > K else 0.0
return -1.0 if S < K else 0.0
d1 = (np.log(S / K) + (sigma ** 2 / 2) * T) / (sigma * np.sqrt(T))
if option_type == 'call':
return _norm.cdf(d1)
return _norm.cdf(d1) - 1
def daily_cumulative(holdings, quarters, filing_dates, close, today, mode,
per_period_opt=None):
"""Build a daily series of cumulative growth factors for a given mode.
For each filing period, stock shares and option contracts are fixed. In
equity-proxy mode, option rows are converted to linear underlying exposure:
calls are long underlying and puts are short underlying. In option-proxy
mode, option rows are sized by 13F underlying notional and returns come
from MarketData quotes; returns are divided by deployed capital (stock
value plus option premium cost). Option-proxy mode raises if MarketData
prices are missing for any required position.
"""
cum_growth = 1.0
dates_out = []
values_out = []
for i, q in enumerate(quarters):
period_start = filing_dates[q]
period_end = filing_dates[quarters[i + 1]] if i < len(quarters) - 1 else today
ps = pd.Timestamp(period_start)
pe = pd.Timestamp(period_end)
# Trading days in this period
mask = (close.index >= ps) & (close.index <= pe)
period_close = close[mask]
if period_close.empty:
continue
# Option prices for this period (keyed by (ticker, type) → prices)
quarter_opt = per_period_opt.get(q, {}) if per_period_opt else {}
# Determine starting prices, fixed exposure, and deployed capital.
positions = holdings[q]
exposure = {}
costs = {}
start_prices = {}
start_underlying = {}
use_opt_px = {} # track which positions use option prices
total_cost = 0
for (ticker, pos_type), value in positions.items():
is_option = pos_type in ('call', 'put')
opt_key = _option_position_key(ticker, pos_type)
if mode == 'equity_only':
if pos_type not in ('long', 'call', 'put'):
continue
if ticker not in close.columns:
continue
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
continue
stock_start = float(avail.iloc[0])
if stock_start <= 0:
continue
start_prices[(ticker, pos_type)] = stock_start
start_underlying[(ticker, pos_type)] = stock_start
costs[(ticker, pos_type)] = value
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = False
total_cost += value
continue
# mode == 'full' (option proxy)
if is_option:
if opt_key not in quarter_opt:
raise RuntimeError(
f"No MarketData option prices for {opt_key} in "
f"period {q}")
ticker_opt = quarter_opt[opt_key]
opt_dates = sorted(d for d in ticker_opt if d >= period_start)
if not opt_dates:
raise RuntimeError(
f"MarketData option prices for {opt_key} in period "
f"{q} contain no dates at or after {period_start}")
if ticker not in close.columns:
raise RuntimeError(
f"No underlying close series for {ticker}")
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
raise RuntimeError(
f"No underlying price for {ticker} at {period_start}")
opt_start = ticker_opt[opt_dates[0]]
underlying_start = float(avail.iloc[0])
if opt_start <= 0 or underlying_start <= 0:
raise RuntimeError(
f"Non-positive starting price for {opt_key} in "
f"period {q}")
start_prices[(ticker, pos_type)] = opt_start
start_underlying[(ticker, pos_type)] = underlying_start
costs[(ticker, pos_type)] = value * opt_start / underlying_start
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = True
total_cost += costs[(ticker, pos_type)]
continue
# Plain stock in full mode
if ticker not in close.columns:
continue
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
continue
stock_start = float(avail.iloc[0])
if stock_start <= 0:
continue
start_prices[(ticker, pos_type)] = stock_start
start_underlying[(ticker, pos_type)] = stock_start
costs[(ticker, pos_type)] = value
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = False
total_cost += value
if total_cost == 0:
continue
# Daily P&L relative to period start.
# Skip first day of subsequent periods (already recorded as last day
# of the prior period) to avoid duplicate boundary dates.
start_idx = 1 if i > 0 else 0
# Forward-fill: track last known option price so that gaps in
# option data don't cause positions to vanish mid-period.
last_opt = {k: v for k, v in start_prices.items()
if use_opt_px.get(k)}
for day_idx in range(start_idx, len(period_close)):
day = period_close.index[day_idx]
day_str = day.strftime('%Y-%m-%d')
period_pnl = 0
for (ticker, pos_type), value in exposure.items():
p0 = start_prices[(ticker, pos_type)]
if p0 == 0:
continue
if use_opt_px[(ticker, pos_type)]:
opt_key = _option_position_key(ticker, pos_type)
p1_val = quarter_opt.get(opt_key, {}).get(day_str)
if p1_val is not None:
last_opt[(ticker, pos_type)] = p1_val
else:
p1_val = last_opt.get((ticker, pos_type))
if p1_val is None:
continue
underlying_p0 = start_underlying.get((ticker, pos_type))
if not underlying_p0 or underlying_p0 <= 0:
continue
position_pnl = value * (float(p1_val) - p0) / underlying_p0
else:
if ticker not in period_close.columns:
continue
p1_val = period_close[ticker].iloc[day_idx]
if pd.isna(p1_val):
continue
stock_ret = (float(p1_val) - p0) / p0
if mode == 'equity_only':
position_pnl = (
value * _linear_underlying_sign(pos_type) * stock_ret)
else:
position_pnl = value * stock_ret
period_pnl += position_pnl
dates_out.append(day)
values_out.append(cum_growth * (1 + period_pnl / total_cost))
# Chain: next period starts from the last day's growth factor
if values_out:
cum_growth = values_out[-1]
return dates_out, values_out
import plotly.graph_objects as go
HUGO_BASE = os.path.expanduser('~/My Drive/repos/stafforini.com')
# ── Fetch daily prices ────────────────────────────────────────────
close = download_daily(all_tickers, first_date, today)
dates_eq, vals_eq = daily_cumulative(
holdings, quarters, filing_dates, close, today, 'equity_only')
option_positions = sorted({
(t, pt)
for q in quarters
for (t, pt) in holdings[q]
if pt in ('call', 'put')
})
per_period_opt = download_option_prices(
option_positions, quarters, holdings, filing_dates, today)
dates_full, vals_full = daily_cumulative(
holdings, quarters, filing_dates, close, today, 'full',
per_period_opt=per_period_opt)
# ── Compute AIS benchmark ─────────────────────────────────────────
if 'AIS' in close.columns:
ais_series = close['AIS'].dropna()
ais_start = ais_series[ais_series.index >= pd.Timestamp(first_date)]
if not ais_start.empty:
ais_p0 = float(ais_start.iloc[0])
ais_dates = ais_start.index.tolist()
ais_vals = [float(p) / ais_p0 for p in ais_start.values]
else:
ais_dates, ais_vals = [], []
else:
ais_dates, ais_vals = [], []
# ── Plot with Plotly ───────────────────────────────────────────────
eq_pct = [round((v - 1) * 100, 1) for v in vals_eq]
full_pct = [round((v - 1) * 100, 1) for v in vals_full]
ais_pct = [round((v - 1) * 100, 1) for v in ais_vals]
fig = go.Figure()
fig.add_trace(go.Scatter(
x=dates_eq, y=eq_pct, mode='lines',
name='Equity proxy',
line=dict(color='#2563eb', width=2)))
fig.add_trace(go.Scatter(
x=dates_full, y=full_pct, mode='lines',
name='Option proxy',
line=dict(color='#dc2626', width=2)))
fig.add_trace(go.Scatter(
x=ais_dates, y=ais_pct, mode='lines',
name='AI ETF (AIS)',
line=dict(color='#9333ea', width=2, dash='dot')))
# Vertical lines at filing dates (rebalancing points)
for fd in filing_dates.values():
fig.add_vline(x=fd, line=dict(color='gray', width=0.5), opacity=0.4)
fig.add_hline(y=0, line=dict(color='gray', width=0.8))
fig.update_layout(
title=dict(text='SA LP copycat vs AIS (VistaShares AI ETF)',
font=dict(size=15)),
yaxis=dict(title='Cumulative return', hoverformat='+.1f',
ticksuffix='%'),
hovermode='x unified',
xaxis=dict(spikemode='across', spikethickness=0.5,
spikedash='solid', spikecolor='gray'),
template='plotly_white',
legend=dict(x=0.02, y=0.98, bgcolor='rgba(255,255,255,0.8)'),
margin=dict(l=60, r=20, t=50, b=40),
height=500,
)
# ── Generate HTML with dark-mode support ──────────────────────────
import re
chart_html = fig.to_html(full_html=False, include_plotlyjs='cdn',
config={'responsive': True, 'displayModeBar': False})
div_id = re.search(r'id="([^"]+)"', chart_html).group(1)
dark_script = """
<script>
(function() {
var gd = document.getElementById('%s');
function isDark() {
try { return parent.document.documentElement.getAttribute('data-theme') === 'dark'; }
catch(e) { return window.matchMedia('(prefers-color-scheme: dark)').matches; }
}
function apply() {
var dk = isDark();
Plotly.relayout(gd, {
paper_bgcolor: 'rgba(0,0,0,0)',
plot_bgcolor: dk ? 'rgba(30,30,30,0.5)' : 'rgba(255,255,255,0.8)',
font: {color: dk ? '#d4d4d4' : '#333'},
'title.font.color': dk ? '#d4d4d4' : '#333',
'xaxis.gridcolor': dk ? 'rgba(255,255,255,0.1)' : 'rgba(0,0,0,0.1)',
'yaxis.gridcolor': dk ? 'rgba(255,255,255,0.1)' : 'rgba(0,0,0,0.1)',
'legend.bgcolor': dk ? 'rgba(30,30,30,0.8)' : 'rgba(255,255,255,0.8)',
'legend.font.color': dk ? '#d4d4d4' : '#333',
});
}
apply();
new MutationObserver(function() { apply(); }).observe(
parent.document.documentElement, {attributes: true, attributeFilter: ['data-theme']});
})();
</script>""" % div_id
outpath = os.path.join(HUGO_BASE, 'static', 'images', 'sa-lp-returns-ais.html')
with open(outpath, 'w') as f:
f.write('<!DOCTYPE html>\n<html>\n<head><meta charset="utf-8">\n'
'<meta http-equiv="Cache-Control" content="no-cache, no-store, must-revalidate">\n'
'<style>body { margin: 0; background: transparent; }</style>\n'
'</head>\n<body>\n' + chart_html + dark_script +
'\n</body>\n</html>')
# ── Risk-adjusted comparison vs AIS ───────────────────────────────
def _sharpe_ais(series, rf_annual=0.04):
if series.empty:
return float('nan')
rf_daily = (1 + rf_annual) ** (1 / 252) - 1
excess = series - rf_daily
if excess.std() == 0 or pd.isna(excess.std()):
return float('nan')
return float(excess.mean() / excess.std() * 252 ** 0.5)
def _max_dd_ais(series):
if series.empty:
return float('nan')
cum = (1 + series).cumprod()
return float(((cum - cum.cummax()) / cum.cummax()).min() * 100)
def _ret_series_ais(values, dates):
if not values:
return pd.Series(dtype=float)
g = pd.Series(values, index=dates)
return g.pct_change().dropna()
ret_eq_d = _ret_series_ais(vals_eq, dates_eq)
ret_full_d = _ret_series_ais(vals_full, dates_full)
ret_ais_d = (pd.Series(ais_vals, index=ais_dates).pct_change().dropna()
if ais_vals else pd.Series(dtype=float))
cum_eq_ret = (vals_eq[-1] - 1) if vals_eq else float('nan')
cum_full_ret = (vals_full[-1] - 1) if vals_full else float('nan')
cum_ais_ret = (ais_vals[-1] - 1) if ais_vals else float('nan')
print("COPYCAT vs AIS")
print("=" * 55)
print(f"{'Metric':<25} {'Eq.proxy':>9} {'Opt.proxy':>9} {'AIS':>9}")
print("-" * 55)
print(f"{'Cum. return':<25} "
f"{fmt(cum_eq_ret):>9} {fmt(cum_full_ret):>9} {fmt(cum_ais_ret):>9}")
print(f"{'Ann. volatility':<25} "
f"{ret_eq_d.std() * 252 ** 0.5 * 100:>8.1f}% "
f"{ret_full_d.std() * 252 ** 0.5 * 100:>8.1f}% "
f"{ret_ais_d.std() * 252 ** 0.5 * 100:>8.1f}%")
print(f"{'Sharpe (rf=4%)':<25} "
f"{_sharpe_ais(ret_eq_d):>9.2f} "
f"{_sharpe_ais(ret_full_d):>9.2f} "
f"{_sharpe_ais(ret_ais_d):>9.2f}")
print(f"{'Max drawdown':<25} "
f"{_max_dd_ais(ret_eq_d):>8.1f}% "
f"{_max_dd_ais(ret_full_d):>8.1f}% "
f"{_max_dd_ais(ret_ais_d):>8.1f}%")
COPYCAT vs AIS
=======================================================
Metric Eq.proxy Opt.proxy AIS
-------------------------------------------------------
Cum. return +110.14% +157.89% +144.04%
Ann. volatility 52.5% 61.5% 37.1%
Sharpe (rf=4%) 1.36 1.52 2.07
Max drawdown -45.8% -45.8% -32.2%
The results are striking: over the available window, AIS roughly matches the option-proxy copycat on cumulative return, with materially better Sharpe and a smaller drawdown; the equity-proxy copycat trails AIS on all three measures. This substantially weakens the case for the copycat. Still, with little more than a year of data, the backtest is far from conclusive, and theoretical considerations—such as confidence in SA’s specific stock-picking thesis—may still favor the copycat. Alternatively, one may decide to hold both, treating the copycat and AIS as complementary positions for diversification.
Copycat delays
Between one filing and the next (~90 days), the copycat holds a fixed portfolio while the fund’s actual portfolio evolves continuously. We only observe the fund’s positions at quarter-end snapshots; its actual holdings between snapshots are unknown. Furthermore, these snapshots are not published immediately, but after 45 days or so. These two delays—between the fund’s quarterly rebalance and quarter-end, and between quarter-end and filing date—create a gap where the copycat’s holdings are stale compared to the fund’s actual positions.
Let \(Q_i\) denote the fund’s disclosed portfolio at the end of quarter \(i\), with quarter-end date \(T_i\) and filing date \(F_i \approx T_i + 45\) days. We model the fund as switching from \(Q_{i-1}\) to \(Q_i\) on a single trading day \(s_i\) drawn uniformly from quarter \(i\).4 The copycat is deterministic: it starts when the first filing becomes public, holding \(Q_0\) from \(F_0\) until \(F_1\), then switches to \(Q_j\) at each subsequent filing date \(F_j\). We compute the expected ratio \(E[V_{\text{fund}} / V_{\text{copy}}] - 1\) over the feasible analysis window \([F_0, F_{n-1}]\), averaging over the independent switch days \(s_1, \ldots, s_{n-1}\).
Under the single-switch assumption the fund’s value decomposes into one factor per held portfolio, where \(V(P, a, b)\) is the buy-and-hold return factor for portfolio \(P\) from \(a\) to \(b\). Because \(F_0\) falls inside the first post-\(Q_0\) quarter, the first factor conditions on whether the fund had already switched from \(Q_0\) to \(Q_1\) before the copycat could start. Adjacent factors share a switch variable, so we marginalise them out via a forward chain of expectations—one switch per step—rather than a single nested sum.5
Code
import json
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
import requests
import time
import os
import warnings
warnings.filterwarnings('ignore')
# Parse data from the scraper block
parsed = json.loads(data) if isinstance(data, str) else data
filings = parsed["filings"]
# Build internal structures
filing_dates = {f["quarter"]: f["filing_date"] for f in filings}
quarter_end_dates = {f["quarter"]: f["quarter_end"] for f in filings}
quarters = [f["quarter"] for f in filings]
# Convert holdings list to dict keyed by quarter.
# Multiple positions in the same ticker with different types are aggregated
# by value per (ticker, type) pair.
holdings = {}
for f in filings:
positions = {}
for h in f["holdings"]:
ticker = h["ticker"]
pos_type = h["type"]
value = h["value"]
key = (ticker, pos_type)
positions[key] = positions.get(key, 0) + value
holdings[f["quarter"]] = positions
def _extract_close_series(df, ticker):
"""Extract a single close-price series from a yfinance result."""
if df.empty:
return pd.Series(dtype=float)
if isinstance(df.columns, pd.MultiIndex):
if 'Close' not in df.columns.get_level_values(0):
return pd.Series(dtype=float)
close = df['Close']
if isinstance(close, pd.DataFrame):
if ticker in close.columns:
series = close[ticker]
elif len(close.columns) == 1:
series = close.iloc[:, 0]
else:
return pd.Series(dtype=float)
else:
series = close
elif 'Close' in df.columns:
series = df['Close']
if isinstance(series, pd.DataFrame):
series = series.iloc[:, 0]
else:
return pd.Series(dtype=float)
return pd.to_numeric(series, errors='coerce').dropna()
def _download_close_series(ticker, start, end):
"""Download one ticker's close series; used to repair flaky batch misses."""
df = yf.download(ticker, start=start, end=end, progress=False,
auto_adjust=True)
return _extract_close_series(df, ticker)
def get_prices(tickers, dates):
"""Fetch close prices for tickers on specific dates."""
unique_tickers = sorted(set(tickers))
all_dates = [datetime.strptime(d, '%Y-%m-%d') for d in dates]
start = min(all_dates) - timedelta(days=5)
end = max(all_dates) + timedelta(days=5)
df = yf.download(unique_tickers, start=start, end=end, progress=False, auto_adjust=True)
# yf.download returns MultiIndex columns (metric, ticker) for multiple tickers
if df.empty:
close = pd.DataFrame()
elif isinstance(df.columns, pd.MultiIndex) and 'Close' in df.columns.get_level_values(0):
close = df['Close'].copy()
elif 'Close' in df.columns:
close = df[['Close']].copy()
close.columns = unique_tickers
else:
close = pd.DataFrame()
prices = {}
for ticker in unique_tickers:
if ticker in close.columns:
series = pd.to_numeric(close[ticker], errors='coerce').dropna()
else:
series = pd.Series(dtype=float)
if series.empty:
series = _download_close_series(ticker, start, end)
if series.empty:
continue
prices[ticker] = {}
for date_str in dates:
target = pd.Timestamp(datetime.strptime(date_str, '%Y-%m-%d'))
after = series[series.index >= target]
if not after.empty:
prices[ticker][date_str] = float(after.iloc[0])
else:
before = series[series.index <= target]
if not before.empty:
prices[ticker][date_str] = float(before.iloc[-1])
return prices
def _price_on_or_after(px_by_date, target_date):
"""Return (date, price) for the first available price on/after target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d >= target_date)
if not dates:
return None
d = dates[0]
return d, px_by_date[d]
def _price_on_or_before(px_by_date, target_date):
"""Return (date, price) for the last available price on/before target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d <= target_date)
if not dates:
return None
d = dates[-1]
return d, px_by_date[d]
def _period_price_pair(px_by_date, start_date, end_date):
"""Return start/end prices for a period using sensible boundary alignment."""
start = _price_on_or_after(px_by_date, start_date)
end = _price_on_or_before(px_by_date, end_date)
if start is None or end is None:
return None
start_actual, p0 = start
end_actual, p1 = end
if end_actual < start_actual:
return None
return start_actual, end_actual, p0, p1
def _option_position_key(ticker, pos_type):
return (ticker, pos_type)
def _linear_underlying_sign(pos_type):
"""Direction when option rows are converted to underlying equity exposure."""
return -1 if pos_type == 'put' else 1
def compute_return(positions, prices, start_date, end_date, mode='equity_only',
option_prices=None):
"""Compute portfolio return between two dates.
The 13F value for an option row is treated as underlying notional, not
option premium. Option contracts are sized from that notional, but the
portfolio denominator is estimated deployed capital: stock value plus option
premium cost. This avoids treating the gap between option notional and
option premium as cash. In 'full' mode, every option row requires a
MarketData price series; missing data raises rather than falling back.
"""
total_cost = 0
portfolio_pnl = 0
for (ticker, pos_type), value in positions.items():
is_option = pos_type in ('call', 'put')
stock_px = prices.get(ticker)
if mode == 'equity_only':
if pos_type not in ('long', 'call', 'put'):
continue
pair = _period_price_pair(stock_px, start_date, end_date)
if pair is None:
continue
start_actual, end_actual, p0, p1 = pair
if p0 == 0:
continue
stock_ret = (p1 - p0) / p0
total_cost += value
portfolio_pnl += value * _linear_underlying_sign(pos_type) * stock_ret
continue
if is_option:
opt_key = _option_position_key(ticker, pos_type)
opt_px = option_prices.get(opt_key) if option_prices else None
if not opt_px:
raise RuntimeError(
f"No MarketData option prices for {opt_key} in period "
f"{start_date}..{end_date}")
pair = _period_price_pair(opt_px, start_date, end_date)
if pair is None:
raise RuntimeError(
f"MarketData option price series for {opt_key} does not "
f"cover {start_date}..{end_date}")
start_actual, end_actual, opt_p0, opt_p1 = pair
stock_start = _price_on_or_after(stock_px, start_actual)
if stock_start is None or stock_start[1] <= 0:
stock_start = _price_on_or_after(stock_px, start_date)
if stock_start is None or stock_start[1] <= 0:
raise RuntimeError(
f"No underlying price for {ticker} at {start_date}")
p0, p1 = opt_p0, opt_p1
underlying_p0 = stock_start[1]
if p0 <= 0 or underlying_p0 <= 0:
continue
position_cost = value * (p0 / underlying_p0)
position_pnl = value * ((p1 - p0) / underlying_p0)
else:
pair = _period_price_pair(stock_px, start_date, end_date)
if pair is None:
continue
start_actual, end_actual, p0, p1 = pair
if p0 == 0:
continue
stock_ret = (p1 - p0) / p0
position_cost = value
position_pnl = value * stock_ret
if position_cost <= 0:
continue
total_cost += position_cost
portfolio_pnl += position_pnl
return portfolio_pnl / total_cost if total_cost else None
def annualize(ret, days):
"""Annualize a return over a given number of calendar days."""
if ret is None or days <= 0:
return None
return (1 + ret) ** (365.25 / days) - 1
def fmt(ret):
return f"{ret * 100:+.2f}%" if ret is not None else "N/A"
# Collect all tickers and dates
all_tickers = set()
for positions in holdings.values():
for (ticker, _) in positions:
all_tickers.add(ticker)
all_tickers.add('SPY')
all_tickers.add('AIS')
today = datetime.now().strftime('%Y-%m-%d')
first_date = filing_dates[quarters[0]]
all_dates = set(filing_dates.values()) | set(quarter_end_dates.values()) | {today}
prices = get_prices(sorted(all_tickers), sorted(all_dates))
# Resolve `today` to the actual last available closing date.
# yfinance may not have data for today (market still open or holiday),
# so we look up what date SPY's price actually corresponds to.
def _resolve_price_date(prices, requested_date):
"""Return the actual trading date of the price stored under requested_date."""
ref = 'SPY' if 'SPY' in prices else next(iter(prices), None)
if not ref or requested_date not in prices[ref]:
return requested_date
target_price = prices[ref][requested_date]
# Re-download a small window to find the real date of this price
start = datetime.strptime(requested_date, '%Y-%m-%d') - timedelta(days=10)
end = datetime.strptime(requested_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(ref, start=start, end=end, progress=False, auto_adjust=True)
if df.empty:
return requested_date
if isinstance(df.columns, pd.MultiIndex):
close = df['Close'][ref].dropna()
elif 'Close' in df.columns:
close = df['Close'].dropna()
else:
close = df.iloc[:, 0].dropna()
for dt, px in close.items():
val = float(px.iloc[0]) if isinstance(px, pd.Series) else float(px)
if abs(val - target_price) < 0.01:
ts = dt[0] if isinstance(dt, tuple) else dt
return pd.Timestamp(ts).strftime('%Y-%m-%d')
return requested_date
today_resolved = _resolve_price_date(prices, today)
if today_resolved != today:
for ticker in prices:
if today in prices[ticker]:
prices[ticker][today_resolved] = prices[ticker].pop(today)
today = today_resolved
def download_daily(tickers, start_date, end_date):
"""Download daily close prices from yfinance, handling MultiIndex.
Dates are 'YYYY-MM-DD' strings. Adds a small buffer for trading-day alignment."""
tickers_sorted = sorted(tickers)
start = datetime.strptime(start_date, '%Y-%m-%d') - timedelta(days=5)
end = datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(tickers_sorted, start=start, end=end,
progress=False, auto_adjust=True)
if df.empty:
close = pd.DataFrame()
elif isinstance(df.columns, pd.MultiIndex) and 'Close' in df.columns.get_level_values(0):
close = df['Close'].copy()
elif 'Close' in df.columns:
close = df[['Close']].copy()
close.columns = tickers_sorted
else:
close = pd.DataFrame()
for ticker in tickers_sorted:
if ticker in close.columns and not close[ticker].dropna().empty:
continue
series = _download_close_series(ticker, start, end)
if not series.empty:
close[ticker] = series
return close.sort_index()
# -- Historical option prices via MarketData --------------------------------
OPTION_CACHE_DIR = os.path.expanduser('~/My Drive/notes/.sa-lp-option-cache')
_MD_BASE = 'https://api.marketdata.app/v1'
_MD_RATE_DELAY = 0.15
OPTION_CACHE_COLUMNS = [
'date', 'selected_on', 'option_type', 'symbol', 'strike', 'expiry',
'delta', 'price']
# Default contract selection parameters. The option proxy picks a contract
# matching option type, with expiry between min_days and max_days of the
# period start, and |delta| closest to delta_target. When the chain is
# sparse, the achieved |delta| may be far from the target; the sensitivity
# block reports achieved |delta| so this is visible rather than silent.
OPTION_DELTA = 0.15
EXPIRY_MIN_DAYS = 270 # ~9 months
EXPIRY_MAX_DAYS = 456 # ~15 months
def _normalize_option_type(option_type):
option_type = str(option_type).lower()
if option_type not in ('call', 'put'):
raise ValueError(f"Unsupported option type: {option_type}")
return option_type
def _empty_option_cache():
return pd.DataFrame(columns=OPTION_CACHE_COLUMNS)
def _option_cache_path(ticker, option_type, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Return the cache CSV path for (ticker, type, delta_target, window).
When the parameter triple equals the baseline (0.15, 270-456 days), the
historical filename ``TICKER-TYPE.csv`` is used so the main-backtest
cache is reused automatically. Any non-baseline combo lives in a
separate ``TICKER-TYPE-d<delta>-e<min>-<max>.csv`` file so a sensitivity
sweep never pollutes the baseline cache (which the portfolio calculator
reads to pick the representative contract for the current filing).
"""
option_type = _normalize_option_type(option_type)
is_baseline = (
abs(delta_target - OPTION_DELTA) < 1e-9
and min_days == EXPIRY_MIN_DAYS
and max_days == EXPIRY_MAX_DAYS)
if is_baseline:
return os.path.join(OPTION_CACHE_DIR, f'{ticker}-{option_type}.csv')
return os.path.join(
OPTION_CACHE_DIR,
f'{ticker}-{option_type}-d{delta_target:g}-e{min_days}-{max_days}.csv')
def _load_option_cache(ticker, option_type, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Load cached MarketData rows for a ticker/type/target/window. Returns DataFrame or empty."""
option_type = _normalize_option_type(option_type)
path = _option_cache_path(ticker, option_type, delta_target,
min_days, max_days)
if not os.path.exists(path):
return _empty_option_cache()
df = pd.read_csv(path)
if df.empty:
return _empty_option_cache()
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
for col in ('date', 'selected_on'):
df[col] = pd.to_datetime(
df[col], errors='coerce').dt.strftime('%Y-%m-%d')
df['option_type'] = df['option_type'].fillna(option_type).str.lower()
cache = df[OPTION_CACHE_COLUMNS].copy()
cache = cache[cache['option_type'] == option_type].copy()
cache.dropna(subset=['date'], inplace=True)
for col in ('strike', 'delta', 'price'):
cache[col] = pd.to_numeric(cache[col], errors='coerce')
cache.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
return cache[OPTION_CACHE_COLUMNS]
def _save_option_cache(ticker, option_type, df, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Persist typed option cache to CSV."""
option_type = _normalize_option_type(option_type)
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
path = _option_cache_path(ticker, option_type, delta_target,
min_days, max_days)
if df.empty:
df = _empty_option_cache()
else:
df = df.copy()
df['option_type'] = option_type
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
df.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
df.sort_values(['date', 'expiry', 'strike'], inplace=True)
df.to_csv(path, index=False)
def _contract_window(ref_date_str, min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
ref = datetime.strptime(ref_date_str, '%Y-%m-%d')
return ref + timedelta(days=min_days), ref + timedelta(days=max_days)
def _contract_from_cache_row(row, ref_date_str, option_type,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
option_type = _normalize_option_type(option_type)
if str(row.get('option_type', option_type)).lower() != option_type:
return None
lo, hi = _contract_window(ref_date_str, min_days, max_days)
try:
exp = datetime.strptime(str(row['expiry']), '%Y-%m-%d')
except (KeyError, TypeError, ValueError):
return None
if not (lo <= exp <= hi):
return None
strike = _safe_float(row.get('strike'))
delta = _safe_float(row.get('delta'))
price = _safe_float(row.get('price'))
if strike is None or delta is None or price is None or price <= 0:
return None
return {
'selected_on': row.get('selected_on'),
'option_type': option_type,
'symbol': row.get('symbol'),
'strike': strike,
'expiry': str(row['expiry']),
'delta': delta,
'price': price,
}
def _select_cached_contract(cache, option_type, ref_date_str,
delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS,
require_selected=False):
rows = cache[(cache['date'] == ref_date_str)
& (cache['option_type'] == option_type)]
selected_rows = rows[rows['selected_on'] == ref_date_str]
if not selected_rows.empty:
rows = selected_rows
elif require_selected:
rows = selected_rows
candidates = []
for _, row in rows.iterrows():
contract = _contract_from_cache_row(row, ref_date_str, option_type,
min_days, max_days)
if contract:
candidates.append(contract)
if not candidates:
return None
candidates.sort(key=lambda x: abs(abs(x['delta']) - delta_target))
return candidates[0]
def _parse_option_price(contract):
"""Extract a mark price from an option contract record."""
mid = _safe_float(contract.get('mid'))
if mid and mid > 0:
return mid
bid = _safe_float(contract.get('bid'))
ask = _safe_float(contract.get('ask'))
last = _safe_float(contract.get('last'))
if bid and ask and bid > 0 and ask > 0:
return (bid + ask) / 2
if last and last > 0:
return last
return None
def _safe_float(val):
try:
out = float(val)
if np.isnan(out):
return None
return out
except (TypeError, ValueError):
return None
def _marketdata_key():
"""Return the MarketData API key, or None if unavailable.
Resolution order:
1. ``MARKETDATA_KEY`` / ``MARKETDATA_API_KEY`` environment variables.
2. ``pass env/marketdata-token`` (local ``pass`` store).
The result is memoised on the function object so repeated lookups
during a sweep do not reshell. Fetch helpers raise themselves when
called without a key, so a fully cached run still succeeds without
requiring either source.
"""
if hasattr(_marketdata_key, '_cached'):
return _marketdata_key._cached
key = (os.environ.get('MARKETDATA_KEY', '')
or os.environ.get('MARKETDATA_API_KEY', ''))
if not key:
try:
import subprocess
out = subprocess.run(
['pass', 'show', 'env/marketdata-token'],
capture_output=True, text=True, timeout=5, check=False)
if out.returncode == 0:
key = out.stdout.strip().splitlines()[0] if out.stdout else ''
except (FileNotFoundError, subprocess.TimeoutExpired):
key = ''
_marketdata_key._cached = key or None
return _marketdata_key._cached
def _marketdata_get(path, params, api_key):
"""Fetch a MarketData endpoint, returning normalized row dictionaries.
Raises on HTTP errors or a non-'ok' status. 'no_data' is returned as
an empty list so that callers can distinguish 'nothing available' from
'request failed'.
"""
headers = {'Accept': 'application/json', 'Authorization': f'Bearer {api_key}'}
resp = requests.get(_MD_BASE + path, params=params, headers=headers,
timeout=30)
resp.raise_for_status()
body = resp.json()
status = body.get('s')
if status == 'no_data':
return []
if status != 'ok':
raise RuntimeError(
f"MarketData {path} returned status={status!r}: "
f"{body.get('errmsg') or body}")
lengths = [len(v) for v in body.values() if isinstance(v, list)]
n = max(lengths) if lengths else 0
rows = []
for i in range(n):
row = {}
for key, val in body.items():
if isinstance(val, list):
row[key] = val[i] if i < len(val) else None
else:
row[key] = val
rows.append(row)
return rows
def _marketdata_date(timestamp):
try:
return datetime.utcfromtimestamp(int(timestamp)).strftime('%Y-%m-%d')
except (TypeError, ValueError, OSError):
return None
def _occ_symbol(ticker, option_type, strike, expiry):
"""Build a standard OCC option symbol from contract fields."""
cp = 'C' if _normalize_option_type(option_type) == 'call' else 'P'
exp = datetime.strptime(str(expiry), '%Y-%m-%d').strftime('%y%m%d')
strike_int = int(round(float(strike) * 1000))
root = ticker.upper().replace('.', '')
return f'{root}{exp}{cp}{strike_int:08d}'
# Chains are always fetched with a broad expiry window so they can be cached
# and reused for in-memory selection across any (delta_target, expiry window)
# combination in the sensitivity sweep.
CHAIN_FETCH_MIN_DAYS = 30
CHAIN_FETCH_MAX_DAYS = 760
def _fetch_marketdata_chain(ticker, date_str, option_type, api_key,
min_days=CHAIN_FETCH_MIN_DAYS,
max_days=CHAIN_FETCH_MAX_DAYS):
lo, hi = _contract_window(date_str, min_days, max_days)
params = {
'date': date_str,
'from': lo.strftime('%Y-%m-%d'),
'to': hi.strftime('%Y-%m-%d'),
'side': _normalize_option_type(option_type),
'expiration': 'all',
}
return _marketdata_get(f'/options/chain/{ticker}/', params, api_key)
# Chain cache: one CSV per (ticker, type, date) storing the broad-window chain.
# Lets the sensitivity sweep re-select contracts for different delta targets
# and expiry windows without refetching.
CHAIN_CACHE_DIR = os.path.join(OPTION_CACHE_DIR, 'chains')
def _chain_cache_path(ticker, option_type, date_str):
option_type = _normalize_option_type(option_type)
return os.path.join(CHAIN_CACHE_DIR,
f'{ticker}-{option_type}-{date_str}.csv')
def _load_chain_cache(ticker, option_type, date_str):
path = _chain_cache_path(ticker, option_type, date_str)
if not os.path.exists(path):
return None
df = pd.read_csv(path)
if df.empty:
return []
return df.to_dict('records')
def _save_chain_cache(ticker, option_type, date_str, chain):
if not chain:
return
os.makedirs(CHAIN_CACHE_DIR, exist_ok=True)
path = _chain_cache_path(ticker, option_type, date_str)
pd.DataFrame(chain).to_csv(path, index=False)
def _get_or_fetch_chain(ticker, date_str, option_type, api_key,
fetched_counter=None):
"""Return the cached broad chain for (ticker, type, date), fetching if absent.
Requires ``api_key`` only when a fetch is actually needed.
"""
chain = _load_chain_cache(ticker, option_type, date_str)
if chain is not None:
return chain
if not api_key:
raise RuntimeError(
"MARKETDATA_KEY is not set but a chain fetch is required for "
f"{ticker} {option_type} on {date_str}.")
time.sleep(_MD_RATE_DELAY)
chain = _fetch_marketdata_chain(ticker, date_str, option_type, api_key)
if fetched_counter is not None:
fetched_counter['marketdata_chains'] += 1
_save_chain_cache(ticker, option_type, date_str, chain)
return chain
def _fetch_marketdata_quotes(symbol, start_date, end_date, api_key):
to_date = (datetime.strptime(end_date, '%Y-%m-%d')
+ timedelta(days=1)).strftime('%Y-%m-%d')
rows = _marketdata_get(f'/options/quotes/{symbol}/',
{'from': start_date, 'to': to_date}, api_key)
prices = {}
for row in rows:
date_str = _marketdata_date(row.get('updated'))
if not date_str:
continue
price = _parse_option_price(row)
if price is not None and price > 0:
prices[date_str] = price
return prices
def _implied_vol_from_price(S, K, T, option_price, option_type):
"""Infer Black-Scholes volatility from an observed option mid price."""
if any(x is None for x in (S, K, T, option_price)):
return None
if S <= 0 or K <= 0 or T <= 0 or option_price <= 0:
return None
intrinsic = max(S - K, 0) if option_type == 'call' else max(K - S, 0)
upper = S if option_type == 'call' else K
if option_price < intrinsic - 1e-6 or option_price > upper * 1.5:
return None
lo, hi = 1e-4, 5.0
try:
if (option_price < bs_price(S, K, T, lo, option_type) - 1e-4
or option_price > bs_price(S, K, T, hi, option_type) + 1e-4):
return None
for _ in range(80):
mid = (lo + hi) / 2
if bs_price(S, K, T, mid, option_type) < option_price:
lo = mid
else:
hi = mid
return (lo + hi) / 2
except (FloatingPointError, ValueError, ZeroDivisionError):
return None
def _marketdata_delta(row, ref_date_str, expiry, option_type, price):
"""Use vendor delta when present; otherwise infer it from the quote."""
native = _safe_float(row.get('delta'))
if native is not None and native != 0:
return native
S = _safe_float(row.get('underlyingPrice'))
K = _safe_float(row.get('strike'))
ref = datetime.strptime(ref_date_str, '%Y-%m-%d')
exp = datetime.strptime(expiry, '%Y-%m-%d')
T = max((exp - ref).days / 365.25, 1e-6)
sigma = _safe_float(row.get('iv'))
if sigma is None or sigma <= 0:
sigma = _implied_vol_from_price(S, K, T, price, option_type)
if S is None or K is None or sigma is None or sigma <= 0:
return None
return bs_delta(S, K, T, sigma, option_type)
def _select_marketdata_contract(chain, ref_date_str, option_type,
delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
option_type = _normalize_option_type(option_type)
lo, hi = _contract_window(ref_date_str, min_days, max_days)
candidates = []
for c in chain:
if str(c.get('side', '')).lower() != option_type:
continue
expiry = _marketdata_date(c.get('expiration'))
if not expiry:
continue
exp = datetime.strptime(expiry, '%Y-%m-%d')
if not (lo <= exp <= hi):
continue
price = _parse_option_price(c)
if price is None or price <= 0:
continue
delta = _marketdata_delta(c, ref_date_str, expiry, option_type, price)
if delta is None or delta == 0:
continue
strike = _safe_float(c.get('strike'))
symbol = c.get('optionSymbol')
if strike is None or not symbol:
continue
candidates.append({
'option_type': option_type,
'symbol': symbol,
'strike': strike,
'expiry': expiry,
'delta': delta,
'price': price,
})
if not candidates:
return None
candidates.sort(key=lambda x: abs(abs(x['delta']) - delta_target))
return candidates[0]
def download_option_prices(option_positions, quarters, holdings, filing_dates,
today, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
"""Download historical representative option prices from MarketData.
MarketData is the sole supported provider. MARKETDATA_KEY must be set.
For each (ticker, option_type) and each filing period in which that
position is held:
1. On the first trading day, select a contract matching type, with
expiry between ``min_days`` and ``max_days`` of the period start, and
|delta| closest to ``delta_target``. MarketData's Starter plan often
returns null Greeks, so delta is inferred from the observed mid price
via Black-Scholes when the vendor delta is missing.
2. Lock in that contract for the period.
3. Track its historical mid price through the period.
The broad option chain for each (ticker, type, first_day) is cached to
disk so that sensitivity sweeps over (delta_target, expiry window) reuse
a single fetch.
Raises ``RuntimeError`` if no suitable contract can be selected for any
required (ticker, type, period), or if MarketData returns no price series
for the selected contract.
Parameters
----------
delta_target : float
Target |delta| for contract selection (default ``OPTION_DELTA``).
min_days, max_days : int
Contract expiry window in days from period start (default 270-456,
i.e. 9-15 months).
Returns
-------
per_period : dict {quarter_str: {(ticker, type): {date_str: float}}}
Option prices keyed by filing period then option position. Each
period has its own contract's prices.
"""
option_positions = sorted({
(ticker, _normalize_option_type(pos_type))
for ticker, pos_type in option_positions})
md_key = _marketdata_key()
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
per_period = {} # {q: {(ticker, type): {date_str: price}}}
fetched = {'marketdata_chains': 0, 'marketdata_quotes': 0}
for ticker, option_type in option_positions:
opt_key = _option_position_key(ticker, option_type)
cache = _load_option_cache(ticker, option_type, delta_target,
min_days, max_days)
new_rows = []
for i, q in enumerate(quarters):
# Skip quarters where this exact option position is absent.
if opt_key not in holdings[q]:
continue
period_start = filing_dates[q]
period_end = (filing_dates[quarters[i + 1]]
if i < len(quarters) - 1 else today)
trading_days = pd.bdate_range(period_start, period_end)
if len(trading_days) == 0:
continue
first_day = trading_days[0].strftime('%Y-%m-%d')
# -- Select contract on first trading day --
contract = _select_cached_contract(
cache, option_type, first_day,
delta_target=delta_target,
min_days=min_days, max_days=max_days,
require_selected=True)
if contract is None:
chain = _get_or_fetch_chain(
ticker, first_day, option_type, md_key, fetched)
contract = _select_marketdata_contract(
chain, first_day, option_type,
delta_target=delta_target,
min_days=min_days, max_days=max_days)
if contract is None:
raise RuntimeError(
f"MarketData returned no usable {option_type} contract "
f"for {ticker} on {first_day} (period {q}) at "
f"delta={delta_target}, "
f"expiry {min_days}-{max_days}d")
new_rows.append({
'date': first_day,
'selected_on': first_day,
'option_type': option_type,
'symbol': contract.get('symbol'),
'strike': contract['strike'],
'expiry': contract['expiry'],
'delta': contract['delta'],
'price': contract['price'],
})
strike = contract['strike']
expiry = contract['expiry']
symbol = contract.get('symbol') or _occ_symbol(
ticker, option_type, strike, expiry)
# -- Collect prices for this period (fresh dict per period) --
period_prices = {}
# Fast path: read matching prices from cache.
rows = cache[
(cache['date'] >= period_start)
& (cache['date'] <= period_end)
& (cache['option_type'] == option_type)
& (abs(cache['strike'] - strike) < 0.01)
& (cache['expiry'].astype(str) == str(expiry))
& pd.notna(cache['price'])]
selected_rows = rows[rows['selected_on'] == first_day]
if not selected_rows.empty:
rows = selected_rows
for _, row in rows.iterrows():
period_prices[row['date']] = float(row['price'])
# Decide whether to refresh quotes. With a key, refresh whenever
# the cached series does not reach period_end. Without a key,
# only fail if the cached series is empty; a slightly stale
# tail is acceptable for cache-only runs (e.g. sensitivity
# sweeps replaying the baseline contract).
has_partial = bool(period_prices)
reaches_end = has_partial and max(period_prices) >= period_end
if md_key and not reaches_end:
time.sleep(_MD_RATE_DELAY)
quote_prices = _fetch_marketdata_quotes(
symbol, period_start, period_end, md_key)
fetched['marketdata_quotes'] += 1
for day_str, price in quote_prices.items():
if period_start <= day_str <= period_end:
period_prices[day_str] = price
new_rows.append({
'date': day_str,
'selected_on': first_day,
'option_type': option_type,
'symbol': symbol,
'strike': strike,
'expiry': expiry,
'delta': contract['delta'],
'price': price,
})
if contract.get('price') and first_day not in period_prices:
period_prices[first_day] = contract['price']
elif not md_key and not has_partial:
raise RuntimeError(
"MARKETDATA_KEY is not set and no cached quotes exist "
f"for {symbol} in {period_start}..{period_end}.")
if not period_prices:
raise RuntimeError(
f"MarketData returned no quotes for {symbol} "
f"({opt_key}) in {period_start}..{period_end}")
per_period.setdefault(q, {})[opt_key] = period_prices
# Persist new data to cache
if new_rows:
new_df = pd.DataFrame(new_rows)
cache = pd.concat([cache, new_df], ignore_index=True)
cache.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
_save_option_cache(ticker, option_type, cache, delta_target,
min_days, max_days)
if any(fetched.values()):
import sys
parts = []
if fetched['marketdata_chains']:
parts.append(f"{fetched['marketdata_chains']} MarketData chains")
if fetched['marketdata_quotes']:
parts.append(f"{fetched['marketdata_quotes']} MarketData quote series")
print(f"[options] Fetched {', '.join(parts)}", file=sys.stderr)
return per_period
# -- Black-Scholes helpers (used only to infer delta when MarketData's
# Starter-plan historical Greeks are null; never to reprice returns) -----
from scipy.stats import norm as _norm
def bs_price(S, K, T, sigma, option_type='call'):
"""Black-Scholes option price (assumes zero risk-free rate and dividends)."""
if T <= 0 or sigma <= 0:
if option_type == 'call':
return max(S - K, 0)
return max(K - S, 0)
d1 = (np.log(S / K) + (sigma ** 2 / 2) * T) / (sigma * np.sqrt(T))
d2 = d1 - sigma * np.sqrt(T)
if option_type == 'call':
return S * _norm.cdf(d1) - K * _norm.cdf(d2)
return K * _norm.cdf(-d2) - S * _norm.cdf(-d1)
def bs_delta(S, K, T, sigma, option_type='call'):
"""Black-Scholes delta (assumes zero risk-free rate and dividends)."""
if T <= 0 or sigma <= 0:
if option_type == 'call':
return 1.0 if S > K else 0.0
return -1.0 if S < K else 0.0
d1 = (np.log(S / K) + (sigma ** 2 / 2) * T) / (sigma * np.sqrt(T))
if option_type == 'call':
return _norm.cdf(d1)
return _norm.cdf(d1) - 1
def daily_cumulative(holdings, quarters, filing_dates, close, today, mode,
per_period_opt=None):
"""Build a daily series of cumulative growth factors for a given mode.
For each filing period, stock shares and option contracts are fixed. In
equity-proxy mode, option rows are converted to linear underlying exposure:
calls are long underlying and puts are short underlying. In option-proxy
mode, option rows are sized by 13F underlying notional and returns come
from MarketData quotes; returns are divided by deployed capital (stock
value plus option premium cost). Option-proxy mode raises if MarketData
prices are missing for any required position.
"""
cum_growth = 1.0
dates_out = []
values_out = []
for i, q in enumerate(quarters):
period_start = filing_dates[q]
period_end = filing_dates[quarters[i + 1]] if i < len(quarters) - 1 else today
ps = pd.Timestamp(period_start)
pe = pd.Timestamp(period_end)
# Trading days in this period
mask = (close.index >= ps) & (close.index <= pe)
period_close = close[mask]
if period_close.empty:
continue
# Option prices for this period (keyed by (ticker, type) → prices)
quarter_opt = per_period_opt.get(q, {}) if per_period_opt else {}
# Determine starting prices, fixed exposure, and deployed capital.
positions = holdings[q]
exposure = {}
costs = {}
start_prices = {}
start_underlying = {}
use_opt_px = {} # track which positions use option prices
total_cost = 0
for (ticker, pos_type), value in positions.items():
is_option = pos_type in ('call', 'put')
opt_key = _option_position_key(ticker, pos_type)
if mode == 'equity_only':
if pos_type not in ('long', 'call', 'put'):
continue
if ticker not in close.columns:
continue
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
continue
stock_start = float(avail.iloc[0])
if stock_start <= 0:
continue
start_prices[(ticker, pos_type)] = stock_start
start_underlying[(ticker, pos_type)] = stock_start
costs[(ticker, pos_type)] = value
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = False
total_cost += value
continue
# mode == 'full' (option proxy)
if is_option:
if opt_key not in quarter_opt:
raise RuntimeError(
f"No MarketData option prices for {opt_key} in "
f"period {q}")
ticker_opt = quarter_opt[opt_key]
opt_dates = sorted(d for d in ticker_opt if d >= period_start)
if not opt_dates:
raise RuntimeError(
f"MarketData option prices for {opt_key} in period "
f"{q} contain no dates at or after {period_start}")
if ticker not in close.columns:
raise RuntimeError(
f"No underlying close series for {ticker}")
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
raise RuntimeError(
f"No underlying price for {ticker} at {period_start}")
opt_start = ticker_opt[opt_dates[0]]
underlying_start = float(avail.iloc[0])
if opt_start <= 0 or underlying_start <= 0:
raise RuntimeError(
f"Non-positive starting price for {opt_key} in "
f"period {q}")
start_prices[(ticker, pos_type)] = opt_start
start_underlying[(ticker, pos_type)] = underlying_start
costs[(ticker, pos_type)] = value * opt_start / underlying_start
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = True
total_cost += costs[(ticker, pos_type)]
continue
# Plain stock in full mode
if ticker not in close.columns:
continue
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
continue
stock_start = float(avail.iloc[0])
if stock_start <= 0:
continue
start_prices[(ticker, pos_type)] = stock_start
start_underlying[(ticker, pos_type)] = stock_start
costs[(ticker, pos_type)] = value
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = False
total_cost += value
if total_cost == 0:
continue
# Daily P&L relative to period start.
# Skip first day of subsequent periods (already recorded as last day
# of the prior period) to avoid duplicate boundary dates.
start_idx = 1 if i > 0 else 0
# Forward-fill: track last known option price so that gaps in
# option data don't cause positions to vanish mid-period.
last_opt = {k: v for k, v in start_prices.items()
if use_opt_px.get(k)}
for day_idx in range(start_idx, len(period_close)):
day = period_close.index[day_idx]
day_str = day.strftime('%Y-%m-%d')
period_pnl = 0
for (ticker, pos_type), value in exposure.items():
p0 = start_prices[(ticker, pos_type)]
if p0 == 0:
continue
if use_opt_px[(ticker, pos_type)]:
opt_key = _option_position_key(ticker, pos_type)
p1_val = quarter_opt.get(opt_key, {}).get(day_str)
if p1_val is not None:
last_opt[(ticker, pos_type)] = p1_val
else:
p1_val = last_opt.get((ticker, pos_type))
if p1_val is None:
continue
underlying_p0 = start_underlying.get((ticker, pos_type))
if not underlying_p0 or underlying_p0 <= 0:
continue
position_pnl = value * (float(p1_val) - p0) / underlying_p0
else:
if ticker not in period_close.columns:
continue
p1_val = period_close[ticker].iloc[day_idx]
if pd.isna(p1_val):
continue
stock_ret = (float(p1_val) - p0) / p0
if mode == 'equity_only':
position_pnl = (
value * _linear_underlying_sign(pos_type) * stock_ret)
else:
position_pnl = value * stock_ret
period_pnl += position_pnl
dates_out.append(day)
values_out.append(cum_growth * (1 + period_pnl / total_cost))
# Chain: next period starts from the last day's growth factor
if values_out:
cum_growth = values_out[-1]
return dates_out, values_out
first_qe = quarter_end_dates[quarters[0]]
last_fd = filing_dates[quarters[-1]]
dc = download_daily(sorted(all_tickers), first_qe, last_fd)
close_arr = {t: dc[t].to_numpy() for t in dc.columns}
def _date_idx(date_str):
return dc.index.get_loc(pd.Timestamp(date_str))
def hold_factor(positions, i0, i1):
"""Return 1 + buy-and-hold return of `positions` from index i0 to i1.
Weights follow 13F-reported values, i.e. rebalanced to those
proportions at i0. Missing or zero-valuation entries contribute 0.
"""
if i0 >= i1:
return 1.0
total_value = 0.0
weighted_return = 0.0
for (ticker, pos_type), value in positions.items():
if pos_type not in ('long', 'call', 'put'):
continue
if ticker not in close_arr:
continue
p0 = close_arr[ticker][i0]
p1 = close_arr[ticker][i1]
if np.isnan(p0) or np.isnan(p1) or p0 == 0:
continue
r = (float(p1) - float(p0)) / float(p0)
sign = _linear_underlying_sign(pos_type)
total_value += value
weighted_return += value * sign * r
return 1.0 + (weighted_return / total_value if total_value else 0.0)
# ── Analytical end-to-end expected delay cost ───────────────────────
# The feasible copycat starts at the first public filing date F_0, not at
# T_0. Since F_0 falls inside the quarter when the fund may switch from
# Q_0 to Q_1, the left boundary integrates over whether s_1 has already
# happened by F_0. Later adjacent factors share a switch variable, so we
# marginalise via a forward chain of expectations, one switch per step.
n = len(quarters)
i_T = [_date_idx(quarter_end_dates[q]) for q in quarters]
i_F = [_date_idx(filing_dates[q]) for q in quarters]
# s_k uniform over trading days in quarter k, for k = 1..n-1.
s_doms = [list(range(i_T[k - 1], i_T[k])) for k in range(1, n)]
analysis_start = i_F[0]
if n < 2:
E_V_fund = 1.0
elif n == 2:
factors = []
for s1 in s_doms[0]:
if analysis_start <= s1:
factor = (
hold_factor(holdings[quarters[0]], analysis_start, s1)
* hold_factor(holdings[quarters[1]], s1, i_F[-1]))
else:
factor = hold_factor(
holdings[quarters[1]], analysis_start, i_F[-1])
factors.append(factor)
E_V_fund = float(np.mean(factors))
else:
# Left boundary: message over s_2 after integrating out s_1.
msg = []
for s2 in s_doms[1]:
factors = []
for s1 in s_doms[0]:
if analysis_start <= s1:
factor = (
hold_factor(holdings[quarters[0]], analysis_start, s1)
* hold_factor(holdings[quarters[1]], s1, s2))
else:
factor = hold_factor(
holdings[quarters[1]], analysis_start, s2)
factors.append(factor)
msg.append(float(np.mean(factors)))
msg = np.array(msg)
# Interior factors Q_k(s_k, s_{k+1}) for 2 <= k <= n-2.
for k in range(2, n - 1):
prev_s = s_doms[k - 1]
curr_s = s_doms[k]
F_k = np.array([[hold_factor(holdings[quarters[k]], sa, sb)
for sb in curr_s]
for sa in prev_s])
msg = (msg @ F_k) / len(prev_s)
# Right boundary.
F_last = np.array([hold_factor(holdings[quarters[-1]], s, i_F[-1])
for s in s_doms[-1]])
E_V_fund = float((msg * F_last).sum() / len(s_doms[-1]))
# Copycat: Q_0 on [F_0, F_1], Q_j on [F_j, F_{j+1}] for j = 1..n-2.
V_copy = hold_factor(holdings[quarters[0]], analysis_start, i_F[1])
for j in range(1, n - 1):
V_copy *= hold_factor(holdings[quarters[j]], i_F[j], i_F[j + 1])
delay_cost = E_V_fund / V_copy - 1.0
window_days = i_F[-1] - analysis_start
print("COPYCAT DELAY COST (equity proxy, uniform single-switch model)")
print("=" * 62)
print(f"Window: {filing_dates[quarters[0]]} to "
f"{filing_dates[quarters[-1]]} ({window_days} trading days)")
print(f"Quarters modelled: {n - 1} transitions")
print("-" * 62)
print(f"E[V_fund] - 1 {fmt(E_V_fund - 1):>10}")
print(f"V_copy - 1 {fmt(V_copy - 1):>10}")
print(f"Expected delay cost {fmt(delay_cost):>10}")
print()
print("Positive cost = delay hurts the copycat")
COPYCAT DELAY COST (equity proxy, uniform single-switch model)
==============================================================
Window: 2025-02-12 to 2026-02-11 (250 trading days)
Quarters modelled: 4 transitions
--------------------------------------------------------------
E[V_fund] - 1 +88.27%
V_copy - 1 +41.20%
Expected delay cost +33.34%
Positive cost = delay hurts the copycat
For evaluating the copycat strategy in isolation, this analysis adds little: the historical returns already price in these delay costs. Where it is more relevant is in comparing the copycat against the strategy of investing in the fund itself. The estimated delay cost, combined with the other limitations inherent in the copycat strategy—undisclosed short positions, foreign-listed securities, and non-equity assets—makes it plausible that the fund returned materially more to its investors than the copycat’s already high returns reported above. This is suggestive rather than conclusive, since the fund’s actual intra-quarter trades are unobserved.
Portfolio calculator
The calculator below converts the most recent 13F filing into a concrete trade list. In equity-proxy mode, stock rows are bought as shares, call rows are bought as underlying shares, and put rows are shorted as underlying shares, all in proportion to reported underlying notional. In option-proxy mode, the bankroll is treated as deployed capital: stock rows consume capital directly, while option rows target the 13F underlying notional and consume the estimated premium for the cached representative contract. An optional cutoff drops positions below a given capital percentage and redistributes their weight among the rest. You can also exclude individual rows, or include rows below the cutoff, by ticking the relevant checkboxes.
Code
import json
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
import requests
import time
import os
import warnings
warnings.filterwarnings('ignore')
# Parse data from the scraper block
parsed = json.loads(data) if isinstance(data, str) else data
filings = parsed["filings"]
# Build internal structures
filing_dates = {f["quarter"]: f["filing_date"] for f in filings}
quarter_end_dates = {f["quarter"]: f["quarter_end"] for f in filings}
quarters = [f["quarter"] for f in filings]
# Convert holdings list to dict keyed by quarter.
# Multiple positions in the same ticker with different types are aggregated
# by value per (ticker, type) pair.
holdings = {}
for f in filings:
positions = {}
for h in f["holdings"]:
ticker = h["ticker"]
pos_type = h["type"]
value = h["value"]
key = (ticker, pos_type)
positions[key] = positions.get(key, 0) + value
holdings[f["quarter"]] = positions
def _extract_close_series(df, ticker):
"""Extract a single close-price series from a yfinance result."""
if df.empty:
return pd.Series(dtype=float)
if isinstance(df.columns, pd.MultiIndex):
if 'Close' not in df.columns.get_level_values(0):
return pd.Series(dtype=float)
close = df['Close']
if isinstance(close, pd.DataFrame):
if ticker in close.columns:
series = close[ticker]
elif len(close.columns) == 1:
series = close.iloc[:, 0]
else:
return pd.Series(dtype=float)
else:
series = close
elif 'Close' in df.columns:
series = df['Close']
if isinstance(series, pd.DataFrame):
series = series.iloc[:, 0]
else:
return pd.Series(dtype=float)
return pd.to_numeric(series, errors='coerce').dropna()
def _download_close_series(ticker, start, end):
"""Download one ticker's close series; used to repair flaky batch misses."""
df = yf.download(ticker, start=start, end=end, progress=False,
auto_adjust=True)
return _extract_close_series(df, ticker)
def get_prices(tickers, dates):
"""Fetch close prices for tickers on specific dates."""
unique_tickers = sorted(set(tickers))
all_dates = [datetime.strptime(d, '%Y-%m-%d') for d in dates]
start = min(all_dates) - timedelta(days=5)
end = max(all_dates) + timedelta(days=5)
df = yf.download(unique_tickers, start=start, end=end, progress=False, auto_adjust=True)
# yf.download returns MultiIndex columns (metric, ticker) for multiple tickers
if df.empty:
close = pd.DataFrame()
elif isinstance(df.columns, pd.MultiIndex) and 'Close' in df.columns.get_level_values(0):
close = df['Close'].copy()
elif 'Close' in df.columns:
close = df[['Close']].copy()
close.columns = unique_tickers
else:
close = pd.DataFrame()
prices = {}
for ticker in unique_tickers:
if ticker in close.columns:
series = pd.to_numeric(close[ticker], errors='coerce').dropna()
else:
series = pd.Series(dtype=float)
if series.empty:
series = _download_close_series(ticker, start, end)
if series.empty:
continue
prices[ticker] = {}
for date_str in dates:
target = pd.Timestamp(datetime.strptime(date_str, '%Y-%m-%d'))
after = series[series.index >= target]
if not after.empty:
prices[ticker][date_str] = float(after.iloc[0])
else:
before = series[series.index <= target]
if not before.empty:
prices[ticker][date_str] = float(before.iloc[-1])
return prices
def _price_on_or_after(px_by_date, target_date):
"""Return (date, price) for the first available price on/after target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d >= target_date)
if not dates:
return None
d = dates[0]
return d, px_by_date[d]
def _price_on_or_before(px_by_date, target_date):
"""Return (date, price) for the last available price on/before target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d <= target_date)
if not dates:
return None
d = dates[-1]
return d, px_by_date[d]
def _period_price_pair(px_by_date, start_date, end_date):
"""Return start/end prices for a period using sensible boundary alignment."""
start = _price_on_or_after(px_by_date, start_date)
end = _price_on_or_before(px_by_date, end_date)
if start is None or end is None:
return None
start_actual, p0 = start
end_actual, p1 = end
if end_actual < start_actual:
return None
return start_actual, end_actual, p0, p1
def _option_position_key(ticker, pos_type):
return (ticker, pos_type)
def _linear_underlying_sign(pos_type):
"""Direction when option rows are converted to underlying equity exposure."""
return -1 if pos_type == 'put' else 1
def compute_return(positions, prices, start_date, end_date, mode='equity_only',
option_prices=None):
"""Compute portfolio return between two dates.
The 13F value for an option row is treated as underlying notional, not
option premium. Option contracts are sized from that notional, but the
portfolio denominator is estimated deployed capital: stock value plus option
premium cost. This avoids treating the gap between option notional and
option premium as cash. In 'full' mode, every option row requires a
MarketData price series; missing data raises rather than falling back.
"""
total_cost = 0
portfolio_pnl = 0
for (ticker, pos_type), value in positions.items():
is_option = pos_type in ('call', 'put')
stock_px = prices.get(ticker)
if mode == 'equity_only':
if pos_type not in ('long', 'call', 'put'):
continue
pair = _period_price_pair(stock_px, start_date, end_date)
if pair is None:
continue
start_actual, end_actual, p0, p1 = pair
if p0 == 0:
continue
stock_ret = (p1 - p0) / p0
total_cost += value
portfolio_pnl += value * _linear_underlying_sign(pos_type) * stock_ret
continue
if is_option:
opt_key = _option_position_key(ticker, pos_type)
opt_px = option_prices.get(opt_key) if option_prices else None
if not opt_px:
raise RuntimeError(
f"No MarketData option prices for {opt_key} in period "
f"{start_date}..{end_date}")
pair = _period_price_pair(opt_px, start_date, end_date)
if pair is None:
raise RuntimeError(
f"MarketData option price series for {opt_key} does not "
f"cover {start_date}..{end_date}")
start_actual, end_actual, opt_p0, opt_p1 = pair
stock_start = _price_on_or_after(stock_px, start_actual)
if stock_start is None or stock_start[1] <= 0:
stock_start = _price_on_or_after(stock_px, start_date)
if stock_start is None or stock_start[1] <= 0:
raise RuntimeError(
f"No underlying price for {ticker} at {start_date}")
p0, p1 = opt_p0, opt_p1
underlying_p0 = stock_start[1]
if p0 <= 0 or underlying_p0 <= 0:
continue
position_cost = value * (p0 / underlying_p0)
position_pnl = value * ((p1 - p0) / underlying_p0)
else:
pair = _period_price_pair(stock_px, start_date, end_date)
if pair is None:
continue
start_actual, end_actual, p0, p1 = pair
if p0 == 0:
continue
stock_ret = (p1 - p0) / p0
position_cost = value
position_pnl = value * stock_ret
if position_cost <= 0:
continue
total_cost += position_cost
portfolio_pnl += position_pnl
return portfolio_pnl / total_cost if total_cost else None
def annualize(ret, days):
"""Annualize a return over a given number of calendar days."""
if ret is None or days <= 0:
return None
return (1 + ret) ** (365.25 / days) - 1
def fmt(ret):
return f"{ret * 100:+.2f}%" if ret is not None else "N/A"
# Collect all tickers and dates
all_tickers = set()
for positions in holdings.values():
for (ticker, _) in positions:
all_tickers.add(ticker)
all_tickers.add('SPY')
all_tickers.add('AIS')
today = datetime.now().strftime('%Y-%m-%d')
first_date = filing_dates[quarters[0]]
all_dates = set(filing_dates.values()) | set(quarter_end_dates.values()) | {today}
prices = get_prices(sorted(all_tickers), sorted(all_dates))
# Resolve `today` to the actual last available closing date.
# yfinance may not have data for today (market still open or holiday),
# so we look up what date SPY's price actually corresponds to.
def _resolve_price_date(prices, requested_date):
"""Return the actual trading date of the price stored under requested_date."""
ref = 'SPY' if 'SPY' in prices else next(iter(prices), None)
if not ref or requested_date not in prices[ref]:
return requested_date
target_price = prices[ref][requested_date]
# Re-download a small window to find the real date of this price
start = datetime.strptime(requested_date, '%Y-%m-%d') - timedelta(days=10)
end = datetime.strptime(requested_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(ref, start=start, end=end, progress=False, auto_adjust=True)
if df.empty:
return requested_date
if isinstance(df.columns, pd.MultiIndex):
close = df['Close'][ref].dropna()
elif 'Close' in df.columns:
close = df['Close'].dropna()
else:
close = df.iloc[:, 0].dropna()
for dt, px in close.items():
val = float(px.iloc[0]) if isinstance(px, pd.Series) else float(px)
if abs(val - target_price) < 0.01:
ts = dt[0] if isinstance(dt, tuple) else dt
return pd.Timestamp(ts).strftime('%Y-%m-%d')
return requested_date
today_resolved = _resolve_price_date(prices, today)
if today_resolved != today:
for ticker in prices:
if today in prices[ticker]:
prices[ticker][today_resolved] = prices[ticker].pop(today)
today = today_resolved
def download_daily(tickers, start_date, end_date):
"""Download daily close prices from yfinance, handling MultiIndex.
Dates are 'YYYY-MM-DD' strings. Adds a small buffer for trading-day alignment."""
tickers_sorted = sorted(tickers)
start = datetime.strptime(start_date, '%Y-%m-%d') - timedelta(days=5)
end = datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(tickers_sorted, start=start, end=end,
progress=False, auto_adjust=True)
if df.empty:
close = pd.DataFrame()
elif isinstance(df.columns, pd.MultiIndex) and 'Close' in df.columns.get_level_values(0):
close = df['Close'].copy()
elif 'Close' in df.columns:
close = df[['Close']].copy()
close.columns = tickers_sorted
else:
close = pd.DataFrame()
for ticker in tickers_sorted:
if ticker in close.columns and not close[ticker].dropna().empty:
continue
series = _download_close_series(ticker, start, end)
if not series.empty:
close[ticker] = series
return close.sort_index()
# -- Historical option prices via MarketData --------------------------------
OPTION_CACHE_DIR = os.path.expanduser('~/My Drive/notes/.sa-lp-option-cache')
_MD_BASE = 'https://api.marketdata.app/v1'
_MD_RATE_DELAY = 0.15
OPTION_CACHE_COLUMNS = [
'date', 'selected_on', 'option_type', 'symbol', 'strike', 'expiry',
'delta', 'price']
# Default contract selection parameters. The option proxy picks a contract
# matching option type, with expiry between min_days and max_days of the
# period start, and |delta| closest to delta_target. When the chain is
# sparse, the achieved |delta| may be far from the target; the sensitivity
# block reports achieved |delta| so this is visible rather than silent.
OPTION_DELTA = 0.15
EXPIRY_MIN_DAYS = 270 # ~9 months
EXPIRY_MAX_DAYS = 456 # ~15 months
def _normalize_option_type(option_type):
option_type = str(option_type).lower()
if option_type not in ('call', 'put'):
raise ValueError(f"Unsupported option type: {option_type}")
return option_type
def _empty_option_cache():
return pd.DataFrame(columns=OPTION_CACHE_COLUMNS)
def _option_cache_path(ticker, option_type, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Return the cache CSV path for (ticker, type, delta_target, window).
When the parameter triple equals the baseline (0.15, 270-456 days), the
historical filename ``TICKER-TYPE.csv`` is used so the main-backtest
cache is reused automatically. Any non-baseline combo lives in a
separate ``TICKER-TYPE-d<delta>-e<min>-<max>.csv`` file so a sensitivity
sweep never pollutes the baseline cache (which the portfolio calculator
reads to pick the representative contract for the current filing).
"""
option_type = _normalize_option_type(option_type)
is_baseline = (
abs(delta_target - OPTION_DELTA) < 1e-9
and min_days == EXPIRY_MIN_DAYS
and max_days == EXPIRY_MAX_DAYS)
if is_baseline:
return os.path.join(OPTION_CACHE_DIR, f'{ticker}-{option_type}.csv')
return os.path.join(
OPTION_CACHE_DIR,
f'{ticker}-{option_type}-d{delta_target:g}-e{min_days}-{max_days}.csv')
def _load_option_cache(ticker, option_type, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Load cached MarketData rows for a ticker/type/target/window. Returns DataFrame or empty."""
option_type = _normalize_option_type(option_type)
path = _option_cache_path(ticker, option_type, delta_target,
min_days, max_days)
if not os.path.exists(path):
return _empty_option_cache()
df = pd.read_csv(path)
if df.empty:
return _empty_option_cache()
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
for col in ('date', 'selected_on'):
df[col] = pd.to_datetime(
df[col], errors='coerce').dt.strftime('%Y-%m-%d')
df['option_type'] = df['option_type'].fillna(option_type).str.lower()
cache = df[OPTION_CACHE_COLUMNS].copy()
cache = cache[cache['option_type'] == option_type].copy()
cache.dropna(subset=['date'], inplace=True)
for col in ('strike', 'delta', 'price'):
cache[col] = pd.to_numeric(cache[col], errors='coerce')
cache.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
return cache[OPTION_CACHE_COLUMNS]
def _save_option_cache(ticker, option_type, df, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS, max_days=EXPIRY_MAX_DAYS):
"""Persist typed option cache to CSV."""
option_type = _normalize_option_type(option_type)
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
path = _option_cache_path(ticker, option_type, delta_target,
min_days, max_days)
if df.empty:
df = _empty_option_cache()
else:
df = df.copy()
df['option_type'] = option_type
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
df.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
df.sort_values(['date', 'expiry', 'strike'], inplace=True)
df.to_csv(path, index=False)
def _contract_window(ref_date_str, min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
ref = datetime.strptime(ref_date_str, '%Y-%m-%d')
return ref + timedelta(days=min_days), ref + timedelta(days=max_days)
def _contract_from_cache_row(row, ref_date_str, option_type,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
option_type = _normalize_option_type(option_type)
if str(row.get('option_type', option_type)).lower() != option_type:
return None
lo, hi = _contract_window(ref_date_str, min_days, max_days)
try:
exp = datetime.strptime(str(row['expiry']), '%Y-%m-%d')
except (KeyError, TypeError, ValueError):
return None
if not (lo <= exp <= hi):
return None
strike = _safe_float(row.get('strike'))
delta = _safe_float(row.get('delta'))
price = _safe_float(row.get('price'))
if strike is None or delta is None or price is None or price <= 0:
return None
return {
'selected_on': row.get('selected_on'),
'option_type': option_type,
'symbol': row.get('symbol'),
'strike': strike,
'expiry': str(row['expiry']),
'delta': delta,
'price': price,
}
def _select_cached_contract(cache, option_type, ref_date_str,
delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS,
require_selected=False):
rows = cache[(cache['date'] == ref_date_str)
& (cache['option_type'] == option_type)]
selected_rows = rows[rows['selected_on'] == ref_date_str]
if not selected_rows.empty:
rows = selected_rows
elif require_selected:
rows = selected_rows
candidates = []
for _, row in rows.iterrows():
contract = _contract_from_cache_row(row, ref_date_str, option_type,
min_days, max_days)
if contract:
candidates.append(contract)
if not candidates:
return None
candidates.sort(key=lambda x: abs(abs(x['delta']) - delta_target))
return candidates[0]
def _parse_option_price(contract):
"""Extract a mark price from an option contract record."""
mid = _safe_float(contract.get('mid'))
if mid and mid > 0:
return mid
bid = _safe_float(contract.get('bid'))
ask = _safe_float(contract.get('ask'))
last = _safe_float(contract.get('last'))
if bid and ask and bid > 0 and ask > 0:
return (bid + ask) / 2
if last and last > 0:
return last
return None
def _safe_float(val):
try:
out = float(val)
if np.isnan(out):
return None
return out
except (TypeError, ValueError):
return None
def _marketdata_key():
"""Return the MarketData API key, or None if unavailable.
Resolution order:
1. ``MARKETDATA_KEY`` / ``MARKETDATA_API_KEY`` environment variables.
2. ``pass env/marketdata-token`` (local ``pass`` store).
The result is memoised on the function object so repeated lookups
during a sweep do not reshell. Fetch helpers raise themselves when
called without a key, so a fully cached run still succeeds without
requiring either source.
"""
if hasattr(_marketdata_key, '_cached'):
return _marketdata_key._cached
key = (os.environ.get('MARKETDATA_KEY', '')
or os.environ.get('MARKETDATA_API_KEY', ''))
if not key:
try:
import subprocess
out = subprocess.run(
['pass', 'show', 'env/marketdata-token'],
capture_output=True, text=True, timeout=5, check=False)
if out.returncode == 0:
key = out.stdout.strip().splitlines()[0] if out.stdout else ''
except (FileNotFoundError, subprocess.TimeoutExpired):
key = ''
_marketdata_key._cached = key or None
return _marketdata_key._cached
def _marketdata_get(path, params, api_key):
"""Fetch a MarketData endpoint, returning normalized row dictionaries.
Raises on HTTP errors or a non-'ok' status. 'no_data' is returned as
an empty list so that callers can distinguish 'nothing available' from
'request failed'.
"""
headers = {'Accept': 'application/json', 'Authorization': f'Bearer {api_key}'}
resp = requests.get(_MD_BASE + path, params=params, headers=headers,
timeout=30)
resp.raise_for_status()
body = resp.json()
status = body.get('s')
if status == 'no_data':
return []
if status != 'ok':
raise RuntimeError(
f"MarketData {path} returned status={status!r}: "
f"{body.get('errmsg') or body}")
lengths = [len(v) for v in body.values() if isinstance(v, list)]
n = max(lengths) if lengths else 0
rows = []
for i in range(n):
row = {}
for key, val in body.items():
if isinstance(val, list):
row[key] = val[i] if i < len(val) else None
else:
row[key] = val
rows.append(row)
return rows
def _marketdata_date(timestamp):
try:
return datetime.utcfromtimestamp(int(timestamp)).strftime('%Y-%m-%d')
except (TypeError, ValueError, OSError):
return None
def _occ_symbol(ticker, option_type, strike, expiry):
"""Build a standard OCC option symbol from contract fields."""
cp = 'C' if _normalize_option_type(option_type) == 'call' else 'P'
exp = datetime.strptime(str(expiry), '%Y-%m-%d').strftime('%y%m%d')
strike_int = int(round(float(strike) * 1000))
root = ticker.upper().replace('.', '')
return f'{root}{exp}{cp}{strike_int:08d}'
# Chains are always fetched with a broad expiry window so they can be cached
# and reused for in-memory selection across any (delta_target, expiry window)
# combination in the sensitivity sweep.
CHAIN_FETCH_MIN_DAYS = 30
CHAIN_FETCH_MAX_DAYS = 760
def _fetch_marketdata_chain(ticker, date_str, option_type, api_key,
min_days=CHAIN_FETCH_MIN_DAYS,
max_days=CHAIN_FETCH_MAX_DAYS):
lo, hi = _contract_window(date_str, min_days, max_days)
params = {
'date': date_str,
'from': lo.strftime('%Y-%m-%d'),
'to': hi.strftime('%Y-%m-%d'),
'side': _normalize_option_type(option_type),
'expiration': 'all',
}
return _marketdata_get(f'/options/chain/{ticker}/', params, api_key)
# Chain cache: one CSV per (ticker, type, date) storing the broad-window chain.
# Lets the sensitivity sweep re-select contracts for different delta targets
# and expiry windows without refetching.
CHAIN_CACHE_DIR = os.path.join(OPTION_CACHE_DIR, 'chains')
def _chain_cache_path(ticker, option_type, date_str):
option_type = _normalize_option_type(option_type)
return os.path.join(CHAIN_CACHE_DIR,
f'{ticker}-{option_type}-{date_str}.csv')
def _load_chain_cache(ticker, option_type, date_str):
path = _chain_cache_path(ticker, option_type, date_str)
if not os.path.exists(path):
return None
df = pd.read_csv(path)
if df.empty:
return []
return df.to_dict('records')
def _save_chain_cache(ticker, option_type, date_str, chain):
if not chain:
return
os.makedirs(CHAIN_CACHE_DIR, exist_ok=True)
path = _chain_cache_path(ticker, option_type, date_str)
pd.DataFrame(chain).to_csv(path, index=False)
def _get_or_fetch_chain(ticker, date_str, option_type, api_key,
fetched_counter=None):
"""Return the cached broad chain for (ticker, type, date), fetching if absent.
Requires ``api_key`` only when a fetch is actually needed.
"""
chain = _load_chain_cache(ticker, option_type, date_str)
if chain is not None:
return chain
if not api_key:
raise RuntimeError(
"MARKETDATA_KEY is not set but a chain fetch is required for "
f"{ticker} {option_type} on {date_str}.")
time.sleep(_MD_RATE_DELAY)
chain = _fetch_marketdata_chain(ticker, date_str, option_type, api_key)
if fetched_counter is not None:
fetched_counter['marketdata_chains'] += 1
_save_chain_cache(ticker, option_type, date_str, chain)
return chain
def _fetch_marketdata_quotes(symbol, start_date, end_date, api_key):
to_date = (datetime.strptime(end_date, '%Y-%m-%d')
+ timedelta(days=1)).strftime('%Y-%m-%d')
rows = _marketdata_get(f'/options/quotes/{symbol}/',
{'from': start_date, 'to': to_date}, api_key)
prices = {}
for row in rows:
date_str = _marketdata_date(row.get('updated'))
if not date_str:
continue
price = _parse_option_price(row)
if price is not None and price > 0:
prices[date_str] = price
return prices
def _implied_vol_from_price(S, K, T, option_price, option_type):
"""Infer Black-Scholes volatility from an observed option mid price."""
if any(x is None for x in (S, K, T, option_price)):
return None
if S <= 0 or K <= 0 or T <= 0 or option_price <= 0:
return None
intrinsic = max(S - K, 0) if option_type == 'call' else max(K - S, 0)
upper = S if option_type == 'call' else K
if option_price < intrinsic - 1e-6 or option_price > upper * 1.5:
return None
lo, hi = 1e-4, 5.0
try:
if (option_price < bs_price(S, K, T, lo, option_type) - 1e-4
or option_price > bs_price(S, K, T, hi, option_type) + 1e-4):
return None
for _ in range(80):
mid = (lo + hi) / 2
if bs_price(S, K, T, mid, option_type) < option_price:
lo = mid
else:
hi = mid
return (lo + hi) / 2
except (FloatingPointError, ValueError, ZeroDivisionError):
return None
def _marketdata_delta(row, ref_date_str, expiry, option_type, price):
"""Use vendor delta when present; otherwise infer it from the quote."""
native = _safe_float(row.get('delta'))
if native is not None and native != 0:
return native
S = _safe_float(row.get('underlyingPrice'))
K = _safe_float(row.get('strike'))
ref = datetime.strptime(ref_date_str, '%Y-%m-%d')
exp = datetime.strptime(expiry, '%Y-%m-%d')
T = max((exp - ref).days / 365.25, 1e-6)
sigma = _safe_float(row.get('iv'))
if sigma is None or sigma <= 0:
sigma = _implied_vol_from_price(S, K, T, price, option_type)
if S is None or K is None or sigma is None or sigma <= 0:
return None
return bs_delta(S, K, T, sigma, option_type)
def _select_marketdata_contract(chain, ref_date_str, option_type,
delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
option_type = _normalize_option_type(option_type)
lo, hi = _contract_window(ref_date_str, min_days, max_days)
candidates = []
for c in chain:
if str(c.get('side', '')).lower() != option_type:
continue
expiry = _marketdata_date(c.get('expiration'))
if not expiry:
continue
exp = datetime.strptime(expiry, '%Y-%m-%d')
if not (lo <= exp <= hi):
continue
price = _parse_option_price(c)
if price is None or price <= 0:
continue
delta = _marketdata_delta(c, ref_date_str, expiry, option_type, price)
if delta is None or delta == 0:
continue
strike = _safe_float(c.get('strike'))
symbol = c.get('optionSymbol')
if strike is None or not symbol:
continue
candidates.append({
'option_type': option_type,
'symbol': symbol,
'strike': strike,
'expiry': expiry,
'delta': delta,
'price': price,
})
if not candidates:
return None
candidates.sort(key=lambda x: abs(abs(x['delta']) - delta_target))
return candidates[0]
def download_option_prices(option_positions, quarters, holdings, filing_dates,
today, delta_target=OPTION_DELTA,
min_days=EXPIRY_MIN_DAYS,
max_days=EXPIRY_MAX_DAYS):
"""Download historical representative option prices from MarketData.
MarketData is the sole supported provider. MARKETDATA_KEY must be set.
For each (ticker, option_type) and each filing period in which that
position is held:
1. On the first trading day, select a contract matching type, with
expiry between ``min_days`` and ``max_days`` of the period start, and
|delta| closest to ``delta_target``. MarketData's Starter plan often
returns null Greeks, so delta is inferred from the observed mid price
via Black-Scholes when the vendor delta is missing.
2. Lock in that contract for the period.
3. Track its historical mid price through the period.
The broad option chain for each (ticker, type, first_day) is cached to
disk so that sensitivity sweeps over (delta_target, expiry window) reuse
a single fetch.
Raises ``RuntimeError`` if no suitable contract can be selected for any
required (ticker, type, period), or if MarketData returns no price series
for the selected contract.
Parameters
----------
delta_target : float
Target |delta| for contract selection (default ``OPTION_DELTA``).
min_days, max_days : int
Contract expiry window in days from period start (default 270-456,
i.e. 9-15 months).
Returns
-------
per_period : dict {quarter_str: {(ticker, type): {date_str: float}}}
Option prices keyed by filing period then option position. Each
period has its own contract's prices.
"""
option_positions = sorted({
(ticker, _normalize_option_type(pos_type))
for ticker, pos_type in option_positions})
md_key = _marketdata_key()
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
per_period = {} # {q: {(ticker, type): {date_str: price}}}
fetched = {'marketdata_chains': 0, 'marketdata_quotes': 0}
for ticker, option_type in option_positions:
opt_key = _option_position_key(ticker, option_type)
cache = _load_option_cache(ticker, option_type, delta_target,
min_days, max_days)
new_rows = []
for i, q in enumerate(quarters):
# Skip quarters where this exact option position is absent.
if opt_key not in holdings[q]:
continue
period_start = filing_dates[q]
period_end = (filing_dates[quarters[i + 1]]
if i < len(quarters) - 1 else today)
trading_days = pd.bdate_range(period_start, period_end)
if len(trading_days) == 0:
continue
first_day = trading_days[0].strftime('%Y-%m-%d')
# -- Select contract on first trading day --
contract = _select_cached_contract(
cache, option_type, first_day,
delta_target=delta_target,
min_days=min_days, max_days=max_days,
require_selected=True)
if contract is None:
chain = _get_or_fetch_chain(
ticker, first_day, option_type, md_key, fetched)
contract = _select_marketdata_contract(
chain, first_day, option_type,
delta_target=delta_target,
min_days=min_days, max_days=max_days)
if contract is None:
raise RuntimeError(
f"MarketData returned no usable {option_type} contract "
f"for {ticker} on {first_day} (period {q}) at "
f"delta={delta_target}, "
f"expiry {min_days}-{max_days}d")
new_rows.append({
'date': first_day,
'selected_on': first_day,
'option_type': option_type,
'symbol': contract.get('symbol'),
'strike': contract['strike'],
'expiry': contract['expiry'],
'delta': contract['delta'],
'price': contract['price'],
})
strike = contract['strike']
expiry = contract['expiry']
symbol = contract.get('symbol') or _occ_symbol(
ticker, option_type, strike, expiry)
# -- Collect prices for this period (fresh dict per period) --
period_prices = {}
# Fast path: read matching prices from cache.
rows = cache[
(cache['date'] >= period_start)
& (cache['date'] <= period_end)
& (cache['option_type'] == option_type)
& (abs(cache['strike'] - strike) < 0.01)
& (cache['expiry'].astype(str) == str(expiry))
& pd.notna(cache['price'])]
selected_rows = rows[rows['selected_on'] == first_day]
if not selected_rows.empty:
rows = selected_rows
for _, row in rows.iterrows():
period_prices[row['date']] = float(row['price'])
# Decide whether to refresh quotes. With a key, refresh whenever
# the cached series does not reach period_end. Without a key,
# only fail if the cached series is empty; a slightly stale
# tail is acceptable for cache-only runs (e.g. sensitivity
# sweeps replaying the baseline contract).
has_partial = bool(period_prices)
reaches_end = has_partial and max(period_prices) >= period_end
if md_key and not reaches_end:
time.sleep(_MD_RATE_DELAY)
quote_prices = _fetch_marketdata_quotes(
symbol, period_start, period_end, md_key)
fetched['marketdata_quotes'] += 1
for day_str, price in quote_prices.items():
if period_start <= day_str <= period_end:
period_prices[day_str] = price
new_rows.append({
'date': day_str,
'selected_on': first_day,
'option_type': option_type,
'symbol': symbol,
'strike': strike,
'expiry': expiry,
'delta': contract['delta'],
'price': price,
})
if contract.get('price') and first_day not in period_prices:
period_prices[first_day] = contract['price']
elif not md_key and not has_partial:
raise RuntimeError(
"MARKETDATA_KEY is not set and no cached quotes exist "
f"for {symbol} in {period_start}..{period_end}.")
if not period_prices:
raise RuntimeError(
f"MarketData returned no quotes for {symbol} "
f"({opt_key}) in {period_start}..{period_end}")
per_period.setdefault(q, {})[opt_key] = period_prices
# Persist new data to cache
if new_rows:
new_df = pd.DataFrame(new_rows)
cache = pd.concat([cache, new_df], ignore_index=True)
cache.drop_duplicates(
subset=['date', 'selected_on', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
_save_option_cache(ticker, option_type, cache, delta_target,
min_days, max_days)
if any(fetched.values()):
import sys
parts = []
if fetched['marketdata_chains']:
parts.append(f"{fetched['marketdata_chains']} MarketData chains")
if fetched['marketdata_quotes']:
parts.append(f"{fetched['marketdata_quotes']} MarketData quote series")
print(f"[options] Fetched {', '.join(parts)}", file=sys.stderr)
return per_period
# -- Black-Scholes helpers (used only to infer delta when MarketData's
# Starter-plan historical Greeks are null; never to reprice returns) -----
from scipy.stats import norm as _norm
def bs_price(S, K, T, sigma, option_type='call'):
"""Black-Scholes option price (assumes zero risk-free rate and dividends)."""
if T <= 0 or sigma <= 0:
if option_type == 'call':
return max(S - K, 0)
return max(K - S, 0)
d1 = (np.log(S / K) + (sigma ** 2 / 2) * T) / (sigma * np.sqrt(T))
d2 = d1 - sigma * np.sqrt(T)
if option_type == 'call':
return S * _norm.cdf(d1) - K * _norm.cdf(d2)
return K * _norm.cdf(-d2) - S * _norm.cdf(-d1)
def bs_delta(S, K, T, sigma, option_type='call'):
"""Black-Scholes delta (assumes zero risk-free rate and dividends)."""
if T <= 0 or sigma <= 0:
if option_type == 'call':
return 1.0 if S > K else 0.0
return -1.0 if S < K else 0.0
d1 = (np.log(S / K) + (sigma ** 2 / 2) * T) / (sigma * np.sqrt(T))
if option_type == 'call':
return _norm.cdf(d1)
return _norm.cdf(d1) - 1
def daily_cumulative(holdings, quarters, filing_dates, close, today, mode,
per_period_opt=None):
"""Build a daily series of cumulative growth factors for a given mode.
For each filing period, stock shares and option contracts are fixed. In
equity-proxy mode, option rows are converted to linear underlying exposure:
calls are long underlying and puts are short underlying. In option-proxy
mode, option rows are sized by 13F underlying notional and returns come
from MarketData quotes; returns are divided by deployed capital (stock
value plus option premium cost). Option-proxy mode raises if MarketData
prices are missing for any required position.
"""
cum_growth = 1.0
dates_out = []
values_out = []
for i, q in enumerate(quarters):
period_start = filing_dates[q]
period_end = filing_dates[quarters[i + 1]] if i < len(quarters) - 1 else today
ps = pd.Timestamp(period_start)
pe = pd.Timestamp(period_end)
# Trading days in this period
mask = (close.index >= ps) & (close.index <= pe)
period_close = close[mask]
if period_close.empty:
continue
# Option prices for this period (keyed by (ticker, type) → prices)
quarter_opt = per_period_opt.get(q, {}) if per_period_opt else {}
# Determine starting prices, fixed exposure, and deployed capital.
positions = holdings[q]
exposure = {}
costs = {}
start_prices = {}
start_underlying = {}
use_opt_px = {} # track which positions use option prices
total_cost = 0
for (ticker, pos_type), value in positions.items():
is_option = pos_type in ('call', 'put')
opt_key = _option_position_key(ticker, pos_type)
if mode == 'equity_only':
if pos_type not in ('long', 'call', 'put'):
continue
if ticker not in close.columns:
continue
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
continue
stock_start = float(avail.iloc[0])
if stock_start <= 0:
continue
start_prices[(ticker, pos_type)] = stock_start
start_underlying[(ticker, pos_type)] = stock_start
costs[(ticker, pos_type)] = value
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = False
total_cost += value
continue
# mode == 'full' (option proxy)
if is_option:
if opt_key not in quarter_opt:
raise RuntimeError(
f"No MarketData option prices for {opt_key} in "
f"period {q}")
ticker_opt = quarter_opt[opt_key]
opt_dates = sorted(d for d in ticker_opt if d >= period_start)
if not opt_dates:
raise RuntimeError(
f"MarketData option prices for {opt_key} in period "
f"{q} contain no dates at or after {period_start}")
if ticker not in close.columns:
raise RuntimeError(
f"No underlying close series for {ticker}")
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
raise RuntimeError(
f"No underlying price for {ticker} at {period_start}")
opt_start = ticker_opt[opt_dates[0]]
underlying_start = float(avail.iloc[0])
if opt_start <= 0 or underlying_start <= 0:
raise RuntimeError(
f"Non-positive starting price for {opt_key} in "
f"period {q}")
start_prices[(ticker, pos_type)] = opt_start
start_underlying[(ticker, pos_type)] = underlying_start
costs[(ticker, pos_type)] = value * opt_start / underlying_start
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = True
total_cost += costs[(ticker, pos_type)]
continue
# Plain stock in full mode
if ticker not in close.columns:
continue
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
continue
stock_start = float(avail.iloc[0])
if stock_start <= 0:
continue
start_prices[(ticker, pos_type)] = stock_start
start_underlying[(ticker, pos_type)] = stock_start
costs[(ticker, pos_type)] = value
exposure[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = False
total_cost += value
if total_cost == 0:
continue
# Daily P&L relative to period start.
# Skip first day of subsequent periods (already recorded as last day
# of the prior period) to avoid duplicate boundary dates.
start_idx = 1 if i > 0 else 0
# Forward-fill: track last known option price so that gaps in
# option data don't cause positions to vanish mid-period.
last_opt = {k: v for k, v in start_prices.items()
if use_opt_px.get(k)}
for day_idx in range(start_idx, len(period_close)):
day = period_close.index[day_idx]
day_str = day.strftime('%Y-%m-%d')
period_pnl = 0
for (ticker, pos_type), value in exposure.items():
p0 = start_prices[(ticker, pos_type)]
if p0 == 0:
continue
if use_opt_px[(ticker, pos_type)]:
opt_key = _option_position_key(ticker, pos_type)
p1_val = quarter_opt.get(opt_key, {}).get(day_str)
if p1_val is not None:
last_opt[(ticker, pos_type)] = p1_val
else:
p1_val = last_opt.get((ticker, pos_type))
if p1_val is None:
continue
underlying_p0 = start_underlying.get((ticker, pos_type))
if not underlying_p0 or underlying_p0 <= 0:
continue
position_pnl = value * (float(p1_val) - p0) / underlying_p0
else:
if ticker not in period_close.columns:
continue
p1_val = period_close[ticker].iloc[day_idx]
if pd.isna(p1_val):
continue
stock_ret = (float(p1_val) - p0) / p0
if mode == 'equity_only':
position_pnl = (
value * _linear_underlying_sign(pos_type) * stock_ret)
else:
position_pnl = value * stock_ret
period_pnl += position_pnl
dates_out.append(day)
values_out.append(cum_growth * (1 + period_pnl / total_cost))
# Chain: next period starts from the last day's growth factor
if values_out:
cum_growth = values_out[-1]
return dates_out, values_out
import os
HUGO_BASE = os.path.expanduser('~/My Drive/repos/stafforini.com')
# -- Build position data for both modes --------------------------------
latest = parsed["filings"][-1]
pos = {}
for h in latest["holdings"]:
key = (h["ticker"], h["type"])
pos[key] = pos.get(key, 0) + h["value"]
eq_pos = pos
# Fetch current underlying prices for all rows
calc_tickers = sorted({t for (t, _) in pos})
current = get_prices(calc_tickers, [today])
# Load option contract info for the latest quarter.
#
# The baseline cache may legitimately hold more than one contract per
# (ticker, selected_on) when the sensitivity sweep's expiry windows overlap
# with the baseline 9-15m window. Pick the one whose |delta| is closest
# to OPTION_DELTA (matching _select_cached_contract's tie-breaking logic)
# so the calculator is deterministic and stays consistent with the
# main backtest's representative selection.
latest_fd = latest["filing_date"]
opt_contracts = {}
for h in latest["holdings"]:
if h["type"] in ('call', 'put'):
key = (h["ticker"], h["type"])
if key in opt_contracts:
continue
cache = _load_option_cache(h["ticker"], h["type"])
selected_rows = cache[(cache['selected_on'] == latest_fd)
& (cache['option_type'] == h["type"])
& pd.notna(cache['delta'])
& pd.notna(cache['strike'])
& pd.notna(cache['price'])]
if selected_rows.empty:
continue
# Identify the canonical contract (closest to baseline delta), then
# pull its most recent price from the cache.
canonical = selected_rows.iloc[
(selected_rows['delta'].abs() - OPTION_DELTA).abs().argsort()
].iloc[0]
strike = float(canonical['strike'])
expiry = str(canonical['expiry'])
price_rows = cache[(cache['option_type'] == h["type"])
& (abs(cache['strike'] - strike) < 0.01)
& (cache['expiry'].astype(str) == expiry)
& pd.notna(cache['price'])
& (cache['date'] >= latest_fd)]
if price_rows.empty:
continue
latest_row = price_rows.sort_values('date').iloc[-1]
opt_contracts[key] = {
'strike': strike,
'expiry': expiry,
'price': round(float(latest_row['price']), 2),
'price_as_of': str(latest_row['date']),
}
# Build JSON data for both modes. In equity-proxy mode, option rows become
# linear underlying exposure. In option-proxy mode, reported option value is
# underlying notional; capital_basis estimates the deployed premium.
def build_mode_data(positions, option_proxy=False):
rows = []
for (ticker, pos_type), value in sorted(positions.items(),
key=lambda x: -x[1]):
underlying_price = None
row = {"ticker": ticker, "type": pos_type,
"reported_value": round(value, 2)}
if ticker in current and today in current[ticker]:
underlying_price = round(current[ticker][today], 2)
if pos_type == 'long' or (pos_type in ('call', 'put')
and not option_proxy):
direction = 'short' if pos_type == 'put' else 'long'
row.update({"instrument": "stock", "price": underlying_price,
"underlying_price": underlying_price,
"multiplier": 1, "direction": direction})
capital_basis = value
elif pos_type in ('call', 'put'):
row.update({"instrument": "option", "price": None,
"underlying_price": underlying_price,
"multiplier": 100, "direction": "long"})
if (ticker, pos_type) in opt_contracts:
row.update(opt_contracts[(ticker, pos_type)])
option_price = row.get("price")
missing = []
if (ticker, pos_type) not in opt_contracts:
missing.append("cached representative contract")
if option_price is None or option_price <= 0:
missing.append("option price")
if underlying_price is None or underlying_price <= 0:
missing.append("underlying price")
if missing:
raise RuntimeError(
f"Cannot size option-proxy row {ticker} {pos_type}: "
f"missing {', '.join(missing)}")
capital_basis = value * option_price / underlying_price
else:
row.update({"instrument": "stock", "price": underlying_price,
"underlying_price": underlying_price,
"multiplier": 1, "direction": "long"})
capital_basis = value
row["capital_basis"] = (
round(capital_basis, 6) if capital_basis is not None else None)
rows.append(row)
total_basis = sum(r["capital_basis"] for r in rows
if r["capital_basis"] is not None)
for row in rows:
if row["capital_basis"] is not None and total_basis > 0:
row["weight"] = round(row["capital_basis"] / total_basis, 6)
else:
row["weight"] = 0
return rows
eq_data = build_mode_data(eq_pos, option_proxy=False)
full_data = build_mode_data(pos, option_proxy=True)
quarter = latest["quarter"].replace("_", " ")
filing_date = latest["filing_date"]
# -- Generate self-contained HTML --------------------------------------
CSS = (
'/* reset */ * { margin: 0; padding: 0; box-sizing: border-box; }\n'
'body { font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto,\n'
' sans-serif; font-size: 14px; background: transparent;\n'
' color: #333; padding: 16px 0; }\n'
'.controls { display: flex; gap: 16px; align-items: center;\n'
' flex-wrap: wrap; margin-bottom: 12px; }\n'
'.controls label { font-weight: 600; font-size: 13px; }\n'
'.controls input, .controls select {\n'
' padding: 6px 10px; border: 1px solid #ccc; border-radius: 4px;\n'
' font-size: 14px; background: #fff; color: #333; }\n'
'.controls input { width: 140px; }\n'
'.meta { font-size: 12px; color: #888; margin-bottom: 12px; }\n'
'.muted { color: #888; font-size: 11px; }\n'
'.table-wrap { overflow-x: auto; }\n'
'table { min-width: 100%; border-collapse: collapse; font-size: 13px;\n'
' font-variant-numeric: tabular-nums; }\n'
'th { text-align: left; padding: 6px 6px; border-bottom: 2px solid #ddd;\n'
' font-weight: 600; font-size: 12px; text-transform: uppercase;\n'
' letter-spacing: 0.03em; color: #666; }\n'
'th.r, td.r { text-align: right; }\n'
'td { padding: 5px 6px; border-bottom: 1px solid #eee; }\n'
'tr:hover td { background: rgba(0,0,0,0.02); }\n'
'.tag { display: inline-block; padding: 1px 6px; border-radius: 3px;\n'
' font-size: 11px; font-weight: 600; }\n'
'.tag-long { background: #dcfce7; color: #166534; }\n'
'.tag-call { background: #dbeafe; color: #1e40af; }\n'
'.tag-put { background: #fee2e2; color: #991b1b; }\n'
'.summary { margin-top: 12px; font-size: 13px; display: flex;\n'
' gap: 24px; font-weight: 500; }\n'
'.summary span { color: #666; font-weight: 400; }\n'
'td.cb { width: 24px; text-align: center; }\n'
'td.cb input { margin: 0; cursor: pointer; }\n'
'tr.excluded td { opacity: 0.35; }\n'
'tr.excluded td.cb { opacity: 1; }\n'
'body.dark { color: #d4d4d4; }\n'
'body.dark .controls input, body.dark .controls select {\n'
' background: #2a2a2a; color: #d4d4d4; border-color: #555; }\n'
'body.dark th { color: #999; border-bottom-color: #444; }\n'
'body.dark td { border-bottom-color: #333; }\n'
'body.dark tr:hover td { background: rgba(255,255,255,0.03); }\n'
'body.dark .tag-long { background: #14532d; color: #86efac; }\n'
'body.dark .tag-call { background: #1e3a5f; color: #93c5fd; }\n'
'body.dark .tag-put { background: #450a0a; color: #fca5a5; }\n'
'body.dark .meta { color: #777; }\n'
'body.dark tr.excluded td { opacity: 0.3; }\n'
'body.dark .summary span { color: #888; }\n'
)
JS = r"""
var DATA = {
equity_only: %s,
full: %s
};
var excluded = {};
function posKey(r) { return r.ticker + '_' + r.type; }
function validBasis(r) {
return typeof r.capital_basis === 'number' &&
isFinite(r.capital_basis) &&
r.capital_basis > 0;
}
function esc(value) {
return String(value == null ? '' : value).replace(/[&<>"']/g, function(ch) {
return {
'&': '&', '<': '<', '>': '>',
'"': '"', "'": '''
}[ch];
});
}
function syncCutoff() {
var cutoff = (parseFloat(document.getElementById('cutoff').value) || 0) / 100;
var mode = document.getElementById('mode').value;
var rows = DATA[mode];
excluded = {};
rows.forEach(function(r) {
if (r.weight < cutoff) excluded[posKey(r)] = true;
});
}
function render() {
var bankroll = parseFloat(document.getElementById('bankroll').value) || 0;
var mode = document.getElementById('mode').value;
var rows = DATA[mode];
var showOptionDetails = mode === 'full';
// Show mode description
var descEl = document.getElementById('mode-desc');
if (mode === 'equity_only') {
descEl.textContent = 'Uses shares only; calls become long underlying and puts become short underlying.';
} else {
descEl.textContent = 'Uses deployed capital; option rows target 13F underlying notional and spend estimated premium.';
}
// All rows shown; excluded rows are greyed out
var active = rows.filter(function(r) {
return !excluded[posKey(r)] && validBasis(r);
});
var totalBasis = active.reduce(function(s, r) {
return s + r.capital_basis;
}, 0);
var allocated = 0;
var unsizedCapital = 0;
var computed = rows.map(function(r) {
var key = posKey(r);
var isExcl = !!excluded[key];
var hasBasis = validBasis(r);
var adjWeight = (!isExcl && hasBasis && totalBasis > 0) ?
r.capital_basis / totalBasis : 0;
var scale = (!isExcl && hasBasis && totalBasis > 0) ?
bankroll / totalBasis : 0;
var target = (!isExcl && hasBasis) ? r.reported_value * scale : null;
var capitalTarget = (!isExcl && hasBasis) ?
r.capital_basis * scale : 0;
var multiplier = r.multiplier || 1;
var isOption = r.instrument === 'option';
var direction = r.direction || 'long';
var sizingPrice = isOption ? r.underlying_price : r.price;
if (!sizingPrice || !r.price || isExcl || !hasBasis) {
if (!isExcl && hasBasis) unsizedCapital += capitalTarget;
return { ticker: r.ticker, type: r.type, weight: r.weight,
adjWeight: adjWeight, target: target,
excluded: isExcl, key: key,
instrument: r.instrument || 'stock',
strike: r.strike || null, expiry: r.expiry || null,
underlyingPrice: r.underlying_price || null,
priceAsOf: r.price_as_of || null,
direction: direction,
price: r.price, units: null, cost: null };
}
var units = Math.floor(target / (sizingPrice * multiplier));
var signedUnits = direction === 'short' ? -units : units;
var cost = units * r.price * multiplier;
allocated += cost;
return { ticker: r.ticker, type: r.type, weight: r.weight,
adjWeight: adjWeight, target: target,
excluded: isExcl, key: key, instrument: r.instrument || 'stock',
strike: r.strike || null, expiry: r.expiry || null,
underlyingPrice: r.underlying_price || null,
priceAsOf: r.price_as_of || null,
direction: direction,
price: r.price, units: signedUnits, cost: cost };
});
var html = '<div class="table-wrap"><table><thead><tr>';
html += '<th></th><th>Ticker</th>';
html += '<th>Type</th>';
if (showOptionDetails) html += '<th class="r">Strike</th><th class="r">Expiry</th>';
html += '<th class="r">Weight</th>';
html += '<th class="r">Target</th>';
html += '<th class="r">Price</th>';
html += '<th class="r">Units</th><th class="r">Cost</th></tr></thead><tbody>';
computed.forEach(function(c) {
html += '<tr' + (c.excluded ? ' class="excluded"' : '') + '>';
html += '<td class="cb"><input type="checkbox" data-key="' + esc(c.key) + '"' + (c.excluded ? '' : ' checked') + '></td>';
html += '<td><strong>' + esc(c.ticker) + '</strong></td>';
var cls = c.type === 'put' ? 'tag-put' : c.type === 'call' ? 'tag-call' : 'tag-long';
var typeText = c.type;
if (mode === 'equity_only' && c.type === 'call') typeText = 'call as long';
if (mode === 'equity_only' && c.type === 'put') typeText = 'put as short';
html += '<td><span class="tag ' + cls + '">' + esc(typeText) + '</span></td>';
if (showOptionDetails) {
if (c.strike) {
html += '<td class="r">$' + c.strike.toFixed(0) + '</td>';
html += '<td class="r">' + esc(c.expiry || '\u2014') + '</td>';
} else {
html += '<td class="r">\u2014</td><td class="r">\u2014</td>';
}
}
html += '<td class="r">' + (c.excluded ? '0.0' : (c.adjWeight * 100).toFixed(1)) + '%%</td>';
html += '<td class="r">' + (c.excluded ? '$0.00' : (c.target == null ? 'N/A' : '$' + c.target.toFixed(2))) + '</td>';
var priceText = c.price != null ? '$' + c.price.toFixed(2) : 'N/A';
if (showOptionDetails && c.instrument === 'option' && c.priceAsOf) {
priceText += '<br><span class="muted">' + esc(c.priceAsOf) + '</span>';
}
html += '<td class="r">' + priceText + '</td>';
html += '<td class="r">' + (c.units != null ? c.units.toLocaleString() : 'N/A') + '</td>';
html += '<td class="r">' + (c.cost != null ? '$' + c.cost.toFixed(2) : 'N/A') + '</td>';
html += '</tr>';
});
html += '</tbody></table></div>';
html += '<div class="summary">';
html += '<div><span>' + (mode === 'full' ? 'Allocated' : 'Gross exposure') + ':</span> $' + allocated.toFixed(2) + '</div>';
html += '<div><span>Unsized capital:</span> $' + unsizedCapital.toFixed(2) + '</div>';
html += '<div><span>Residual:</span> $' + (bankroll - allocated - unsizedCapital).toFixed(2) + '</div>';
html += '</div>';
document.getElementById('output').innerHTML = html;
// Auto-resize iframe to fit content
try {
var el = window.frameElement;
if (el) el.style.height = document.body.scrollHeight + 'px';
} catch(e) {}
}
document.getElementById('bankroll').addEventListener('input', render);
document.getElementById('mode').addEventListener('change', function() { syncCutoff(); render(); });
document.getElementById('cutoff').addEventListener('input', function() { syncCutoff(); render(); });
document.getElementById('output').addEventListener('change', function(e) {
if (e.target.type === 'checkbox' && e.target.dataset.key) {
if (e.target.checked) {
delete excluded[e.target.dataset.key];
} else {
excluded[e.target.dataset.key] = true;
}
render();
}
});
// Dark mode
function isDark() {
try { return parent.document.documentElement.getAttribute('data-theme') === 'dark'; }
catch(e) { return window.matchMedia('(prefers-color-scheme: dark)').matches; }
}
function applyTheme() {
document.body.classList.toggle('dark', isDark());
}
applyTheme();
try {
new MutationObserver(applyTheme).observe(
parent.document.documentElement,
{ attributes: true, attributeFilter: ['data-theme'] });
} catch(e) {}
render();
""" % (json.dumps(eq_data), json.dumps(full_data))
BODY = (
'<div class="controls">\n'
' <label for="bankroll">Bankroll ($)</label>\n'
' <input type="number" id="bankroll" value="100000" min="0" step="100">\n'
' <label for="mode">Mode</label>\n'
' <select id="mode">\n'
' <option value="equity_only" selected>Equity proxy</option>\n'
' <option value="full">Option proxy</option>\n'
' </select>\n'
' <label for="cutoff">Cutoff (%%)</label>\n'
' <input type="number" id="cutoff" value="0" min="0" max="100"'
' step="0.5" style="width:80px">\n'
'</div>\n'
'<div id="mode-desc" class="meta" style="font-style:italic"></div>\n'
'<div class="meta">\n'
' %s filing (filed %s) · underlying prices as of %s\n'
'</div>\n'
'<div id="output"></div>\n'
) % (quarter, filing_date, today)
html = (
'<!DOCTYPE html>\n<html>\n<head>\n'
'<meta charset="utf-8">\n'
'<meta http-equiv="Cache-Control" content="no-cache, no-store, must-revalidate">\n'
'<style>\n' + CSS + '</style>\n'
'</head>\n<body>\n'
+ BODY
+ '<script>\n' + JS + '\n</script>\n'
'</body>\n</html>'
)
outpath = os.path.join(HUGO_BASE, 'static', 'images', 'sa-lp-calculator.html')
with open(outpath, 'w') as f:
f.write(html)
Based on the five filings to date, the fund files within 0–3 days of the 45-day deadline:
| Quarter end | 45-day deadline | Actual filing | Days early |
|---|---|---|---|
| 2024-12-31 | Feb 14 | Feb 12 | 2 |
| 2025-03-31 | May 15 | May 14 | 1 |
| 2025-06-30 | Aug 14 | Aug 14 | 0 |
| 2025-09-30 | Nov 14 | Nov 14 | 0 |
| 2025-12-31 | Feb 14 | Feb 11 | 3 |
To be notified when a new filing lands, subscribe by email or RSS. A daily job checks SEC EDGAR and pushes a notification within hours of the filing appearing. If you prefer a standing reminder in your calendar instead, this Google Calendar has a recurring event for every quarter, around five days before the deadline. (The advantage of email or RSS over the calendar event is that the notification is triggered right when the filing takes place, rather than on a fixed calendar date which may not coincide with the filing event.) All the relevant code blocks re-evaluate daily, so the portfolio composition, backtest results, and copycat delay analysis should remain up-to-date, but please let me know if anything looks dated.
With thanks to Johannes Treutlein, Bastian Stern and Jonas Vollmer for discussion.
The code in the blocks that follow was written by Claude Opus 4.6 and audited by GPT-5.4. In the source org-mode file, reusable helpers live in separate noweb blocks; in the published note, those helpers are expanded into each block so the code can run independently. ↩︎
The displayed option-proxy results come entirely from MarketData historical option chains and quotes.
MARKETDATA_KEYmust be set; with it, the code selects representative contracts from historical chains and tracks their historical mid prices. The tested Starter responses returned null historical Greek fields, so the code uses vendor delta when present and otherwise infers delta from the observed option mid-price, underlying price, strike, and expiration. This affects only contract selection; returns still come from observed option quotes. ↩︎The single-switch assumption is a simplification: the fund likely makes multiple trades throughout the quarter. But since we only observe quarter-end snapshots, a uniform single-switch model is the most we can extract from the available data. ↩︎
The last two paragraphs in this section were written with considerable assistance from Opus 4.6. Claude assures me that a Monte Carlo simulation with 3000 samples yields essentially the same results as the analytical approach described here. ↩︎