Situational Awareness LP
Table of contents
Situational Awareness LP is a hedge fund founded in 2024 by Leopold Aschenbrenner and co-managed by Carl Shulman. It is backed by Patrick & John Collison, Daniel Gross, and Nat Friedman. The fund’s thesis is explicitly AGI-focused, pursuing an opportunistic approach to public equities and strategic investments in semiconductor companies and energy infrastructure.
The copycat approach
Investing directly in the fund requires a $25M minimum, a two-year lockup followed by quarterly redemptions over two more years, and Qualified Purchaser accreditation. The fund also has the standard “2 and 20” fee structure. If those barriers are too much for you, there is an alternative. Institutional investment managers with over $100M in qualifying assets must file Form 13F with the United States Securities and Exchange Commission (SEC) each quarter, disclosing their long equity positions, options, and convertible bonds. Filings are due 45 days after quarter-end and are published on the SEC’s EDGAR system.
This approach has significant limitations, however: the copycat sees each portfolio only after it is disclosed, not when the fund actually trades into it;1 13F filings exclude short positions, foreign-listed securities, and non-equity assets; and, for options, filings report only the notional value of the underlying shares, omitting strike prices or expirations. Still, the loss of fidelity may be acceptable if one is sufficiently bullish on the fund’s strategy and wants a simple way to get exposure to its public equity bets.
Backtesting the strategy
A trader interested in pursuing the strategy would proceed as follows. Each time a 13F filing is published (the “filing date”, typically ~45 days after quarter-end), they would buy the disclosed portfolio at that day’s closing prices, weighting each position by its reported dollar value. They would hold that portfolio unchanged until the next filing is published, at which point they would rebalance to match the new disclosure.
We can backtest the strategy using all existing filings and calculating its returns to date. The script below computes compounded returns in two modes: equity-only (long stock positions reweighted to 100%, ignoring options) and full exposure (calls treated as long stock, puts as short stock—a rough proxy since, as noted, strikes and expirations are undisclosed). The last period always runs from the most recent filing date to today’s date, so re-evaluating the script updates that partial-period figure automatically.
Code
# ── SA LP 13F data fetcher ─────────────────────────────────────────
# Fetches all 13F-HR filings from SEC EDGAR for Situational Awareness LP,
# parses the infotable XML, and resolves CUSIPs to tickers.
# Output: JSON with one entry per quarterly filing, each containing
# the filing date, quarter label, and a list of holdings.
# Caches results in CACHE_DIR to avoid redundant SEC requests.
import urllib.request, re, json, sys, time, os, xml.etree.ElementTree as ET
from datetime import datetime
# ── Configuration (update these for your environment) ──────────────
SEC_UA = os.environ.get(
'SEC_USER_AGENT',
'stafforini.com situational-awareness-lp research; contact via stafforini.com')
CACHE_DIR = os.path.expanduser('~/.cache')
CIK = '2045724'
BASE = f'https://www.sec.gov/Archives/edgar/data/{CIK}'
NS = {'ns': 'http://www.sec.gov/edgar/document/thirteenf/informationtable'}
CACHE = os.path.join(CACHE_DIR, 'sa-lp-13f.json')
CUSIP_TICKER = {
'038169207': 'APLD', '05614L209': 'BW', '09173B107': 'BITF',
'093712107': 'BE', '093712AH0': 'BE', '11135F101': 'AVGO',
'12514G108': 'CIFR', '17253J106': 'CIFR', '17253JAA4': 'CIFR',
'18452B209': 'CLSK', '19247G107': 'COHR', '21037T109': 'CEG',
'21873S108': 'CRWV', '21874A106': 'CORZ', '26884L109': 'EQT',
'36168Q104': 'GLXY', '36317J209': 'GLXY', '44282L109': 'HUT',
'44812J104': 'HUT', '456788108': 'INFY', '458140100': 'INTC',
'49338L103': 'KRC', '49427F108': 'KRC', '53115L104': 'LBRT',
'55024U109': 'LITE', '55024UAD1': 'LITE', '573874104': 'MRVL',
'577933104': 'MRVL', '593787101': 'MU', '593787105': 'MU',
'595112103': 'MU', '607828100': 'MOD', '67066G104': 'NVDA',
'683344105': 'ONTO', '68340J108': 'ONTO', '73933G202': 'PSIX',
'73933H100': 'PSIX', '743344109': 'PUMP', '74347M108': 'PUMP',
'76754A103': 'RIOT', '767292105': 'RIOT', '80004C200': 'SNDK',
'80106M109': 'SNDK', '83418M103': 'SEI', '87422Q109': 'TLN',
'87425V106': 'TLN', '874039100': 'TSM', '89854H102': 'TSEM',
'92189F106': 'SMH', '92189F676': 'SMH', '92535P101': 'VRT',
'92537N108': 'VRT', '92840M102': 'VST', '958102105': 'WDC',
'958102AT2': 'WDC', '98321C108': 'WYFI',
'G1110V104': 'BITF', 'G1189L107': 'BTDR', 'G11448100': 'BTDR',
'G7945J104': 'STX', 'G7997R103': 'STX', 'G96115103': 'WYFI',
'M87915274': 'TSEM', 'Q4982L109': 'IREN',
}
def fetch(url, timeout=10):
time.sleep(0.5)
req = urllib.request.Request(url, headers={'User-Agent': SEC_UA})
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read()
def find_infotable_filename(acc):
"""Discover the infotable XML filename for a filing via EFTS, then -index.htm."""
# EFTS search (fast, reliable)
try:
efts = f'https://efts.sec.gov/LATEST/search-index?q=%22{acc}%22'
data = json.loads(fetch(efts))
for hit in data.get('hits', {}).get('hits', []):
doc_id = hit['_id'] # format: "accession:filename"
filename = doc_id.split(':', 1)[1] if ':' in doc_id else ''
if filename.endswith('.xml') and 'primary_doc' not in filename:
return filename
except Exception:
pass
# Fallback: filing index page
acc_path = acc.replace('-', '')
try:
html = fetch(f'{BASE}/{acc_path}/{acc}-index.htm').decode()
for href in re.findall(r'href="([^"]*\.xml)"', html):
fn = href.split('/')[-1]
if fn != 'primary_doc.xml' and 'xslForm' not in href:
return fn
except Exception:
pass
return None
def parse_infotable(xml_data):
root = ET.fromstring(xml_data)
holdings = []
for info in root.findall('.//ns:infoTable', NS):
cusip = info.findtext('ns:cusip', '', NS).strip()
value = int(info.findtext('ns:value', '0', NS))
putcall = info.findtext('ns:putCall', '', NS).strip().lower()
ticker = CUSIP_TICKER.get(cusip, '')
if not ticker:
issuer = info.findtext('ns:nameOfIssuer', '', NS)
print(f"WARNING: Unknown CUSIP {cusip} ({issuer})", file=sys.stderr)
ticker = f"UNKNOWN_{cusip}"
pos_type = 'put' if putcall == 'put' else 'call' if putcall == 'call' else 'long'
holdings.append({"ticker": ticker, "type": pos_type, "value": value})
return holdings
def quarter_from_filing_date(fdate):
d = datetime.strptime(fdate, '%Y-%m-%d')
m, y = d.month, d.year
if m <= 3: return f'Q4_{y-1}', f'{y-1}-12-31'
elif m <= 6: return f'Q1_{y}', f'{y}-03-31'
elif m <= 9: return f'Q2_{y}', f'{y}-06-30'
else: return f'Q3_{y}', f'{y}-09-30'
def load_cache():
if os.path.exists(CACHE):
with open(CACHE) as f:
return json.load(f)
return {"filings": []}
def save_cache(data):
os.makedirs(os.path.dirname(CACHE), exist_ok=True)
with open(CACHE, 'w') as f:
json.dump(data, f)
cached = load_cache()
cached_quarters = {f["quarter"] for f in cached["filings"]}
result = {"filings": list(cached["filings"])}
try:
subs_url = f'https://data.sec.gov/submissions/CIK{CIK.zfill(10)}.json'
subs = json.loads(fetch(subs_url))
recent = subs['filings']['recent']
accessions = sorted(
[(recent['filingDate'][i], recent['accessionNumber'][i])
for i in range(len(recent['form'])) if recent['form'][i] == '13F-HR'])
for fdate, acc in accessions:
quarter, quarter_end = quarter_from_filing_date(fdate)
if quarter in cached_quarters:
continue
filename = find_infotable_filename(acc)
if not filename:
print(f"Could not find infotable for {acc}", file=sys.stderr)
continue
acc_path = acc.replace('-', '')
xml = fetch(f'{BASE}/{acc_path}/{filename}')
holdings = parse_infotable(xml)
result["filings"].append({
"quarter": quarter, "quarter_end": quarter_end,
"filing_date": fdate, "holdings": holdings})
result["filings"].sort(key=lambda f: f["filing_date"])
save_cache(result)
except Exception as e:
if result["filings"]:
print(f"SEC fetch error ({e}); using cache", file=sys.stderr)
else:
raise
return json.dumps(result)
import json
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
import requests
import time
import os
import warnings
warnings.filterwarnings('ignore')
# Parse data from the scraper block
parsed = json.loads(data) if isinstance(data, str) else data
filings = parsed["filings"]
# Build internal structures
filing_dates = {f["quarter"]: f["filing_date"] for f in filings}
quarter_end_dates = {f["quarter"]: f["quarter_end"] for f in filings}
quarters = [f["quarter"] for f in filings]
# Convert holdings list to dict keyed by quarter.
# Multiple positions in the same ticker with different types are aggregated
# by value per (ticker, type) pair.
holdings = {}
for f in filings:
positions = {}
for h in f["holdings"]:
ticker = h["ticker"]
pos_type = h["type"]
value = h["value"]
key = (ticker, pos_type)
positions[key] = positions.get(key, 0) + value
holdings[f["quarter"]] = positions
def get_prices(tickers, dates):
"""Fetch close prices for tickers on specific dates."""
unique_tickers = sorted(set(tickers))
all_dates = [datetime.strptime(d, '%Y-%m-%d') for d in dates]
start = min(all_dates) - timedelta(days=5)
end = max(all_dates) + timedelta(days=5)
df = yf.download(unique_tickers, start=start, end=end, progress=False, auto_adjust=True)
if df.empty:
return {}
# yf.download returns MultiIndex columns (metric, ticker) for multiple tickers
if isinstance(df.columns, pd.MultiIndex):
close = df['Close']
else:
close = df[['Close']]
close.columns = unique_tickers
prices = {}
for ticker in unique_tickers:
if ticker not in close.columns:
continue
series = close[ticker].dropna()
if series.empty:
continue
prices[ticker] = {}
for date_str in dates:
target = pd.Timestamp(datetime.strptime(date_str, '%Y-%m-%d'))
after = series[series.index >= target]
if not after.empty:
prices[ticker][date_str] = float(after.iloc[0])
else:
before = series[series.index <= target]
if not before.empty:
prices[ticker][date_str] = float(before.iloc[-1])
return prices
def _price_on_or_after(px_by_date, target_date):
"""Return (date, price) for the first available price on/after target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d >= target_date)
if not dates:
return None
d = dates[0]
return d, px_by_date[d]
def _price_on_or_before(px_by_date, target_date):
"""Return (date, price) for the last available price on/before target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d <= target_date)
if not dates:
return None
d = dates[-1]
return d, px_by_date[d]
def _period_price_pair(px_by_date, start_date, end_date):
"""Return start/end prices for a period using sensible boundary alignment."""
start = _price_on_or_after(px_by_date, start_date)
end = _price_on_or_before(px_by_date, end_date)
if start is None or end is None:
return None
start_actual, p0 = start
end_actual, p1 = end
if end_actual < start_actual:
return None
return start_actual, end_actual, p0, p1
def _option_position_key(ticker, pos_type):
return (ticker, pos_type)
def compute_return(positions, prices, start_date, end_date, mode='equity_only',
option_prices=None, option_params=None):
"""Compute weighted portfolio return between two dates.
For option positions (call/put): if option_prices contains actual
historical option prices for the ticker, compute returns directly from
those prices. Otherwise fall back to Black-Scholes repricing using
the parameters in option_params.
"""
total_value = 0
weighted_return = 0
for (ticker, pos_type), value in positions.items():
if mode == 'equity_only' and pos_type != 'long':
continue
is_option = pos_type in ('call', 'put')
opt_key = _option_position_key(ticker, pos_type)
opt_px = option_prices.get(opt_key) if option_prices else None
use_option_px = bool(is_option and opt_px)
use_bs = False
if use_option_px:
pair = _period_price_pair(opt_px, start_date, end_date)
if pair is None:
use_option_px = False
else:
start_actual, end_actual, p0, p1 = pair
if not use_option_px:
px = prices.get(ticker)
pair = _period_price_pair(px, start_date, end_date)
if pair is None:
continue
start_actual, end_actual, p0, p1 = pair
if p0 == 0:
continue
ret = (p1 - p0) / p0
# Fallback: BS repricing when no option price data
if is_option and not use_option_px:
params = option_params.get(ticker) if option_params else None
if params:
K_call, K_put, sigma = params
K_over_S = K_call if pos_type == 'call' else K_put
dt = days_between(start_actual, end_actual) / 365.25
ret = bs_option_return(ret, K_over_S, OPTION_T, dt, sigma,
pos_type)
use_bs = True
total_value += value
# When using actual option/BS prices the return already reflects
# the directional bet, so sign is +1. The -1 for puts applies
# only to the raw stock-price proxy path.
sign = 1 if (use_option_px or use_bs) else (
-1 if pos_type == 'put' else 1)
weighted_return += value * sign * ret
return weighted_return / total_value if total_value else None
def annualize(ret, days):
"""Annualize a return over a given number of calendar days."""
if ret is None or days <= 0:
return None
return (1 + ret) ** (365.25 / days) - 1
def days_between(d1, d2):
return (datetime.strptime(d2, '%Y-%m-%d') - datetime.strptime(d1, '%Y-%m-%d')).days
def fmt(ret):
return f"{ret * 100:+.2f}%" if ret is not None else "N/A"
# Collect all tickers and dates
all_tickers = set()
for positions in holdings.values():
for (ticker, _) in positions:
all_tickers.add(ticker)
all_tickers.add('SPY')
today = datetime.now().strftime('%Y-%m-%d')
first_date = filing_dates[quarters[0]]
all_dates = set(filing_dates.values()) | set(quarter_end_dates.values()) | {today}
prices = get_prices(sorted(all_tickers), sorted(all_dates))
# Resolve `today` to the actual last available closing date.
# yfinance may not have data for today (market still open or holiday),
# so we look up what date SPY's price actually corresponds to.
def _resolve_price_date(prices, requested_date):
"""Return the actual trading date of the price stored under requested_date."""
ref = 'SPY' if 'SPY' in prices else next(iter(prices), None)
if not ref or requested_date not in prices[ref]:
return requested_date
target_price = prices[ref][requested_date]
# Re-download a small window to find the real date of this price
start = datetime.strptime(requested_date, '%Y-%m-%d') - timedelta(days=10)
end = datetime.strptime(requested_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(ref, start=start, end=end, progress=False, auto_adjust=True)
if df.empty:
return requested_date
if isinstance(df.columns, pd.MultiIndex):
close = df['Close'][ref].dropna()
elif 'Close' in df.columns:
close = df['Close'].dropna()
else:
close = df.iloc[:, 0].dropna()
for dt, px in close.items():
val = float(px.iloc[0]) if isinstance(px, pd.Series) else float(px)
if abs(val - target_price) < 0.01:
ts = dt[0] if isinstance(dt, tuple) else dt
return pd.Timestamp(ts).strftime('%Y-%m-%d')
return requested_date
today_resolved = _resolve_price_date(prices, today)
if today_resolved != today:
for ticker in prices:
if today in prices[ticker]:
prices[ticker][today_resolved] = prices[ticker].pop(today)
today = today_resolved
def download_daily(tickers, start_date, end_date):
"""Download daily close prices from yfinance, handling MultiIndex.
Dates are 'YYYY-MM-DD' strings. Adds a small buffer for trading-day alignment."""
tickers_sorted = sorted(tickers)
start = datetime.strptime(start_date, '%Y-%m-%d') - timedelta(days=5)
end = datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(tickers_sorted, start=start, end=end,
progress=False, auto_adjust=True)
if df.empty:
return pd.DataFrame()
if isinstance(df.columns, pd.MultiIndex):
return df['Close']
close = df[['Close']]
close.columns = tickers_sorted
return close
# -- Historical option prices via Alpha Vantage ----------------------------
OPTION_CACHE_DIR = os.path.expanduser('~/My Drive/notes/.sa-lp-option-cache')
_AV_BASE = 'https://www.alphavantage.co/query'
_AV_RATE_DELAY = 0.85 # seconds between requests (75 req/min limit)
OPTION_CACHE_COLUMNS = [
'date', 'option_type', 'strike', 'expiry', 'delta', 'price']
def _normalize_option_type(option_type):
option_type = str(option_type).lower()
if option_type not in ('call', 'put'):
raise ValueError(f"Unsupported option type: {option_type}")
return option_type
def _empty_option_cache():
return pd.DataFrame(columns=OPTION_CACHE_COLUMNS)
def _option_cache_path(ticker, option_type, legacy=False):
if legacy:
return os.path.join(OPTION_CACHE_DIR, f'{ticker}.csv')
option_type = _normalize_option_type(option_type)
return os.path.join(OPTION_CACHE_DIR, f'{ticker}-{option_type}.csv')
def _load_option_cache(ticker, option_type):
"""Load cached option data for a ticker/type. Returns DataFrame or empty."""
option_type = _normalize_option_type(option_type)
paths = [_option_cache_path(ticker, option_type)]
# Pre-fix caches were call-only and named TICKER.csv. They are safe to
# reuse for calls but must not be reused for puts.
if option_type == 'call':
legacy_path = _option_cache_path(ticker, option_type, legacy=True)
if legacy_path not in paths:
paths.append(legacy_path)
frames = []
for path in paths:
if not os.path.exists(path):
continue
df = pd.read_csv(path)
if df.empty:
continue
if 'option_type' not in df.columns:
df['option_type'] = 'call'
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
df['date'] = pd.to_datetime(df['date'], errors='coerce').dt.strftime(
'%Y-%m-%d')
df['option_type'] = df['option_type'].fillna(option_type).str.lower()
frames.append(df[OPTION_CACHE_COLUMNS])
if not frames:
return _empty_option_cache()
cache = pd.concat(frames, ignore_index=True)
cache = cache[cache['option_type'] == option_type].copy()
cache.dropna(subset=['date'], inplace=True)
for col in ('strike', 'delta', 'price'):
cache[col] = pd.to_numeric(cache[col], errors='coerce')
cache.drop_duplicates(
subset=['date', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
return cache[OPTION_CACHE_COLUMNS]
def _save_option_cache(ticker, option_type, df):
"""Persist typed option cache to CSV."""
option_type = _normalize_option_type(option_type)
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
path = _option_cache_path(ticker, option_type)
if df.empty:
df = _empty_option_cache()
else:
df = df.copy()
df['option_type'] = option_type
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
df.drop_duplicates(
subset=['date', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
df.sort_values(['date', 'expiry', 'strike'], inplace=True)
df.to_csv(path, index=False)
def _fetch_option_chain(ticker, date_str, api_key):
"""Fetch the full option chain for ticker on a given date from Alpha Vantage."""
params = {
'function': 'HISTORICAL_OPTIONS',
'symbol': ticker,
'date': date_str,
'apikey': api_key,
}
try:
resp = requests.get(_AV_BASE, params=params, timeout=30)
resp.raise_for_status()
body = resp.json()
return body.get('data', [])
except Exception:
return []
def _contract_window(ref_date_str):
ref = datetime.strptime(ref_date_str, '%Y-%m-%d')
return ref + timedelta(days=270), ref + timedelta(days=456)
def _contract_from_cache_row(row, ref_date_str, option_type):
option_type = _normalize_option_type(option_type)
if str(row.get('option_type', option_type)).lower() != option_type:
return None
lo, hi = _contract_window(ref_date_str)
try:
exp = datetime.strptime(str(row['expiry']), '%Y-%m-%d')
except (KeyError, TypeError, ValueError):
return None
if not (lo <= exp <= hi):
return None
strike = _safe_float(row.get('strike'))
delta = _safe_float(row.get('delta'))
price = _safe_float(row.get('price'))
if strike is None or delta is None or price is None or price <= 0:
return None
return {
'option_type': option_type,
'strike': strike,
'expiry': str(row['expiry']),
'delta': delta,
'price': price,
}
def _select_cached_contract(cache, option_type, ref_date_str):
rows = cache[(cache['date'] == ref_date_str)
& (cache['option_type'] == option_type)]
candidates = []
for _, row in rows.iterrows():
contract = _contract_from_cache_row(row, ref_date_str, option_type)
if contract:
candidates.append(contract)
if not candidates:
return None
candidates.sort(key=lambda x: abs(abs(x['delta']) - OPTION_DELTA))
return candidates[0]
def _select_best_contract(chain, ref_date_str, option_type):
"""Select the best-matching call/put contract from an option chain.
Criteria: matching option type, expiry 9-15 months from ref_date,
absolute delta closest to 0.15. Returns dict with type, strike, expiry,
delta, price or None.
"""
option_type = _normalize_option_type(option_type)
lo, hi = _contract_window(ref_date_str)
candidates = []
for c in chain:
if c.get('type', '').lower() != option_type:
continue
try:
exp = datetime.strptime(c['expiration'], '%Y-%m-%d')
except (KeyError, ValueError):
continue
if not (lo <= exp <= hi):
continue
delta = _safe_float(c.get('delta'))
if delta is None:
continue
abs_delta = abs(delta)
if abs_delta == 0:
continue
# Price: prefer mid if available, else last
price = _parse_option_price(c)
if price is None or price <= 0:
continue
candidates.append({
'option_type': option_type,
'strike': float(c['strike']),
'expiry': c['expiration'],
'delta': delta,
'price': price,
})
if not candidates:
return None
# Pick contract with absolute delta closest to 0.15.
candidates.sort(key=lambda x: abs(abs(x['delta']) - OPTION_DELTA))
return candidates[0]
def _parse_option_price(contract):
"""Extract a price from an Alpha Vantage option contract record."""
bid = _safe_float(contract.get('bid'))
ask = _safe_float(contract.get('ask'))
last = _safe_float(contract.get('last'))
if bid and ask and bid > 0 and ask > 0:
return (bid + ask) / 2
if last and last > 0:
return last
return None
def _safe_float(val):
try:
return float(val)
except (TypeError, ValueError):
return None
def _cached_contract_price(cache, option_type, date_str, strike, expiry):
rows = cache[(cache['date'] == date_str)
& (cache['option_type'] == option_type)
& (abs(cache['strike'] - strike) < 0.01)
& (cache['expiry'].astype(str) == str(expiry))]
for _, row in rows.iterrows():
price = _safe_float(row.get('price'))
if price is not None and price > 0:
return price
return None
def _extract_contract_price(chain, strike, expiry, option_type):
"""Find the price of a specific option contract."""
option_type = _normalize_option_type(option_type)
for c in chain:
if c.get('type', '').lower() != option_type:
continue
try:
if (abs(float(c['strike']) - strike) < 0.01
and c.get('expiration') == expiry):
return _parse_option_price(c)
except (KeyError, TypeError, ValueError):
continue
return None
def download_option_prices(option_positions, quarters, holdings, filing_dates,
today):
"""Download historical option prices from Alpha Vantage.
For each (ticker, option_type) and each filing period:
1. On the first trading day, fetch the chain and select the best contract
(matching type, expiry 9-15 months out, |delta| closest to 0.15).
2. Lock in that contract for the period.
3. Fetch the price of that contract on each subsequent trading day.
Returns
-------
per_period : dict {quarter_str: {(ticker, type): {date_str: float}}}
Option prices keyed by filing period then option position. Each period
has its own contract's prices, avoiding cross-contract mixing at
boundary dates where one period ends and the next begins.
fallback_positions : set
Option positions where no option data was found (need BS fallback).
"""
option_positions = sorted({
(ticker, _normalize_option_type(pos_type))
for ticker, pos_type in option_positions})
api_key = os.environ.get('ALPHA_VANTAGE_KEY', '')
cache_only = not bool(api_key)
if cache_only:
print("WARNING: ALPHA_VANTAGE_KEY not set; using cached option "
"prices where available and BS repricing elsewhere.")
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
per_period = {} # {q: {(ticker, type): {date_str: price}}}
fallback = set()
fetched = 0
for ticker, option_type in option_positions:
opt_key = _option_position_key(ticker, option_type)
cache = _load_option_cache(ticker, option_type)
position_has_data = False
new_rows = []
for i, q in enumerate(quarters):
# Skip quarters where this exact option position is absent.
has_opts = opt_key in holdings[q]
if not has_opts:
continue
period_start = filing_dates[q]
period_end = (filing_dates[quarters[i + 1]]
if i < len(quarters) - 1 else today)
trading_days = pd.bdate_range(period_start, period_end)
if len(trading_days) == 0:
continue
first_day = trading_days[0].strftime('%Y-%m-%d')
# -- Select contract on first trading day --
from_cache = False
contract = _select_cached_contract(cache, option_type, first_day)
if contract:
from_cache = True
else:
if not cache_only:
time.sleep(_AV_RATE_DELAY)
chain = _fetch_option_chain(ticker, first_day, api_key)
fetched += 1
contract = _select_best_contract(
chain, first_day, option_type)
if contract:
new_rows.append({
'date': first_day,
'option_type': option_type,
'strike': contract['strike'],
'expiry': contract['expiry'],
'delta': contract['delta'],
'price': contract['price'],
})
if contract is None:
fallback.add(opt_key)
continue
strike = contract['strike']
expiry = contract['expiry']
# -- Collect prices for this period (fresh dict per period) --
period_prices = {}
if from_cache:
# Fast path: read all matching prices from cache (no API calls).
rows = cache[
(cache['date'] >= period_start)
& (cache['date'] <= period_end)
& (cache['option_type'] == option_type)
& (abs(cache['strike'] - strike) < 0.01)
& (cache['expiry'].astype(str) == str(expiry))
& pd.notna(cache['price'])]
for _, row in rows.iterrows():
period_prices[row['date']] = float(row['price'])
else:
# Slow path: fetch each trading day from API
if contract.get('price'):
period_prices[first_day] = contract['price']
for day in trading_days[1:]:
day_str = day.strftime('%Y-%m-%d')
cached_price = _cached_contract_price(
cache, option_type, day_str, strike, expiry)
if cached_price is not None:
period_prices[day_str] = cached_price
continue
time.sleep(_AV_RATE_DELAY)
chain = _fetch_option_chain(ticker, day_str, api_key)
fetched += 1
price = _extract_contract_price(
chain, strike, expiry, option_type)
if price is not None:
period_prices[day_str] = price
new_rows.append({
'date': day_str,
'option_type': option_type,
'strike': strike,
'expiry': expiry,
'delta': contract['delta'],
'price': price,
})
# Accumulate per-period prices
if period_prices:
per_period.setdefault(q, {})[opt_key] = period_prices
position_has_data = True
# Persist new data to cache
if new_rows:
new_df = pd.DataFrame(new_rows)
cache = pd.concat([cache, new_df], ignore_index=True)
cache.drop_duplicates(
subset=['date', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
_save_option_cache(ticker, option_type, cache)
if not position_has_data:
fallback.add(opt_key)
if fetched:
import sys
print(f"[options] Fetched {fetched} chain snapshots from Alpha Vantage",
file=sys.stderr)
return per_period, fallback
# -- Fallback: Black-Scholes repricing for tickers without option data -----
from scipy.stats import norm as _norm
def compute_realized_vol(tickers, download_daily_fn, today_str):
"""Compute annualized realized vol from trailing 1-year daily returns."""
vol_start = (datetime.strptime(today_str, '%Y-%m-%d')
- timedelta(days=400)).strftime('%Y-%m-%d')
vol_df = download_daily_fn(tickers, vol_start, today_str)
result = {}
for ticker in tickers:
if ticker in vol_df.columns:
series = vol_df[ticker].dropna()
if len(series) > 20:
log_rets = np.log(series / series.shift(1)).dropna().tail(252)
result[ticker] = float(log_rets.std() * np.sqrt(252))
return result
def bs_price(S, K, T, sigma, option_type='call'):
"""Black-Scholes option price (assumes zero risk-free rate and dividends)."""
if T <= 0 or sigma <= 0:
if option_type == 'call':
return max(S - K, 0)
return max(K - S, 0)
d1 = (np.log(S / K) + (sigma ** 2 / 2) * T) / (sigma * np.sqrt(T))
d2 = d1 - sigma * np.sqrt(T)
if option_type == 'call':
return S * _norm.cdf(d1) - K * _norm.cdf(d2)
return K * _norm.cdf(-d2) - S * _norm.cdf(-d1)
def bs_option_return(stock_return, K_over_S, T, delta_t, sigma,
option_type='call'):
"""Compute option return from stock return using Black-Scholes repricing.
Normalizes S_0 = 1, so S_1 = 1 + stock_return and K = K_over_S.
T is time to expiry at period start; delta_t is time elapsed.
"""
V0 = bs_price(1.0, K_over_S, T, sigma, option_type)
V1 = bs_price(1.0 + stock_return, K_over_S, max(T - delta_t, 0),
sigma, option_type)
if V0 <= 0:
return stock_return
return V1 / V0 - 1
OPTION_DELTA = 0.15
OPTION_T = 1.0
def build_option_params(option_tickers, ticker_vol):
"""Build {ticker: (K_over_S_call, K_over_S_put, sigma)} for BS fallback."""
result = {}
for t in option_tickers:
if t not in ticker_vol:
continue
sigma = ticker_vol[t]
srt = sigma * OPTION_T ** 0.5
# Call: delta = N(d1) = OPTION_DELTA
d1_call = _norm.ppf(OPTION_DELTA)
K_call = np.exp(-d1_call * srt + sigma ** 2 * OPTION_T / 2)
# Put: delta = N(d1) - 1 = -OPTION_DELTA, so N(d1) = 1 - OPTION_DELTA
d1_put = _norm.ppf(1 - OPTION_DELTA)
K_put = np.exp(-d1_put * srt + sigma ** 2 * OPTION_T / 2)
result[t] = (K_call, K_put, sigma)
return result
def daily_cumulative(holdings, quarters, filing_dates, close, today, mode,
per_period_opt=None, option_params=None):
"""Build a daily series of cumulative growth factors for a given mode.
For each filing period the portfolio weights are fixed. Each trading
day's weighted return is computed relative to the period's starting
prices, then chained with the prior period's cumulative growth.
For option positions: uses per-period option prices from per_period_opt
when available; otherwise falls back to Black-Scholes repricing using
option_params.
"""
cum_growth = 1.0
dates_out = []
values_out = []
for i, q in enumerate(quarters):
period_start = filing_dates[q]
period_end = filing_dates[quarters[i + 1]] if i < len(quarters) - 1 else today
ps = pd.Timestamp(period_start)
pe = pd.Timestamp(period_end)
# Trading days in this period
mask = (close.index >= ps) & (close.index <= pe)
period_close = close[mask]
if period_close.empty:
continue
# Option prices for this period (keyed by (ticker, type) → prices)
quarter_opt = per_period_opt.get(q, {}) if per_period_opt else {}
# Determine portfolio weights and starting prices
positions = holdings[q]
weights = {}
start_prices = {}
use_opt_px = {} # track which positions use option prices
total_value = 0
for (ticker, pos_type), value in positions.items():
if mode == 'equity_only' and pos_type != 'long':
continue
# Select price source for this position
is_option = pos_type in ('call', 'put')
opt_key = _option_position_key(ticker, pos_type)
has_opt = is_option and opt_key in quarter_opt
if has_opt:
ticker_opt = quarter_opt[opt_key]
opt_dates = sorted(d for d in ticker_opt if d >= period_start)
if not opt_dates:
continue
start_prices[(ticker, pos_type)] = ticker_opt[opt_dates[0]]
elif ticker in close.columns:
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
continue
start_prices[(ticker, pos_type)] = float(avail.iloc[0])
else:
continue
weights[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = has_opt
total_value += value
if total_value == 0:
continue
# Daily weighted return relative to period start
# Skip first day of subsequent periods (already recorded as last day
# of the prior period) to avoid duplicate boundary dates.
start_idx = 1 if i > 0 else 0
# Forward-fill: track last known option price so that gaps in
# option data don't cause positions to vanish mid-period.
last_opt = {k: v for k, v in start_prices.items()
if use_opt_px.get(k)}
for day_idx in range(start_idx, len(period_close)):
day = period_close.index[day_idx]
day_str = day.strftime('%Y-%m-%d')
weighted_return = 0
for (ticker, pos_type), value in weights.items():
p0 = start_prices[(ticker, pos_type)]
if p0 == 0:
continue
if use_opt_px[(ticker, pos_type)]:
opt_key = _option_position_key(ticker, pos_type)
p1_val = quarter_opt.get(opt_key, {}).get(day_str)
if p1_val is not None:
last_opt[(ticker, pos_type)] = p1_val
else:
p1_val = last_opt.get((ticker, pos_type))
if p1_val is None:
continue
elif ticker in period_close.columns:
p1_val = period_close[ticker].iloc[day_idx]
if pd.isna(p1_val):
continue
else:
continue
ret = (float(p1_val) - p0) / p0
# Fallback: BS repricing for option positions without option data
use_bs = False
if (pos_type in ('call', 'put')
and not use_opt_px[(ticker, pos_type)]):
params = option_params.get(ticker) if option_params else None
if params:
K_call, K_put, sigma = params
K_over_S = K_call if pos_type == 'call' else K_put
dt = (day - ps).days / 365.25
ret = bs_option_return(ret, K_over_S, OPTION_T, dt,
sigma, pos_type)
use_bs = True
sign = 1 if (use_opt_px[(ticker, pos_type)] or use_bs) else (
-1 if pos_type == 'put' else 1)
weighted_return += (value / total_value) * sign * ret
dates_out.append(day)
values_out.append(cum_growth * (1 + weighted_return))
# Chain: next period starts from the last day's growth factor
if values_out:
cum_growth = values_out[-1]
return dates_out, values_out
# Compute copycat returns
header = (f"{'Period':<16} {'Dates':<24} "
f"{'Eq. only':>9} {'Full exp.':>9} {'SPY':>9}")
print("COPYCAT STRATEGY RETURNS")
print("=" * 72)
print(header)
print("-" * 72)
cum_eq = 1.0
cum_full = 1.0
cum_spy = 1.0
for i, q in enumerate(quarters):
start = filing_dates[q]
end = filing_dates[quarters[i + 1]] if i < len(quarters) - 1 else today
suffix = " †" if i == len(quarters) - 1 else ""
ret_eq = compute_return(holdings[q], prices, start, end, 'equity_only')
ret_full = compute_return(holdings[q], prices, start, end, 'full')
ret_spy = None
if 'SPY' in prices and start in prices['SPY'] and end in prices['SPY']:
spy_p0, spy_p1 = prices['SPY'][start], prices['SPY'][end]
if spy_p0 != 0:
ret_spy = (spy_p1 - spy_p0) / spy_p0
if ret_eq is not None:
cum_eq *= (1 + ret_eq)
if ret_full is not None:
cum_full *= (1 + ret_full)
if ret_spy is not None:
cum_spy *= (1 + ret_spy)
dates_str = f"{start} to {end}"
print(f"{q + suffix:<16} {dates_str:<24} "
f"{fmt(ret_eq):>9} {fmt(ret_full):>9} {fmt(ret_spy):>9}")
print("-" * 72)
cum_eq_ret = cum_eq - 1
cum_full_ret = cum_full - 1
cum_spy_ret = cum_spy - 1
dates_str = f"{first_date} to {today}"
print(f"{'Cumulative':<16} {dates_str:<24} "
f"{fmt(cum_eq_ret):>9} {fmt(cum_full_ret):>9} {fmt(cum_spy_ret):>9}")
print()
print("† = partial period (still holding; updates on re-evaluation)")
print("Eq. only = long equity positions only")
print("Full exp. = options treated as underlying exposure proxy")
# ── Risk-adjusted returns ──────────────────────────────────────────
daily_close = download_daily(all_tickers, first_date, today)
def daily_returns_series(holdings, quarters, filing_dates, daily_close,
today, mode, per_period_opt=None, option_params=None):
"""Compute daily portfolio return series for a given mode.
Uses per-period option prices when available; falls back to
Black-Scholes repricing using option_params otherwise.
"""
if daily_close.empty:
return pd.Series(dtype=float)
returns, dates = [], []
for i, q in enumerate(quarters):
ps = pd.Timestamp(filing_dates[q])
pe = pd.Timestamp(
filing_dates[quarters[i + 1]] if i < len(quarters) - 1 else today)
period = daily_close[(daily_close.index >= ps) &
(daily_close.index <= pe)]
if len(period) < 2:
continue
quarter_opt = per_period_opt.get(q, {}) if per_period_opt else {}
weights = {}
use_opt = {}
total_value = 0
for (ticker, pos_type), value in holdings[q].items():
if mode == 'equity_only' and pos_type != 'long':
continue
is_option = pos_type in ('call', 'put')
opt_key = _option_position_key(ticker, pos_type)
has_opt = is_option and opt_key in quarter_opt
if not has_opt and ticker not in daily_close.columns:
continue
weights[(ticker, pos_type)] = value
use_opt[(ticker, pos_type)] = has_opt
total_value += value
if total_value == 0:
continue
# Forward-fill: track last known option price per position
last_opt = {}
for day_idx in range(1, len(period)):
daily_ret = 0
day_curr = period.index[day_idx]
day_prev = period.index[day_idx - 1]
d_curr = day_curr.strftime('%Y-%m-%d')
d_prev = day_prev.strftime('%Y-%m-%d')
for (ticker, pos_type), value in weights.items():
if use_opt[(ticker, pos_type)]:
opt_key = _option_position_key(ticker, pos_type)
ticker_opt = quarter_opt.get(opt_key, {})
p0 = ticker_opt.get(d_prev, last_opt.get(
(ticker, pos_type)))
raw_p1 = ticker_opt.get(d_curr)
p1 = raw_p1 if raw_p1 is not None else p0
if raw_p1 is not None:
last_opt[(ticker, pos_type)] = raw_p1
elif p0 is not None:
last_opt[(ticker, pos_type)] = p0
if p0 is None or p1 is None:
continue
else:
if ticker not in period.columns:
continue
p0 = period[ticker].iloc[day_idx - 1]
p1 = period[ticker].iloc[day_idx]
if pd.isna(p0) or pd.isna(p1) or p0 == 0:
continue
ret = (float(p1) - float(p0)) / float(p0)
use_bs = False
if (pos_type in ('call', 'put')
and not use_opt[(ticker, pos_type)]):
params = option_params.get(ticker) if option_params else None
if params:
K_call, K_put, sigma = params
K_over_S = K_call if pos_type == 'call' else K_put
dt = (day_curr - ps).days / 365.25
# Reconstruct cumulative stock return for BS repricing
if ticker in period.columns:
s0 = period[ticker].iloc[0]
if s0 and not pd.isna(s0) and s0 != 0:
cum_stock_ret = (float(period[ticker].iloc[day_idx]) - float(s0)) / float(s0)
cum_opt_ret = bs_option_return(cum_stock_ret, K_over_S, OPTION_T, dt, sigma, pos_type)
dt_prev = (day_prev - ps).days / 365.25
cum_stock_ret_prev = (float(period[ticker].iloc[day_idx - 1]) - float(s0)) / float(s0)
cum_opt_ret_prev = bs_option_return(cum_stock_ret_prev, K_over_S, OPTION_T, dt_prev, sigma, pos_type)
ret = (1 + cum_opt_ret) / (1 + cum_opt_ret_prev) - 1 if (1 + cum_opt_ret_prev) != 0 else ret
use_bs = True
sign = 1 if (use_opt[(ticker, pos_type)] or use_bs) else (
-1 if pos_type == 'put' else 1)
daily_ret += (value / total_value) * sign * ret
returns.append(daily_ret)
dates.append(period.index[day_idx])
return pd.Series(returns, index=dates)
ret_eq_d = daily_returns_series(holdings, quarters, filing_dates,
daily_close, today, 'equity_only')
ret_full_d = daily_returns_series(holdings, quarters, filing_dates,
daily_close, today, 'full')
if 'SPY' in daily_close.columns:
spy_close = daily_close['SPY'].dropna()
spy_period = spy_close[spy_close.index >= pd.Timestamp(first_date)]
ret_spy_d = spy_period.pct_change().dropna()
else:
ret_spy_d = pd.Series(dtype=float)
def sharpe(daily_rets, rf_annual=0.04):
if daily_rets.empty:
return float('nan')
rf_daily = (1 + rf_annual) ** (1 / 252) - 1
excess = daily_rets - rf_daily
if excess.std() == 0 or pd.isna(excess.std()):
return float('nan')
return float(excess.mean() / excess.std() * 252 ** 0.5)
def max_drawdown(daily_rets):
if daily_rets.empty:
return float('nan')
cum = (1 + daily_rets).cumprod()
return float(((cum - cum.cummax()) / cum.cummax()).min() * 100)
print()
print("RISK-ADJUSTED RETURNS")
print("=" * 55)
print(f"{'Metric':<25} {'Eq. only':>9} {'Full exp.':>9} {'SPY':>9}")
print("-" * 55)
vol_eq = float(ret_eq_d.std() * 252 ** 0.5 * 100)
vol_full = float(ret_full_d.std() * 252 ** 0.5 * 100)
vol_spy = float(ret_spy_d.std() * 252 ** 0.5 * 100)
print(f"{'Ann. volatility':<25} {vol_eq:>8.1f}% {vol_full:>8.1f}% {vol_spy:>8.1f}%")
sh_eq = sharpe(ret_eq_d)
sh_full = sharpe(ret_full_d)
sh_spy = sharpe(ret_spy_d)
print(f"{'Sharpe (rf=4%)':<25} {sh_eq:>9.2f} {sh_full:>9.2f} {sh_spy:>9.2f}")
mdd_eq = max_drawdown(ret_eq_d)
mdd_full = max_drawdown(ret_full_d)
mdd_spy = max_drawdown(ret_spy_d)
print(f"{'Max drawdown':<25} {mdd_eq:>8.1f}% {mdd_full:>8.1f}% {mdd_spy:>8.1f}%")
import json
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
import requests
import time
import os
import warnings
warnings.filterwarnings('ignore')
# Parse data from the scraper block
parsed = json.loads(data) if isinstance(data, str) else data
filings = parsed["filings"]
# Build internal structures
filing_dates = {f["quarter"]: f["filing_date"] for f in filings}
quarter_end_dates = {f["quarter"]: f["quarter_end"] for f in filings}
quarters = [f["quarter"] for f in filings]
# Convert holdings list to dict keyed by quarter.
# Multiple positions in the same ticker with different types are aggregated
# by value per (ticker, type) pair.
holdings = {}
for f in filings:
positions = {}
for h in f["holdings"]:
ticker = h["ticker"]
pos_type = h["type"]
value = h["value"]
key = (ticker, pos_type)
positions[key] = positions.get(key, 0) + value
holdings[f["quarter"]] = positions
def get_prices(tickers, dates):
"""Fetch close prices for tickers on specific dates."""
unique_tickers = sorted(set(tickers))
all_dates = [datetime.strptime(d, '%Y-%m-%d') for d in dates]
start = min(all_dates) - timedelta(days=5)
end = max(all_dates) + timedelta(days=5)
df = yf.download(unique_tickers, start=start, end=end, progress=False, auto_adjust=True)
if df.empty:
return {}
# yf.download returns MultiIndex columns (metric, ticker) for multiple tickers
if isinstance(df.columns, pd.MultiIndex):
close = df['Close']
else:
close = df[['Close']]
close.columns = unique_tickers
prices = {}
for ticker in unique_tickers:
if ticker not in close.columns:
continue
series = close[ticker].dropna()
if series.empty:
continue
prices[ticker] = {}
for date_str in dates:
target = pd.Timestamp(datetime.strptime(date_str, '%Y-%m-%d'))
after = series[series.index >= target]
if not after.empty:
prices[ticker][date_str] = float(after.iloc[0])
else:
before = series[series.index <= target]
if not before.empty:
prices[ticker][date_str] = float(before.iloc[-1])
return prices
def _price_on_or_after(px_by_date, target_date):
"""Return (date, price) for the first available price on/after target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d >= target_date)
if not dates:
return None
d = dates[0]
return d, px_by_date[d]
def _price_on_or_before(px_by_date, target_date):
"""Return (date, price) for the last available price on/before target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d <= target_date)
if not dates:
return None
d = dates[-1]
return d, px_by_date[d]
def _period_price_pair(px_by_date, start_date, end_date):
"""Return start/end prices for a period using sensible boundary alignment."""
start = _price_on_or_after(px_by_date, start_date)
end = _price_on_or_before(px_by_date, end_date)
if start is None or end is None:
return None
start_actual, p0 = start
end_actual, p1 = end
if end_actual < start_actual:
return None
return start_actual, end_actual, p0, p1
def _option_position_key(ticker, pos_type):
return (ticker, pos_type)
def compute_return(positions, prices, start_date, end_date, mode='equity_only',
option_prices=None, option_params=None):
"""Compute weighted portfolio return between two dates.
For option positions (call/put): if option_prices contains actual
historical option prices for the ticker, compute returns directly from
those prices. Otherwise fall back to Black-Scholes repricing using
the parameters in option_params.
"""
total_value = 0
weighted_return = 0
for (ticker, pos_type), value in positions.items():
if mode == 'equity_only' and pos_type != 'long':
continue
is_option = pos_type in ('call', 'put')
opt_key = _option_position_key(ticker, pos_type)
opt_px = option_prices.get(opt_key) if option_prices else None
use_option_px = bool(is_option and opt_px)
use_bs = False
if use_option_px:
pair = _period_price_pair(opt_px, start_date, end_date)
if pair is None:
use_option_px = False
else:
start_actual, end_actual, p0, p1 = pair
if not use_option_px:
px = prices.get(ticker)
pair = _period_price_pair(px, start_date, end_date)
if pair is None:
continue
start_actual, end_actual, p0, p1 = pair
if p0 == 0:
continue
ret = (p1 - p0) / p0
# Fallback: BS repricing when no option price data
if is_option and not use_option_px:
params = option_params.get(ticker) if option_params else None
if params:
K_call, K_put, sigma = params
K_over_S = K_call if pos_type == 'call' else K_put
dt = days_between(start_actual, end_actual) / 365.25
ret = bs_option_return(ret, K_over_S, OPTION_T, dt, sigma,
pos_type)
use_bs = True
total_value += value
# When using actual option/BS prices the return already reflects
# the directional bet, so sign is +1. The -1 for puts applies
# only to the raw stock-price proxy path.
sign = 1 if (use_option_px or use_bs) else (
-1 if pos_type == 'put' else 1)
weighted_return += value * sign * ret
return weighted_return / total_value if total_value else None
def annualize(ret, days):
"""Annualize a return over a given number of calendar days."""
if ret is None or days <= 0:
return None
return (1 + ret) ** (365.25 / days) - 1
def days_between(d1, d2):
return (datetime.strptime(d2, '%Y-%m-%d') - datetime.strptime(d1, '%Y-%m-%d')).days
def fmt(ret):
return f"{ret * 100:+.2f}%" if ret is not None else "N/A"
# Collect all tickers and dates
all_tickers = set()
for positions in holdings.values():
for (ticker, _) in positions:
all_tickers.add(ticker)
all_tickers.add('SPY')
today = datetime.now().strftime('%Y-%m-%d')
first_date = filing_dates[quarters[0]]
all_dates = set(filing_dates.values()) | set(quarter_end_dates.values()) | {today}
prices = get_prices(sorted(all_tickers), sorted(all_dates))
# Resolve `today` to the actual last available closing date.
# yfinance may not have data for today (market still open or holiday),
# so we look up what date SPY's price actually corresponds to.
def _resolve_price_date(prices, requested_date):
"""Return the actual trading date of the price stored under requested_date."""
ref = 'SPY' if 'SPY' in prices else next(iter(prices), None)
if not ref or requested_date not in prices[ref]:
return requested_date
target_price = prices[ref][requested_date]
# Re-download a small window to find the real date of this price
start = datetime.strptime(requested_date, '%Y-%m-%d') - timedelta(days=10)
end = datetime.strptime(requested_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(ref, start=start, end=end, progress=False, auto_adjust=True)
if df.empty:
return requested_date
if isinstance(df.columns, pd.MultiIndex):
close = df['Close'][ref].dropna()
elif 'Close' in df.columns:
close = df['Close'].dropna()
else:
close = df.iloc[:, 0].dropna()
for dt, px in close.items():
val = float(px.iloc[0]) if isinstance(px, pd.Series) else float(px)
if abs(val - target_price) < 0.01:
ts = dt[0] if isinstance(dt, tuple) else dt
return pd.Timestamp(ts).strftime('%Y-%m-%d')
return requested_date
today_resolved = _resolve_price_date(prices, today)
if today_resolved != today:
for ticker in prices:
if today in prices[ticker]:
prices[ticker][today_resolved] = prices[ticker].pop(today)
today = today_resolved
def download_daily(tickers, start_date, end_date):
"""Download daily close prices from yfinance, handling MultiIndex.
Dates are 'YYYY-MM-DD' strings. Adds a small buffer for trading-day alignment."""
tickers_sorted = sorted(tickers)
start = datetime.strptime(start_date, '%Y-%m-%d') - timedelta(days=5)
end = datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(tickers_sorted, start=start, end=end,
progress=False, auto_adjust=True)
if df.empty:
return pd.DataFrame()
if isinstance(df.columns, pd.MultiIndex):
return df['Close']
close = df[['Close']]
close.columns = tickers_sorted
return close
# -- Historical option prices via Alpha Vantage ----------------------------
OPTION_CACHE_DIR = os.path.expanduser('~/My Drive/notes/.sa-lp-option-cache')
_AV_BASE = 'https://www.alphavantage.co/query'
_AV_RATE_DELAY = 0.85 # seconds between requests (75 req/min limit)
OPTION_CACHE_COLUMNS = [
'date', 'option_type', 'strike', 'expiry', 'delta', 'price']
def _normalize_option_type(option_type):
option_type = str(option_type).lower()
if option_type not in ('call', 'put'):
raise ValueError(f"Unsupported option type: {option_type}")
return option_type
def _empty_option_cache():
return pd.DataFrame(columns=OPTION_CACHE_COLUMNS)
def _option_cache_path(ticker, option_type, legacy=False):
if legacy:
return os.path.join(OPTION_CACHE_DIR, f'{ticker}.csv')
option_type = _normalize_option_type(option_type)
return os.path.join(OPTION_CACHE_DIR, f'{ticker}-{option_type}.csv')
def _load_option_cache(ticker, option_type):
"""Load cached option data for a ticker/type. Returns DataFrame or empty."""
option_type = _normalize_option_type(option_type)
paths = [_option_cache_path(ticker, option_type)]
# Pre-fix caches were call-only and named TICKER.csv. They are safe to
# reuse for calls but must not be reused for puts.
if option_type == 'call':
legacy_path = _option_cache_path(ticker, option_type, legacy=True)
if legacy_path not in paths:
paths.append(legacy_path)
frames = []
for path in paths:
if not os.path.exists(path):
continue
df = pd.read_csv(path)
if df.empty:
continue
if 'option_type' not in df.columns:
df['option_type'] = 'call'
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
df['date'] = pd.to_datetime(df['date'], errors='coerce').dt.strftime(
'%Y-%m-%d')
df['option_type'] = df['option_type'].fillna(option_type).str.lower()
frames.append(df[OPTION_CACHE_COLUMNS])
if not frames:
return _empty_option_cache()
cache = pd.concat(frames, ignore_index=True)
cache = cache[cache['option_type'] == option_type].copy()
cache.dropna(subset=['date'], inplace=True)
for col in ('strike', 'delta', 'price'):
cache[col] = pd.to_numeric(cache[col], errors='coerce')
cache.drop_duplicates(
subset=['date', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
return cache[OPTION_CACHE_COLUMNS]
def _save_option_cache(ticker, option_type, df):
"""Persist typed option cache to CSV."""
option_type = _normalize_option_type(option_type)
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
path = _option_cache_path(ticker, option_type)
if df.empty:
df = _empty_option_cache()
else:
df = df.copy()
df['option_type'] = option_type
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
df.drop_duplicates(
subset=['date', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
df.sort_values(['date', 'expiry', 'strike'], inplace=True)
df.to_csv(path, index=False)
def _fetch_option_chain(ticker, date_str, api_key):
"""Fetch the full option chain for ticker on a given date from Alpha Vantage."""
params = {
'function': 'HISTORICAL_OPTIONS',
'symbol': ticker,
'date': date_str,
'apikey': api_key,
}
try:
resp = requests.get(_AV_BASE, params=params, timeout=30)
resp.raise_for_status()
body = resp.json()
return body.get('data', [])
except Exception:
return []
def _contract_window(ref_date_str):
ref = datetime.strptime(ref_date_str, '%Y-%m-%d')
return ref + timedelta(days=270), ref + timedelta(days=456)
def _contract_from_cache_row(row, ref_date_str, option_type):
option_type = _normalize_option_type(option_type)
if str(row.get('option_type', option_type)).lower() != option_type:
return None
lo, hi = _contract_window(ref_date_str)
try:
exp = datetime.strptime(str(row['expiry']), '%Y-%m-%d')
except (KeyError, TypeError, ValueError):
return None
if not (lo <= exp <= hi):
return None
strike = _safe_float(row.get('strike'))
delta = _safe_float(row.get('delta'))
price = _safe_float(row.get('price'))
if strike is None or delta is None or price is None or price <= 0:
return None
return {
'option_type': option_type,
'strike': strike,
'expiry': str(row['expiry']),
'delta': delta,
'price': price,
}
def _select_cached_contract(cache, option_type, ref_date_str):
rows = cache[(cache['date'] == ref_date_str)
& (cache['option_type'] == option_type)]
candidates = []
for _, row in rows.iterrows():
contract = _contract_from_cache_row(row, ref_date_str, option_type)
if contract:
candidates.append(contract)
if not candidates:
return None
candidates.sort(key=lambda x: abs(abs(x['delta']) - OPTION_DELTA))
return candidates[0]
def _select_best_contract(chain, ref_date_str, option_type):
"""Select the best-matching call/put contract from an option chain.
Criteria: matching option type, expiry 9-15 months from ref_date,
absolute delta closest to 0.15. Returns dict with type, strike, expiry,
delta, price or None.
"""
option_type = _normalize_option_type(option_type)
lo, hi = _contract_window(ref_date_str)
candidates = []
for c in chain:
if c.get('type', '').lower() != option_type:
continue
try:
exp = datetime.strptime(c['expiration'], '%Y-%m-%d')
except (KeyError, ValueError):
continue
if not (lo <= exp <= hi):
continue
delta = _safe_float(c.get('delta'))
if delta is None:
continue
abs_delta = abs(delta)
if abs_delta == 0:
continue
# Price: prefer mid if available, else last
price = _parse_option_price(c)
if price is None or price <= 0:
continue
candidates.append({
'option_type': option_type,
'strike': float(c['strike']),
'expiry': c['expiration'],
'delta': delta,
'price': price,
})
if not candidates:
return None
# Pick contract with absolute delta closest to 0.15.
candidates.sort(key=lambda x: abs(abs(x['delta']) - OPTION_DELTA))
return candidates[0]
def _parse_option_price(contract):
"""Extract a price from an Alpha Vantage option contract record."""
bid = _safe_float(contract.get('bid'))
ask = _safe_float(contract.get('ask'))
last = _safe_float(contract.get('last'))
if bid and ask and bid > 0 and ask > 0:
return (bid + ask) / 2
if last and last > 0:
return last
return None
def _safe_float(val):
try:
return float(val)
except (TypeError, ValueError):
return None
def _cached_contract_price(cache, option_type, date_str, strike, expiry):
rows = cache[(cache['date'] == date_str)
& (cache['option_type'] == option_type)
& (abs(cache['strike'] - strike) < 0.01)
& (cache['expiry'].astype(str) == str(expiry))]
for _, row in rows.iterrows():
price = _safe_float(row.get('price'))
if price is not None and price > 0:
return price
return None
def _extract_contract_price(chain, strike, expiry, option_type):
"""Find the price of a specific option contract."""
option_type = _normalize_option_type(option_type)
for c in chain:
if c.get('type', '').lower() != option_type:
continue
try:
if (abs(float(c['strike']) - strike) < 0.01
and c.get('expiration') == expiry):
return _parse_option_price(c)
except (KeyError, TypeError, ValueError):
continue
return None
def download_option_prices(option_positions, quarters, holdings, filing_dates,
today):
"""Download historical option prices from Alpha Vantage.
For each (ticker, option_type) and each filing period:
1. On the first trading day, fetch the chain and select the best contract
(matching type, expiry 9-15 months out, |delta| closest to 0.15).
2. Lock in that contract for the period.
3. Fetch the price of that contract on each subsequent trading day.
Returns
-------
per_period : dict {quarter_str: {(ticker, type): {date_str: float}}}
Option prices keyed by filing period then option position. Each period
has its own contract's prices, avoiding cross-contract mixing at
boundary dates where one period ends and the next begins.
fallback_positions : set
Option positions where no option data was found (need BS fallback).
"""
option_positions = sorted({
(ticker, _normalize_option_type(pos_type))
for ticker, pos_type in option_positions})
api_key = os.environ.get('ALPHA_VANTAGE_KEY', '')
cache_only = not bool(api_key)
if cache_only:
print("WARNING: ALPHA_VANTAGE_KEY not set; using cached option "
"prices where available and BS repricing elsewhere.")
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
per_period = {} # {q: {(ticker, type): {date_str: price}}}
fallback = set()
fetched = 0
for ticker, option_type in option_positions:
opt_key = _option_position_key(ticker, option_type)
cache = _load_option_cache(ticker, option_type)
position_has_data = False
new_rows = []
for i, q in enumerate(quarters):
# Skip quarters where this exact option position is absent.
has_opts = opt_key in holdings[q]
if not has_opts:
continue
period_start = filing_dates[q]
period_end = (filing_dates[quarters[i + 1]]
if i < len(quarters) - 1 else today)
trading_days = pd.bdate_range(period_start, period_end)
if len(trading_days) == 0:
continue
first_day = trading_days[0].strftime('%Y-%m-%d')
# -- Select contract on first trading day --
from_cache = False
contract = _select_cached_contract(cache, option_type, first_day)
if contract:
from_cache = True
else:
if not cache_only:
time.sleep(_AV_RATE_DELAY)
chain = _fetch_option_chain(ticker, first_day, api_key)
fetched += 1
contract = _select_best_contract(
chain, first_day, option_type)
if contract:
new_rows.append({
'date': first_day,
'option_type': option_type,
'strike': contract['strike'],
'expiry': contract['expiry'],
'delta': contract['delta'],
'price': contract['price'],
})
if contract is None:
fallback.add(opt_key)
continue
strike = contract['strike']
expiry = contract['expiry']
# -- Collect prices for this period (fresh dict per period) --
period_prices = {}
if from_cache:
# Fast path: read all matching prices from cache (no API calls).
rows = cache[
(cache['date'] >= period_start)
& (cache['date'] <= period_end)
& (cache['option_type'] == option_type)
& (abs(cache['strike'] - strike) < 0.01)
& (cache['expiry'].astype(str) == str(expiry))
& pd.notna(cache['price'])]
for _, row in rows.iterrows():
period_prices[row['date']] = float(row['price'])
else:
# Slow path: fetch each trading day from API
if contract.get('price'):
period_prices[first_day] = contract['price']
for day in trading_days[1:]:
day_str = day.strftime('%Y-%m-%d')
cached_price = _cached_contract_price(
cache, option_type, day_str, strike, expiry)
if cached_price is not None:
period_prices[day_str] = cached_price
continue
time.sleep(_AV_RATE_DELAY)
chain = _fetch_option_chain(ticker, day_str, api_key)
fetched += 1
price = _extract_contract_price(
chain, strike, expiry, option_type)
if price is not None:
period_prices[day_str] = price
new_rows.append({
'date': day_str,
'option_type': option_type,
'strike': strike,
'expiry': expiry,
'delta': contract['delta'],
'price': price,
})
# Accumulate per-period prices
if period_prices:
per_period.setdefault(q, {})[opt_key] = period_prices
position_has_data = True
# Persist new data to cache
if new_rows:
new_df = pd.DataFrame(new_rows)
cache = pd.concat([cache, new_df], ignore_index=True)
cache.drop_duplicates(
subset=['date', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
_save_option_cache(ticker, option_type, cache)
if not position_has_data:
fallback.add(opt_key)
if fetched:
import sys
print(f"[options] Fetched {fetched} chain snapshots from Alpha Vantage",
file=sys.stderr)
return per_period, fallback
# -- Fallback: Black-Scholes repricing for tickers without option data -----
from scipy.stats import norm as _norm
def compute_realized_vol(tickers, download_daily_fn, today_str):
"""Compute annualized realized vol from trailing 1-year daily returns."""
vol_start = (datetime.strptime(today_str, '%Y-%m-%d')
- timedelta(days=400)).strftime('%Y-%m-%d')
vol_df = download_daily_fn(tickers, vol_start, today_str)
result = {}
for ticker in tickers:
if ticker in vol_df.columns:
series = vol_df[ticker].dropna()
if len(series) > 20:
log_rets = np.log(series / series.shift(1)).dropna().tail(252)
result[ticker] = float(log_rets.std() * np.sqrt(252))
return result
def bs_price(S, K, T, sigma, option_type='call'):
"""Black-Scholes option price (assumes zero risk-free rate and dividends)."""
if T <= 0 or sigma <= 0:
if option_type == 'call':
return max(S - K, 0)
return max(K - S, 0)
d1 = (np.log(S / K) + (sigma ** 2 / 2) * T) / (sigma * np.sqrt(T))
d2 = d1 - sigma * np.sqrt(T)
if option_type == 'call':
return S * _norm.cdf(d1) - K * _norm.cdf(d2)
return K * _norm.cdf(-d2) - S * _norm.cdf(-d1)
def bs_option_return(stock_return, K_over_S, T, delta_t, sigma,
option_type='call'):
"""Compute option return from stock return using Black-Scholes repricing.
Normalizes S_0 = 1, so S_1 = 1 + stock_return and K = K_over_S.
T is time to expiry at period start; delta_t is time elapsed.
"""
V0 = bs_price(1.0, K_over_S, T, sigma, option_type)
V1 = bs_price(1.0 + stock_return, K_over_S, max(T - delta_t, 0),
sigma, option_type)
if V0 <= 0:
return stock_return
return V1 / V0 - 1
OPTION_DELTA = 0.15
OPTION_T = 1.0
def build_option_params(option_tickers, ticker_vol):
"""Build {ticker: (K_over_S_call, K_over_S_put, sigma)} for BS fallback."""
result = {}
for t in option_tickers:
if t not in ticker_vol:
continue
sigma = ticker_vol[t]
srt = sigma * OPTION_T ** 0.5
# Call: delta = N(d1) = OPTION_DELTA
d1_call = _norm.ppf(OPTION_DELTA)
K_call = np.exp(-d1_call * srt + sigma ** 2 * OPTION_T / 2)
# Put: delta = N(d1) - 1 = -OPTION_DELTA, so N(d1) = 1 - OPTION_DELTA
d1_put = _norm.ppf(1 - OPTION_DELTA)
K_put = np.exp(-d1_put * srt + sigma ** 2 * OPTION_T / 2)
result[t] = (K_call, K_put, sigma)
return result
def daily_cumulative(holdings, quarters, filing_dates, close, today, mode,
per_period_opt=None, option_params=None):
"""Build a daily series of cumulative growth factors for a given mode.
For each filing period the portfolio weights are fixed. Each trading
day's weighted return is computed relative to the period's starting
prices, then chained with the prior period's cumulative growth.
For option positions: uses per-period option prices from per_period_opt
when available; otherwise falls back to Black-Scholes repricing using
option_params.
"""
cum_growth = 1.0
dates_out = []
values_out = []
for i, q in enumerate(quarters):
period_start = filing_dates[q]
period_end = filing_dates[quarters[i + 1]] if i < len(quarters) - 1 else today
ps = pd.Timestamp(period_start)
pe = pd.Timestamp(period_end)
# Trading days in this period
mask = (close.index >= ps) & (close.index <= pe)
period_close = close[mask]
if period_close.empty:
continue
# Option prices for this period (keyed by (ticker, type) → prices)
quarter_opt = per_period_opt.get(q, {}) if per_period_opt else {}
# Determine portfolio weights and starting prices
positions = holdings[q]
weights = {}
start_prices = {}
use_opt_px = {} # track which positions use option prices
total_value = 0
for (ticker, pos_type), value in positions.items():
if mode == 'equity_only' and pos_type != 'long':
continue
# Select price source for this position
is_option = pos_type in ('call', 'put')
opt_key = _option_position_key(ticker, pos_type)
has_opt = is_option and opt_key in quarter_opt
if has_opt:
ticker_opt = quarter_opt[opt_key]
opt_dates = sorted(d for d in ticker_opt if d >= period_start)
if not opt_dates:
continue
start_prices[(ticker, pos_type)] = ticker_opt[opt_dates[0]]
elif ticker in close.columns:
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
continue
start_prices[(ticker, pos_type)] = float(avail.iloc[0])
else:
continue
weights[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = has_opt
total_value += value
if total_value == 0:
continue
# Daily weighted return relative to period start
# Skip first day of subsequent periods (already recorded as last day
# of the prior period) to avoid duplicate boundary dates.
start_idx = 1 if i > 0 else 0
# Forward-fill: track last known option price so that gaps in
# option data don't cause positions to vanish mid-period.
last_opt = {k: v for k, v in start_prices.items()
if use_opt_px.get(k)}
for day_idx in range(start_idx, len(period_close)):
day = period_close.index[day_idx]
day_str = day.strftime('%Y-%m-%d')
weighted_return = 0
for (ticker, pos_type), value in weights.items():
p0 = start_prices[(ticker, pos_type)]
if p0 == 0:
continue
if use_opt_px[(ticker, pos_type)]:
opt_key = _option_position_key(ticker, pos_type)
p1_val = quarter_opt.get(opt_key, {}).get(day_str)
if p1_val is not None:
last_opt[(ticker, pos_type)] = p1_val
else:
p1_val = last_opt.get((ticker, pos_type))
if p1_val is None:
continue
elif ticker in period_close.columns:
p1_val = period_close[ticker].iloc[day_idx]
if pd.isna(p1_val):
continue
else:
continue
ret = (float(p1_val) - p0) / p0
# Fallback: BS repricing for option positions without option data
use_bs = False
if (pos_type in ('call', 'put')
and not use_opt_px[(ticker, pos_type)]):
params = option_params.get(ticker) if option_params else None
if params:
K_call, K_put, sigma = params
K_over_S = K_call if pos_type == 'call' else K_put
dt = (day - ps).days / 365.25
ret = bs_option_return(ret, K_over_S, OPTION_T, dt,
sigma, pos_type)
use_bs = True
sign = 1 if (use_opt_px[(ticker, pos_type)] or use_bs) else (
-1 if pos_type == 'put' else 1)
weighted_return += (value / total_value) * sign * ret
dates_out.append(day)
values_out.append(cum_growth * (1 + weighted_return))
# Chain: next period starts from the last day's growth factor
if values_out:
cum_growth = values_out[-1]
return dates_out, values_out
import plotly.graph_objects as go
HUGO_BASE = os.path.expanduser('~/My Drive/repos/stafforini.com')
# ── Fetch daily prices ────────────────────────────────────────────
close = download_daily(all_tickers, first_date, today)
dates_eq, vals_eq = daily_cumulative(
holdings, quarters, filing_dates, close, today, 'equity_only')
dates_full, vals_full = daily_cumulative(
holdings, quarters, filing_dates, close, today, 'full')
# ── Compute SPY benchmark ─────────────────────────────────────────
spy_series = close['SPY'].dropna()
spy_start = spy_series[spy_series.index >= pd.Timestamp(first_date)]
if not spy_start.empty:
spy_p0 = float(spy_start.iloc[0])
spy_dates = spy_start.index.tolist()
spy_vals = [float(p) / spy_p0 for p in spy_start.values]
else:
spy_dates, spy_vals = [], []
# ── Plot with Plotly ───────────────────────────────────────────────
eq_pct = [round((v - 1) * 100, 1) for v in vals_eq]
full_pct = [round((v - 1) * 100, 1) for v in vals_full]
spy_pct = [round((v - 1) * 100, 1) for v in spy_vals]
fig = go.Figure()
fig.add_trace(go.Scatter(
x=dates_eq, y=eq_pct, mode='lines',
name='Equity only',
line=dict(color='#2563eb', width=2)))
fig.add_trace(go.Scatter(
x=dates_full, y=full_pct, mode='lines',
name='Full exposure proxy',
line=dict(color='#dc2626', width=2)))
fig.add_trace(go.Scatter(
x=spy_dates, y=spy_pct, mode='lines',
name='S&P 500 (SPY)',
line=dict(color='#16a34a', width=2, dash='dot')))
# Vertical lines at filing dates (rebalancing points)
for fd in filing_dates.values():
fig.add_vline(x=fd, line=dict(color='gray', width=0.5), opacity=0.4)
fig.add_hline(y=0, line=dict(color='gray', width=0.8))
fig.update_layout(
title=dict(text='SA LP copycat: cumulative returns',
font=dict(size=15)),
yaxis=dict(title='Cumulative return', hoverformat='+.1f',
ticksuffix='%'),
hovermode='x unified',
xaxis=dict(spikemode='across', spikethickness=0.5,
spikedash='solid', spikecolor='gray'),
template='plotly_white',
legend=dict(x=0.02, y=0.98, bgcolor='rgba(255,255,255,0.8)'),
margin=dict(l=60, r=20, t=50, b=40),
height=500,
)
# ── Generate HTML with dark-mode support ──────────────────────────
import re
chart_html = fig.to_html(full_html=False, include_plotlyjs='cdn',
config={'responsive': True, 'displayModeBar': False})
div_id = re.search(r'id="([^"]+)"', chart_html).group(1)
dark_script = """
<script>
(function() {
var gd = document.getElementById('%s');
function isDark() {
try { return parent.document.documentElement.getAttribute('data-theme') === 'dark'; }
catch(e) { return window.matchMedia('(prefers-color-scheme: dark)').matches; }
}
function apply() {
var dk = isDark();
Plotly.relayout(gd, {
paper_bgcolor: 'rgba(0,0,0,0)',
plot_bgcolor: dk ? 'rgba(30,30,30,0.5)' : 'rgba(255,255,255,0.8)',
font: {color: dk ? '#d4d4d4' : '#333'},
'title.font.color': dk ? '#d4d4d4' : '#333',
'xaxis.gridcolor': dk ? 'rgba(255,255,255,0.1)' : 'rgba(0,0,0,0.1)',
'yaxis.gridcolor': dk ? 'rgba(255,255,255,0.1)' : 'rgba(0,0,0,0.1)',
'legend.bgcolor': dk ? 'rgba(30,30,30,0.8)' : 'rgba(255,255,255,0.8)',
'legend.font.color': dk ? '#d4d4d4' : '#333',
});
}
apply();
new MutationObserver(function() { apply(); }).observe(
parent.document.documentElement, {attributes: true, attributeFilter: ['data-theme']});
})();
</script>""" % div_id
outpath = os.path.join(HUGO_BASE, 'static', 'images', 'sa-lp-returns.html')
with open(outpath, 'w') as f:
f.write('<!DOCTYPE html>\n<html>\n<head><meta charset="utf-8">\n'
'<meta http-equiv="Cache-Control" content="no-cache, no-store, must-revalidate">\n'
'<style>body { margin: 0; background: transparent; }</style>\n'
'</head>\n<body>\n' + chart_html + dark_script +
'\n</body>\n</html>')
The “equity only” line uses only the fund’s long equity positions, ignoring its options book.
The “full exposure” line is a directional exposure proxy, not a true fund-return estimate. For option rows, SEC guidance says that most Form 13F information-table entries are reported in terms of the underlying security rather than the option contract itself. The filings omit the contracts’ strike prices, expiration dates, and premiums. The model therefore treats calls as long exposure to the underlying stock and puts as short exposure to the underlying stock, weighted by the reported 13F value. This is useful for estimating the direction and rough size of the disclosed bets, but it is not an executable options backtest.2
Copycat delays
Between one filing and the next (~90 days), the copycat holds a fixed portfolio while the fund’s actual portfolio evolves continuously. We only observe the fund’s positions at quarter-end snapshots; its actual holdings between snapshots are unknown. Furthermore, these snapshots are not published immediately, but after 45 days or so. These two delays—between the fund’s quarterly rebalance and quarter-end, and between quarter-end and filing date—create a gap where the copycat’s holdings are stale compared to the fund’s actual positions, depressing copycat returns.
Let \(Q_i\) denote the fund’s disclosed portfolio at the end of quarter \(i\). To estimate this cost, we can model the fund as switching from \(Q_{i-1}\) to \(Q_i\) at a single (unknown) point, uniformly distributed over the trading days in quarter \(i\).3 Let \(R(P, s, t)\) denote the return of portfolio \(P\) from date \(s\) to date \(t\), and let \(T_i\) denote the last day of the quarter \(i\). For each possible switch day \(d\), the delay cost is \(R(Q_i, d, T_i) - R(Q_{i-1}, d, T_i)\): the return the fund earned on its new positions that the copycat missed. Averaging over all \(d\) gives the expected intra-quarter delay cost.
The same logic extends to the ~45-day gap between quarter-end and filing date. During this period, the copycat still holds \(Q_{i-1}\), while the fund may continue holding \(Q_i\) or may have already started trading toward \(Q_{i+1}\). We apply the same uniform-switch model over the full span of quarter \(i+1\): for each possible switch day during the gap, the fund earns a blend of \(Q_i\) and \(Q_{i+1}\) returns. We average over all scenarios—including those where the switch occurs after the gap—weighted by their probability.4
Code
import json
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
import requests
import time
import os
import warnings
warnings.filterwarnings('ignore')
# Parse data from the scraper block
parsed = json.loads(data) if isinstance(data, str) else data
filings = parsed["filings"]
# Build internal structures
filing_dates = {f["quarter"]: f["filing_date"] for f in filings}
quarter_end_dates = {f["quarter"]: f["quarter_end"] for f in filings}
quarters = [f["quarter"] for f in filings]
# Convert holdings list to dict keyed by quarter.
# Multiple positions in the same ticker with different types are aggregated
# by value per (ticker, type) pair.
holdings = {}
for f in filings:
positions = {}
for h in f["holdings"]:
ticker = h["ticker"]
pos_type = h["type"]
value = h["value"]
key = (ticker, pos_type)
positions[key] = positions.get(key, 0) + value
holdings[f["quarter"]] = positions
def get_prices(tickers, dates):
"""Fetch close prices for tickers on specific dates."""
unique_tickers = sorted(set(tickers))
all_dates = [datetime.strptime(d, '%Y-%m-%d') for d in dates]
start = min(all_dates) - timedelta(days=5)
end = max(all_dates) + timedelta(days=5)
df = yf.download(unique_tickers, start=start, end=end, progress=False, auto_adjust=True)
if df.empty:
return {}
# yf.download returns MultiIndex columns (metric, ticker) for multiple tickers
if isinstance(df.columns, pd.MultiIndex):
close = df['Close']
else:
close = df[['Close']]
close.columns = unique_tickers
prices = {}
for ticker in unique_tickers:
if ticker not in close.columns:
continue
series = close[ticker].dropna()
if series.empty:
continue
prices[ticker] = {}
for date_str in dates:
target = pd.Timestamp(datetime.strptime(date_str, '%Y-%m-%d'))
after = series[series.index >= target]
if not after.empty:
prices[ticker][date_str] = float(after.iloc[0])
else:
before = series[series.index <= target]
if not before.empty:
prices[ticker][date_str] = float(before.iloc[-1])
return prices
def _price_on_or_after(px_by_date, target_date):
"""Return (date, price) for the first available price on/after target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d >= target_date)
if not dates:
return None
d = dates[0]
return d, px_by_date[d]
def _price_on_or_before(px_by_date, target_date):
"""Return (date, price) for the last available price on/before target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d <= target_date)
if not dates:
return None
d = dates[-1]
return d, px_by_date[d]
def _period_price_pair(px_by_date, start_date, end_date):
"""Return start/end prices for a period using sensible boundary alignment."""
start = _price_on_or_after(px_by_date, start_date)
end = _price_on_or_before(px_by_date, end_date)
if start is None or end is None:
return None
start_actual, p0 = start
end_actual, p1 = end
if end_actual < start_actual:
return None
return start_actual, end_actual, p0, p1
def _option_position_key(ticker, pos_type):
return (ticker, pos_type)
def compute_return(positions, prices, start_date, end_date, mode='equity_only',
option_prices=None, option_params=None):
"""Compute weighted portfolio return between two dates.
For option positions (call/put): if option_prices contains actual
historical option prices for the ticker, compute returns directly from
those prices. Otherwise fall back to Black-Scholes repricing using
the parameters in option_params.
"""
total_value = 0
weighted_return = 0
for (ticker, pos_type), value in positions.items():
if mode == 'equity_only' and pos_type != 'long':
continue
is_option = pos_type in ('call', 'put')
opt_key = _option_position_key(ticker, pos_type)
opt_px = option_prices.get(opt_key) if option_prices else None
use_option_px = bool(is_option and opt_px)
use_bs = False
if use_option_px:
pair = _period_price_pair(opt_px, start_date, end_date)
if pair is None:
use_option_px = False
else:
start_actual, end_actual, p0, p1 = pair
if not use_option_px:
px = prices.get(ticker)
pair = _period_price_pair(px, start_date, end_date)
if pair is None:
continue
start_actual, end_actual, p0, p1 = pair
if p0 == 0:
continue
ret = (p1 - p0) / p0
# Fallback: BS repricing when no option price data
if is_option and not use_option_px:
params = option_params.get(ticker) if option_params else None
if params:
K_call, K_put, sigma = params
K_over_S = K_call if pos_type == 'call' else K_put
dt = days_between(start_actual, end_actual) / 365.25
ret = bs_option_return(ret, K_over_S, OPTION_T, dt, sigma,
pos_type)
use_bs = True
total_value += value
# When using actual option/BS prices the return already reflects
# the directional bet, so sign is +1. The -1 for puts applies
# only to the raw stock-price proxy path.
sign = 1 if (use_option_px or use_bs) else (
-1 if pos_type == 'put' else 1)
weighted_return += value * sign * ret
return weighted_return / total_value if total_value else None
def annualize(ret, days):
"""Annualize a return over a given number of calendar days."""
if ret is None or days <= 0:
return None
return (1 + ret) ** (365.25 / days) - 1
def days_between(d1, d2):
return (datetime.strptime(d2, '%Y-%m-%d') - datetime.strptime(d1, '%Y-%m-%d')).days
def fmt(ret):
return f"{ret * 100:+.2f}%" if ret is not None else "N/A"
# Collect all tickers and dates
all_tickers = set()
for positions in holdings.values():
for (ticker, _) in positions:
all_tickers.add(ticker)
all_tickers.add('SPY')
today = datetime.now().strftime('%Y-%m-%d')
first_date = filing_dates[quarters[0]]
all_dates = set(filing_dates.values()) | set(quarter_end_dates.values()) | {today}
prices = get_prices(sorted(all_tickers), sorted(all_dates))
# Resolve `today` to the actual last available closing date.
# yfinance may not have data for today (market still open or holiday),
# so we look up what date SPY's price actually corresponds to.
def _resolve_price_date(prices, requested_date):
"""Return the actual trading date of the price stored under requested_date."""
ref = 'SPY' if 'SPY' in prices else next(iter(prices), None)
if not ref or requested_date not in prices[ref]:
return requested_date
target_price = prices[ref][requested_date]
# Re-download a small window to find the real date of this price
start = datetime.strptime(requested_date, '%Y-%m-%d') - timedelta(days=10)
end = datetime.strptime(requested_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(ref, start=start, end=end, progress=False, auto_adjust=True)
if df.empty:
return requested_date
if isinstance(df.columns, pd.MultiIndex):
close = df['Close'][ref].dropna()
elif 'Close' in df.columns:
close = df['Close'].dropna()
else:
close = df.iloc[:, 0].dropna()
for dt, px in close.items():
val = float(px.iloc[0]) if isinstance(px, pd.Series) else float(px)
if abs(val - target_price) < 0.01:
ts = dt[0] if isinstance(dt, tuple) else dt
return pd.Timestamp(ts).strftime('%Y-%m-%d')
return requested_date
today_resolved = _resolve_price_date(prices, today)
if today_resolved != today:
for ticker in prices:
if today in prices[ticker]:
prices[ticker][today_resolved] = prices[ticker].pop(today)
today = today_resolved
def download_daily(tickers, start_date, end_date):
"""Download daily close prices from yfinance, handling MultiIndex.
Dates are 'YYYY-MM-DD' strings. Adds a small buffer for trading-day alignment."""
tickers_sorted = sorted(tickers)
start = datetime.strptime(start_date, '%Y-%m-%d') - timedelta(days=5)
end = datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(tickers_sorted, start=start, end=end,
progress=False, auto_adjust=True)
if df.empty:
return pd.DataFrame()
if isinstance(df.columns, pd.MultiIndex):
return df['Close']
close = df[['Close']]
close.columns = tickers_sorted
return close
# -- Historical option prices via Alpha Vantage ----------------------------
OPTION_CACHE_DIR = os.path.expanduser('~/My Drive/notes/.sa-lp-option-cache')
_AV_BASE = 'https://www.alphavantage.co/query'
_AV_RATE_DELAY = 0.85 # seconds between requests (75 req/min limit)
OPTION_CACHE_COLUMNS = [
'date', 'option_type', 'strike', 'expiry', 'delta', 'price']
def _normalize_option_type(option_type):
option_type = str(option_type).lower()
if option_type not in ('call', 'put'):
raise ValueError(f"Unsupported option type: {option_type}")
return option_type
def _empty_option_cache():
return pd.DataFrame(columns=OPTION_CACHE_COLUMNS)
def _option_cache_path(ticker, option_type, legacy=False):
if legacy:
return os.path.join(OPTION_CACHE_DIR, f'{ticker}.csv')
option_type = _normalize_option_type(option_type)
return os.path.join(OPTION_CACHE_DIR, f'{ticker}-{option_type}.csv')
def _load_option_cache(ticker, option_type):
"""Load cached option data for a ticker/type. Returns DataFrame or empty."""
option_type = _normalize_option_type(option_type)
paths = [_option_cache_path(ticker, option_type)]
# Pre-fix caches were call-only and named TICKER.csv. They are safe to
# reuse for calls but must not be reused for puts.
if option_type == 'call':
legacy_path = _option_cache_path(ticker, option_type, legacy=True)
if legacy_path not in paths:
paths.append(legacy_path)
frames = []
for path in paths:
if not os.path.exists(path):
continue
df = pd.read_csv(path)
if df.empty:
continue
if 'option_type' not in df.columns:
df['option_type'] = 'call'
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
df['date'] = pd.to_datetime(df['date'], errors='coerce').dt.strftime(
'%Y-%m-%d')
df['option_type'] = df['option_type'].fillna(option_type).str.lower()
frames.append(df[OPTION_CACHE_COLUMNS])
if not frames:
return _empty_option_cache()
cache = pd.concat(frames, ignore_index=True)
cache = cache[cache['option_type'] == option_type].copy()
cache.dropna(subset=['date'], inplace=True)
for col in ('strike', 'delta', 'price'):
cache[col] = pd.to_numeric(cache[col], errors='coerce')
cache.drop_duplicates(
subset=['date', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
return cache[OPTION_CACHE_COLUMNS]
def _save_option_cache(ticker, option_type, df):
"""Persist typed option cache to CSV."""
option_type = _normalize_option_type(option_type)
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
path = _option_cache_path(ticker, option_type)
if df.empty:
df = _empty_option_cache()
else:
df = df.copy()
df['option_type'] = option_type
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
df.drop_duplicates(
subset=['date', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
df.sort_values(['date', 'expiry', 'strike'], inplace=True)
df.to_csv(path, index=False)
def _fetch_option_chain(ticker, date_str, api_key):
"""Fetch the full option chain for ticker on a given date from Alpha Vantage."""
params = {
'function': 'HISTORICAL_OPTIONS',
'symbol': ticker,
'date': date_str,
'apikey': api_key,
}
try:
resp = requests.get(_AV_BASE, params=params, timeout=30)
resp.raise_for_status()
body = resp.json()
return body.get('data', [])
except Exception:
return []
def _contract_window(ref_date_str):
ref = datetime.strptime(ref_date_str, '%Y-%m-%d')
return ref + timedelta(days=270), ref + timedelta(days=456)
def _contract_from_cache_row(row, ref_date_str, option_type):
option_type = _normalize_option_type(option_type)
if str(row.get('option_type', option_type)).lower() != option_type:
return None
lo, hi = _contract_window(ref_date_str)
try:
exp = datetime.strptime(str(row['expiry']), '%Y-%m-%d')
except (KeyError, TypeError, ValueError):
return None
if not (lo <= exp <= hi):
return None
strike = _safe_float(row.get('strike'))
delta = _safe_float(row.get('delta'))
price = _safe_float(row.get('price'))
if strike is None or delta is None or price is None or price <= 0:
return None
return {
'option_type': option_type,
'strike': strike,
'expiry': str(row['expiry']),
'delta': delta,
'price': price,
}
def _select_cached_contract(cache, option_type, ref_date_str):
rows = cache[(cache['date'] == ref_date_str)
& (cache['option_type'] == option_type)]
candidates = []
for _, row in rows.iterrows():
contract = _contract_from_cache_row(row, ref_date_str, option_type)
if contract:
candidates.append(contract)
if not candidates:
return None
candidates.sort(key=lambda x: abs(abs(x['delta']) - OPTION_DELTA))
return candidates[0]
def _select_best_contract(chain, ref_date_str, option_type):
"""Select the best-matching call/put contract from an option chain.
Criteria: matching option type, expiry 9-15 months from ref_date,
absolute delta closest to 0.15. Returns dict with type, strike, expiry,
delta, price or None.
"""
option_type = _normalize_option_type(option_type)
lo, hi = _contract_window(ref_date_str)
candidates = []
for c in chain:
if c.get('type', '').lower() != option_type:
continue
try:
exp = datetime.strptime(c['expiration'], '%Y-%m-%d')
except (KeyError, ValueError):
continue
if not (lo <= exp <= hi):
continue
delta = _safe_float(c.get('delta'))
if delta is None:
continue
abs_delta = abs(delta)
if abs_delta == 0:
continue
# Price: prefer mid if available, else last
price = _parse_option_price(c)
if price is None or price <= 0:
continue
candidates.append({
'option_type': option_type,
'strike': float(c['strike']),
'expiry': c['expiration'],
'delta': delta,
'price': price,
})
if not candidates:
return None
# Pick contract with absolute delta closest to 0.15.
candidates.sort(key=lambda x: abs(abs(x['delta']) - OPTION_DELTA))
return candidates[0]
def _parse_option_price(contract):
"""Extract a price from an Alpha Vantage option contract record."""
bid = _safe_float(contract.get('bid'))
ask = _safe_float(contract.get('ask'))
last = _safe_float(contract.get('last'))
if bid and ask and bid > 0 and ask > 0:
return (bid + ask) / 2
if last and last > 0:
return last
return None
def _safe_float(val):
try:
return float(val)
except (TypeError, ValueError):
return None
def _cached_contract_price(cache, option_type, date_str, strike, expiry):
rows = cache[(cache['date'] == date_str)
& (cache['option_type'] == option_type)
& (abs(cache['strike'] - strike) < 0.01)
& (cache['expiry'].astype(str) == str(expiry))]
for _, row in rows.iterrows():
price = _safe_float(row.get('price'))
if price is not None and price > 0:
return price
return None
def _extract_contract_price(chain, strike, expiry, option_type):
"""Find the price of a specific option contract."""
option_type = _normalize_option_type(option_type)
for c in chain:
if c.get('type', '').lower() != option_type:
continue
try:
if (abs(float(c['strike']) - strike) < 0.01
and c.get('expiration') == expiry):
return _parse_option_price(c)
except (KeyError, TypeError, ValueError):
continue
return None
def download_option_prices(option_positions, quarters, holdings, filing_dates,
today):
"""Download historical option prices from Alpha Vantage.
For each (ticker, option_type) and each filing period:
1. On the first trading day, fetch the chain and select the best contract
(matching type, expiry 9-15 months out, |delta| closest to 0.15).
2. Lock in that contract for the period.
3. Fetch the price of that contract on each subsequent trading day.
Returns
-------
per_period : dict {quarter_str: {(ticker, type): {date_str: float}}}
Option prices keyed by filing period then option position. Each period
has its own contract's prices, avoiding cross-contract mixing at
boundary dates where one period ends and the next begins.
fallback_positions : set
Option positions where no option data was found (need BS fallback).
"""
option_positions = sorted({
(ticker, _normalize_option_type(pos_type))
for ticker, pos_type in option_positions})
api_key = os.environ.get('ALPHA_VANTAGE_KEY', '')
cache_only = not bool(api_key)
if cache_only:
print("WARNING: ALPHA_VANTAGE_KEY not set; using cached option "
"prices where available and BS repricing elsewhere.")
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
per_period = {} # {q: {(ticker, type): {date_str: price}}}
fallback = set()
fetched = 0
for ticker, option_type in option_positions:
opt_key = _option_position_key(ticker, option_type)
cache = _load_option_cache(ticker, option_type)
position_has_data = False
new_rows = []
for i, q in enumerate(quarters):
# Skip quarters where this exact option position is absent.
has_opts = opt_key in holdings[q]
if not has_opts:
continue
period_start = filing_dates[q]
period_end = (filing_dates[quarters[i + 1]]
if i < len(quarters) - 1 else today)
trading_days = pd.bdate_range(period_start, period_end)
if len(trading_days) == 0:
continue
first_day = trading_days[0].strftime('%Y-%m-%d')
# -- Select contract on first trading day --
from_cache = False
contract = _select_cached_contract(cache, option_type, first_day)
if contract:
from_cache = True
else:
if not cache_only:
time.sleep(_AV_RATE_DELAY)
chain = _fetch_option_chain(ticker, first_day, api_key)
fetched += 1
contract = _select_best_contract(
chain, first_day, option_type)
if contract:
new_rows.append({
'date': first_day,
'option_type': option_type,
'strike': contract['strike'],
'expiry': contract['expiry'],
'delta': contract['delta'],
'price': contract['price'],
})
if contract is None:
fallback.add(opt_key)
continue
strike = contract['strike']
expiry = contract['expiry']
# -- Collect prices for this period (fresh dict per period) --
period_prices = {}
if from_cache:
# Fast path: read all matching prices from cache (no API calls).
rows = cache[
(cache['date'] >= period_start)
& (cache['date'] <= period_end)
& (cache['option_type'] == option_type)
& (abs(cache['strike'] - strike) < 0.01)
& (cache['expiry'].astype(str) == str(expiry))
& pd.notna(cache['price'])]
for _, row in rows.iterrows():
period_prices[row['date']] = float(row['price'])
else:
# Slow path: fetch each trading day from API
if contract.get('price'):
period_prices[first_day] = contract['price']
for day in trading_days[1:]:
day_str = day.strftime('%Y-%m-%d')
cached_price = _cached_contract_price(
cache, option_type, day_str, strike, expiry)
if cached_price is not None:
period_prices[day_str] = cached_price
continue
time.sleep(_AV_RATE_DELAY)
chain = _fetch_option_chain(ticker, day_str, api_key)
fetched += 1
price = _extract_contract_price(
chain, strike, expiry, option_type)
if price is not None:
period_prices[day_str] = price
new_rows.append({
'date': day_str,
'option_type': option_type,
'strike': strike,
'expiry': expiry,
'delta': contract['delta'],
'price': price,
})
# Accumulate per-period prices
if period_prices:
per_period.setdefault(q, {})[opt_key] = period_prices
position_has_data = True
# Persist new data to cache
if new_rows:
new_df = pd.DataFrame(new_rows)
cache = pd.concat([cache, new_df], ignore_index=True)
cache.drop_duplicates(
subset=['date', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
_save_option_cache(ticker, option_type, cache)
if not position_has_data:
fallback.add(opt_key)
if fetched:
import sys
print(f"[options] Fetched {fetched} chain snapshots from Alpha Vantage",
file=sys.stderr)
return per_period, fallback
# -- Fallback: Black-Scholes repricing for tickers without option data -----
from scipy.stats import norm as _norm
def compute_realized_vol(tickers, download_daily_fn, today_str):
"""Compute annualized realized vol from trailing 1-year daily returns."""
vol_start = (datetime.strptime(today_str, '%Y-%m-%d')
- timedelta(days=400)).strftime('%Y-%m-%d')
vol_df = download_daily_fn(tickers, vol_start, today_str)
result = {}
for ticker in tickers:
if ticker in vol_df.columns:
series = vol_df[ticker].dropna()
if len(series) > 20:
log_rets = np.log(series / series.shift(1)).dropna().tail(252)
result[ticker] = float(log_rets.std() * np.sqrt(252))
return result
def bs_price(S, K, T, sigma, option_type='call'):
"""Black-Scholes option price (assumes zero risk-free rate and dividends)."""
if T <= 0 or sigma <= 0:
if option_type == 'call':
return max(S - K, 0)
return max(K - S, 0)
d1 = (np.log(S / K) + (sigma ** 2 / 2) * T) / (sigma * np.sqrt(T))
d2 = d1 - sigma * np.sqrt(T)
if option_type == 'call':
return S * _norm.cdf(d1) - K * _norm.cdf(d2)
return K * _norm.cdf(-d2) - S * _norm.cdf(-d1)
def bs_option_return(stock_return, K_over_S, T, delta_t, sigma,
option_type='call'):
"""Compute option return from stock return using Black-Scholes repricing.
Normalizes S_0 = 1, so S_1 = 1 + stock_return and K = K_over_S.
T is time to expiry at period start; delta_t is time elapsed.
"""
V0 = bs_price(1.0, K_over_S, T, sigma, option_type)
V1 = bs_price(1.0 + stock_return, K_over_S, max(T - delta_t, 0),
sigma, option_type)
if V0 <= 0:
return stock_return
return V1 / V0 - 1
OPTION_DELTA = 0.15
OPTION_T = 1.0
def build_option_params(option_tickers, ticker_vol):
"""Build {ticker: (K_over_S_call, K_over_S_put, sigma)} for BS fallback."""
result = {}
for t in option_tickers:
if t not in ticker_vol:
continue
sigma = ticker_vol[t]
srt = sigma * OPTION_T ** 0.5
# Call: delta = N(d1) = OPTION_DELTA
d1_call = _norm.ppf(OPTION_DELTA)
K_call = np.exp(-d1_call * srt + sigma ** 2 * OPTION_T / 2)
# Put: delta = N(d1) - 1 = -OPTION_DELTA, so N(d1) = 1 - OPTION_DELTA
d1_put = _norm.ppf(1 - OPTION_DELTA)
K_put = np.exp(-d1_put * srt + sigma ** 2 * OPTION_T / 2)
result[t] = (K_call, K_put, sigma)
return result
def daily_cumulative(holdings, quarters, filing_dates, close, today, mode,
per_period_opt=None, option_params=None):
"""Build a daily series of cumulative growth factors for a given mode.
For each filing period the portfolio weights are fixed. Each trading
day's weighted return is computed relative to the period's starting
prices, then chained with the prior period's cumulative growth.
For option positions: uses per-period option prices from per_period_opt
when available; otherwise falls back to Black-Scholes repricing using
option_params.
"""
cum_growth = 1.0
dates_out = []
values_out = []
for i, q in enumerate(quarters):
period_start = filing_dates[q]
period_end = filing_dates[quarters[i + 1]] if i < len(quarters) - 1 else today
ps = pd.Timestamp(period_start)
pe = pd.Timestamp(period_end)
# Trading days in this period
mask = (close.index >= ps) & (close.index <= pe)
period_close = close[mask]
if period_close.empty:
continue
# Option prices for this period (keyed by (ticker, type) → prices)
quarter_opt = per_period_opt.get(q, {}) if per_period_opt else {}
# Determine portfolio weights and starting prices
positions = holdings[q]
weights = {}
start_prices = {}
use_opt_px = {} # track which positions use option prices
total_value = 0
for (ticker, pos_type), value in positions.items():
if mode == 'equity_only' and pos_type != 'long':
continue
# Select price source for this position
is_option = pos_type in ('call', 'put')
opt_key = _option_position_key(ticker, pos_type)
has_opt = is_option and opt_key in quarter_opt
if has_opt:
ticker_opt = quarter_opt[opt_key]
opt_dates = sorted(d for d in ticker_opt if d >= period_start)
if not opt_dates:
continue
start_prices[(ticker, pos_type)] = ticker_opt[opt_dates[0]]
elif ticker in close.columns:
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
continue
start_prices[(ticker, pos_type)] = float(avail.iloc[0])
else:
continue
weights[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = has_opt
total_value += value
if total_value == 0:
continue
# Daily weighted return relative to period start
# Skip first day of subsequent periods (already recorded as last day
# of the prior period) to avoid duplicate boundary dates.
start_idx = 1 if i > 0 else 0
# Forward-fill: track last known option price so that gaps in
# option data don't cause positions to vanish mid-period.
last_opt = {k: v for k, v in start_prices.items()
if use_opt_px.get(k)}
for day_idx in range(start_idx, len(period_close)):
day = period_close.index[day_idx]
day_str = day.strftime('%Y-%m-%d')
weighted_return = 0
for (ticker, pos_type), value in weights.items():
p0 = start_prices[(ticker, pos_type)]
if p0 == 0:
continue
if use_opt_px[(ticker, pos_type)]:
opt_key = _option_position_key(ticker, pos_type)
p1_val = quarter_opt.get(opt_key, {}).get(day_str)
if p1_val is not None:
last_opt[(ticker, pos_type)] = p1_val
else:
p1_val = last_opt.get((ticker, pos_type))
if p1_val is None:
continue
elif ticker in period_close.columns:
p1_val = period_close[ticker].iloc[day_idx]
if pd.isna(p1_val):
continue
else:
continue
ret = (float(p1_val) - p0) / p0
# Fallback: BS repricing for option positions without option data
use_bs = False
if (pos_type in ('call', 'put')
and not use_opt_px[(ticker, pos_type)]):
params = option_params.get(ticker) if option_params else None
if params:
K_call, K_put, sigma = params
K_over_S = K_call if pos_type == 'call' else K_put
dt = (day - ps).days / 365.25
ret = bs_option_return(ret, K_over_S, OPTION_T, dt,
sigma, pos_type)
use_bs = True
sign = 1 if (use_opt_px[(ticker, pos_type)] or use_bs) else (
-1 if pos_type == 'put' else 1)
weighted_return += (value / total_value) * sign * ret
dates_out.append(day)
values_out.append(cum_growth * (1 + weighted_return))
# Chain: next period starts from the last day's growth factor
if values_out:
cum_growth = values_out[-1]
return dates_out, values_out
first_qe = quarter_end_dates[quarters[0]]
last_fd = filing_dates[quarters[-1]]
dc = download_daily(sorted(all_tickers), first_qe, last_fd)
def pf_ret(positions, close_df, i0, i1, mode='equity_only'):
"""Weighted portfolio return between index positions i0 and i1."""
if i0 >= i1:
return 0.0
total_value = 0
weighted_return = 0
for (ticker, pos_type), value in positions.items():
if mode == 'equity_only' and pos_type != 'long':
continue
if ticker not in close_df.columns:
continue
p0 = close_df[ticker].iloc[i0]
p1 = close_df[ticker].iloc[i1]
if pd.isna(p0) or pd.isna(p1) or float(p0) == 0:
continue
ret = (float(p1) - float(p0)) / float(p0)
sign = -1 if pos_type == 'put' else 1
total_value += value
weighted_return += value * sign * ret
return weighted_return / total_value if total_value else None
def slice_dc(start_date, end_date):
"""Slice the daily close DataFrame to a date range (inclusive)."""
return dc[(dc.index >= pd.Timestamp(start_date))
& (dc.index <= pd.Timestamp(end_date))]
# ── Unified delay cost analysis ──────────────────────────────────────
# The copycat holds Q_{i-1} from filing_{i-1} to filing_i. We model
# the fund as switching from Q_{i-1} to Q_i uniformly during quarter i,
# and from Q_i to Q_{i+1} uniformly during quarter i+1.
print("COPYCAT DELAY COST (equity-only, uniform switch model)")
print("=" * 57)
print(f"{'Transition':<19} {'Intra-Q':>10} {'Gap':>10} {'Total':>10}")
print("-" * 57)
cum_intra = 1.0
cum_gap = 1.0
cum_total = 1.0
for i in range(1, len(quarters)):
prev_q = quarters[i - 1]
curr_q = quarters[i]
next_q = quarters[i + 1] if i + 1 < len(quarters) else None
qe_prev = quarter_end_dates[prev_q]
qe_curr = quarter_end_dates[curr_q]
fd_curr = filing_dates[curr_q]
# ── Intra-quarter: qe[i-1] to qe[i] ─────────────────────────
# For each trading day d, compute R(Q_i, d, T) - R(Q_{i-1}, d, T).
pc_q = slice_dc(qe_prev, qe_curr)
N_q = len(pc_q) - 1
intra_costs = []
for d in range(N_q):
r_new = pf_ret(holdings[curr_q], pc_q, d, N_q)
r_old = pf_ret(holdings[prev_q], pc_q, d, N_q)
if r_new is not None and r_old is not None:
intra_costs.append(r_new - r_old)
avg_intra = sum(intra_costs) / len(intra_costs) if intra_costs else None
# ── Gap: qe[i] to filing[i] ─────────────────────────────────
# The fund may hold Q_i or may have already switched to Q_{i+1}.
pc_g = slice_dc(qe_curr, fd_curr)
M = len(pc_g) - 1 # trading days in the gap
if M <= 0:
avg_gap = 0.0
elif next_q is not None:
# Count trading days in quarter i+1 for probabilities
qe_next = quarter_end_dates[next_q]
pc_full = slice_dc(qe_curr, qe_next)
N_full = len(pc_full) - 1 if len(pc_full) > 1 else 63
r_copy = pf_ret(holdings[prev_q], pc_g, 0, M)
if r_copy is None:
avg_gap = None
else:
# No-switch: fund holds Q_i for entire gap (switch after gap)
r_qi = pf_ret(holdings[curr_q], pc_g, 0, M)
no_switch = (r_qi - r_copy) if r_qi is not None else None
# Switch on day d: fund holds Q_i for [0, d], Q_{i+1} for [d, M]
switch_costs = []
for d in range(1, M + 1):
r_a = pf_ret(holdings[curr_q], pc_g, 0, d)
r_b = pf_ret(holdings[next_q], pc_g, d, M)
if r_a is not None and r_b is not None:
r_fund = (1 + r_a) * (1 + r_b) - 1
switch_costs.append(r_fund - r_copy)
p_no = (N_full - M) / N_full
p_each = 1 / N_full
if no_switch is not None:
avg_gap = p_no * no_switch + p_each * sum(switch_costs)
elif switch_costs:
avg_gap = p_each * sum(switch_costs)
else:
avg_gap = None
else:
# Q_{i+1} not available: simple Q_i vs Q_{i-1}
r_qi = pf_ret(holdings[curr_q], pc_g, 0, M)
r_old = pf_ret(holdings[prev_q], pc_g, 0, M)
if r_qi is not None and r_old is not None:
avg_gap = r_qi - r_old
else:
avg_gap = None
# ── Total ────────────────────────────────────────────────────
if avg_intra is not None and avg_gap is not None:
total = (1 + avg_intra) * (1 + avg_gap) - 1
elif avg_intra is not None:
total = avg_intra
elif avg_gap is not None:
total = avg_gap
else:
total = None
if avg_intra is not None:
cum_intra *= (1 + avg_intra)
if avg_gap is not None:
cum_gap *= (1 + avg_gap)
if total is not None:
cum_total *= (1 + total)
suffix = " \u2020" if next_q is None else ""
label = f"{prev_q} \u2192 {curr_q}"
print(f"{label + suffix:<19} {fmt(avg_intra):>10} {fmt(avg_gap):>10} {fmt(total):>10}")
print("-" * 57)
print(f"{'Cumulative':<19} {fmt(cum_intra - 1):>10} {fmt(cum_gap - 1):>10} {fmt(cum_total - 1):>10}")
print()
print("Intra-Q = avg cost of fund switching to Q_i during quarter i")
print("Gap = cost during ~45-day gap, with possible Q_i \u2192 Q_{i+1} switch")
print("\u2020 = Q_{i+1} not yet available; gap uses simple Q_i vs Q_{i-1}")
print("Positive = delay hurts the copycat")
For evaluating the copycat strategy in isolation, this analysis adds little: the historical returns already price in these delay costs. Where it is more relevant is in comparing the copycat against the fund itself. The delay cost estimates one observable component of the tracking gap; the other limitations discussed above remain unobserved and could move the realized gap in either direction.
As noted, the compounded delay cost over four quarterly transitions (approximately one year) is 85.97%. This estimate uses only equity positions. The true gap between the copycat and the fund could be larger or smaller depending on the fund’s undisclosed option contracts, short positions, foreign-listed securities, non-equity assets, and actual trading path, so I would treat the figure as evidence that disclosure lag matters rather than as a literal estimate of the fund’s net investor return.
Portfolio calculator
The calculator below converts the most recent 13F filing into a concrete trade list. In equity-only mode, it allocates whole shares in proportion to the fund’s reported equity weights. In full-exposure mode, option rows are treated as underlying notional targets; when a cached representative contract is available, the calculator estimates contract counts and premium cost, but the exact option contracts remain undisclosed. An optional cutoff drops positions below a given portfolio percentage and redistributes their weight among the rest. You can also exclude individual rows, or include rows below the cutoff, by selecting the relevant checkboxes.
Code
import json
import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
import requests
import time
import os
import warnings
warnings.filterwarnings('ignore')
# Parse data from the scraper block
parsed = json.loads(data) if isinstance(data, str) else data
filings = parsed["filings"]
# Build internal structures
filing_dates = {f["quarter"]: f["filing_date"] for f in filings}
quarter_end_dates = {f["quarter"]: f["quarter_end"] for f in filings}
quarters = [f["quarter"] for f in filings]
# Convert holdings list to dict keyed by quarter.
# Multiple positions in the same ticker with different types are aggregated
# by value per (ticker, type) pair.
holdings = {}
for f in filings:
positions = {}
for h in f["holdings"]:
ticker = h["ticker"]
pos_type = h["type"]
value = h["value"]
key = (ticker, pos_type)
positions[key] = positions.get(key, 0) + value
holdings[f["quarter"]] = positions
def get_prices(tickers, dates):
"""Fetch close prices for tickers on specific dates."""
unique_tickers = sorted(set(tickers))
all_dates = [datetime.strptime(d, '%Y-%m-%d') for d in dates]
start = min(all_dates) - timedelta(days=5)
end = max(all_dates) + timedelta(days=5)
df = yf.download(unique_tickers, start=start, end=end, progress=False, auto_adjust=True)
if df.empty:
return {}
# yf.download returns MultiIndex columns (metric, ticker) for multiple tickers
if isinstance(df.columns, pd.MultiIndex):
close = df['Close']
else:
close = df[['Close']]
close.columns = unique_tickers
prices = {}
for ticker in unique_tickers:
if ticker not in close.columns:
continue
series = close[ticker].dropna()
if series.empty:
continue
prices[ticker] = {}
for date_str in dates:
target = pd.Timestamp(datetime.strptime(date_str, '%Y-%m-%d'))
after = series[series.index >= target]
if not after.empty:
prices[ticker][date_str] = float(after.iloc[0])
else:
before = series[series.index <= target]
if not before.empty:
prices[ticker][date_str] = float(before.iloc[-1])
return prices
def _price_on_or_after(px_by_date, target_date):
"""Return (date, price) for the first available price on/after target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d >= target_date)
if not dates:
return None
d = dates[0]
return d, px_by_date[d]
def _price_on_or_before(px_by_date, target_date):
"""Return (date, price) for the last available price on/before target."""
if not px_by_date:
return None
dates = sorted(d for d in px_by_date if d <= target_date)
if not dates:
return None
d = dates[-1]
return d, px_by_date[d]
def _period_price_pair(px_by_date, start_date, end_date):
"""Return start/end prices for a period using sensible boundary alignment."""
start = _price_on_or_after(px_by_date, start_date)
end = _price_on_or_before(px_by_date, end_date)
if start is None or end is None:
return None
start_actual, p0 = start
end_actual, p1 = end
if end_actual < start_actual:
return None
return start_actual, end_actual, p0, p1
def _option_position_key(ticker, pos_type):
return (ticker, pos_type)
def compute_return(positions, prices, start_date, end_date, mode='equity_only',
option_prices=None, option_params=None):
"""Compute weighted portfolio return between two dates.
For option positions (call/put): if option_prices contains actual
historical option prices for the ticker, compute returns directly from
those prices. Otherwise fall back to Black-Scholes repricing using
the parameters in option_params.
"""
total_value = 0
weighted_return = 0
for (ticker, pos_type), value in positions.items():
if mode == 'equity_only' and pos_type != 'long':
continue
is_option = pos_type in ('call', 'put')
opt_key = _option_position_key(ticker, pos_type)
opt_px = option_prices.get(opt_key) if option_prices else None
use_option_px = bool(is_option and opt_px)
use_bs = False
if use_option_px:
pair = _period_price_pair(opt_px, start_date, end_date)
if pair is None:
use_option_px = False
else:
start_actual, end_actual, p0, p1 = pair
if not use_option_px:
px = prices.get(ticker)
pair = _period_price_pair(px, start_date, end_date)
if pair is None:
continue
start_actual, end_actual, p0, p1 = pair
if p0 == 0:
continue
ret = (p1 - p0) / p0
# Fallback: BS repricing when no option price data
if is_option and not use_option_px:
params = option_params.get(ticker) if option_params else None
if params:
K_call, K_put, sigma = params
K_over_S = K_call if pos_type == 'call' else K_put
dt = days_between(start_actual, end_actual) / 365.25
ret = bs_option_return(ret, K_over_S, OPTION_T, dt, sigma,
pos_type)
use_bs = True
total_value += value
# When using actual option/BS prices the return already reflects
# the directional bet, so sign is +1. The -1 for puts applies
# only to the raw stock-price proxy path.
sign = 1 if (use_option_px or use_bs) else (
-1 if pos_type == 'put' else 1)
weighted_return += value * sign * ret
return weighted_return / total_value if total_value else None
def annualize(ret, days):
"""Annualize a return over a given number of calendar days."""
if ret is None or days <= 0:
return None
return (1 + ret) ** (365.25 / days) - 1
def days_between(d1, d2):
return (datetime.strptime(d2, '%Y-%m-%d') - datetime.strptime(d1, '%Y-%m-%d')).days
def fmt(ret):
return f"{ret * 100:+.2f}%" if ret is not None else "N/A"
# Collect all tickers and dates
all_tickers = set()
for positions in holdings.values():
for (ticker, _) in positions:
all_tickers.add(ticker)
all_tickers.add('SPY')
today = datetime.now().strftime('%Y-%m-%d')
first_date = filing_dates[quarters[0]]
all_dates = set(filing_dates.values()) | set(quarter_end_dates.values()) | {today}
prices = get_prices(sorted(all_tickers), sorted(all_dates))
# Resolve `today` to the actual last available closing date.
# yfinance may not have data for today (market still open or holiday),
# so we look up what date SPY's price actually corresponds to.
def _resolve_price_date(prices, requested_date):
"""Return the actual trading date of the price stored under requested_date."""
ref = 'SPY' if 'SPY' in prices else next(iter(prices), None)
if not ref or requested_date not in prices[ref]:
return requested_date
target_price = prices[ref][requested_date]
# Re-download a small window to find the real date of this price
start = datetime.strptime(requested_date, '%Y-%m-%d') - timedelta(days=10)
end = datetime.strptime(requested_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(ref, start=start, end=end, progress=False, auto_adjust=True)
if df.empty:
return requested_date
if isinstance(df.columns, pd.MultiIndex):
close = df['Close'][ref].dropna()
elif 'Close' in df.columns:
close = df['Close'].dropna()
else:
close = df.iloc[:, 0].dropna()
for dt, px in close.items():
val = float(px.iloc[0]) if isinstance(px, pd.Series) else float(px)
if abs(val - target_price) < 0.01:
ts = dt[0] if isinstance(dt, tuple) else dt
return pd.Timestamp(ts).strftime('%Y-%m-%d')
return requested_date
today_resolved = _resolve_price_date(prices, today)
if today_resolved != today:
for ticker in prices:
if today in prices[ticker]:
prices[ticker][today_resolved] = prices[ticker].pop(today)
today = today_resolved
def download_daily(tickers, start_date, end_date):
"""Download daily close prices from yfinance, handling MultiIndex.
Dates are 'YYYY-MM-DD' strings. Adds a small buffer for trading-day alignment."""
tickers_sorted = sorted(tickers)
start = datetime.strptime(start_date, '%Y-%m-%d') - timedelta(days=5)
end = datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=5)
df = yf.download(tickers_sorted, start=start, end=end,
progress=False, auto_adjust=True)
if df.empty:
return pd.DataFrame()
if isinstance(df.columns, pd.MultiIndex):
return df['Close']
close = df[['Close']]
close.columns = tickers_sorted
return close
# -- Historical option prices via Alpha Vantage ----------------------------
OPTION_CACHE_DIR = os.path.expanduser('~/My Drive/notes/.sa-lp-option-cache')
_AV_BASE = 'https://www.alphavantage.co/query'
_AV_RATE_DELAY = 0.85 # seconds between requests (75 req/min limit)
OPTION_CACHE_COLUMNS = [
'date', 'option_type', 'strike', 'expiry', 'delta', 'price']
def _normalize_option_type(option_type):
option_type = str(option_type).lower()
if option_type not in ('call', 'put'):
raise ValueError(f"Unsupported option type: {option_type}")
return option_type
def _empty_option_cache():
return pd.DataFrame(columns=OPTION_CACHE_COLUMNS)
def _option_cache_path(ticker, option_type, legacy=False):
if legacy:
return os.path.join(OPTION_CACHE_DIR, f'{ticker}.csv')
option_type = _normalize_option_type(option_type)
return os.path.join(OPTION_CACHE_DIR, f'{ticker}-{option_type}.csv')
def _load_option_cache(ticker, option_type):
"""Load cached option data for a ticker/type. Returns DataFrame or empty."""
option_type = _normalize_option_type(option_type)
paths = [_option_cache_path(ticker, option_type)]
# Pre-fix caches were call-only and named TICKER.csv. They are safe to
# reuse for calls but must not be reused for puts.
if option_type == 'call':
legacy_path = _option_cache_path(ticker, option_type, legacy=True)
if legacy_path not in paths:
paths.append(legacy_path)
frames = []
for path in paths:
if not os.path.exists(path):
continue
df = pd.read_csv(path)
if df.empty:
continue
if 'option_type' not in df.columns:
df['option_type'] = 'call'
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
df['date'] = pd.to_datetime(df['date'], errors='coerce').dt.strftime(
'%Y-%m-%d')
df['option_type'] = df['option_type'].fillna(option_type).str.lower()
frames.append(df[OPTION_CACHE_COLUMNS])
if not frames:
return _empty_option_cache()
cache = pd.concat(frames, ignore_index=True)
cache = cache[cache['option_type'] == option_type].copy()
cache.dropna(subset=['date'], inplace=True)
for col in ('strike', 'delta', 'price'):
cache[col] = pd.to_numeric(cache[col], errors='coerce')
cache.drop_duplicates(
subset=['date', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
return cache[OPTION_CACHE_COLUMNS]
def _save_option_cache(ticker, option_type, df):
"""Persist typed option cache to CSV."""
option_type = _normalize_option_type(option_type)
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
path = _option_cache_path(ticker, option_type)
if df.empty:
df = _empty_option_cache()
else:
df = df.copy()
df['option_type'] = option_type
for col in OPTION_CACHE_COLUMNS:
if col not in df.columns:
df[col] = np.nan
df.drop_duplicates(
subset=['date', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
df.sort_values(['date', 'expiry', 'strike'], inplace=True)
df.to_csv(path, index=False)
def _fetch_option_chain(ticker, date_str, api_key):
"""Fetch the full option chain for ticker on a given date from Alpha Vantage."""
params = {
'function': 'HISTORICAL_OPTIONS',
'symbol': ticker,
'date': date_str,
'apikey': api_key,
}
try:
resp = requests.get(_AV_BASE, params=params, timeout=30)
resp.raise_for_status()
body = resp.json()
return body.get('data', [])
except Exception:
return []
def _contract_window(ref_date_str):
ref = datetime.strptime(ref_date_str, '%Y-%m-%d')
return ref + timedelta(days=270), ref + timedelta(days=456)
def _contract_from_cache_row(row, ref_date_str, option_type):
option_type = _normalize_option_type(option_type)
if str(row.get('option_type', option_type)).lower() != option_type:
return None
lo, hi = _contract_window(ref_date_str)
try:
exp = datetime.strptime(str(row['expiry']), '%Y-%m-%d')
except (KeyError, TypeError, ValueError):
return None
if not (lo <= exp <= hi):
return None
strike = _safe_float(row.get('strike'))
delta = _safe_float(row.get('delta'))
price = _safe_float(row.get('price'))
if strike is None or delta is None or price is None or price <= 0:
return None
return {
'option_type': option_type,
'strike': strike,
'expiry': str(row['expiry']),
'delta': delta,
'price': price,
}
def _select_cached_contract(cache, option_type, ref_date_str):
rows = cache[(cache['date'] == ref_date_str)
& (cache['option_type'] == option_type)]
candidates = []
for _, row in rows.iterrows():
contract = _contract_from_cache_row(row, ref_date_str, option_type)
if contract:
candidates.append(contract)
if not candidates:
return None
candidates.sort(key=lambda x: abs(abs(x['delta']) - OPTION_DELTA))
return candidates[0]
def _select_best_contract(chain, ref_date_str, option_type):
"""Select the best-matching call/put contract from an option chain.
Criteria: matching option type, expiry 9-15 months from ref_date,
absolute delta closest to 0.15. Returns dict with type, strike, expiry,
delta, price or None.
"""
option_type = _normalize_option_type(option_type)
lo, hi = _contract_window(ref_date_str)
candidates = []
for c in chain:
if c.get('type', '').lower() != option_type:
continue
try:
exp = datetime.strptime(c['expiration'], '%Y-%m-%d')
except (KeyError, ValueError):
continue
if not (lo <= exp <= hi):
continue
delta = _safe_float(c.get('delta'))
if delta is None:
continue
abs_delta = abs(delta)
if abs_delta == 0:
continue
# Price: prefer mid if available, else last
price = _parse_option_price(c)
if price is None or price <= 0:
continue
candidates.append({
'option_type': option_type,
'strike': float(c['strike']),
'expiry': c['expiration'],
'delta': delta,
'price': price,
})
if not candidates:
return None
# Pick contract with absolute delta closest to 0.15.
candidates.sort(key=lambda x: abs(abs(x['delta']) - OPTION_DELTA))
return candidates[0]
def _parse_option_price(contract):
"""Extract a price from an Alpha Vantage option contract record."""
bid = _safe_float(contract.get('bid'))
ask = _safe_float(contract.get('ask'))
last = _safe_float(contract.get('last'))
if bid and ask and bid > 0 and ask > 0:
return (bid + ask) / 2
if last and last > 0:
return last
return None
def _safe_float(val):
try:
return float(val)
except (TypeError, ValueError):
return None
def _cached_contract_price(cache, option_type, date_str, strike, expiry):
rows = cache[(cache['date'] == date_str)
& (cache['option_type'] == option_type)
& (abs(cache['strike'] - strike) < 0.01)
& (cache['expiry'].astype(str) == str(expiry))]
for _, row in rows.iterrows():
price = _safe_float(row.get('price'))
if price is not None and price > 0:
return price
return None
def _extract_contract_price(chain, strike, expiry, option_type):
"""Find the price of a specific option contract."""
option_type = _normalize_option_type(option_type)
for c in chain:
if c.get('type', '').lower() != option_type:
continue
try:
if (abs(float(c['strike']) - strike) < 0.01
and c.get('expiration') == expiry):
return _parse_option_price(c)
except (KeyError, TypeError, ValueError):
continue
return None
def download_option_prices(option_positions, quarters, holdings, filing_dates,
today):
"""Download historical option prices from Alpha Vantage.
For each (ticker, option_type) and each filing period:
1. On the first trading day, fetch the chain and select the best contract
(matching type, expiry 9-15 months out, |delta| closest to 0.15).
2. Lock in that contract for the period.
3. Fetch the price of that contract on each subsequent trading day.
Returns
-------
per_period : dict {quarter_str: {(ticker, type): {date_str: float}}}
Option prices keyed by filing period then option position. Each period
has its own contract's prices, avoiding cross-contract mixing at
boundary dates where one period ends and the next begins.
fallback_positions : set
Option positions where no option data was found (need BS fallback).
"""
option_positions = sorted({
(ticker, _normalize_option_type(pos_type))
for ticker, pos_type in option_positions})
api_key = os.environ.get('ALPHA_VANTAGE_KEY', '')
cache_only = not bool(api_key)
if cache_only:
print("WARNING: ALPHA_VANTAGE_KEY not set; using cached option "
"prices where available and BS repricing elsewhere.")
os.makedirs(OPTION_CACHE_DIR, exist_ok=True)
per_period = {} # {q: {(ticker, type): {date_str: price}}}
fallback = set()
fetched = 0
for ticker, option_type in option_positions:
opt_key = _option_position_key(ticker, option_type)
cache = _load_option_cache(ticker, option_type)
position_has_data = False
new_rows = []
for i, q in enumerate(quarters):
# Skip quarters where this exact option position is absent.
has_opts = opt_key in holdings[q]
if not has_opts:
continue
period_start = filing_dates[q]
period_end = (filing_dates[quarters[i + 1]]
if i < len(quarters) - 1 else today)
trading_days = pd.bdate_range(period_start, period_end)
if len(trading_days) == 0:
continue
first_day = trading_days[0].strftime('%Y-%m-%d')
# -- Select contract on first trading day --
from_cache = False
contract = _select_cached_contract(cache, option_type, first_day)
if contract:
from_cache = True
else:
if not cache_only:
time.sleep(_AV_RATE_DELAY)
chain = _fetch_option_chain(ticker, first_day, api_key)
fetched += 1
contract = _select_best_contract(
chain, first_day, option_type)
if contract:
new_rows.append({
'date': first_day,
'option_type': option_type,
'strike': contract['strike'],
'expiry': contract['expiry'],
'delta': contract['delta'],
'price': contract['price'],
})
if contract is None:
fallback.add(opt_key)
continue
strike = contract['strike']
expiry = contract['expiry']
# -- Collect prices for this period (fresh dict per period) --
period_prices = {}
if from_cache:
# Fast path: read all matching prices from cache (no API calls).
rows = cache[
(cache['date'] >= period_start)
& (cache['date'] <= period_end)
& (cache['option_type'] == option_type)
& (abs(cache['strike'] - strike) < 0.01)
& (cache['expiry'].astype(str) == str(expiry))
& pd.notna(cache['price'])]
for _, row in rows.iterrows():
period_prices[row['date']] = float(row['price'])
else:
# Slow path: fetch each trading day from API
if contract.get('price'):
period_prices[first_day] = contract['price']
for day in trading_days[1:]:
day_str = day.strftime('%Y-%m-%d')
cached_price = _cached_contract_price(
cache, option_type, day_str, strike, expiry)
if cached_price is not None:
period_prices[day_str] = cached_price
continue
time.sleep(_AV_RATE_DELAY)
chain = _fetch_option_chain(ticker, day_str, api_key)
fetched += 1
price = _extract_contract_price(
chain, strike, expiry, option_type)
if price is not None:
period_prices[day_str] = price
new_rows.append({
'date': day_str,
'option_type': option_type,
'strike': strike,
'expiry': expiry,
'delta': contract['delta'],
'price': price,
})
# Accumulate per-period prices
if period_prices:
per_period.setdefault(q, {})[opt_key] = period_prices
position_has_data = True
# Persist new data to cache
if new_rows:
new_df = pd.DataFrame(new_rows)
cache = pd.concat([cache, new_df], ignore_index=True)
cache.drop_duplicates(
subset=['date', 'option_type', 'strike', 'expiry'],
keep='last', inplace=True)
cache.sort_values(['date', 'expiry', 'strike'], inplace=True)
_save_option_cache(ticker, option_type, cache)
if not position_has_data:
fallback.add(opt_key)
if fetched:
import sys
print(f"[options] Fetched {fetched} chain snapshots from Alpha Vantage",
file=sys.stderr)
return per_period, fallback
# -- Fallback: Black-Scholes repricing for tickers without option data -----
from scipy.stats import norm as _norm
def compute_realized_vol(tickers, download_daily_fn, today_str):
"""Compute annualized realized vol from trailing 1-year daily returns."""
vol_start = (datetime.strptime(today_str, '%Y-%m-%d')
- timedelta(days=400)).strftime('%Y-%m-%d')
vol_df = download_daily_fn(tickers, vol_start, today_str)
result = {}
for ticker in tickers:
if ticker in vol_df.columns:
series = vol_df[ticker].dropna()
if len(series) > 20:
log_rets = np.log(series / series.shift(1)).dropna().tail(252)
result[ticker] = float(log_rets.std() * np.sqrt(252))
return result
def bs_price(S, K, T, sigma, option_type='call'):
"""Black-Scholes option price (assumes zero risk-free rate and dividends)."""
if T <= 0 or sigma <= 0:
if option_type == 'call':
return max(S - K, 0)
return max(K - S, 0)
d1 = (np.log(S / K) + (sigma ** 2 / 2) * T) / (sigma * np.sqrt(T))
d2 = d1 - sigma * np.sqrt(T)
if option_type == 'call':
return S * _norm.cdf(d1) - K * _norm.cdf(d2)
return K * _norm.cdf(-d2) - S * _norm.cdf(-d1)
def bs_option_return(stock_return, K_over_S, T, delta_t, sigma,
option_type='call'):
"""Compute option return from stock return using Black-Scholes repricing.
Normalizes S_0 = 1, so S_1 = 1 + stock_return and K = K_over_S.
T is time to expiry at period start; delta_t is time elapsed.
"""
V0 = bs_price(1.0, K_over_S, T, sigma, option_type)
V1 = bs_price(1.0 + stock_return, K_over_S, max(T - delta_t, 0),
sigma, option_type)
if V0 <= 0:
return stock_return
return V1 / V0 - 1
OPTION_DELTA = 0.15
OPTION_T = 1.0
def build_option_params(option_tickers, ticker_vol):
"""Build {ticker: (K_over_S_call, K_over_S_put, sigma)} for BS fallback."""
result = {}
for t in option_tickers:
if t not in ticker_vol:
continue
sigma = ticker_vol[t]
srt = sigma * OPTION_T ** 0.5
# Call: delta = N(d1) = OPTION_DELTA
d1_call = _norm.ppf(OPTION_DELTA)
K_call = np.exp(-d1_call * srt + sigma ** 2 * OPTION_T / 2)
# Put: delta = N(d1) - 1 = -OPTION_DELTA, so N(d1) = 1 - OPTION_DELTA
d1_put = _norm.ppf(1 - OPTION_DELTA)
K_put = np.exp(-d1_put * srt + sigma ** 2 * OPTION_T / 2)
result[t] = (K_call, K_put, sigma)
return result
def daily_cumulative(holdings, quarters, filing_dates, close, today, mode,
per_period_opt=None, option_params=None):
"""Build a daily series of cumulative growth factors for a given mode.
For each filing period the portfolio weights are fixed. Each trading
day's weighted return is computed relative to the period's starting
prices, then chained with the prior period's cumulative growth.
For option positions: uses per-period option prices from per_period_opt
when available; otherwise falls back to Black-Scholes repricing using
option_params.
"""
cum_growth = 1.0
dates_out = []
values_out = []
for i, q in enumerate(quarters):
period_start = filing_dates[q]
period_end = filing_dates[quarters[i + 1]] if i < len(quarters) - 1 else today
ps = pd.Timestamp(period_start)
pe = pd.Timestamp(period_end)
# Trading days in this period
mask = (close.index >= ps) & (close.index <= pe)
period_close = close[mask]
if period_close.empty:
continue
# Option prices for this period (keyed by (ticker, type) → prices)
quarter_opt = per_period_opt.get(q, {}) if per_period_opt else {}
# Determine portfolio weights and starting prices
positions = holdings[q]
weights = {}
start_prices = {}
use_opt_px = {} # track which positions use option prices
total_value = 0
for (ticker, pos_type), value in positions.items():
if mode == 'equity_only' and pos_type != 'long':
continue
# Select price source for this position
is_option = pos_type in ('call', 'put')
opt_key = _option_position_key(ticker, pos_type)
has_opt = is_option and opt_key in quarter_opt
if has_opt:
ticker_opt = quarter_opt[opt_key]
opt_dates = sorted(d for d in ticker_opt if d >= period_start)
if not opt_dates:
continue
start_prices[(ticker, pos_type)] = ticker_opt[opt_dates[0]]
elif ticker in close.columns:
src = close[ticker].dropna()
avail = src[src.index >= ps]
if avail.empty:
continue
start_prices[(ticker, pos_type)] = float(avail.iloc[0])
else:
continue
weights[(ticker, pos_type)] = value
use_opt_px[(ticker, pos_type)] = has_opt
total_value += value
if total_value == 0:
continue
# Daily weighted return relative to period start
# Skip first day of subsequent periods (already recorded as last day
# of the prior period) to avoid duplicate boundary dates.
start_idx = 1 if i > 0 else 0
# Forward-fill: track last known option price so that gaps in
# option data don't cause positions to vanish mid-period.
last_opt = {k: v for k, v in start_prices.items()
if use_opt_px.get(k)}
for day_idx in range(start_idx, len(period_close)):
day = period_close.index[day_idx]
day_str = day.strftime('%Y-%m-%d')
weighted_return = 0
for (ticker, pos_type), value in weights.items():
p0 = start_prices[(ticker, pos_type)]
if p0 == 0:
continue
if use_opt_px[(ticker, pos_type)]:
opt_key = _option_position_key(ticker, pos_type)
p1_val = quarter_opt.get(opt_key, {}).get(day_str)
if p1_val is not None:
last_opt[(ticker, pos_type)] = p1_val
else:
p1_val = last_opt.get((ticker, pos_type))
if p1_val is None:
continue
elif ticker in period_close.columns:
p1_val = period_close[ticker].iloc[day_idx]
if pd.isna(p1_val):
continue
else:
continue
ret = (float(p1_val) - p0) / p0
# Fallback: BS repricing for option positions without option data
use_bs = False
if (pos_type in ('call', 'put')
and not use_opt_px[(ticker, pos_type)]):
params = option_params.get(ticker) if option_params else None
if params:
K_call, K_put, sigma = params
K_over_S = K_call if pos_type == 'call' else K_put
dt = (day - ps).days / 365.25
ret = bs_option_return(ret, K_over_S, OPTION_T, dt,
sigma, pos_type)
use_bs = True
sign = 1 if (use_opt_px[(ticker, pos_type)] or use_bs) else (
-1 if pos_type == 'put' else 1)
weighted_return += (value / total_value) * sign * ret
dates_out.append(day)
values_out.append(cum_growth * (1 + weighted_return))
# Chain: next period starts from the last day's growth factor
if values_out:
cum_growth = values_out[-1]
return dates_out, values_out
import os
HUGO_BASE = os.path.expanduser('~/My Drive/repos/stafforini.com')
# -- Build position data for both modes --------------------------------
latest = parsed["filings"][-1]
pos = {}
for h in latest["holdings"]:
key = (h["ticker"], h["type"])
pos[key] = pos.get(key, 0) + h["value"]
eq_pos = {k: v for k, v in pos.items() if k[1] == 'long'}
eq_total = sum(eq_pos.values())
full_total = sum(pos.values())
# Fetch current underlying prices for all rows
calc_tickers = sorted({t for (t, _) in pos})
current = get_prices(calc_tickers, [today])
# Load option contract info for the latest quarter
latest_fd = latest["filing_date"]
opt_contracts = {}
for h in latest["holdings"]:
if h["type"] in ('call', 'put'):
key = (h["ticker"], h["type"])
if key in opt_contracts:
continue
cache = _load_option_cache(h["ticker"], h["type"])
period_rows = cache[(cache['date'] >= latest_fd)
& pd.notna(cache['price'])]
if not period_rows.empty:
row = period_rows.sort_values('date').iloc[-1]
if pd.notna(row['strike']) and pd.notna(row['price']):
opt_contracts[key] = {
'strike': float(row['strike']),
'expiry': str(row['expiry']),
'price': round(float(row['price']), 2),
'price_as_of': str(row['date']),
}
# Build JSON data for both modes
def build_mode_data(positions, total_value):
rows = []
for (ticker, pos_type), value in sorted(positions.items(),
key=lambda x: -x[1]):
weight = value / total_value
underlying_price = None
row = {"ticker": ticker, "type": pos_type,
"weight": round(weight, 6)}
if ticker in current and today in current[ticker]:
underlying_price = round(current[ticker][today], 2)
if pos_type == 'long':
row.update({"instrument": "stock", "price": underlying_price,
"underlying_price": underlying_price,
"multiplier": 1})
elif pos_type in ('call', 'put'):
row.update({"instrument": "option", "price": None,
"underlying_price": underlying_price,
"multiplier": 100})
if (ticker, pos_type) in opt_contracts:
row.update(opt_contracts[(ticker, pos_type)])
else:
row.update({"instrument": "stock", "price": underlying_price,
"underlying_price": underlying_price,
"multiplier": 1})
rows.append(row)
return rows
eq_data = build_mode_data(eq_pos, eq_total)
full_data = build_mode_data(pos, full_total)
quarter = latest["quarter"].replace("_", " ")
filing_date = latest["filing_date"]
# -- Generate self-contained HTML --------------------------------------
CSS = (
'/* reset */ * { margin: 0; padding: 0; box-sizing: border-box; }\n'
'body { font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto,\n'
' sans-serif; font-size: 14px; background: transparent;\n'
' color: #333; padding: 16px 0; }\n'
'.controls { display: flex; gap: 16px; align-items: center;\n'
' flex-wrap: wrap; margin-bottom: 12px; }\n'
'.controls label { font-weight: 600; font-size: 13px; }\n'
'.controls input, .controls select {\n'
' padding: 6px 10px; border: 1px solid #ccc; border-radius: 4px;\n'
' font-size: 14px; background: #fff; color: #333; }\n'
'.controls input { width: 140px; }\n'
'.meta { font-size: 12px; color: #888; margin-bottom: 12px; }\n'
'.muted { color: #888; font-size: 11px; }\n'
'table { width: 100%; border-collapse: collapse; font-size: 13px;\n'
' font-variant-numeric: tabular-nums; }\n'
'th { text-align: left; padding: 6px 10px; border-bottom: 2px solid #ddd;\n'
' font-weight: 600; font-size: 12px; text-transform: uppercase;\n'
' letter-spacing: 0.03em; color: #666; }\n'
'th.r, td.r { text-align: right; }\n'
'td { padding: 5px 10px; border-bottom: 1px solid #eee; }\n'
'tr:hover td { background: rgba(0,0,0,0.02); }\n'
'.tag { display: inline-block; padding: 1px 6px; border-radius: 3px;\n'
' font-size: 11px; font-weight: 600; }\n'
'.tag-long { background: #dcfce7; color: #166534; }\n'
'.tag-call { background: #dbeafe; color: #1e40af; }\n'
'.tag-put { background: #fee2e2; color: #991b1b; }\n'
'.summary { margin-top: 12px; font-size: 13px; display: flex;\n'
' gap: 24px; font-weight: 500; }\n'
'.summary span { color: #666; font-weight: 400; }\n'
'td.cb { width: 24px; text-align: center; }\n'
'td.cb input { margin: 0; cursor: pointer; }\n'
'tr.excluded td { opacity: 0.35; }\n'
'tr.excluded td.cb { opacity: 1; }\n'
'body.dark { color: #d4d4d4; }\n'
'body.dark .controls input, body.dark .controls select {\n'
' background: #2a2a2a; color: #d4d4d4; border-color: #555; }\n'
'body.dark th { color: #999; border-bottom-color: #444; }\n'
'body.dark td { border-bottom-color: #333; }\n'
'body.dark tr:hover td { background: rgba(255,255,255,0.03); }\n'
'body.dark .tag-long { background: #14532d; color: #86efac; }\n'
'body.dark .tag-call { background: #1e3a5f; color: #93c5fd; }\n'
'body.dark .tag-put { background: #450a0a; color: #fca5a5; }\n'
'body.dark .meta { color: #777; }\n'
'body.dark tr.excluded td { opacity: 0.3; }\n'
'body.dark .summary span { color: #888; }\n'
)
JS = r"""
var DATA = {
equity_only: %s,
full: %s
};
var excluded = {};
function posKey(r) { return r.ticker + '_' + r.type; }
function syncCutoff() {
var cutoff = (parseFloat(document.getElementById('cutoff').value) || 0) / 100;
var mode = document.getElementById('mode').value;
var rows = DATA[mode];
excluded = {};
rows.forEach(function(r) {
if (r.weight < cutoff) excluded[posKey(r)] = true;
});
}
function render() {
var bankroll = parseFloat(document.getElementById('bankroll').value) || 0;
var mode = document.getElementById('mode').value;
var rows = DATA[mode];
var showType = mode === 'full';
// Show mode description
var descEl = document.getElementById('mode-desc');
if (mode === 'equity_only') {
descEl.textContent = 'Long equity positions only. Options are excluded.';
} else {
descEl.textContent = 'Uses option rows as underlying notional exposure targets; exact contracts are inferred.';
}
// All rows shown; excluded rows are greyed out
var active = rows.filter(function(r) { return !excluded[posKey(r)]; });
var totalWeight = active.reduce(function(s, r) { return s + r.weight; }, 0);
var allocated = 0;
var unsizedTarget = 0;
var unknownCostTarget = 0;
var computed = rows.map(function(r) {
var key = posKey(r);
var isExcl = !!excluded[key];
var adjWeight = (!isExcl && totalWeight > 0) ? r.weight / totalWeight : 0;
var dollarAlloc = bankroll * adjWeight;
var multiplier = r.multiplier || 1;
var isOption = r.instrument === 'option';
var sizingPrice = isOption ? r.underlying_price : r.price;
if (!isExcl && !sizingPrice) unsizedTarget += dollarAlloc;
if (!sizingPrice || isExcl) return { ticker: r.ticker, type: r.type, weight: r.weight,
adjWeight: adjWeight, target: dollarAlloc,
excluded: isExcl, key: key,
instrument: r.instrument || 'stock',
strike: r.strike || null, expiry: r.expiry || null,
underlyingPrice: r.underlying_price || null,
priceAsOf: r.price_as_of || null,
price: r.price, units: null, cost: null };
var units = Math.floor(dollarAlloc / (sizingPrice * multiplier));
var cost = r.price ? units * r.price * multiplier : null;
if (cost != null) allocated += cost;
if (!isExcl && isOption && !r.price && units > 0) unknownCostTarget += dollarAlloc;
return { ticker: r.ticker, type: r.type, weight: r.weight,
adjWeight: adjWeight, target: dollarAlloc,
excluded: isExcl, key: key, instrument: r.instrument || 'stock',
strike: r.strike || null, expiry: r.expiry || null,
underlyingPrice: r.underlying_price || null,
priceAsOf: r.price_as_of || null,
price: r.price, units: units, cost: cost };
});
var html = '<table><thead><tr>';
html += '<th></th><th>Ticker</th>';
if (showType) html += '<th>Type</th><th class="r">Strike</th><th class="r">Expiry</th>';
html += '<th class="r">Weight</th>';
html += '<th class="r">Target</th>';
html += '<th class="r">Price</th>';
html += '<th class="r">Units</th><th class="r">Cost</th></tr></thead><tbody>';
computed.forEach(function(c) {
html += '<tr' + (c.excluded ? ' class="excluded"' : '') + '>';
html += '<td class="cb"><input type="checkbox" data-key="' + c.key + '"' + (c.excluded ? '' : ' checked') + '></td>';
html += '<td><strong>' + c.ticker + '</strong></td>';
if (showType) {
var cls = c.type === 'put' ? 'tag-put' : c.type === 'call' ? 'tag-call' : 'tag-long';
html += '<td><span class="tag ' + cls + '">' + c.type + '</span></td>';
if (c.strike) {
html += '<td class="r">$' + c.strike.toFixed(0) + '</td>';
html += '<td class="r">' + (c.expiry || '\u2014') + '</td>';
} else {
html += '<td class="r">\u2014</td><td class="r">\u2014</td>';
}
}
html += '<td class="r">' + (c.excluded ? '0.0' : (c.adjWeight * 100).toFixed(1)) + '%%</td>';
html += '<td class="r">' + (c.excluded ? '$0.00' : '$' + c.target.toFixed(2)) + '</td>';
var priceText = c.price != null ? '$' + c.price.toFixed(2) : 'N/A';
if (c.instrument === 'option' && c.priceAsOf) {
priceText += '<br><span class="muted">' + c.priceAsOf + '</span>';
}
html += '<td class="r">' + priceText + '</td>';
html += '<td class="r">' + (c.units != null ? c.units.toLocaleString() : 'N/A') + '</td>';
html += '<td class="r">' + (c.cost != null ? '$' + c.cost.toFixed(2) : 'N/A') + '</td>';
html += '</tr>';
});
html += '</tbody></table>';
html += '<div class="summary">';
html += '<div><span>Allocated:</span> $' + allocated.toFixed(2) + '</div>';
html += '<div><span>Unknown-cost target:</span> $' + unknownCostTarget.toFixed(2) + '</div>';
html += '<div><span>Unsized target:</span> $' + unsizedTarget.toFixed(2) + '</div>';
html += '<div><span>Known residual:</span> $' + (bankroll - allocated - unsizedTarget).toFixed(2) + '</div>';
html += '</div>';
document.getElementById('output').innerHTML = html;
// Auto-resize iframe to fit content
try {
var el = window.frameElement;
if (el) el.style.height = document.body.scrollHeight + 'px';
} catch(e) {}
}
document.getElementById('bankroll').addEventListener('input', render);
document.getElementById('mode').addEventListener('change', function() { syncCutoff(); render(); });
document.getElementById('cutoff').addEventListener('input', function() { syncCutoff(); render(); });
document.getElementById('output').addEventListener('change', function(e) {
if (e.target.type === 'checkbox' && e.target.dataset.key) {
if (e.target.checked) {
delete excluded[e.target.dataset.key];
} else {
excluded[e.target.dataset.key] = true;
}
render();
}
});
// Dark mode
function isDark() {
try { return parent.document.documentElement.getAttribute('data-theme') === 'dark'; }
catch(e) { return window.matchMedia('(prefers-color-scheme: dark)').matches; }
}
function applyTheme() {
document.body.classList.toggle('dark', isDark());
}
applyTheme();
try {
new MutationObserver(applyTheme).observe(
parent.document.documentElement,
{ attributes: true, attributeFilter: ['data-theme'] });
} catch(e) {}
render();
""" % (json.dumps(eq_data), json.dumps(full_data))
BODY = (
'<div class="controls">\n'
' <label for="bankroll">Bankroll ($)</label>\n'
' <input type="number" id="bankroll" value="10000" min="0" step="100">\n'
' <label for="mode">Mode</label>\n'
' <select id="mode">\n'
' <option value="equity_only" selected>Equity only</option>\n'
' <option value="full">Full exposure</option>\n'
' </select>\n'
' <label for="cutoff">Cutoff (%%)</label>\n'
' <input type="number" id="cutoff" value="0" min="0" max="100"'
' step="0.5" style="width:80px">\n'
'</div>\n'
'<div id="mode-desc" class="meta" style="font-style:italic"></div>\n'
'<div class="meta">\n'
' %s filing (filed %s) · underlying prices as of %s\n'
'</div>\n'
'<div id="output"></div>\n'
) % (quarter, filing_date, today)
html = (
'<!DOCTYPE html>\n<html>\n<head>\n'
'<meta charset="utf-8">\n'
'<meta http-equiv="Cache-Control" content="no-cache, no-store, must-revalidate">\n'
'<style>\n' + CSS + '</style>\n'
'</head>\n<body>\n'
+ BODY
+ '<script>\n' + JS + '\n</script>\n'
'</body>\n</html>'
)
outpath = os.path.join(HUGO_BASE, 'static', 'images', 'sa-lp-calculator.html')
with open(outpath, 'w') as f:
f.write(html)
Based on the five filings to date, the fund files within 0–3 days of the 45-day deadline:
| Quarter end | 45-day deadline | Actual filing | Days early |
|---|---|---|---|
| 2024-12-31 | Feb 14 | Feb 12 | 2 |
| 2025-03-31 | May 15 | May 14 | 1 |
| 2025-06-30 | Aug 14 | Aug 14 | 0 |
| 2025-09-30 | Nov 14 | Nov 14 | 0 |
| 2025-12-31 | Feb 14 | Feb 11 | 3 |
So expect new disclosures around February 14, May 15, August 14, and November 14. If you decide to implement the copycat strategy, consider setting a calendar reminder. I’ll try to keep this note updated, but please let me know if anything looks outdated.
With thanks to Bastian Stern and Jonas Vollmer for comments.
The code still contains utilities for caching representative option contracts, keyed by ticker and option type, because the calculator can use them to estimate contract counts and premium costs. Those estimates are separate from the return table, where the options book is modeled only as underlying directional exposure. ↩︎
The single-switch assumption is a simplification: the fund likely makes multiple trades throughout the quarter. But since we only observe quarter-end snapshots, the uniform single-switch model is the most we can extract from the data. There is also a one-time boundary cost—the return of the initial portfolio during the first ~45 days before the copycat sees the first filing—but this is a startup effect, not a recurring feature of the delay, so we omit it from the analysis. ↩︎
For the most recent transition, where \(Q_{i+1}\) is not yet available, the gap estimate falls back to the simple comparison of \(Q_i\) vs \(Q_{i-1}\). A more accurate way to model this is to compute the historical returns using this simple comparison and compare them to the returns calculated as we now do, and then adjust the returns from the simple comparison for the most recent transition accordingly. This adjustment will increase the cumulative delay costs a bit, but probably not too much to bother: it only affects ~45/135 ≈ 33% of the days in the last quarter. ↩︎