“市场动态”界面本质上就是这样一个你愿意持续打开的页面——当你不想一整天都盯着图表看的时候,这个界面能告诉你当前哪些资产正在发生变化,哪些资产的波动性异常剧烈,以及哪些资产的价格走势开始出现联动。

这种信息的呈现方式并非适合用于研究论文,而是更适合产品应用。你可以将这样的信息推送到一个媒体平台或投资应用程序中,它就能立刻发挥实际作用。

在本教程中,我们将使用Streamlit在Python中构建这样一个界面的最小化版本。该界面由三个部分组成:

  • 一个“动态排行榜”,用于显示你关注列表中波动幅度最大的资产

  • 一个“预警信息流”,它会以事件的形式发出警报,而不会发送大量的原始数据

  • 一张“相关性图表”,它会根据当前的市场波动情况实时更新内容

该界面的数据将来源于EODHD提供的实时WebSocket数据流。

需要提前说明的是:这个工具既不是TradingView,也不是用于回测的工具。它只是一个轻量级的实时系统,能够实时传输价格数据,维护滚动缓冲区,计算一些实时指标,并将这些信息转化为适合用户界面的展示形式。

我们的目标是要构建一个真正可以投入实际使用的“市场动态”功能,而不仅仅是一个一次性的演示示例。

目录

  1. 先决条件

  2. 我们正在构建的应用程序

  3. 应用程序架构

  4. 流式传输层:一个队列,多个数据源

  5. 滚动状态缓冲区:回报数据、波动性指标与趋势分析

先决条件

在开始构建之前,请确保你已经掌握了以下基础知识。

你应该能够熟练运行Python脚本,使用pip安装包,并处理简单的多文件项目。

本教程不基于Jupyter Notebook进行讲解。我们将构建一个轻量级的实时应用,其中数据流、状态信息、事件处理逻辑以及Streamlit用户界面都由独立的文件来实现。

你需要安装Python 3.10及以上版本,以及以下这些包:

pip install streamlit pandas websockets

此外,你还需要一个EODHD API密钥,这样才能访问他们的实时WebSocket数据源,因为仪表盘的功能依赖于股票、外汇和加密货币的实时数据。

为了顺利进行操作,请在开始之前在项目文件夹中创建以下文件:

feeds.py
pulse_store.py
events.py
correlation.py
app.py

在开始之前需要说明一点:由于这个应用会使用实时市场数据,因此你看到的界面内容会受到访问时间的影响。在周末或市场关闭的时间段,加密货币的相关数据通常会占据仪表盘的主要显示位置,而股票和大多数外汇品种的数据则相对较少。这是正常现象。

我们正在构建的应用

在开始编写代码之前,让我们先来看看最终完成的仪表盘长什么样:

https://gumlet.tv/watch/69b99df9554f0fb510c28ce6/

下面我们来了解一下它的主要功能:

脉动表格

这是主界面。它会显示你关注的所有资产中表现最突出的那些。每一行代表一个资产,而列则展示了我们实时计算得出的一些关键数据:最新价格、1分钟内的价格变化幅度、5分钟内的价格变化幅度(如有的话)、15分钟内的波动率,以及一个简单的分类标签。

如果你打开这个应用,最想看到的就是这个表格。你可以快速浏览它,立刻了解哪些资产值得关注。

重要事件通知

这个功能让这个应用不再只是一个简单的实时数据展示工具,而更像是一个实用的功能模块。我们不会显示所有的更新信息,只有当某些数据达到预设的阈值时才会发出通知——比如价格在1分钟内发生了剧烈变化,或者波动率突然上升。这些通知会以“卡片”的形式出现在界面上。这样做的目的是减少干扰,而不是增加不必要的信息量。

相关性图表

这个图表的设计非常简洁实用。实时计算相关性数据往往会变得复杂,因为不同资产的价格变动频率各不相同,因此需要对其进行适当的处理。在这个版本中,我们只针对股票进行相关性分析,并且是在固定的时间间隔内计算相关性的。

这个图表并不是为了展示完整的相关性矩阵,而是用来快速查看“当前我的主要关注资产与其他资产之间有什么关联”。此外,它的回顾窗口会根据主要关注资产所处的市场状态(正常波动还是高波动)进行调整。

控制面板

在页面顶部,有一些控制选项可以让演示功能具有交互性,而不会使其变成一个设置页面。“Top movers”允许你指定“Pulse表格”中应显示多少行数据;“Correlation base”则用于选择以哪只股票作为计算相关性的基准;“Correlation bucket”用于调整用于数据对齐的时间间隔大小——当数据更新频率较低时,这个选项非常有用,因为它可以帮助使相关性计算结果更加稳定。

