实时期权分析数据会不断变化:隐含波动率会发生变化,各种衍生参数也会出现变动,甚至几分钟后,这些数据的呈现方式也可能有所不同。
然而,许多团队仍然将这些数据视为可以随意查看一次的信息——比如在演示文稿中添加一张截图,或者在会议前快速在用户界面中查看一下而已。
但当你需要回答在实际工作中会出现的一些基本问题时,这种处理方式就会遇到麻烦:
比如:TSLA的期权分析数据在10:32时是什么样的?偏度开始加剧是在什么时候?这种变化是由于“翼部”参数的变化还是“平价期权价格”参数的变化引起的?
如果你没有及时保存这些数据,就无法重新查看它们、进行对比或进行审计。你只能依赖当时看到的那些数据。
在本次教程中,我们将构建一个简单但实用的工具:一个内部数据库。这个数据库会持续记录SpiderRock MLink为TSLA提供的实时期权分析数据,将每一份分析结果保存为可供查询的历史数据,并同时维护一个“最新视图”表格,这样你就可以直接获取当前的数据分析结果,而无需浏览全部历史数据。
我们的目标并不是构建一个交易系统,而是建立一个可靠的内部数据集,以便你可以对其进行监控和查询。
注意:SpiderRock MLink提供的实时期权分析服务是需要付费使用的,其中还包括用于生成这些数据的底层市场数据的交易所费用。
目录
先决条件
在开始执行本教程中的任何代码之前,你需要确保满足以下几项要求。
在API方面,您需要一个能够访问LiveImpliedQuote数据源的SpiderRock MLink账户。示例中使用了REST接口,因此不需要进行WebSocket配置,但您确实需要一个有效的API密钥。如果您还没有这样的密钥,可以直接联系SpiderRock来获取访问权限。
在Python方面,所需的环境非常简单。由于某个函数签名中使用了元组类型提示语法,因此您需要Python 3.10或更高版本。所需的外部包包括requests、pandas、numpy和matplotlib,其余的sqlite3、time、datetime等都属于标准库。您可以通过以下命令安装这些外部依赖项:
pip install requests pandas numpy matplotlib
除了需要一个可写的本地路径外,不需要进行任何数据库配置。SQLite会在程序首次运行时自动创建相关文件,因此无需另行安装或配置任何内容。
最后,本示例以TSLA作为目标标的,因为该标的的期权交易非常活跃且流动性良好。如果您想使用其他标的,只需修改配置文件中的标的代码即可。
我们使用的数据
本次开发所依赖的数据来自SpiderRock MLink提供的某种OptAnalytics消息类型:LiveImpliedQuote。