应用程序架构

如果你曾经尝试过构建一个实时的Streamlit应用程序,你可能会遇到同样的问题:Streamlit会不断重新执行你的脚本。每当某个组件发生变化,或者当你调用`st.rerun()`函数时,整个脚本都会从开头开始重新执行。

对于普通的仪表板来说,这种设计确实没有问题,但对于需要运行无限循环的WebSocket应用程序而言,这种机制却会导致严重的性能问题。如果在主线程中执行这样的循环,要么用户界面会冻结,要么每次重新执行脚本时都需要重新连接数据源。

多资产市场脉动应用程序架构

因此,这里的应用程序架构被有意地分成了两个部分。

其中一个后台工作进程负责处理实时数据流。它负责连接WebSocket数据源,接收数据更新,更新滚动缓冲区,计算各项指标,并生成相应的警报信息。这个进程会持续运行,并将最新的数据状态保存在内存中——这就是整个应用程序的核心运行机制。

而Streamlit本身则被设计得相对简单:每次重新执行脚本时,它只会读取后台工作进程生成的数据结果,然后渲染表格和相关的关联分析图表。在用户界面所在的循环中,不会进行任何数据采集操作,也不会进行复杂的计算处理,仅仅负责数据的展示而已。正是这种分离机制,使得应用程序能够在用户不断刷新页面或调整设置的情况下依然保持稳定运行。

在实际开发中,在Python中实现这一功能的最简单方法就是使用后台线程来执行异步循环。Streamlit会通过`st.session_state`变量来确保线程的正确启动和运行,然后用户界面代码就会根据共享的状态数据不断重新渲染页面内容。

这种设计虽然并不复杂,但却决定了应用程序是只能运行30秒钟的简单演示工具,还是能够像真正的市场监测系统一样持续运行的成熟应用。

代码文件结构

为了使代码结构更加清晰易懂,我将整个应用程序分解成了五个独立的文件。每个文件负责完成特定的功能,而Streamlit的用户界面本身并不直接参与WebSocket相关逻辑的处理。

  • feeds.py文件负责处理WebSocket连接,并将所有接收到的数据转换为统一的格式进行存储。

  • pulse_store.py文件为每只股票维护滚动缓冲区,并计算相应的脉动指标(如回报率、成交量、趋势等)。这些数据构成了应用程序的核心状态信息。

  • events.py文件负责将实时生成的指标数据转换为包含冷却时间间隔和资产特定阈值的警报信息。

  • correlation.py文件通过分组和对齐数据来生成关联分析图表,并根据不同的市场状况调整数据分析的时间窗口。

  • app.py文件则是Streamlit仪表板的主体部分。它负责启动后台工作进程,然后根据共享的状态数据不断重新渲染用户界面。

正是这种分离机制使得该应用程序能够保持稳定。后台工作进程可以持续运行下去。Streamlit可以根据需要随时重新运行,而无需重新连接数据源或从头开始重新计算所有数据。

流式处理层:一个队列,多个数据源

第一步是将实时数据点纳入系统。我们连接到EODHD提供的股票、外汇和加密货币的WebSocket数据源,订阅一个简短的观察列表,然后将所有接收到的消息统一转换为相同的格式:{symbol, asset, ts, price}

一旦完成了这一步,后续的所有处理流程就会变得可预测了。

feeds.py:

import asyncio
import json
import time
import websockets

API_KEY = "您的EODHD API密钥"

WS = {
    "stocks": "wss://ws.eodhistoricaldata.com/ws/us?api_token=",
    "forex":  "wss://ws.eodhistoricaldata.com/ws/forex?api_token=",
    "crypto": "wss://ws.eodhistoricaldata.com/ws/crypto?api_token=",
}

def _tick(symbol, asset, price):
    return {"symbol": symbol, "asset": asset, "ts": time.time(), "price": float(price)}

def _parse(asset, msg):
    s = msg.get("s")
    p = msg.get("p")
    if s is None or p is None:
        return None
    return _tick(s, asset, p)

async def _stream(asset, symbols, q):
    url = WS[asset] + API_KEY

    while True:
        try:
            async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws:
                sub = {"action": "subscribe", "symbols": ",".join(symbols)}
                await ws.send(json.dumps(sub))

                async for raw in ws:
                    try:
                        msg = json.loads(raw)
                    except Exception:
                        continue

                    t = _parse(asset, msg)
                    if t:
                        await q.put(t)

        except Exception:
            await asyncio.sleep(1.0)

async def start_streams(q):
    tasks = []
    tasks.append(asyncio.create_task(_stream("stocks", ["AAPL","TSLA","NVDA","AMZN","MSFT","META","GOOGL"], q)))
    tasks.append.asyncio.create_task(_stream("forex", ["EURUSD","USDINR","USDJPY","GBPUSD","AUDUSD"], q)))
    tasks.append(asyncio.create_task(_stream("crypto", ["BTC-USD","ETH-USD","BTC-USDT","ETH-USDT","SOL-USDT"], q)))
    return tasks

注意:请将“您的EODHD API密钥”替换为您实际的EODHD API密钥。如果您还没有这个密钥,可以通过注册EODHD开发者账户来获取它。

这段代码的作用非常简单:每个数据源都在独立的异步任务中进行处理,处理后的数据会被放入一个共享的队列中;如果连接中断,系统会自动重新建立连接。在这里,我们并没有尝试使用任何复杂的逻辑或技术——这一层代码仅仅起到了数据传输和存储的作用而已。

为什么观察列表需要被精心筛选

虽然包含更多的股票或资产可以让演示效果更加出色,但这也会导致调试和数据分析变得更加困难。对于这篇文章来说,我们需要的观察列表规模应该适中——既小到足以让我们轻松理解其运作机制,又足够多样化,能够反映出多种资产之间的交互关系。

有一点会影响到你看到的数据:周末时间。在市场关闭期间,股票和大多数外汇品种的价格不会发生明显变化,而加密货币则全天24小时都在交易。因此,如果你在周日运行这个应用程序,加密货币的相关数据自然会在显示结果中占据主导地位。这并不是一个漏洞,而是由于只有某一类资产在发生变化时才会产生这样的结果。

在真正的产品中,可以通过按资产类别对变化数据进行排序,或者为不同的资产类别设置单独的展示区域来解决这个问题。但对于当前这个版本来说,我们就保持简单,接受显示结果会受到运行时间影响这一事实吧。

滚动状态:缓冲区数据、收益情况、波动性及趋势分析

这是该应用程序的核心功能。我们会为每个交易品种维护一个滚动缓冲区,从中计算出一些实时数据,并将这些信息以简洁的形式呈现出来,以便用户界面和事件处理系统能够使用这些数据。

pulse_store.py:

import time
import math
import threading
from collections import deque