每条消息都代表一个期权合约,并包含了进行监控所需的各项分析数据:
-
期权合约的标识信息(标的代码、到期日、执行价格、看涨或看跌期权类型)
-
表面波动率(sVol)及相关数值字段
-
希腊字母值(delta、gamma、theta、vega)
-
标的资产价格(uPrc)、到期时间(以年为单位)以及利率等背景信息
-
时间戳信息以及计算过程中的标记数据,这些在将实时数据导入数据库时非常有用
在本文中,我们将sVol视为主要的波动率指标,并将其称为“表面波动率”。这样,在后续重新构建期权价格曲线或根据历史数据计算偏度代理值时,工作流程能够保持一致性。
之所以选择TSLA作为示例标的,是因为该标的的期权交易非常活跃,因此即使在较短的时间窗口内,数据库中的数据和查询结果也会显得非常有参考价值。同样的分析方法也适用于其他任何标的,只需更改相应的标的代码即可。
环境搭建:导入所需包
在开始操作数据库或API之前,我们首先会搭建一个简单且可重复使用的开发环境。这一部分的配置内容非常简洁,因为我们只需要导入那些用于进行REST请求、将数据存储到SQLite数据库中以及进行基本数据分析与绘图所需的库而已。
import requests
import sqlite3
import pandas as pd
import numpy as np
import time
from datetime import datetime, timezone
import matplotlib.pyplot as plt
plt.style.use('ggplot')
-
requests用于调用MLink的REST接口。 -
sqlite3提供了一个轻量级的数据库,我们可以直接在本地对其进行写入操作,而无需进行额外的配置。 -
pandas和numpy主要用于在数据返回后对它们进行整理和筛选。 -
time和datetime帮助我们实现轮询机制,并为每份数据添加时间戳,从而使数据库中的数据能够反映实时变化情况。
数据库设计
如果我们的目标是让实时分析结果能够被查询到,那么数据库的设计就必须满足两种不同的需求。
首先,我们需要保留所有的数据记录,这样就可以在需要时还原出特定时间点的市场状况。
其次,我们还需要一种快速的方法来获取“当前的市场情况”,而无需遍历所有存储的数据。
因此,我们使用了两个表格:
-
implied_quote_history:这种表格仅支持数据的追加操作,每次轮询都会生成一份完整的数据快照。 -
implied_quote_latest:每个期权合约对应一行数据。每次轮询都会更新这个表格,使其始终反映最新的数据状态。
这两个表格的核心是使用一个稳定的标识符来唯一区分不同的期权合约。在数据流中,期权的各种信息是以嵌套的形式存在的,因此我们将其规范化为一个个option_key字符串,其中包含了符号、到期时间、执行价格以及交易类型等信息。这个option_key既是implied_quote_latest表格的主键,也是进行数据查询时的主要关联键。
#config
api_key = "YOUR SPIDERROCK API KEY"
mlink_url = "https://mlink-live.nms.saturn.spiderrockconnect.com/rest/json"
msg_type = "LiveImpliedQuote"
symbol = "TSLA"
poll_interval_s = 10
poll_duration_s = 120
limit = 2000
#create db connection
db_path = "/mnt/data/optanalytics_iv_greeks.db"
def get_conn(path: str = db_path):
conn = sqlite3.connect(path)
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("PRAGMA synchronous=NORMAL;"
return conn
-create db schema
def setup_db(path: str = db_path):
conn = get_conn(path)
cur = conn.cursor()
cur.execute("""
create table if not exists implied_quote_history (
id integer primary key autoincrement,
asof_ts text not null,
option_key text not null,
symbol text not null,
expiry text not null,
strike real not null,
cp text not null,
calc_source text,
u_prc real,
years real,
rate real,
s_vol real,
atm_vol real,
s_mark real,
o_bid real,
o_ask real,
oBid_iv real,
oAsk_iv real,
delta real,
gamma real,
theta real,
vega real,
src_ts text
);
""")
cur.execute("""
create index if not exists idx_hist_symbolexpiry_asof
on implied_quote_history(symbol, expiry, asof_ts);
""")
cur.execute:「
create index if not exists idx_hist_option_asof
on implied_quote_history(option_key, asof_ts);
""")
cur.execute:「
create table if not exists implied_quote_latest (
option_key text primary key,
last_asof_ts text not null,
symbol text not null,
expiry text not null,
strike real not null,
cp text not null,
calc_source text,
u_prc real,
years real,
rate real,
s_vol real,
atm_vol real,
s_mark real,
o_bid real,
o_ask real,
oBid_iv real,
oAsk_iv real,
delta real,
gamma real,
theta real,
vega real,
src_ts text
);
""")
cur.execute:「
create index if not exists idx_latest_symbolexpiry
on implied_quote_latest(symbol, expiry);
""")
conn.commit()
conn.close()
setup_db()
这会创建SQLite数据库文件以及两个表格。其中,“历史记录表”仅支持追加数据操作,并且为后续要执行的两种查询进行了索引优化:一种是根据到期时间和具体时间提取数据快照,另一种则是通过option_key获取特定选项的时间线信息。而另一个表格则以option_key作为键值字段,这样我们就可以进行插入和更新操作,从而保持“当前视图”的一致性。
我们选择存储的这些字段都是具有代表性的数据。我们会记录表面隐含波动率(s_vol)、表面标记(s_mark)、希腊字母数值以及一些上下文相关信息。同时,我们还会保存时间戳,以便后续能够确定某个数值是在何时生成的。
获取实时隐含报价信息
现在我们来进行第一次实时数据获取操作。这样做的目的并不是为了构建一个完美的过滤系统,而是为了确认我们确实能够获取到有意义的TSLA期权分析数据,并且确认返回的数据结构符合我们的预期。
我们通过where子句按照符号来筛选数据。响应结果会是一个列表,其中大部分行实际上都是实时隐含报价信息,而列表的最后一行则包含查询结果的摘要信息。
def fetch_live_implied_quote(symbol: str, limit: int = 2000):
where = f"okey.tk:eq:{symbol」
params = {
"apiKey": api_key,
"cmd": "getmsgs",
"msgType": msg_type,
"where": where,
"limit": limit
}
r = requests.get(mlink_url, params=params)
r.raise_for_status()
return r.json()
raw = fetch_live_implied_quote(symbol, limit=limit)
print("原始数据信息:", len(raw))
print("第一条记录的信息:", raw[0].get("header", "").get("mTyp") if raw else None)
这实际上是一个简单的REST请求,使用的是getmsgs方法。我们传入了API密钥、消息类型以及符号筛选条件。limit参数非常重要,它决定了每次请求能够返回多少条数据。对于那些交易活跃的标的资产来说,不同时间点查询得到的执行价格和到期时间可能会有所不同。不过对于本教程而言,这种差异并不重要,因为我们的目的是展示数据库的结构以及这类监控查询所能实现的功能。
你应该会看到如下输出结果:

将响应结果转换为表格格式
目前,raw变量中保存的是一系列嵌套的消息对象。这种格式在数据传输过程中没有问题,但直接用它来存储数据或进行查询是不现实的。因此,我们现在需要将每一条实时隐含报价信息转换成结构统一的表格记录。
def make_option_key(okey: dict) -> str:
return "|".join([
str(okey.get("tk")),
str(okey.get("dt')),
str(okey.get("xx"));
str(okey.get("cp"));
str(okey.get("at"));
str(okey.get("ts"));
])
def normalize_liq(raw: list, asof_ts: str, keep_calc_source: str = "Loop") -> pd.DataFrame:
rows = []
for row in raw:
if row.get("header", "").get("mTyp") != "LiveImpliedQuote":
continue
m = row.get("message", "")
if keep_calc_source and m.get("calcSource") != keep_calc_source:
continue
pkey = m.get("pkey", "")
okey = pkey.get("okey", "")
if not okey:
continue
s_vol = m.get("sVol")
if s_vol is None or s_vol == 0:
continue
o_bid = m.get("oBid", 0) or 0
o_ask = m.get("oAsk", 0) or 0
quote_ok = int(not (o_bid == 0 and o_ask == 0))
rows.append({
"asof_ts": asof_ts,
"option_key": make_option_key(okey),
"symbol": okey.get("tk"),
"expiry": okey.get("dt"),
"strike": okey.get("xx"),
"cp": okey.get("cp"),
"calc_source": m.get("calcSource"),
"u_prc": m.get("uPrc"),
"years": m.get("years"),
"rate": m.get("rate),
"s_vol": s_vol,
"atm_vol": m.get("atmVol"),
"s_mark": m.get("sMark"),
"o_bid": oBid,
"o_ask": oAsk,
"o_bid_iv": m.get("oBidIv"),
"o_ask_iv": m.get("oAskIv"),
"quote_ok": quote_ok,
"delta": m.get("de"),
"gamma": m.get("ga"),
"theta": m.get("th"),
"vega": m.get("ve),
"src_ts": m.get("timestamp"),
})
df = pd.DataFrame(rows)
if df.empty:
return df
df = (
df.sort_values("src_ts")
.drop_duplicates(subset=["option_key"], keep="last")
.reset_index(drop=True)
)
return df
asof_ts = datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
snapshot_df = normalize_liq(raw, asof_ts)
print("数据快照记录数:", len(snapshot_df))
print("quote_ok值分布情况:", snapshot_df["quote_ok"].value_counts().to_dict() if not snapshot_df.empty else{})
snapshot_df.head()
在这一步规范化处理中,实际上包含了三个关键的决策:
-
首先,我们会根据选项标识符生成一个稳定的
option_key,这样最新的数据表就能拥有一个统一的主键。 -
其次,我们只保留
calcSource="Loop"这种类型的记录。 -
第三,我们避免使用过于严格的过滤条件。在这个数据集中,即使分析字段已经填充了数据,买卖报价字段也可能显示为0。因此,我们不会直接删除这些记录,而是设置一个
quote_ok标志来保留这些记录。这样既能保证数据管道的正常运行,也能在后续清楚地识别出哪些记录包含实时报价信息。
最终得到的结果如下:

此时,每一行数据都代表一个期权合约的快照。如果quote_ok的值为0,那就意味着这一行的买卖报价信息尚未被填充,尽管其他分析字段都是存在的。不过,这种设计对于构建监控数据库来说仍然非常有用,因为我们的目标主要是追踪各项分析指标随时间的变化情况,而不是重新构建可执行的市场模型。
将数据写入数据库
现在我们已经得到了一个结构清晰的数据帧,接下来需要将其保存到两个地方。
历史数据表:将所有数据追加进去。这个表格相当于审计日志。最新数据表:根据option_key进行插入或更新操作。这个表格可以快速反映当前的“实时状态”。
这种分离机制使得数据库具备了实际价值。通过历史数据表,我们可以还原任何过去的快照;而最新数据表则让我们能够立即了解“当前市场的情况”,而无需逐一查看时间序列数据。
def safe_add_column(table: str, col: str, col_type: str, path: str = db_path):
conn = get_conn(path)
cur = conn.cursor()
existing = [r[1] for r in cur.execute(f"PRAGMA table_info({table});").fetchall()]
if col not in existing:
cur.execute(f"ALTER TABLE {table} ADD COLUMN {col} {col_type};")
conn.commit()
conn.close()
safe_add_column("implied_quote_history", "quote_ok", "INTEGER")
safe_add_column("implied_quote_latest", "quote_ok", "INTEGER")
def write_snapshot_to_db(df: pd.DataFrame, path: str = db_path) -> tuple[int, int]:
if df.empty:
return 0, 0
conn = get_conn(path)
cur = conn.cursor()
cols = [
"asof_ts",
"option_key","symbol","expiry","strike","cp",
"calc_source","u_prc","years","rate",
"s_vol","atm_vol","s_mark",
"o_bid","o_ask","oBid_iv","oAsk_iv",
"delta","gamma","theta","vega",
"quote_ok","src_ts"
]
for c in cols:
if c not in df.columns:
df[c] = None
insert_df = df[cols].copy()
cur.executemany(
"""
insert into implied_quote_history (
asof_ts,
option_key, symbol, expiry, strike, cp,
calc_source, u_prc, years, rate,
s_vol, atm_vol, s_mark,
o_bid, o_ask, oBid_iv, oAsk_iv,
delta, gamma, theta, vega,
quote_ok, src_ts
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
insert_df.itertuples(index=False, name=None)
)
history_inserted = cur.rowcount
cur.executemany(
"""
insert into implied_quote_latest (
option_key,
last_asof_ts, symbol, expiry, strike, cp,
calc_source, u_prc, years, rate,
s_vol, atm_vol, s_mark,
o_bid, o_ask, oBid_iv, oAsk_iv,
delta, gamma, theta, vega,
quote_ok, src_ts
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
on conflict(option_key) do update set
last_asof_ts=excluded.last_asof_ts,
symbol=excluded.symbol,
expiry=excluded.expiry,
strike=excluded.strike,
cp=excluded.cp,
calc_source=excluded(calc_source),
u_prc=excluded.u_prc,
years=excluded.years,
rate=excluded.rate,
s_vol=excluded.s_vol,
atm_vol=excluded.atm_vol,
s_mark=excluded.s_mark,
o_bid=excluded.oBid,
o_ask=excluded.o_ask,
oBid_iv=excluded.oBid_iv,
oAsk_iv=excluded.oAsk_iv,
delta=excluded.delta,
gamma=excluded.gamma,
theta=excluded(theta),
vega=excluded.vega,
quote_ok=excluded.quote_ok,
src_ts=excluded.src_ts
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
on conflict(option_key) do update set
last_asof_ts=excluded.last_asof_ts,
symbol=excluded.symbol,
expiry=excluded.expiry,
strike=excluded.strike,
cp=excluded.cp,
calc_source=excluded.calc_source,
u_prc=excluded.u_prc,
years=excluded.years,
rate=excluded.rate,
s_vol=excluded.s_vol,
atm_vol=excluded.atm_vol,
s_mark=excluded.s_mark,
o_bid=excluded.oBid,
o_ask=excluded.o_ask,
oBid_iv=excluded.oBid_iv,
oAsk_iv=excluded.oAsk_iv,
delta=excluded.delta,
gamma=excluded.gamma,
theta=excluded(theta),
vega=excluded.vega,
quote_ok=excluded.quote_ok,
src_ts=excluded.src_ts
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
on conflict(option_key) do update set
last_asof_ts=excluded.last_asof_ts,
symbol=excluded.symbol,
expiry=excluded.expiry,
strike=excluded.strike,
cp=excluded.cp,
calc_source=excluded(calc_source),
u_prc=excluded.u_prc,
years=excluded.years,
rate=excluded.rate,
s_vol=excluded.s_vol,
atm_vol=excluded.atm_vol,
s_mark=excluded.s_mark,
o_bid=excluded.oBid,
o_ask=excluded.o_ask,
oBid_iv=excluded.oBid_iv,
oAsk_iv=excluded.oAsk_iv,
delta=excluded.delta,
gamma=excluded.gamma,
theta=excluded(theta),
vega=excluded.vega,
quote_ok=excluded.quote_ok,
src_ts=excluded.src_ts
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
insert_df[[
"option_key","asof_ts","symbol","expiry","strike","cp",
"calc_source","u_prc","years","rate",
"s_vol","atm_vol","s_mark",
"o_bid","o_ask","oBid_iv","oAsk_iv",
"delta","gamma","theta","vega",
"quote_ok","src_ts"
]].itertuples(index=False, name=None)
)
latest_upserted = cur.rowcount
conn.commit()
conn.close()
return history_inserted, latest_upserted
hist_n, latest_n = write_snapshot_to_db(snapshot_df)
print("历史数据插入数量:", hist_n)
print("最新数据更新数量:", latest_n)
我们使用executemany进行批量写入操作,因此即使有成千上万的选项记录,插入速度依然很快。历史数据的管理也非常简单:最新的数据会通过基于option_key键值的SQLite upsert操作进行写入;如果该合约已经存在于数据库中,其字段就会被更新为最新的数据。
您应该会看到如下结果:

在第一次写入数据后,两个表格中的记录数量将会相同。这是预期之中的结果,因为目前历史记录中只保存了一条数据。一旦我们开始获取多条历史记录,历史表格的数量就会随着每次数据采集而增加,而最新数据的表格数量则基本保持不变,会持续接收新的数据并进行更新。
执行短期数据采集操作
目前,这个数据处理流程是针对单条历史记录来运行的。不过,数据库的真正价值在于能够将实时分析数据转化为时间序列数据。因此,我们会运行一段短暂的采集周期,并连续保存多条历史记录。
这个程序并不是为生产环境设计的调度工具,它只是一个简单的循环程序:会运行几分钟,每隔几秒钟进行一次数据采集,为每条记录添加时间戳,然后将其写入两个表格中。
def poll_and_write(symbol: str, duration_s: int = poll_duration_s, interval_s: int = poll_interval_s):
start = time.time()
polls = 0
total_hist = 0
while time.time() - start < duration_s:
asof_ts = datetime.now(timezone.utc).isoformat_timespec="seconds").replace("+00:00", "Z")
raw = fetch_live_implied_quote(symbol, limit=limit)
df = normalize_liq(raw, asof_ts)
hist_n, latest_n = write_snapshot_to_db(df)
polls += 1
total_hist += hist_n
print(f"[{polls}] {asof_ts}条记录被写入历史表格;最新数据中包含{hist_n}条记录,最近一次更新添加了{latest_n}条记录")
time.sleep(interval_s)
print(f"采集完成。共进行了{polls}次数据采集,总共添加了{total_hist}条历史记录")
poll_and_write(symbol, duration_s=120, interval_s=10)
每次循环迭代都代表生成一条新的历史记录。我们会首先获取UTC时间戳,然后从LiveImpliedQuote服务中提取最新的数据,将其整理成表格格式,最后写入数据库。历史表格会逐渐积累所有这些记录;而最新数据的表格则会根据option_key进行更新,因此它始终能反映最当前的数据状态。
这里还有一个实际需要注意的细节:API调用是有限制的,因此每次数据采集时获取到的执行价格和到期时间可能并不完全相同。这就是为什么snapshot_rows在每次迭代中可能会有所不同的原因。
在实际生产环境中,通常会通过固定某些到期时间和执行价格范围,或者将隐含波动率插值到特定的平价点上来,来确保数据采集的稳定性。不过在这个教程中,我们为了简化操作流程,主要关注数据库的设计结构以及它所能支持的监控功能。
你应该能看到如下所示的每次民意调查后的数据统计结果:
[1] 2026-04-14T18:09:29Z snapshot_rows=1454 history+=1454 latest_upsert=1454
...
完成。共进行了9次民意调查,总共添加了12,806条数据记录。
这证明了数据库正在构建时间序列数据。在进行了9次民意调查后,你向历史数据表中存储了12,806条期权相关记录。最新的数据表会每次都被更新,但其数据增长方式与历史数据表不同,因为它是按照每份合约的键来覆盖原有数据的。
从下一节开始,我们将停止数据写入操作,转而开始进行数据查询。
分析:从数据库中重建波动率曲线
一旦数据被存储到implied_quote_history表中,工作流程就会发生改变。我们不再考虑“API响应”,而是开始使用“查询语句”来处理数据。这一节主要做两件事:首先,选择那些包含足够多数据的到期日;然后,针对这些到期日在多个时间点上重建波动率曲线。
选择具有良好覆盖范围的到期日
如果你选择的到期日在捕获的数据中仅偶尔出现,那么绘制的波动率曲线就会产生误导。因此,我们首先会查看历史数据表中哪些到期日的记录数量最多。
conn = get_conn()
expiry_counts = pd.read_sql_query(
"""
select expiry, count(*) as n
from implied_quote_history
where symbol = ?
group by expiry
order by n desc
limit 10
""",
conn,
params=(symbol,)
)
conn.close()
expiry_counts
这条查询语句仅扫描历史数据表,筛选出TSLA相关的数据,并统计在指定时间窗口内每个到期日对应的期权记录数量。我们会保留排名前10的到期日,然后选择其中第一个作为用于重建波动率曲线的到期日。

到期日2026-11-20对应的记录数量最多。
需要注意的是,这里的记录数量并不意味着这个到期日在交易方面就是“最佳选择”;它只是因为在捕获的数据中出现的频率最高而已。因此,选择一个这样的到期日作为比较对象是合理的。
根据不同时间点的数据重建波动率曲线
现在,我们会针对某个选定的到期日,从存储的历史数据中提取仅与看涨期权相关的数据,然后绘制多个时间点上的隐含波动率与执行价格之间的关系图。
chosenexpiry = "2026-11-20"
conn = get_conn()
smile = pd.read_sql_query(
"""
select asof_ts, strike, cp, s_vol, u_prc
from implied_quote_history
where symbol = ? and expiry = ?
""",
conn,
params=(symbol, chosenexpiry)
)
conn.close()
smile_calls = smile[smile["cp"] == "Call"].copy()
ts_list = sorted(smile_calls["asof_ts"].unique())
pick = [ts_list[0], ts_list[len(ts_list)//2], ts_list[-1]]
plt.figure(figsize=(9,5))
for ts in pick:
g = smile_calls[smile_calls["asof_ts"] == ts].sort_values("strike")
plt.plot(g["strike"], g["s_vol"], label=ts)
plt.title(f"{symbol}波动率曲线(看涨期权)| 到期日 {chosenexpiry} | 3个时间点的数据")
plt.xlabel("执行价格")
plt.ylabel("隐含波动率")
plt.grid(True)
plt.legend()
plt.show()
我们会从历史数据中提取所有在所选到期日之前的交易记录,然后筛选出仅与看涨期权相关的数据,以避免将看跌期权和看涨期权的数据混在一起。为了使图表易于阅读,我们只绘制三个时间点的数据:第一个时间点、中间时间点和最后一个时间点。

在较短的时间窗口内,这些波动率微笑曲线的形态往往会严重重叠。但这并不意味着系统出现了故障,通常只是说明在那两分钟内市场价格没有发生太大变化。重要的是,我们完全可以仅通过存储的历史数据来重新构建并分析这些图表。
放大显示特定区域
虽然全范围的图表有助于观察整体形态,但它可能会掩盖人们真正关心的那些细微变化。因此,我们会将视野缩小到围绕标的资产价格的一个特定区间内进行观察。
s0 = float(smile_calls["u_prc"].dropna().median())
low, high = s0 * 0.6, s0 * 1.4
for ts in pick:
g = smilecalls[smile_calls["asof_ts"] == ts].sort_values("strike")
g = g[(g["strike"] >= low) & (g["strike"] <= high)]
plt.plot(g["strike"], g["s_vol"], label=ts)
plt.title(f"{symbol} 波动率微笑曲线(看涨期权)| 到期日 {chosenexpiry} | 放大显示")
plt.xlabel("执行价格")
plt.ylabel("隐含波动率 (s_vol)")
plt.grid(True)
plt.legend(fontsize=8)
plt.show()
我们会从存储的u_prc数据中选取一个代表当前市场水平的数值,然后将所有执行价格保持在这个数值附近的范围内进行展示。我们的目标并不是追求绝对精确性,而是为了让图表更易于阅读,并能够清楚地看出接近平价区的波动趋势。

通过这种方式,即使是微小的变化也能被清晰地显示出来。这也正是为什么存储历史数据如此重要的原因——如果只看某个单独的时间点的数据,这些变化很容易被忽略或忽视掉。
分析:不平价区间隐含波动率及随时间的变化趋势
虽然完整的波动率微笑曲线图表很有用,但它并不总是监测市场走势的最快捷方式。在实际操作中,团队通常会为每个到期日跟踪几个关键指标,这样就能快速发现异常变化,只有在发现问题时才会进一步深入分析。
在这里,我们将每个存储的历史数据点简化为两个指标,用于反映某个特定到期日的市场情况。
-
不平价区间隐含波动率:指最接近当前市场水平的执行价格对应的隐含波动率。
-
偏度代理指标:通过使用最接近的执行价格,计算出0.9倍当前市场水平与1.1倍当前市场水平对应的隐含波动率之差,以此来反映市场的偏度情况。
chosenexpiry = "2026-11-20"
conn = get_conn()
df = pd.read_sql_query(
"""
select asof_ts, strike, s_vol, u_prc
from implied_quote_history
where symbol = ? and expiry = ? and cp = 'Call'
""",
conn,
params=(symbol, chosenexpiry)
)
conn.close()
df["strike"] = df["strike"].astype(float)
df["s_vol"] = df["s_vol"].astype(float)
def closest_iv(grp: pd.DataFrame, target_strike: float):
g = grp.iloc[(grp["strike"] - target_strike).abs().argsort()[:1]]
return float(g["s_vol"].iloc[0]), float(g["strike"].iloc[0])
rows = []
for ts, grp in df.groupby("asof_ts"):
spot = float(grp["u_prc"].dropna().median())
atm_target = spot
down_target = spot * 0.9
up_target = spot * 1.1
atm_iv, atm_k = closest_iv(grp, atm_target)
down_iv, down_k = closest_iv(grp, down_target)
up_iv, up_k = closest_iv(grp, up_target)
rows.append({
"asof_ts": ts,
"spot": spot,
"atm_strike": atm_k,
"atm_iv": atm_iv,
"k90": down_k,
"iv_90": down_iv,
"k110": up_k,
"iv_110": up_iv,
"skew_90_110": down_iv - up_iv
})
metrics = pd.DataFrame(rows).sort_values("asof_ts").reset_index(drop=True)
metrics
我们查询历史数据表,找出所有到期的期权交易记录,并仅保留相关的调用信息,然后按照快照时间戳对这些数据进行处理。对于每个快照,我们使用中位数u_prc作为“即期代理价格”,并选择最接近该即期价格的可用执行价格,从而计算出ATM隐含波动率。我们对0.9倍于即期价格和1.1倍于即期价格的情况也采用同样的方法进行计算,最后将这两种计算结果之间的差值作为“偏度代理价格”。
该表格还记录了实际使用的执行价格(包括atm_strike、k90、k110)。由于期权执行价格是离散的,因此在不同的快照之间,最接近的执行价格可能会发生变化。将所选的执行价格明确显示出来,可以使这些指标在数值变化时更加易于理解。
最终生成的表格中,每个快照时间戳对应一行数据,其中包含了计算出的各项指标值。

现在我们已经得到了一个结构清晰的时间序列表格,接下来就可以将这些指标可视化展示了。首先展示ATM隐含波动率,然后再展示偏度代理价格。
plt.plot(metrics["asof_ts"], metrics["atm_iv"])
plt.title(f"{symbol} 随时间变化的ATM隐含波动率 | 到期日 {chosenexpiry}")
plt.xticks(rotation=30, ha="right")
plt.ylabel("ATM隐含波动率 (标准差)")
plt.grid(True)
plt.show()
plt.plot(metrics["asof_ts"], metrics["skew_90_110"])
plt.title(f"{symbol} 偏度代理价格 (IV@0.9S - IV@1.1S) | 到期日 {chosenexpiry}")
plt.xticks(rotation=30, ha="right")
plt.ylabel("偏度代理价格")
plt.grid(True)
plt.show()
这是第一张图表,展示了ATM隐含波动率随时间的变化情况。

通常情况下,ATM隐含波动率在较短的时间窗口内变化幅度较小,除非发生剧烈的价格重估事件。在这次分析中,ATM隐含波动率保持相对稳定,这对于短期分析来说是一个符合实际情况的结果。数据库将这种“相对稳定”的状态转化为可以量化和比较的数据,而不是一个模糊的概念,这一点非常有用。
这是第二张图表,展示了偏度代理价格随时间的变化情况。

偏度代理价格对市场变化更为敏感,因为它的计算是基于期权价格的“翼点”进行的。当这些翼点发生变化时,通常意味着该到期日的期权价格下行幅度与上行幅度的变化情况不同。需要注意的是,在不同的快照之间,最接近的可用执行价格可能会发生变化,这可能会导致指标出现阶梯状的波动,即使市场整体价格并没有发生剧烈变动。因此,我们在指标表格中同时包含了k90和k110这两个数据,这样就可以使偏度图更加容易理解了。
警报式阈值设置
一旦为每个快照建立了相应的指标表格,添加监控功能就变得非常简单了。我们的目的并不是为了生成交易指令,而是为了在市场走势发生明显变化时发出警告,提醒相关人员进一步关注市场情况。
在这里,我们进行两项检查:
-
ATM IV变化警报:如果在不同快照之间,ATM IV的变化幅度超过了某个阈值,则触发该警报。
-
偏度变化警报:如果在不同快照之间,偏度代理的值变化幅度超过了某个阈值,则触发该警报。
alerts = metrics.copy()
alerts["atm_iv_change"] = alerts["atm_iv"].diff()
alerts["skew_change"] = alerts["skew_90_110"].diff()
atm_thresh = 0.002
skew_thresh = 0.003
alerts["atm_alert"] = alerts["atm_iv_change"].abs() >= atm_thresh
alerts["skew_alert"] = alerts["skew_change"].abs() >= skewthresh
alerts[[
"asof_ts",
"atm_iv", "atm_iv_change", "atm_alert",
"skew_90_110", "skew_change", "skew_alert",
"atm_strike", "k90", "k110"
]]
我们首先获取每个快照对应的指标数据,然后计算这些数据之间的变化幅度。接下来,我们将这些变化幅度与预设的阈值进行比较,并根据比较结果生成相应的布尔值标志。最终生成的输出表格中既包含了各项指标数据,也记录了用于计算这些指标的数据点,因此任何警报的出现都能得到合理的解释,而不会让人感到困惑。

在这次分析中,所有的ATM IV警报都属于假警报,而偏度警报只触发了一次。
偏度警报之所以会触发,是因为在两个快照之间,偏度代理的值变化幅度超过了预设的阈值。这一现象是可以理解的。如果查看相关数据表,就会发现用于计算偏度的数据点在几乎同一时间发生了变化(例如k90的值从340变为了315)。由于这些数据点是离散的,因此即使实际的变化幅度并不大,指标值也可能会发生突变。
为了让这些图表更易于阅读,我们还将这两组数据绘制在同一张图中,并在出现警报的情况下标出相应的点位。
plt.plot(alerts["asof_ts"], alerts["atm_iv"])
for i, r in alerts[alerts["atm_alert"]].iterrows():
plt.scatter(r["asof_ts"], r["atm_iv"], s=30, edgecolors="r", alpha=0.6, linewidth=2)
plt.title(f"{symbol} ATM IV指标及警报信息 | 到期时间 {chosenexpiry}")
plt.xticks(rotation=30, ha="right")
plt.grid(True)
plt.show()
plt.plot(alerts["asof_ts"], alerts["skew_90_110"])
for i, r in alerts[alerts["skew_alert"]].iterrows():
plt.scatter(r["asof_ts"], r["skew_90_110"], s=30, edgecolors="r", alpha=0.6, linewidth=2)
plt.title(f"{symbol} 偏度代理指标及警报信息 | 到期时间 {chosenexpiry}")
plt.xticks(rotation=30, ha="right")
plt.grid(True)
plt.show()
这两张图采用了相同的绘制方式:先将各项指标值以线条的形式呈现出来,然后在那些触发警报的时间点上添加标记。这样一来,就能清楚地看出哪些数据值超过了预设的阈值。
这张图表展示了偏度代理指标及其引发的警报情况。

此图表显示了一个警报标记,这一标记与我们在表格中看到的内容是一致的。
由于没有出现任何警报点,因此图中并未展示ATM隐含波动率的相关数据。
总结
在这次演示过程中,我们使用了SpiderRock MLink提供的TSLA实时隐含报价数据,并将其转化成了一个可供查询的内部数据库。我们将所有的数据快照存储在一个仅支持追加操作的历史记录表中,同时通过一个稳定的期权标识符来维护最新的数据视图。随后,我们利用这些存储的数据重新构建了波动率曲线,监测了ATM隐含波动率的变化情况,并实现了一个简单的偏度测量工具;此外,我们还添加了一条基本的警报规则。
这种做法非常适用于B2B业务场景,因为它能够将实时分析数据转化为可实际操作的工具——一种可以用来进行审计、回放和监控的数据集。无论你是正在构建内部数据看板,为交易团队提供常规的波动率监测服务,还是在进行事后的快速回顾分析,这一方法都是十分实用的。此外,这种方法也不依赖于截图或一次性运行的数据分析工具。
如果你想进一步扩展这个系统,接下来最实际的做法就是延长数据采集的时间间隔,同时跟踪更多种类的金融产品;当数据量逐渐增加时,也可以将数据库从SQLite升级为Postgres。如果指标的稳定性变得尤为重要,你还可以统一每次数据采集时所追踪的具体指标范围,或者将隐含波动率插值到固定的货币价值点上,这样当最接近执行价的期权价格发生变化时,偏度测量结果也不会出现突变。
总之,这篇文章已经讲完了。希望你们能从中学到一些新的、有用的知识。