class PulseStore:
    def __init__(self, window_sec=3600):
        self.window_sec = window_sec
        self.buffers = {}
        self/latest = {}
        self.asset = {}
        self.vol_hist = {}
        self.lock = threading.Lock()

    def _buf(self, symbol):
        if symbol not in self.buffers:
            self.buffers[symbol] = deque()
        return self.buffers[symbol]

    def update(self, tick):
        symbol = tick["symbol"]
        ts = tick["ts"]
        px = tick["price"]

        with self.lock:
            b = self._buf(symbol)
            b.append((ts, px))
            self/latest[symbol] = px
            self.asset[symbol] = tick.get("asset")

            cutoff = ts - self.window_sec
            while b and b[0][0] < cutoff:
                b.popleft()

        return len(b)

    def _price_at_or_before(self, b, target_ts):
        with self.lock:
            data = list(b)

        for i in range(len(data) - 1, -1, -1):
            if data[i][0] <= target_ts:
                return data[i][1]
        return None

    def ret(self, symbol, window_sec):
        b = self.buffers.get(symbol)
        if not b:
            return None

        with self.lock:
            if len(b) < 2:
                return None
            now_ts, now_px = b[-1]

        px0 = self._price_at_or_before(b, now_ts - window_sec)
        if px0 is None:
            return None

        return (now.px / px0) - 1.0

    def ret_1m(self, symbol):
        return self.ret(symbol, 60)

    def ret_5m(self, symbol):
        return self(ret(symbol, 300)

    def ret_15m(self, symbol):
        return self-ret(symbol, 900)

    def _recent_prices(self, b, window_sec):
        with self.lock:
            data = list(b)

        if not data:
            return []

        cutoff = data[-1][0] - window_sec
        out = []
        for ts, px in data:
            if ts >= cutoff:
                out.append(px)
        return out

    def vol_15m(self, symbol):
        b = self.buffers.get(symbol)
        if not b:
            return None

        prices = self._recent_prices(b, 900)
        if len(prices) < 6:
            return None

        rets = []
        for i in range(1, len(prices)):
            rets.append(prices[i] / prices[i-1] - 1.0)

        if len(rets) < 3:
            return None

        m = sum(rets) / len(rets)
        var = sum((x - m) ** 2 for x in rets) / len(rets)
        return var ** 0.5

    def trend_15m(self, symbol):
        b = self.buffers.get(symbol)
        if not b:
            return None

        prices = self._recent_prices(b, 900)
        if len(prices) < 8:
            return None

        lp = []
        for p in prices:
            if p > 0:
                lp.append(math.log(p))

        if len(lp) < 8:
            return None

        n = len(lp)
        xs = list(range(n))

        xbar = sum(xs) / n
        ybar = sum(lp) / n

        num = 0.0
        den = 0.0
        for i in range(n):
            dx = xs[i] - xbar
            dy = lp[i] - ybar
            num += dx * dy
            den += dx * dx

        if den == 0:
            return None

        return num / den

    def _vh(self, symbol):
        if symbol not in self.vol_hist:
            self.vol_hist[symbol] = deque(maxlen=200)
        return self.vol_hist[symbol]

    def update_vol_history(self, symbol):
        v = self.vol_15m(symbol)
        if v is None:
            return None
        self._vh(symbol).append(v)
        return v

    def regime(self, symbol):
        h = self.vol_hist.get(symbol)
        if not h or len(h) < 30:
            return "unknown"

        cur = h[-1]
        hs = sorted(h)
        p80 = hs[int(0.8 * (len(hs) - 1))]

        if cur >= p80:
            return "high_vol"
        return "normal"

    def snapshot(self, symbol):
        last = self/latest.get(symbol)
        if last is None:
            return None

        out = {"symbol": symbol, "asset": self.asset.get(symbol), "last": last}

        r1 = self.ret_1m(symbol)
        r5 = self_ret_5m(symbol)
        r15 = self(ret_15m(symbol)
        v15 = self.vol_15m(symbol)
        tr = self.trend_15m(symbol)

        if r1 is not None:
            out["ret_1m"] = r1
        if r5 is not None:
            out["ret_5m"] = r5
        if r15 is not None:
            out["ret_15m"] = r15
        if v15 is not None:
            out["vol_15m"] = v15
        if tr is not None:
            out["trend_15m"] = tr

        v = self.update_vol_history(symbol)
        if v is not None:
            out["regime"] = self.regime(symbol)

        return out

    def snapshots(self):
        with self.lock:
            syms = list(self.buffers.keys())

        out = []
        for s in syms:
            snap = self.snapshot(s)
            if snap:
                out.append(snap)
        return out

update()是整个系统的入口点。所有新收到的数据都会被添加到该符号对应的双端队列中,而旧的数据则会被删除,这样缓冲区就不会无限制地扩大。

回报值的计算采用了一种小技巧:我们并不假设自己能够得到60秒前或300秒前的精确价格。我们会反向扫描数据,取出目标时间戳之前或当时的最新价格。这样一来,即使数据接收的频率不均匀,回报值也能保持稳定。

波动性的计算是基于过去15分钟内价格的短期变化得出的。这个数值并没有进行年化处理,仅仅是一个实时的波动指标而已。趋势指标则是通过计算同一时间段内对数价格的变化率来得出的,它能够提供一个方向性提示,而不会进行复杂的计算。

vol_hist双端队列被用来区分不同的市场状况。我们会存储每个符号近期波动性的历史数据,如果当前的波动性超过了这个历史数据的第80百分位,我们就将当前状态标记为high_vol。这种判断方法虽然简单,但已经足够用于后续的相关性分析逻辑中了。

并发问题是导致需要使用锁机制的原因。在Streamlit读取数据的同时,后台线程正在向双端队列中写入数据。如果在一个队列正在被修改的情况下对其进行遍历,Python会抛出错误。因此,在任何需要进行遍历的地方,我们都会先在锁的保护下复制该队列的内容,然后再进行遍历操作。这样就可以确保读操作的安全性,同时也不会影响写操作的效率。

将实时数据转化为事件(压力推送机制)

一旦获得了实时数据,接下来就要考虑如何使用这些数据了。如果直接将原始数据展示在用户界面中,就会让用户被过多的信息淹没。我们真正需要的是一种事件推送机制——只有当某些条件被满足时,才会显示相应的信息。

压力推送机制正是实现这一目标的工具。它会监控从PulseStore中获取的数据,并根据不同的情况发出三种类型的事件。

  • 当1分钟内的价格变化幅度达到一定标准时,会触发move_1m事件

  • 当5分钟内的价格变化幅度达到一定标准时,会触发move_5m事件

  • 当15分钟内的波动性超过某个阈值时,会触发vol_spike事件

有两个实用的功能使得这种机制能够在实际的仪表盘中得到应用。首先是冷却时间设置——如果TSLA的价格在1分钟内发生了较大的变化,我们并不希望每个数据更新都会触发50次重复的事件。其次是针对不同资产类型设置不同的阈值。由于加密货币的价格波动通常比股票更为剧烈,因此如果使用统一的阈值,BTC就会在整个系统中占据主导地位,导致压力推送信息过于频繁。

events.py

import time
from collections import deque

class EventStore:
    def __init__(self, max_events=25):
        self.max_events = max_events
        self.events = deque(maxlen=max_events)
        
    def add(self, e):
        self.events.appendleft(e)

    def latest(self):
        return list(self.events)


class StressDetector:
    def __init__(self, move_thr_1m=0.0015, moveTHR_5m=0.004, volthr=0.00025):
        self.moveThr_1m = move_thr_1m
        self.move THR_5m = moveTHR_5m
        self.vol_thr = vol_thr
        self.cooldown_sec = 30
        self.last_emit = {}
        self.thr = {
            "stocks": {"move_1m": 0.0012, "move_5m": 0.0040, "vol": 0.00006},
            "crypto": {"move_1m": 0.0025, "move_5m": 0.0080, "vol": 0.00045},
            "forex":  {"move_1m": 0.0006, "move_5m": 0.0018, "vol": 0.00015},
        }

    def _can_emit(self, symbol, etype, now):
        k = (symbol, etype)
        prev = self.last.emit.get(k)
        if prev is None:
            self.lastEmitter[k] = now
            return True
        if now - prev >= self.cooldown_sec:
            self.lastEmitter[k] = now
            return True
        return False

    def check(self, snap):
        if not snap:
            return None

        sym = snap.get("symbol")
        asset = snap.get("asset", None)
        thr = self.thr.get(asset, {"move_1m": self.move_thr_1m, "move_5m": self.moveTHR_5m, "vol": self.volthr})
        moveThr_1m = thr["move_1m"]
        move THR_5m = thr["move_5m"]
        vol_thr = thr["vol"]
        now = time.time()

        r5 = snap.get("ret_5m")
        r1 = snap.get("ret_1m")
        v15 = snap.get("vol_15m")

        if r5 is not None and abs(r5) >= moveTHR_5m:
            if self._can_emit(sym, "move_5m", now):
                return {"ts": now, "type": "move_5m", "symbol": sym, "asset": asset, "value": float(r5)}
            return None

        if r1 is not None and abs(r1) >= moveTHR_1m:
            if self._can_emit(sym, "move_1m", now):
                return {"ts": now, "type": "move_1m", "symbol": sym, "asset": asset, "value": float(r1)}
            return None

        if v15 is not None and v15 >= vol_thr:
            if self._can_emit(sym, "vol_spike", now):
                return {"ts": now, "type": "vol_spike", "symbol": sym, "asset": asset, "value": float(v15)}
            return None

        return None

EventStore实际上只是一个滚动数据流。它会保留最近N条事件记录,这样Streamlit就可以将这些数据渲染成表格形式来显示。

StressDetector.check()则起到了过滤的作用。它会查看最新的数据状态,并判断是否需要生成新的事件记录。其中的冷却机制可以有效防止垃圾信息的产生——一旦某个资产发出了move_1m事件,那么在30秒内它就不会再发出同样的事件。

针对不同类型的资产,这些阈值被有意设置成了不同的值。对于加密货币来说,无论是价格波动幅度还是波动频率都需要更大的容忍范围。否则,即使是比特币这种表现相对平稳的资产,在相对于股票而言也会被视为处于高压力状态。通过这样的调整,数据流就能显得更加平衡、更符合实际情况。

机制分类标签(虽小但很重要)

“机制分类”其实只是一个简单的上下文标记。我们会记录每个资产在过去15分钟内的波动幅度,并将当前状态划分为“高波动”或“正常”两类——如果波动幅度超过了最近80%的数据范围,就属于“高波动”状态;否则就是“正常”状态。这一分类标签后续会被我们用来调整相关性分析的窗口长度。

将这段代码添加到pulse_store.py

你在pulse_store.py文件中已经有了PulseStore类。只需在PulseStore class内部,在vol_15m()trend_15m()方法之后插入以下代码即可(具体的插入位置并不重要,只要保证文件能够正常编译运行即可)。

    def _vh(self, symbol):
        if symbol not in self.vol_hist:
            self.vol_hist[symbol] = deque(maxlen=200)
        return self.vol_hist[symbol]

    def update_vol_history(self, symbol):
        v = self.vol_15m(symbol)
        if v is None:
            return None
        self._vh(symbol).append(v)
        return v

    def regime(self, symbol):
        h = self.vol_hist.get(symbol)
        if not h or len(h) < 30:
            return "unknown"

        cur = h[-1]
        hs = sorted(h)
        p80 = hs[int(0.8 * (len(hs) - 1))]

        if cur >= p80:
            return "high_vol"
        return "normal"

pulse_store.py中的snapshot()方法内添加机制分类标签

在同一文件中,在snapshot(self, symbol)方法的末尾附近,就在返回语句之前,添加以下代码块:

    v = self.update_vol_history(symbol)
    if v is not None:
        out["regime"] = self.regime(symbol)

这样机制分类标签的设置就完成了。

为什么这一点后来会非常重要:

一旦snapshot()方法包含了这种机制分类信息,应用程序的其他部分就可以直接使用这些数据而无需重新进行计算。在后续的内容中,相关性分析功能会读取store.regime(base_symbol)这个值,并根据这个值来决定是应该回顾过去60分钟的数据还是仅回顾15分钟的数据。正是这一机制确保了在价格剧烈波动时相关性分析结果仍然有效,在市场平静时期则不会出现数据波动过大的情况。

相关性分析卡片(仅限股票,考虑市场状态窗口)

听起来相关性分析很简单,但实际操作起来却并非如此。在实时数据流中,不同的证券会在不同时间点产生交易数据。如果直接对比这些原始的交易数据,实际上就是在比较噪音和时间差异。

因此,我们采取了两种方法来使这一功能能够被有效使用。

首先,我们按时间对价格数据进行对齐。我们将交易数据分摊到固定的时间区间内(比如10秒、20秒、30秒),并将每个区间内的最后一个价格视为该区间的代表价格。这样,所有证券的价格数据就能处于相同的时间轴上进行了。

其次,我们让相关性分析窗口能够根据市场状态进行动态调整。如果基准证券所处的市场状态属于高波动性类型,我们会使用较短的时间段来计算相关性,从而使分析结果能更快地反映当前市场情况;而当市场处于正常状态时,我们会使用更长的时间跨度来进行计算,以避免分析结果在每次更新时出现剧烈波动。

在我们的应用程序中,这个相关性分析卡片也仅限于股票数据。虽然可以对多种资产进行相关性分析,但当不同资产的交易频率差异很大时,对齐数据就会变得非常困难。因此,我们目前优先开发适用于股票的这款工具——一个稳定、可靠的股票相关性分析工具,总比一个经常出问题的多资产分析工具要好得多。

correlation.py

import math

def _bucket(ts, bin_sec):
    return int/ts // bin_sec) * bin_sec

def build_price_table(store, symbols, window_sec=1800, bin_sec=10):
    table = {}
    now = None

    for s in symbols:
        b = store.buffers.get(s)
        if not b:
            continue
        if now is None:
            now = b[-1][0]
        else:
            now = max(now, b[-1][0])

    if now is None:
        return {}

    cutoff = now - window_sec

    for s in symbols:
        b = store.buffers.get(s)
        if not b:
            continue

        for ts, px in b:
            if ts < cutoff:
                continue
            k = _bucket(ts, bin_sec)
            row = table.get(k)
            if row is None:
                row = {}
                table[k] = row
            row[s] = px

    return table

def to_return_matrix(price_table, symbols):
    buckets = sorted(price_table.keys())
    if len(buckets) < 3:
        return []

    last_prices = None
    rows = []

    for bt in buckets:
        rowp = price_table[bt]
        if any(s not in rowp for s in symbols):
            continue

        prices = [float(rowp[s]) for s in symbols]

        if last_prices is None:
            last_prices = prices
            continue

        rets = []
        ok = True
        for i in range(len(symbols)):
            p0 = last_prices[i]
            p1 = prices[i]
            if p0 <= 0 or p1 <= 0:
                ok = False
                break
            rets.append(p1 / p0 - 1.0)

        last_prices = prices
        if ok:
            rows.append(rets)

    return rows

def corr(a, b):
    n = len(a)
    if n < 5:
        return None
    am = sum(a) / n
    bm = sum(b) / n
    num = 0.0
    da = 0.0
    db = 0.0
    for i in range(n):
        x = a[i] - am
        y = b[i] - bm
        num += x * y
        da += x * x
        db += y * y
    if da == 0 or db == 0:
        return None
    return num / math.sqrt(da * db)

def corr_card(store, symbols, base_symbol, bin_sec=10):
    reg = store.regime(base_symbol)
    win = 900 if reg == "high_vol" else 3600

    pt = build_price_table.store, symbols, window_sec=win, bin_sec=bin_sec)
    mat = to_return_matrix(pt, symbols)
    if not mat:
        return {"base": base_symbol, "regime": reg, "window_sec": win, "top": []}

    cols = list(zip(*mat))
    if base_symbol not in symbols:
        return {"base": base_symbol, "regime": reg, "window_sec": win, "top": []}

    bi = symbols.index(base_symbol)
    base = list(cols[bi])

    scores = []
    for i, s in enumerate(symbols):
        if s == base_symbol:
            continue
        c = corr(base, listcols[i]))
        if c is None:
            continue
        scores.append((s, c))

    scores.sort(key=lambda x: abs(x[1]), reverse=True)
    top = [{"symbol": s, "corr": float(v)} for s, v in scores[:3]]

    return {"base": base_symbol, "regime": reg, "window_sec": win, "top": top}

build_price_table()函数用于创建对齐的时间轴。它会扫描每个证券的滚动缓冲区,将时间戳分入固定的区间,并存储每个区间内的最新价格。

to_return_matrix()函数将这些按区间划分后的价格转换为收益率,但只有当所有证券在同一个区间内都有价格数据时才会进行转换。这一对齐步骤使得相关性分析的结果具有实际意义。

corr_card()函数负责生成最终的可视化结果。它会根据基础证券的类型选择合适的回顾时间窗口(高波动性证券为15分钟,普通证券为60分钟),然后计算各证券与基础证券之间的相关性,并显示最相关的结果。

接下来,我们将把所有这些功能整合到Streamlit中,从而生成最终的仪表盘。这时,整个应用程序才真正开始呈现出完整的形态。

构建Streamlit应用程序

目前,我们已经拥有了所有必要的组件:用于生成数据流的层、用于生成快照的状态管理机制、用于触发事件的检测系统,以及用于生成相关性分析结果的函数。现在我们只需要将这些组件封装到一个Streamlit应用程序中,确保整个系统能够正常运行即可。

关键在于让实时处理任务只启动一次,并在后台持续运行。由于Streamlit会不断重新执行脚本,因此用户界面代码不应该再去连接WebSocket或启动新的循环,而应该仅仅负责读取共享状态并渲染数据表格。

import asyncio
import threading
import time

import pandas as pd
import streamlit as st

from feeds import start_streams
from pulse_store import PulseStore
from events import StressDetector, EventStore
from correlation import corr_card

st.set_page_config(page_title="市场脉动", layout="wide")

st.markdown"""
<style>
html, body, [class*="css"]  { background-color: #0b0f14; color: #e6edf3; }
.stApp { background-color: #0b0f14; }
div[data-testid="stMetricValue"] { color: #e6edf3; }
div[dataTypeId="stMetricLabel"] { color: #9aa4af; }
[dataTypeId="stDataFrame"] { background-color: #0b0f14; }
</style>
""", unsafe_allow_html=True)

def _runner(state):
    async def _main():
        q = asyncio.Queue()
        await start_streams(q)

        store = PulseStore(window_sec=3600)
        detector = StressDetector()
        ev = EventStore(max_events=50)

        state["store"] = store
        state["events"] = ev
        state["detector"] = detector
        state["started_at"] = time.time()

        while True:
            t = await q.get()
            store.update(t)
            snap = store.snapshot(t["symbol"])
            e = detector.check(snap)
            if e:
                ev.add(e)

    asyncio.run(_main())

if "bg_started" not in st.session_state:
    st.session_state.bgstarted = True
    st.session_state.state = {}
    th = threading.Thread(target=_runner, args=(st.session_state.state,), daemon=True)
    th.start()

state = st.session_state.state

st.title("市场脉动")

col1, col2, col3 = st.columns([2, 2, 1])
with col1:
    stcaption("实时多资产脉动信息:价格变动、压力事件及相关性分析结果。")

with col3:
    up = 0
    if "started_at" in state:
        up = int(time.time() - state["started_at"])
    st(metric("运行时间(秒)", up)

if "store" not in state:
    st.info("正在连接数据源并初始化缓冲区...")
    st.stop()

store = state["store"]
ev = state["events"]

c1, c2, c3 = st.columns(3)
with c1:
    top_k = st.slider("排名前K的证券", 3, 10, 5)
with c2:
    base = st.selectbox("相关性分析的基础证券(股票)", ["TSLA", "AAPL"], index=0)
with c3:
    bin_sec = st.selectbox("相关性分析的区间长度(秒)", [10, 20, 30], index=2)

snaps = store.snapshots()

def score(x):
    r1 = x.get("ret_1m")
    r5 = x.get("ret_5m")
    if r1 is not None:
        return abs(r1)
    if r5 is not None:
        return abs(r5)
    return 0.0

snaps.sort(key=score, reverse=True)
top = snaps[:top_k]

pulse_df = pd.DataFrame(top)
keep_cols = ["symbol", "asset", "last", "ret_1m", "ret_5m", "vol_15m", "regime"]
pulse_df = pulse_df[[c for c in keepcols if c in pulse_df.columns]]

st.subheader("市场脉动")
st.dataframe(pulse_df, use_container_width=True, height=260)

st.subheader("压力事件")
events = ev/latest()[:15]
if events:
    ev_df = pd.DataFrame(events)
    ev_df["time"] = pd.to_datetime(ev_df["ts"], unit="s").dt.strftime("%H:%M:%S")
    ev_df = ev_df[["time", "type", "symbol", "asset", "value."]
    st.dataframe,ev_df, use_container_width=True, height=260)
else:
    stcaption("目前没有事件发生。")

st.subheader("相关性分析结果(股票)")
corr_symbols = ["AAPL", "TSLA"]
card = corr_card(store, corrsymbols, base_symbol=base, bin_sec=bin_sec)

st.write(card)

time.sleep(2.0)
st.rerun()

后台工作线程只会被启动一次,它运行在守护进程线程中。该线程负责处理异步WebSocket通信,并不断更新内存中的数据存储结构及事件信息。Streamlit本身并不会直接操作这些套接字。
Pulse数据表直接来源于`store.snapshots()`函数。当有1分钟间隔的数据可用时,我们会按照这些数据来排序展示;如果1分钟间隔的数据不存在,就会使用5分钟间隔的数据进行排序。
压力测试生成的数据会被呈现为简单的表格形式,但我们会将原始的纪元时间戳转换成易于阅读的时间字符串,这样看起来才像一个真正的用户界面。
相关性分析卡片其实是一个结构较为简单的JSON对象。它包含了基础资产代码、当前的交易状态、所使用的数据窗口范围,以及相关系数最高的资产信息。
最后,刷新循环的设计相当简单:只是让程序暂停两秒钟,然后重新运行并展示最新的数据状态。实际的数据处理工作仍然在后台工作线程中完成。

最终结果

最终的成品应用程序链接为:https://gumlet.tv/watch/69b99df9554f0fb510c28ce6/

下一步改进计划

如果你们希望将这个项目发展成更实用的应用程序,我建议从以下几个方面入手进行升级。
首先,应该按照资产类别来划分Pulse数据表。虽然使用全局排名也是一种可行的方式,但由于加密货币的交易频率非常高且价格波动幅度较大,因此为股票、外汇和加密货币分别设置不同的数据表会使得仪表盘看起来更加平衡,也更符合真实产品的呈现方式。
其次,应该实现简单的数据持久化功能。即使每隔几分钟只保存一份小的SQLite文件或Parquet格式的数据文件,也足以用来回放过去一小时内的数据并帮助排查问题,而无需让应用程序一直处于运行状态。
第三,应该为压力测试产生的事件找到合适的存储或处理方式。可以使用Webhook、队列或小型数据库表来存储这些事件。一旦这些事件脱离了用户界面,成为系统的一部分,就可以利用它们来生成警报信息、发送新闻通讯或进行内部监控。
最后,如果希望相关性分析功能真正支持多种资产类型,就需要采用更加科学的数据对齐方法。对于流动性较强的股票来说,分组统计的方法效果不错;但对于那些交易频率不固定的资产类型,就需要使用重新采样算法来处理数据,并且可能需要为不同的资产类别设置不同的分组区间大小。

结论

以上就是整个项目的完整实现过程:一个能够实时显示多种资产价格的数据界面,它能够在内存中保持数据的滚动更新状态,将那些杂乱无序的交易数据转换成有用的信息,并通过Streamlit仪表盘将这些信息呈现出来。
最重要的经验就是:要始终保持数据流、状态信息和用户界面的分离。只需计算出一小部分能够平滑更新的指标,然后将这些指标转化为产品团队真正可以使用的信息展示形式,即可完成整个开发过程。
如果你们已经在使用像EODHD这样的多资产数据源来获取价格信息和市场覆盖范围,那么这种类型的仪表盘就可以作为简单的扩展功能来使用。这并不是一个复杂的工程项目,而只是提供了一种简洁的方式来实时呈现市场动态而已。

Comments are closed.