自动化脚本通常用于验证流程是否完成,而非系统的实际运行状态。

一个Kubernetes容器可能正在运行,但其中的应用程序却无法与数据库建立连接;Terraform部署的结果可能显示一切正常,但实际上有人在云控制台中手动修改了基础设施配置;“金丝雀发布”方案可能会显示零错误,但用户在实际请求时仍需要等待数秒才能得到响应。

问题并不在于这些工具本身,而在于系统表面上看似正常,实际上却存在问题。

本手册通过五个生产环境中的自动化案例,介绍了如何使用Bash和Python来实现这些功能:

  • 在月度账单生成之前检测异常的AWS费用支出

  • 利用跟踪ID将多个服务中的日志关联起来

  • 发现那些未通过Terraform进行管理的基础设施变更

  • 在应用程序层面验证密钥轮换机制是否正常运行

  • 在用户投诉之前,自动回滚那些运行缓慢的部署任务

读完本手册后,你将能够编写一些简单的脚本,这些脚本能够帮助你在工具显示一切正常的情况下,及时发现系统中的问题。

这些脚本的设计初衷就是简洁实用。更重要的是它们所体现出的运营思维——比如脚本具体检测什么信号、能发现哪些故障类型,以及底层平台做了哪些假设。

每个案例都提供了可运行的演示环境、完整的脚本代码、对相关系统行为的详细解释,以及你可以自行触发的一些故意设置的故障场景。

如果你是初次接触这种工作流程,建议从第一个案例开始学习。后面的章节都是基于同样的原理进行讲解的:自动化工具的价值在于它们能验证现实情况,而不仅仅是确认流程是否完成。

先决条件

在开始之前,请确保满足以下要求:

  • Python 3.8或更高版本——可以通过`python3 –version`来检查

  • Python虚拟环境——在安装任何软件之前,先创建一个虚拟环境

python3 -m venv venv
source venv/bin/activate  

 # 在Windows系统中:

venv\Scripts\activate

这样做的目的是将你安装的软件包与系统自带的Python环境隔离开来,从而避免在共享机器上出现权限问题。

  • pip——Python的包管理工具,随Python一起安装

  • AWS CLI——需要配置一个有效的身份凭证才能使用。对于案例1、3和4来说,免费级别的AWS账户就足够了。可以通过以下命令验证它的功能是否正常:`aws sts get-caller-identity`

  • Docker和Docker Compose——案例2、4和5需要这些工具

  • Kind——一种在本地运行Kubernetes的工具。对于案例4和5来说,可以使用`brew install kind`在macOS上安装它,或者参考Kind快速入门指南

  • kubectl——用于与Kubernetes集群交互的命令行工具。安装完Kind后,运行`kind create cluster`,kubectl就会自动配置完成

  • Helm——Kubernetes的包管理工具,案例5需要使用它。可以在macOS上使用`brew install helm`进行安装,或者参考Helm安装指南

  • Terraform——案例3需要这个工具。在macOS上可以使用`brew install terraform`进行安装,或者在Ubuntu系统中使用`apt install terraform`。安装完成后,可以通过`terraform version`来检查其版本是否正确

  • bc

    ——一种计算器工具,用于“金丝雀监控”脚本中的浮点数比较操作。在macOS上可以使用`brew install bc`进行安装,在Ubuntu系统中则使用`apt install bc`。在开始案例5之前,请确保已经安装了这个工具,并通过`bc –version`来验证其是否可用

知识与技能

  • 你应该能够熟练地阅读Python和Bash脚本,而无需从头开始编写它们。

  • 你应该具备基本的Linux终端操作能力——能够导航目录、运行脚本、查看输出结果等。

  • 你应该对Kubernetes的Pod和Deployment有基本的了解。虽然不需要深入掌握Kubernetes的相关知识,但在后面的案例中会逐步介绍这些概念。

  • 熟悉AWS的基础知识,比如EC2、IAM和Secrets Manager,会对案例1、3和4的实施有帮助;而案例2完全在本地机器上运行,因此不需要任何AWS相关知识。

  • 对于案例3来说,了解Terraform以及状态文件的作用会很有帮助。虽然不需要自己编写Terraform代码,但理解Terraform如何跟踪和管理所创建的资源是完成这个案例的基础。

所需的AWS IAM权限

本文中的脚本会实际调用AWS的API。因此,你的IAM用户或角色需要具备以下最低权限。(如果遇到AccessDenied错误,首先检查这些权限是否满足。)

案例 所需IAM权限
1 – 成本异常检测 ce:GetCostAndUsage
3 – 变化趋势检测 ec2:DescribeSecurityGroups
4 – 秘密信息轮换 secretsmanager:GetSecretValue, secretsmanager:PutSecretValue

如果你使用的是带有AdministratorAccess权限的AWS免费账户,那么这些权限已经包含在内,可以直接跳过这一步。

如果你的IAM用户权限有限,可以按照以下步骤添加所需权限。在AWS控制台中进入IAM,点击“Users”,然后选择你的用户名。在“Permissions”选项卡下,点击“Add permissions”,再选择“Create inline policy”。

切换到“JSON”选项卡,粘贴上述表格中列出的权限配置文件,然后保存即可。

如果你的公司通过组织结构来管理AWS账户,而你没有权限编辑自己的IAM策略,请向管理员申请为你的角色添加这些权限。

配套的GitHub仓库

所有的演示项目都存储在以下地址:https://github.com/irvingtalks/devops-scripting-labs

每个案例都有一个对应的文件夹,其中包含完整的脚本文件、辅助文件、用于准备环境的setup.sh文件,以及用于引发特定故障的break_it.sh文件。

在开始之前,请先克隆该仓库:

git clone https://github.com/irvingtalks/devops-scripting-labs
cd devops-scripting-labs

在运行任何用例之前,请确保已经安装了所有所需的工具:

./preflight.sh

该脚本会检查实验室所需的所有工具,如Python、AWS CLI、Docker、Kind、Helm、Terraform以及bc,并会针对每一种工具提供具体的缺失项信息,同时还会给出相应的安装命令。

目录

用例1——成本异常检测

环境: AWS Cost Explorer API(仅限读取权限,所有账户均可使用)语言: Python

生产环境中出现的问题

一名初级工程师正在测试Kubernetes配置。他们在AWS中创建了一个托管节点组(这是一组EC2虚拟机,Kubernetes集群会利用这些机器来运行工作负载),并配置了集群自动扩展功能——这个组件会在集群需要更多计算资源时自动添加新的机器。测试进行得很顺利,但周五下午,他们忘记关闭这个测试环境。

整个周末,自动扩展功能都在持续为集群添加新节点,因为那些测试工作负载仍在运行并消耗资源。到了周一早上,这个节点组已经悄悄增长了两天半的时间,然而直到三周后收到账单时,才有人注意到这个问题。

设计这个用例的目的在于提醒大家:AWS的账单数据并不只是简单的月度数值,而是一个时间序列数据。你可以像监控应用程序指标一样来监控这些数据。每天检查一次,了解数据的基准值,这样就能在几小时内发现这类问题,而不会等到几周后才发现问题。

系统层面实际发生的情况

需要注意的一点:这并不是一个财务监控工具。它实际上是一个用于检测运营异常的工具,它关注的是成本变化情况。但真正被它检测到的,其实是那些意外的基础设施行为,比如资源被遗留下来继续运行、自动扩展功能触发了不必要的操作,或者有人忘记了关闭某个测试环境。

AWS Cost Explorer是一项存储用户账单数据并通过API提供这些数据的服务。当你使用这个服务时,你需要指定时间范围、数据粒度以及结果的分组方式,从而针对自己账户的账单记录进行查询分析。

在开始调查任何被标记为异常的费用之前,需要了解的一点是:决定将某项费用归入哪一类服务类别的其实是AWS,而不是你。例如,在不同地区进行的EBS快照复制操作,可能会被计入EC2费用项目中,而非数据传输费用;因此,EC2费用的突然增加并不一定意味着你的EC2实例出现了问题。该脚本能够正确地标记出这些异常情况,但要进一步调查的话,你应该问的是“在某一天我的基础设施发生了哪些变化”,而不是“目前EC2上运行着什么服务”。

账单分类标签只是一个起点,并不能作为诊断依据。

搭建演示环境

请进入配套仓库中的01-cost-anomaly/目录。对于这种使用场景来说,无需进行任何集群配置,因为该脚本是直接在你的AWS账户上运行的,它所依赖的唯一工具就是boto3:

cd 01-cost-anomaly
pip install boto3

在在实际账户上运行该脚本之前,请确保你的AWS登录凭据已经配置完成。该脚本会使用AWS CLI所设置的凭据进行操作。如果你还没有配置这些凭据,可以执行以下命令:

aws configure

系统会要求你输入AWS访问密钥ID、秘密访问密钥、默认区域(如果不确定的话,可以选择us-east-1),以及输出格式(选择json)。你可以通过AWS控制台的IAM → Users → 你的用户名 → Security credentials → Create access key来获取这些访问凭据。

如果你的账户是新建的,并且已经拥有了包含AdministratorAccess权限的账户,那么你的账户也需要具备ce:GetCostAndUsage权限。

如果你拥有一个已经有一定 billing 历史的AWS账户,可以直接使用真实数据来运行该脚本:

python detect_cost_anomaly.py

在在实际账户上运行该脚本之前,有兩点需要注意:首先,Cost Explorer提供的数据存在24小时的延迟,因此今天的费用数据要到明天才会显示出来,所以该脚本会自动排除最近这一天的数据,以避免生成不完整的结果。其次,该脚本使用的是未合并的费用数据,也就是你在单个账户环境下实际支付的费用金额;而对于那些在多个账户之间共享预留资源的组织来说,他们使用的则是合并后的费用数据,因此计算出来的结果会有所不同。

如果你拥有一个新账户,或者不愿意使用真实的账单数据,该脚本还提供了一个--sample选项,使用内置的模拟数据来运行脚本,此时完全不会调用任何AWS API。在阅读代码之前,可以先尝试运行这个选项,看看输出结果是什么样的:

python detect_cost_anomaly.py --sample

脚本本身

#!/usr/bin/env python3
# detect_cost_anomaly.py — 使用案例1:成本异常检测
# 文章中对每个函数都有详细说明。

import statistics
import sys
from datetime import datetime, timedelta

import boto3

def build_sample_data(days=30):
    """为过去`days`天生成虚构的成本数据(截止到昨天)。
    将EC2成本的异常波动设置在昨天,这样样本数据产生的结果总是与实时成本分析模式中的相同时间区间相匹配。"""
    last_day = datetime.today().date() - timedelta(days=1)
    first_day = last_day - timedelta(days=days - 1)
    anomaly_day_index = days - 1
    results = []
    for i in range(days):
        day = first_day + timedelta(days=i)
        d = i + 1
        results.append(
            {
                "TimePeriod": {
                    "Start": str(day),
                    "End": str(day + timedelta(days=1)),
                },
                "Groups": [
                    {
                        "Keys": ["Amazon EC2"],
                        "Metrics": {
                            "UnblendedCost": {
                                "Amount": str(
                                    round(
                                        18.50
                                        if i == anomaly_day_index
                                        else 1.10 + (d % 3) * 0.10,
                                        2,
                                    )
                                )
                            }
                        },
                    },
                    {
                        "Keys": ["Amazon S3"],
                        "Metrics": {
                            "UnblendedCost": {
                                "Amount": str(round(0.04 + (d % 5) * 0.01, 2))
                            }
                        },
                    },
                    {
                        "Keys": ["Amazon RDS"],
                        "Metrics": {
                            "UnblendedCost": {
                                "Amount": str(round(0.85 + (d % 4) * 0.05, 2))
                            }
                        },
                    },
                ],
            }
        )
    return results, str(last_day)


def get_daily_costs(days=30):
    ce = boto3.client("ce", region_name="us-east-1")
    end = datetime.today().date() - timedelta(days=1)
    start = end - timedelta(days=days)
    response = ce.get_cost_and_usage(
        TimePeriod={"Start": str(start), "End": str(end)},
        Granularity="DAILY",
        Metrics=["UnblendedCost"],
        GroupBy=[{"Type": "DIMENSION", "Key": "SERVICE"}],
    )
    return response["ResultsByTime"]


def build_service_timeseries(results):
    services = {}
    for day in results:
        date_str = day["TimePeriod"]["Start"]
        for group in day["Groups]:
            service = group["Keys"][0]
            cost = float(group["Metrics"]["UnblendedCost"]["Amount"])
            if service not in services:
                services[service] = []
            services[service].append({"date": date_str, "cost": cost})
    return services


def detect_anomalies(services, baseline_days=7, multiplier=2.0, recent_days=None):
    """标记那些成本超过过去`baseline_days`天平均值的2倍的标准差的日期。
    采用滚动基准计算方法(比较当前天数与前一周的数据)。如果设置了`recent_days`,则只返回今天及之后的异常情况。"""
    cutoff = None
    if recent_days is not None:
        cutoff = datetime.today().date() - timedelta(days=recent_days)

    anomalies = []
    for service, daily in services.items():
        if len(daily) < baseline_days + 1:
            continue
        for i in range(baseline_days, len(daily)):
            day = daily[i]
            day_date = datetime.strptime(day["date"], "%Y-%m-%d").date()
            if cutoff is not None and day_date < cutoff:
                continue
            baseline_costs = [d["cost"] for d in daily[i - baseline_days : i]]
            avg = statistics.mean(baseline Costs)
            if avg < 0.01:
                continue
            try:
                std = statistics.stdev(baseline_costs)
            except statistics.StatisticsError:
                continue
            threshold = avg + (multiplier * std)
            if day["cost"] > threshold:
                anomalies.append(
                    {
                        "service": service,
                        "date": day["date"],
                        "actual": round(day["cost"], 4),
                        "baseline_avg": round(avg, 4),
                        "threshold": round(threshold, 4),
                        "pct_above": round(((day["cost"] - avg) / avg) * 100, 1),
                    }
                )
    return sorted(anomalies, key=lambda x: x["date"])


def parse_args argv):
    use_sample = "--sample" in argv
    recent_days = None
    for arg in argv[1:]:
        if arg.startswith("--recent-days="):
            recent_days = int(arg.split("=", 1)[1])
    return use_sample, recent_days


def run/use_sample=False, recent_days=None):
    if use_sample:
        results, anomaly_date = build_sample_data()
        print("正在使用样本数据运行 (--sample模式)。")
        print(
            f"这些数据代表了截至昨天的30天账单信息,"
            f"其中在{anomaly_date}这一天出现了EC2成本异常。\n"
        )
    else:
        print("正在获取过去30天内各服务的每日成本数据...")
        print("注意:今天不会被包含在内——Cost Explorer的数据统计有24小时的延迟。")
        results = get_daily_costs(days=30)

    if recent_days is not None:
        since = datetime.today().date() - timedelta(days=recent_days)
        print(
            f"仅检查过去{recent_days}天内的异常情况,
            "即从{since}这一天开始的数据。\n")
        )

    services = build_service_timeseries(results)
    anomalies = detect_anomalies(services, recent_days=recent_days)

    if not anomalies:
        print("未检测到任何异常情况。」
        print("\n注意:此脚本是根据您设定的基准值来识别异常数据的。」
        print("只有成本突然增加的情况才会被标记为异常,持续性的高支出不会被触发。")
        return

    print(f"{'=' * 60}")
    print(f"检测到的异常情况共有{len(anomalies)}条.")
    print(f"{'=' * 60}\n")

    for a in anomalies:
        print(f"服务名称:      {a['service']}")
        print(f"日期:         {a['date']}")
        print(f"实际成本:  ${a['actual']}")
        print(f"基准平均值: ${a['baseline_avg']}(过去7天的平均成本)")
        print(f"阈值:    ${a['threshold']}")
        print(f"超标比例:      {a['pct_above']}%超过基准值")
        print()

    print "=" * 60)
    print("关于AWS成本归属的说明:")
    print("Cost Explorer中显示的服务名称是由AWS指定的,而不是由实际产生成本的资源决定的。
            例如,EC2成本的异常增加可能是由于EBS快照复制、跨区域数据传输,
            或者自动扩展操作导致的,而AWS会在账单中将这些费用归类为EC2费用,
            并不是指控制台中所看到的正在运行的EC2实例。")
    print()
    print("在直接调查被标记为异常的服务之前,请先思考以下问题:")
    print("在标记的日期或之前,我的基础设施中发生了什么变化?")
    "应该从实际发生的操作变化入手进行分析,而不是根据账单上的服务名称来推断。")


if __name__ == "__main__":
    use_sample, recent_days = parse_args(sys.argv)
    run/use_sample=use_sample, recent_days=recent_days)

脚本的工作原理

get_daily_costs功能会获取您过去30天的AWS账单数据。

build_service_timeseries函数会接收来自AWS的原始数据,并对它们进行重新整理。AWS最初是按日期对数据进行分类,然后再按服务进行分组;而这个函数则会反过来处理这些数据,使得每项服务都能得到属于自己的每日费用列表,而这正是后续检测步骤所需要的数据。

detect_anomalies才是真正执行检查的部分。对于每一项服务,该函数会将每天的花费与前7天的数据进行对比。如果某天的费用明显高于前一周的费用,脚本就会标记出这一异常情况。它所做的就是这些而已。

--recent-days=7这个选项表示“只显示过去7天内的异常情况”。虽然脚本仍然会获取30天的数据,因为计算差异时需要这些历史数据,但最终显示的结果只会包含您关心的那段时间内的信息。这对于在周一早上快速检查情况来说非常方便。

--sample选项表示在运行脚本时不会触碰您的AWS账户。它使用内置的虚拟账单数据,在昨天的日期中人为设置了一个费用峰值,这样检测功能就会始终触发。在将脚本连接到真实数据之前,可以先使用这个选项来看看输出结果是什么样的。

输出结果的呈现方式

当使用--sample选项运行脚本时(其中设置的费用峰值会显示为实际的昨天日期,而不是一个固定的数值):

正在使用样本数据运行脚本(--sample模式)。
检测的是截至昨天的30天账单数据,其中2026-05-14这一天的费用出现了异常增长。

============================================================
检测到的异常情况:1条
============================================================

服务:Amazon EC2
日期:2026-05-14
实际费用:18.5美元
基准平均费用:1.2143美元(过去7天的平均费用)
阈值:1.3939美元
超支比例:比基准费用高出1423.4%

需要注意的是,AWS费用归属的判定是由AWS来决定的,而不是由产生费用的资源来决定的。例如,EC2服务上的费用异常增长可能是由于EBS快照复制、跨区域数据传输或自动扩展操作等原因造成的,而这些操作在AWS的账单系统中都被归类为EC2服务的相关费用——而不是控制台中所显示的正在运行的EC2实例的费用。

在直接调查被标记为异常的服务之前,请先思考以下问题:在标记的日期之前或当天,我的基础设施中发生了什么变化?应该从这些运营上的变化入手进行排查,而不是仅仅根据账单标签来推断问题原因。

由于样本数据会动态生成今天的日期,因此您实际看到的数据结果可能会与上面示例中的略有不同。费用峰值总是会出现在昨天这一天,而周围的基准数据也会根据您运行脚本的具体日期而发生变化。

脚本无法为您做出的决策

虽然检测结果显示EC2服务存在异常情况,但费用的归属是由AWS来决定的,而不是由您来确定的。因此,在面对这种异常时,您需要根据AWS提供的具体信息来进行进一步的分析和处理。

在打开EC2控制台之前,请先查看该日期下的部署记录。当时到底部署了什么?是创建了一个新的环境吗?还是自动扩展机制触发了某个操作?应该从操作变更记录开始,顺着相关线索追踪到账单数据那里,因为如果从账单信息开始反向查找,不仅效率低下,还很容易产生误解。

故意制造问题以便进行分析

# 无需AWS账户即可立即查看异常情况
python detect_cost_anomaly.py --sample

# 使用你的真实账户运行该脚本
python detect_cost_anomaly.py

# 仅显示过去7天内的异常记录,适合快速检查当周的情况
python detect_cost_anomaly.py --recent-days=7

# 同时使用这两个参数——样本数据仅限于过去7天
python detect_cost_anomaly.py --sample --recent-days=7

如果你的真实账户显示“未检测到异常”,这并不意味着程序出现了故障。这说明你的消费行为一直很稳定。一个正常的账户自然会生成正常的检测结果,这个脚本正在正常执行它的功能而已。

当你的账户中真正发生了某些异常情况,比如自动扩展机制持续运行、某个环境被遗忘或者出现了意外的数据传输时,这个脚本就能在账单生成之前及时发现这些问题。

用例2——跨服务之间的日志关联分析

环境:完全本地环境——使用Docker Compose搭建,包含三个Python服务
语言:Python

生产环境中的问题

有用户报告称他们的支付操作失败了。你打开日志分析工具进行查询:认证服务记录显示认证过程成功,账本服务也记录了交易完成的细节,但本应发送支付确认邮件的通知服务却没有任何记录。

两个服务都显示操作成功,但第三个服务却没有任何反馈。支付仍然失败了,而你手头有三份日志,却无法确定问题出在哪个环节。

系统层面实际上发生了什么

需要明确的是:这份文档并不是关于如何安装日志聚合工具的指南,而是探讨了那些使得跨服务日志关联成为可能的数据结构,以及当这种结构在某个服务的错误处理流程中出现问题时会发生什么。

在一个只包含一个服务的系统中,调试过程非常简单:只有一个服务,也就只需要一个日志文件和一条时间线即可。但当用户请求需要经过多个服务进行处理时,就需要一种方法来将这些日志关联起来。这种连接机制就是“追踪ID”。

可以把追踪ID想象成政府办公室发放的编号单。当你去办理业务时,会得到一个编号,比如A247。负责处理你业务的每个部门都会在相应的文件中记录这个编号。如果出现问题,管理员就可以通过这个编号调出所有相关记录,从而清楚地了解整个处理流程中究竟发生了什么。这就是追踪ID——一个在所有参与处理该请求的服务中都被共享的编号。

在演示中,当有一笔付款到账时,认证服务会为这笔付款生成一个唯一的ID。认证服务、账本服务和通知服务在记录与这笔付款相关的所有日志时,都会使用这个相同的ID。如果系统中出现故障,就可以使用这个ID运行`correlate.py`脚本,该脚本会从这三个服务中查找所有相关的日志,并按照时间顺序对它们进行排序:

“`bash
python correlate.py pay-abc123
“`

这些日志的具体内容如下。请注意,每一条日志都包含相同的`trace_id`:

“`json
{
“timestamp”: “2026-05-01T14:23:01.234Z”,
“trace_id”: “pay-abc123”,
“service”: “auth”,
“event”: “userAuthenticated”,
“level”: “INFO”,
“user_id”: “u_789”,
“duration_ms”: 12
},
{
“timestamp”: “2026-05-01T14:23:01.891Z”,
“trace_id”: “pay-abc123”,
“service”: “ledger”,
“event”: “transaction_recorded”,
“level”: “INFO”,
“amount”: 50.0,
“currency”: “USD”
},
{
“timestamp”: “2026-05-01T14:23:02.103Z”,
“trace_id”: “pay-abc123”,
“service”: “notification”,
“event”: “email_queued”,
“level”: “INFO”,
“recipient”: “user@example.com”
}
“`

然而,有时会出现故障。比如通知服务在尝试连接电子邮件发送服务器时会发生超时错误。编写错误处理程序的开发者忘记在日志中添加`trace_id`,因此系统中会记录这样一条错误的日志:

“`text
2026-05-01T14:23:02.415Z ERROR Connection timeout to email provider smtp.example.com:587
“`

虽然这个错误确实发生了,也有相应的日志记录,但由于没有`trace_id`,`correlate.py`无法找到与这个错误相关的其他日志。

因此,在时间线上仍然会显示“email_send_attempt”这一操作,但“email_queued”这一操作却不会被显示出来。

下面是展示这些日志的时间线示例:

“`text
时间线 — 3个服务中发生的5个事件:
[2026-05-15T21:59:00.605307+00:00] [AUTH] [INFO] payment_request_received
[2026-05-15T21:59:00.606008+00:00] [AUTH] [INFO] userAuthenticated
[2026-05-15T21:59:00.617331+00:00] [LEDGER] [INFO] transaction_recorded
[2026-05-15T21:59:00.630313+00:00] [NOTIFICATION] [INFO] email_send_attempt
[2026-05-15T21:59:00.685182+00:00] [AUTH] [INFO] payment_complete
“`

虽然“email_sendattempt”这一操作被记录下来了,但“email_queued”这一操作却缺失了。开发者只是忘记在日志中添加一个字段而已。

![日志关联尝试的终端输出结果 — 错误:连接电子邮件发送服务器超时](https://cdn.hashnode.com/uploads/covers/698d563262d4ce66226a844a/22b7d7b0-8ae5-4573-bcb0-faaf5d807e8a.png)

设置演示环境

请导航到`02-log-correlation/`目录,然后启动这三个服务:

“`bash
cd 02-log-correlation
docker compose up -d
“`

这样就可以启动认证服务、账本服务和通知服务了。接下来,可以执行`./trigger_request.sh`命令来生成一些日志记录。trigger_request.sh终端输出结果——其中也显示了traceid” height=

该脚本会输出它所使用的trace ID。请复制这个ID,然后立即运行关联分析脚本,这样我们就能在出现问题之前查看完整的处理流程了:

python correlate.py pay-5831e1bf

你应该会看到类似以下的输出(你的trace ID可能会有所不同,但结构是相同的):

从./logs/目录中加载日志文件……
共加载了6条结构化日志记录。

============================================================
Trace ID: pay-5831e1bf
============================================================

时间线——涉及3个服务的6个事件:

  [2026-05-15T21:42:28.079046+00:00] [AUTH] [INFO] 收到支付请求
    service: auth
    user_id: u_789
    amount: 50.0
  [2026-05-15T21:42:28.080718+00:00] [AUTH] [INFO] 用户身份验证成功
    service: auth
    user_id: u_789
    duration_ms: 12
  [2026-05-15T21:42:28.145528+00:00] [LEDGER] [INFO] 交易记录已生成
    service: ledger
    user_id: u_789
    amount: 50.0
    currency: USD
  [2026-05-15T21:42:28.210088+00:00] [NOTIFICATION] [INFO] 已尝试发送电子邮件
    service: notification
    recipient: user@example.com
  [2026-05-15T21:42:28.347893+00:00] [NOTIFICATION] [INFO] 电子邮件已排队等待发送
    service: notification
    recipient: user@example.com
    amount: 50.0
  [2026-05-15T21:42:28.378402+00:00] [AUTH] [INFO] 支付已完成
    service: auth
    user_id: u_789
    amount: 50.0

终端输出结果,展示了完整的支付流程” height=

以上就是按照实际发生顺序展示的完整支付流程,其中涉及auth、ledger和notification这三个服务。现在让我们来了解一下这个脚本的具体工作原理。

脚本说明

# correlate.py
import json
import os
import sys

SERVICES = ["auth", "ledger", "notification"]
LOG_DIR = "./logs"

def load_logs(log_dir):
    """
    读取每个服务的日志文件,并将每条记录解析为JSON格式。
    如果某条记录无法被解析成JSON,系统会发出警告提示。
    这些警告信息并不会被忽略——因为在一个应该生成结构化日志的服务中,如果出现了纯文本的错误记录,那么这条记录本身就非常值得关注。
    """
    all_lines = []

    for service in SERVICES:
        log_file = os.path.join(log_dir, f"{service}.log")

        if not os.path.exists(log_file):
            print(f"  警告:未找到'{service}'的日志文件。")
            continue

        with open(log_file) as f:
            for line_num, line in enumerate(f, 1):
                line = line.strip()
                if not line:
                    continue
                try:
                    parsed = json.loads(line)
                    parsed["_source"] = service
                    all_lines.append(parsed)
                except json.JSONDecodeError:
                    # 这条记录存在于日志文件中,但无法被正确解析。
                    print(f"  警告:{service}.log文件中的第{line_num}行不是结构化JSON格式。")
                    print(f"           {line[:100]}")
                    print(f"           这条记录不会出现在任何基于trace的查询结果中。")

    return all_lines


def correlate(trace_id, all_lines):
    """
    查找所有包含该trace ID的日志记录,并按照时间戳对它们进行排序。
    排序后的结果就是该请求的完整处理流程。
    """
    matched = [line for line in all_lines if line.get("trace_id") == trace_id]
    matched.sort(key=lambda x: x.get("timestamp", ""))
    return matched


def find_missing_services(matched):
    """
    查看哪些服务没有为这个请求生成任何带有trace标签的日志记录。
    如果有服务没有生成日志记录,那肯定说明存在问题:
    要么是请求从未到达该服务,要么是某些错误流程导致trace ID被丢失了。
    这两种情况都值得进一步调查。
    """
    services_seen = {line["_source"] for line in matched}
    return [s for s in SERVICES if s not in services_seen]


def print_timeline(trace_id, matched, missing):
    print(f"\n{'=' * 60}")
    print(f"Trace ID: {trace_id}")
    print(f"{'=' * 60}")

    if not matched:
        print("\n没有找到与该trace ID相关的结构化日志记录。」)
        print("要么是trace ID输入有误,要么就是没有任何服务为这个请求生成了结构化日志记录。」)
        return

    services_count = len({line["_source"] for line in matched})
    print(f"\n时间线——共{services_count}个服务参与了{len(matched)}个事件的处理:\n")

    for line in matched:
        ts = line.get("timestamp", "unknown")
        service = line.get="_source", "unknown").upper()
        event = line.get("event", "未知事件")
        level = line.get("level", "INFO")
        extras = {k: v for k, v in line.items()
                  if k not in ("timestamp", "trace_id", "event", "level", "_source")}

        print(f"  [{ts}] [{service}] [{level}] {event}")
        for k, v in extras.items():
            print(f"    {k}: {v}")

    if missing:
        print(f"\n{'=' * 60}")
        print("缺少相关日志记录...")
        print(f"{'=' * 60}")
        print(f"以下服务没有为trace {trace_id}生成任何日志记录:\n")
        for s in missing:
            print(f"  - {s}")
        print()
        print("这可能有三种原因:")
        print("  1. 请求从未到达这些服务。」)
        print("  2. 这些服务虽然接收到了请求,但某些错误流程导致trace ID被丢失了,"
        print("     因此只留下了纯文本的日志记录,而这些记录无法被正确解析。)")
        print("  3. 这些服务的日志文件没有包含在本次分析范围内。」)
        print()
        print("请检查原始日志文件,看看是否在同一时间戳附近有类似纯文本错误记录的存在。)"
        print("如果找到了这样的记录,那么它就是导致问题的根本原因——也是需要及时修复的环节。)"

脚本的工作原理

load_logs 会从每个服务中读取日志文件。每行数据都应该是 JSON 格式。如果某行不是 JSON 格式,程序会输出警告信息,这通常意味着某个错误日志缺少追踪 ID,因此无法被追踪。

correlate 会查找所有与给定追踪 ID相匹配的日志,并按时间顺序对它们进行排序。这样就能重建出跨各个服务的完整请求流程。

find_missing_services 会检查哪些服务没有与该追踪 ID相关的日志。这能帮助你了解请求在何处中断,或者追踪 ID是在哪里丢失的。

print_timeline 会按顺序显示完整的请求时间线。如果某些服务的日志记录有误,该命令还会显示出这些服务。

在真实的 Kubernetes 环境中使用这个脚本时,有一点需要注意:在 Kubernetes 中,kubectl logs 只能显示当前正在运行的容器的日志。如果某个 Pod 重新启动了,你可以使用以下命令:

kubectl logs  --previous

但这个命令只能查看上一次重启后的日志。除非你使用了像 Loki 或 CloudWatch 这样的日志系统,否则之前的日志就会丢失。

脚本出错时的输出结果

这一节的目的是向你展示当某个服务发生无声故障时会发生什么——也就是说,虽然错误记录在日志中,但由于开发人员遗漏了某个字段,脚本却无法找到这个错误。

break_it.sh 会强制使通知服务在尝试发送电子邮件时出现故障。由于错误处理程序没有包含追踪 ID,因此这个故障会被以纯文本形式记录下来,从而无法将其与原始请求关联起来。

运行这个脚本:

./break_it.sh

然后触发一个新的请求:

./trigger_request.sh

复制脚本输出中显示的追踪 ID,然后使用 correlate 命令进行关联分析:

python correlate.py pay-xxxxxxxx

你将会看到如下结果:

正在从 ./logs/... 中读取日志文件…
  警告:notification.log 文件的第 10 行不是有效的 JSON 格式:
           2026-05-15T21:59:00.681583+00:00 错误:尝试向 http://mock-email:80/ 发送电子邮件时出现连接超时错误
           0.001 秒后尝试失败,无法将确认信息发送到 user@example.com
           这一行内容不会出现在任何基于追踪 ID 的搜索结果中。
   共读取了 29 行有效的日志数据。

============================================================
追踪 ID:pay-6cf69a8c
============================================================

时间线——涉及 3 个服务的 5 个事件:
  [2026-05-15T21:59:00.605307+00:00] [AUTH] [INFO] 收到付款请求
  [2026-05-15T21:59:00.606008+00:00] [AUTH] [INFO> 用户身份验证成功
  [2026-05-15T21:59:00.617331+00:00] [LEDGER] [INFO] 交易记录已生成
  [2026-05-15T21:59:00.630313+00:00] [NOTIFICATION] [INFO> 尝试发送电子邮件
  [2026-05-15T21:59:00.685182+00:00] [AUTH] [INFO] 付款操作已完成

请仔细看这个例子。通知信息出现在时间线上,系统中也记录了email_send_attempt这一操作,但email_queued这一状态却没有显示出来,这意味着邮件实际上从未被发送出去,而导致失败的原因也没有被记录在时间线上。这个错误信息隐藏在最顶端的警告提示中,因为脚本在运行过程中发现有一行代码无法解析,所以才产生了这条警告。

问题就出在这里:尝试发送邮件的操作能够被看到,但失败的原因却无法显示出来。

运行命令cat logs/notification.log,然后滚动到文件底部:

{"timestamp": "2026-05-15T21:59:00.630313+00:00", "trace_id": "pay-6cf69a8c",
 "service": "notification", "event": "email_send_attempt", ...}
2026-05-15T21:59:00.681583+00:00 ERROR 连接电子邮件提供商时超时
在0.001秒后尝试再次连接,但仍然失败,无法向user@example.com发送确认邮件

需要注意这两行日志:第一行的日志中包含了追踪ID,因此脚本能够将其显示在时间线上;而第二行的日志中没有追踪ID,所以脚本只是将其标记为警告并忽略了它。错误发生在尝试发送邮件的0.075秒之后,日志文件中记录了这两条信息,但时间线上只显示了一条。

这就是在生产环境中“失败信息被隐藏”的真实情况:支付操作本身已经完成,但确认邮件却从未被发送出去。错误信息确实存在于日志文件中,显示为“连接电子邮件提供商时超时”,但在上述的时间线展示中,系统只显示了“尝试发送邮件的操作”,然后直接跳到了“支付操作完成”,中间没有任何关于失败的信息,看起来一切都很顺利。

解决问题的方法在于修改文件02-log-correlation/services/notification/main.py中的错误处理代码。下面是原来的错误处理代码:

except httpx.TimeoutException:
    emitPLAIN(f"连接电子邮件提供商时超时 {EMAIL_PROVIDER_URL}")
    return {"status": "ok"}

修改后的代码如下:将req.trace_id作为参数传递给函数emit,而不是直接使用emitPLAIN

except httpx.TimeoutException:
    emit(req.trace_id, "email_timeout", level="ERROR",
         provider=EMAIL_PROVIDER_URL)
    return {"status": "ok"}

进行这样的修改后,超时错误就会像其他错误信息一样被显示在时间线上:

  [2026-05-15T21:59:00.681583+00:00] [NOTIFICATION] [ERROR] 连接电子邮件提供商时超时
    provider: http://mock-email:80/

只需一个命令,就能获取到所有相关的信息。

脚本无法为你做出的决定

关联分析脚本将这个故障原因归结为通知流程中的某个环节出现了问题。当你查看原始的notification.log文件时,会发现其中记录了超时错误信息,说明请求确实已经到达了目标服务,认证和交易记录过程也都很成功,但邮件发送却失败了。

通知失败是否意味着支付也发生了失败,完全取决于你的系统设计方式。如果通知功能属于非强制依赖项,那么这种错误本不应该以“支付失败”的形式呈现给用户,这说明你的系统设计中存在其他问题;而如果通知功能属于强制依赖项,那么交易本身就应该被回滚。脚本虽然能够找出问题出现的具体位置,但正确的处理方式仍然取决于系统的整体设计。

故意制造故障以进行测试

  1. 运行 ./break_it.sh——这会将通知服务切换到一种模式,在这种模式下,该服务的错误处理机制会忽略跟踪ID信息。

  2. 运行 ./trigger_request.sh 以生成新的支付请求,并获取一个新的跟踪ID。

  3. 运行 python correlate.py ——此时,通知记录将会从时间线中消失。

  4. 运行 cat logsnotification.log——你会看到超时错误信息,但由于缺少跟踪ID,脚本将无法识别这些错误。

用例3:基础设施漂移检测

环境: AWS免费 tier账户(使用一个安全组)+ Terraform
语言: Python

生产环境中出现的问题

Terraform计划文件中并未显示任何变化,但你的部署环境与昨天相比已经发生了改变。当你询问相关人员时,终于有人想起来了:上周有同事在AWS控制台中手动修改了一个安全组设置,目的是为某个测试环境解除限制。他们本打算之后通过Terraform来重新应用这些更改,但却忘记了这件事。

从那以后,Terraform的状态文件与实际的AWS基础设施就出现了不一致的情况。不过并没有出现任何明显的故障或警报提示,除非有人专门运行 terraform plan 命令来检查,而在这种情况下,显然没有人这么做。

这种现象被称为“基础设施漂移”,实际上它的发生频率比大多数团队愿意承认的要高得多。

系统层面真正发生了什么

需要说明的是:这与运行 terraform plan 命令是不同的。terraform plan 只能显示Terraform“会”做出哪些更改,而这个脚本则能够揭示在Terraform不知情的情况下,AWS系统实际上已经发生了哪些变化。

该脚本本身并不会执行任何Terraform命令,它只是读取Terraform已经生成的状态文件。在演示环境中,这些状态文件是由Terraform创建的;而在实际生产环境中,这些文件本来就存在于你的日常运维流程中。

可以把Terraform的状态文件想象成一张“收据”:当Terraform创建一个安全组时,它会详细记录下所创建的内容,包括规则、端口以及CIDR地址等信息,这份记录就是状态文件。

该脚本会将这个“收据”与AWS当前的实际配置进行对比。如果有人在AWS控制台中添加了某些未记录在状态文件中的规则,脚本就会将其视为“基础设施漂移”的现象。

存在的问题在于,如果有人在控制台中创建了一个全新的安全组,但从未使用过Terraform,那么就没有任何记录可供脚本参考。由于脚本无法对比它从未见过的信息,因此它会返回“无差异”的结果,而这个安全组就会在用户的账户中存在却未被发现。

演示中会展示这两种情况。首先会破坏一个已知资源;然后,在不使用Terraform的情况下创建一个新的安全组,尽管用户的账户里多了这个安全组,脚本仍然会返回“无差异”的结果。

设置演示环境

请进入配套仓库中的03-drift-detection/目录:

cd 03-drift-detection
pip install -r requirements.txt

运行setup脚本。这个脚本使用的是真实的Terraform,而不是模拟版本:

./setup.sh

这个脚本会执行terraform initterraform apply命令,从而在AWS中创建一个真正的安全组:

AWS控制面板截图,显示已创建的安全组

同时,它还会生成一个真实的terraform.tfstate文件。如果你想了解Terraform实际生成了什么内容,可以打开这个文件查看。它是JSON格式的,内容是可以阅读的,而且确实是真实生成的文件。

IDE文件夹结构截图,显示正在创建terraform.tfstate文件

设置完成后,运行以下脚本:

python detect_drift.py terraform.tfstate

你应该会看到类似这样的输出,不过你的安全组ID实际上会有所不同:

从文件{tfstate_path}中加载Terraform状态信息…

正在检查:sg-0a1b2c3d4e5f6a7b8

  结果:未检测到任何差异。

现在,演示环境已经准备就绪,系统中的各项数据也符合预期。接下来,让我们来看看这个脚本具体在做什么。

脚本代码文件(代码文件链接

# detect_drift.py
import boto3
import json
import sys


def load_tfstate(path):
    """
    Terraform的状态信息文件是纯JSON格式的——用任何文本编辑器打开它,你都会看到一个‘resources’数组,其中列出了Terraform所了解的所有资源信息。
    这个函数会读取该文件并返回解析后的内容。
    """
    with open(path) as f:
        return json.load(f)


def get_security_groups_from_state(tfstate):
    """
    遍历‘resources’数组,提取所有的安全组信息。
    每个安全组都有‘type’、‘name’以及‘instances’数组,其中包含了Terraform在上次运行时记录的属性值。
    我们会提取安全组的ID以及其入站规则信息。
    """
    resources = {}
    for resource in tfstate.get("resources", []):
        if resource["type"] == "aws_security_group":
            for instance in resource.get("instances", []):
                sg_id = instance["attributes"]["id"]
                resources[sg_id] = {
                    "ingress": instance["attributes"].get("ingress", [])
                }
    return resources


def get_security_group_from_aws(sg_id):
    """
    调用AWS EC2 API来获取这个安全组的当前状态信息。
    实际上,boto3会构建一个经过身份验证的HTTPS请求,使用你的AWS凭证进行签名,然后发送到配置好的区域的EC2 API接口,并解析返回的结果。
    返回的结果中包含很多数据,但我们只需要提取其中的入站规则信息。
    """
    ec2 = boto3.client("ec2")
    response = ec2.describe_security_groups(GroupIds=[sg_id])
    sg = response["SecurityGroups"][0]
    return {"ingress": sg.get("IpPermissions", [])}


def normalize_state_rules/rules):
    """
    Terraform以自己特定的格式存储入站规则信息。
    我们将这些规则转换为元组形式,以便于进行比较。
    每个元组的格式为:(from_port, to_port, protocol, cidr_block)
    ```
    normalized = set()
    for rule in rules:
        for cidr in rule.get("cidr_blocks", []):
            normalized.add((
                rule.get("from_port", 0),
                rule.get("to_port", 0),
                rule.get("protocol", "-1"),
                cidr
            "))
    return normalized


def normalize_aws_rules/rules):
    """
    AWS返回的入站规则信息与Terraform使用的格式不同。
    我们将它们转换为相同的元组格式,以便进行比较。
    ```
    normalized = set()
    for rule in rules:
        from_port = rule.get("FromPort", 0)
        to_port = rule.get("ToPort", 0)
        protocol = rule.get("IpProtocol", "-1")
        for ip_range in rule.get("IpRanges", []):
            normalized.add((from_port, to_port, protocol, ip_range["CidrIp"]))
    return normalized


def detect_drift(tfstate_path):
    print(f"正在从文件{tfstate_path}中加载Terraform状态信息...")
    tfstate = load_tfstate(tfstate_path)
    state_sgs = get_security_groups_from_state(tfstate)

    if not state_sgs:
        print("在状态文件中未找到任何安全组信息,因此无法进行比较。")
        return


    drift_found = False

    for sg_id, state_data in state_sgs.items():
        print(f"正在检查安全组{sg_id}的信息...")
        
        try:
            aws_data = get_security_group_from_aws(sg_id)
        except Exception as e:
            print(f"  错误:无法从AWS获取安全组{sg_id}的信息 - 错误原因:{e}")
            print("  请检查你的IAM权限,是否具有访问ec2:DescribeSecurityGroups的权限。")
            continue

        state_rules = normalize_state_rules(state_data["ingress"])
        awsRules = normalize_aws_rules(aws_data["ingress"])

        # AWS中存在但Terraform未记录的规则
        added_in_aws = aws_rules - state_rules
        # Terraform预期存在但实际上已从AWS中删除的规则
        removed_from_aws = state_rules - aws_rules

        if added_in_aws:
            drift_found = True
            print("  存在差异:AWS中有这些规则,但在状态文件中却不存在...")
            for rule in added_in_aws:
                print(f"    端口:{rule[0]}-{rule[1]} | 协议:{rule[2]} | IP范围:{rule[3]}")
        }

        if removed_from_aws:
            drift_found = True
            print("  存在差异:状态文件中这些规则存在,但实际上已从AWS中删除...")
            for rule in removed_from_aws:
                print(f"    端口:{rule[0]}-{rule[1]} | 协议:{rule[2]} | IP范围:{rule[3]}")
        }

        if not added_in_aws and not removed_from_aws:
            print("  结果:未检测到任何差异。")

    print("\n" + "=" * 60)
    if drift_found:
        print("检测到差异,请查看上面的详细信息。")
    else:
        print("在监控的资源中未检测到任何差异。")

    print("\n重要提示:此脚本仅检查状态文件中记录的资源。")
    print("在AWS中手动创建的资源不会被此脚本检测到。")
    print("如果输出结果显示“无差异”,并不意味着你的AWS账户完全正常——这只是说明你正在监控的资源与Terraform上次记录的信息是一致的。")


if __name__ == "__main__":
    tfstate_path = sys.argv[1] if len(sys.argv) > 1 else "terraform.tfstate"
    detect_drift(tfstate_path)

脚本的工作原理

load_tfstate 会打开文件 terraform.tfstate 并读取其内容。在设置完成后运行 cat terraform.tfstate,你会发现它只是一个文本文件,Terraform 所了解的关于你的基础设施的所有信息都存储在其中。

get_security_groups_from_state 会从该文件中提取所有的安全组、AWS 分配给它们的 ID,以及 Terraform 最后记录的入站规则。这些就是预期的数据值。

get_security_group_from_aws 会调用 AWS API 来获取相同安全组的当前入站规则。这些才是实际的数据值。现在,脚本拥有了同一信息的两种版本。

normalize_state_rulesnormalize_aws_rules 的存在是因为 Terraform 和 AWS 存储规则的格式略有不同。这两个函数会将这两种格式转换成相同的格式,以便进行比较。

比较是最后一步。那些在 AWS 中存在但在状态文件中不存在的规则是手动添加的;而那些在状态文件中存在但在 AWS 中不存在的规则则是手动删除的。脚本会显示这两种情况。

输出结果的表现形式

在没有出现差异的情况下正常运行时,输出结果如下:

正在从文件 terraform.tfstate 中加载 Terraform 状态信息

检查安全组 sg-0a1b2c3d4e5f6a7b8…

  结果正常——未检测到任何差异。

============================================================
在监控的资源中未检测到任何差异。
重要提示:此脚本仅检查状态文件中记录的资源。那些通过 Terraform 以外的方式在 AWS 中创建的资源不会被此脚本检测到。因此,这里显示“结果正常”并不意味着你的 AWS 账户完全没有问题,而是意味着你正在关注的资源与 Terraform 最后记录的信息是一致的。

当出现差异时,输出结果如下:

AWS 控制面板中显示安全组入站规则变更的截图

正在从文件 terraform.tfstate 中加载 Terraform 状态信息

检查安全组 sg-0a1b2c3d4e5f6a7b8…

  存在差异——AWS 中有但这些规则在状态文件中缺失:
    端口 22-22 | 协议:tcp | IP 地址范围:0.0.0.0/0

============================================================
检测到差异。详情请参见上方内容。

在出现差异后终端输出的截图,其中显示“检测到差异”

脚本无法为你做出的决策

当脚本检测到差异,也就是发现某些 Terraform 并不知道的入站规则时,它的第一反应可能是立即运行 terraform apply 来恢复这些规则。但在这样做之前,你需要先问自己一个问题:这种变更是否属于紧急修复措施?也许有人在准备正式修复方案的过程中,为了尽快恢复某个出现故障的服务,而手动打开了某个端口。如果你自动恢复了这些规则,那么你可能会撤销那些为维持服务正常运行而特意设置的配置。

“差异检测”能告诉你某些地方发生了变化,但它无法告诉你哪个版本是正确的;确认哪个版本正确才是脚本执行后的下一步工作。

故意制造差异

  1. 运行 ./break_it.sh。该命令会通过 AWS CLI 直接添加一条 SSH 入站规则(端口 22),从而模拟手动修改配置文件的操作。

  2. 运行 python detect_drift.py terraform.tfstate,差异信息会显示在输出结果中。

  3. 运行 ./break_it.sh --invisible,创建一个完全不存在于配置文件中的新安全组,然后再运行脚本。即使你的账户中确实存在这个新资源,脚本也会显示“无差异”,从而暴露出配置覆盖上的漏洞。

  4. 运行 ./teardown.sh。该命令会执行 terraform destroy,从而删除安全组并清理所有 AWS 资源。完成这些操作后,就不会产生任何费用了。

用例 4:实现零停机时间的密钥轮换

环境: AWS Secrets Manager + 本地的 Kind 集群
语言: Python

生产环境中出现的问题

这个用例的目标是: Kubernetes 显示某个 Pod 是正常的,但用户却遇到了数据库连接错误。该脚本通过执行 Kubernetes 从未会进行的额外检查,及时发现了这一问题。

当你轮换数据库凭据后,Pod 会重新启动。kubectl get pods 命令显示 Pod 处于“运行中”状态,但十分钟后用户就无法登录了。

密钥轮换操作本身是有效的,但问题在于 Kubernetes 只检查了 HTTP 服务器是否正常运行,并没有验证它是否能够与数据库建立连接——这两者其实是不同的概念。

实际发生的情况

需要注意的一点是:这个例子并不是关于如何在 Kubernetes 中存储密钥,而是关注在密钥轮换之后会发生什么。

当一个 Pod 已经在运行时,它会保持一些已经过身份验证的数据库连接。这些连接在密码更改后仍然有效,因为它们是在密码变更之前就已经完成身份验证的,而数据库也不会主动关闭这些连接。然而,当 Pod 需要建立新的连接时,它使用的是仍然包含旧密码的凭据,因此新的连接会立即失败。

与此同时,Kubernetes 会看到该 Pod 正在响应 HTTP 请求,并将其状态标记为“运行中”,因此用户会遇到连接失败的问题,但集群本身并不会显示任何异常信息。

/healthz/db 端点的作用

/healthz 端点如果检测到 HTTP 服务器正常运行,就会返回 200 状态码。Kubernetes 就只检查这一点而已。

/healthz/db 端点会使用当前的凭据建立一个新的数据库连接,然后执行 SELECT 1 这条查询语句。如果在密钥轮换之后这条查询失败,那就说明 Pod 虽然处于“运行中”状态,但实际上无法处理数据库请求。轮换脚本会将调用这个端点作为最后的检查步骤——而这是 Kubernetes 自身从未会进行的操作。

以下是在FastAPI演示应用中这些代码的实际运行效果(代码文件):

# app.py(相关代码段)
import os
import asyncpg
from fastapi import FastAPI, HTTPException

app = FastAPI()

DB_HOST = os.environ.get("DB_HOST", "postgres")
DB_PORT = int(os.environ.get("DB_PORT", "5432"))
DB_NAME = os.environ.get("DB_NAME", "appdb")
DB_USERNAME = os.environ.get("DB_USERNAME", "appuser")
DB_PASSWORD = os.environ.get("DB_PASSWORD", "")

@app.get("/healthz")
async def healthz():
    # 如果HTTP服务器正常运行,就会返回200状态码。
    # 这部分代码用于检测Kubernetes集群是否已准备好接受请求。
    return {"status": "ok"}

@app.get("/healthz/db")
async def healthz_db():
    # 使用当前环境配置信息建立新的数据库连接。
    # 如果密码已经更改,但相关Pod尚未重新启动,那么环境配置中仍然会使用旧密码,从而导致连接失败。
    # 在这种情况下,/healthz接口仍然会返回200状态码,但用户会看到错误信息。
    try:
        conn = await asyncpg.connect(
            host=DB_HOST, port(DB_PORT,
            database,DB_NAME, user=DB_USERNAME, password=DB_PASSWORD,
        )
        await conn.execute("SELECT 1")
        await conn.close()
        return {"status": "ok", "db": "authenticated"}

    except asyncpg.InvalidPasswordError:
        raise HTTPException(
            status_code=503,
            detail=(
                f"用户名'{DB_USERNAME}'的认证失败。"
                "可能是密码已经更改,但系统尚未更新。"
                "Kubernetes的健康检查机制并未检测到这一变化。"
            )
        )

    except Exception as e:
        raise HTTPException(status_code=503, detail=f"数据库错误:{str(e)}")

这两个接口之间的区别恰恰体现了这个使用案例的核心意义。

配置演示环境

导航到04-secrets-rotation/目录,然后运行设置脚本:

cd 04-secrets-rotation
./setup.sh

这个脚本会启动一个Kind集群,部署已经创建了appuser账户的PostgreSQL数据库,同时部署与之连接的FastAPI演示应用,并在AWS Secrets Manager中生成初始的加密密钥。

设置完成后,需要安装相应的依赖库:

pip install boto3 kubernetes

在开始执行密码更新操作之前,请确认所有服务都已正常运行:

kubectl get pods

你应该会看到myapppostgres这两个Pod都处于“Running”状态。如果有任何Pod显示为“Pending”或“Error”,请等待30秒后再检查。PostgreSQL数据库需要一些时间来完成初始化过程。

你还可以在AWS控制台确认加密密钥是否已经生成。进入AWS Secrets Manager,查找myapp/db-credentials这个密钥:

显示AWS中生成的加密密钥的截图

如果您更喜欢使用命令行界面,请执行以下操作:

aws secretsmanager get-secret-value --secret-id myapp/db-credentials

当两个Pod都运行起来且秘密信息已经存在时,运行以下命令即可完成密码更新流程并查看完整的路径:

python rotate_secret.py

如果在第一次尝试时步骤6显示“FAILED”,这通常是由于时间安排不当导致的:应用程序Pod成功重启了,但新的Pod在完成首次数据库连接之前,/healthz/db接口就已经被调用了。请等待20秒后再运行python rotate_secret.py。如果问题仍然存在,请执行kubectl logs deployment/myapp来查看应用程序报出的错误信息。

理想情况下,所有六个步骤都应该能够顺利完成,最终会显示如下结果:

密码更新完成。应用程序已验证新凭证的有效性。
  AWS Secrets Manager:凭证已更新
  PostgreSQL:凭证已更新(执行了ALTER USER命令)
  Kubernetes Secret:凭证已更新
  应用程序Pod:已重启并完成了身份验证

现在,整个密码更新流程已经能够正常运行了。接下来,让我们来了解一下这个脚本的具体工作原理。

脚本代码文件(代码文件链接

# rotate_secret.py
import boto3
import base64
import json
import subprocess
import sys
from kubernetes import client, config


def get_current_secret(secret_name):
    """
    从AWS Secrets Manager中获取当前的凭证信息。
    这些凭证信息以JSON字符串的形式存储,其中包含‘username’和‘password’字段。
    """
    sm = boto3.client("secretsmanager")
    response = sm.get_secret_value(SecretId=secret_name)
    return json.loads(response["SecretString"])


def rotate_in_aws(secret_name, username, new_password):
    """
    将新的凭证信息写入AWS Secrets Manager。
    使用put_secret_value命令可以创建一个新的凭证版本,之前的版本不会被立即删除,
    因此你仍然有短暂的时间可以回滚到之前的配置。
    ```
    sm = boto3.client("secretsmanager")
    new_value = json.dumps({"username": username, "password": new_password})
    sm.put_secret_value(SecretId=secret_name, SecretString=new_value)
    print("  [AWS] AWS Secrets Manager中的凭证信息已更新。")


def update_kubernetes_secret(namespace, k8s_secret_name, username, new_password):
    """
    使用新的凭证信息更新Kubernetes Secret对象。
    Kubernetes要求秘密数据必须经过base64编码,这里的操作只是进行编码,并非加密;
    任何拥有该Secret对象访问权限的人都可以解码这些数据。
    如果需要真正的加密功能,还需要单独配置etcd的加密设置。
    """
    config.load_kube_config()
    v1 = client.CoreV1Api()

    secret_data = {
        "username": base64.b64encode(username.encode()).decode(),
        "password": base64.b64encode(new_password.encode()).decode()
    }

    v1.patch_namespaced_secret(
        name=k8s_secret_name,
        namespace=namespace,
        body={"data": secret_data}
    )
    print(f"  [K8s] Kubernetes Secret '{k8s_secret_name}'已更新。")


def rolling_restart(namespace, deployment_name):
    """
    触发应用程序的滚动重启流程。
    在滚动重启过程中,Kubernetes会创建一个新的Pod,等待它通过就绪检查,
    然后终止一个旧的Pod,如此反复进行,直到所有Pod都被替换完毕。
    这种方式能够保证服务的可用性不受影响,与一次性删除所有Pod的方式截然不同。
    """
    result = subprocess.run(
        ["kubectl", "rollout", "restart",
         f"deployment/{deployment_name}", "-n", namespace],
        capture_output=True, text=True
    )
    if result.returncode != 0:
        raise RuntimeError(f"滚动重启失败:{result.stderr}")
    print(f"  [K8s] 应用程序{deployment_name}的滚动重启已触发。")


def wait_for_rollout(namespace, deployment_name, timeout=120):
    """
    等待滚动重启完成,或者直到超时为止。
    “完成”意味着所有新的Pod都已经运行起来,并且通过了就绪检查。
    但这并不意味着应用程序已经能够使用新的凭证信息进行身份验证,
    这需要通过后面的verify_credential函数来验证。
    ```
    print(f"  [K8s] 正在等待滚动重启完成(超时时间为{timeout}秒)...")
    result = subprocess.run(
        ["kubectl", "rollout", "status",
         f"deployment/{deployment_name}",
         "-n", namespace,
         f"--timeout={timeout}s"],
        capture_output=True, text=True
    )
    if result.returncode != 0:
        raise RuntimeError(f"滚动重启未完成:{result.stderr}")
    print("  [K8s] 滚动重启已完成。所有Pod都显示为“Ready”状态。")


def verify_credential(namespace, deployment_name):
    """
    这个步骤用于验证应用程序是否能够使用新的凭证信息进行身份验证。
    我们会进入正在运行的Pod,然后执行/healthz/db命令,
    这个命令会向数据库发送一次经过身份验证的请求。
    如果这个操作能够成功完成,说明新的凭证信息在应用程序层面是有效的;
    如果在就绪检查通过之后这个步骤仍然失败,那就说明存在配置不匹配的问题。
    注意:这个步骤是在Pod运行状态下执行的。
    """
    print("  [验证] 正在执行密码更新后的验证操作...")
    result = subprocess.run(
        ["kubectl", "get", "pods", "-n", namespace,
         "-l", f"app={deployment_name}",
         "-o", "jsonpath={.items[0].metadata.name}"],
        capture_output=True, text=True
    )
    pod_name = result.stdout.strip()

    if not pod_name:
        print("  [验证] 错误:没有找到正在运行的Pod。")
        return False

    verify = subprocess.run(
        ["kubectl", "exec", pod_name, "-n", namespace,
         "--", "curl", "-sf", "http://localhost:8000/healthz/db"],
        capture_output=True, text=True
    )

    if verify.returncode != 0:
        print("  [验证] 失败:Pod正在运行,但数据库身份验证失败。")
        print("           尽管Pod已经启动,但数据库身份验证仍然无法通过。」)
        print("           这说明应用程序和数据库之间的接口存在不匹配的问题。」)
        print("           只有其中一个接口的配置是正确的。」)
        return False

    print("  [验证] 成功:应用程序已经能够使用新的凭证信息进行身份验证。)"
    return True


def rotate(secret_name, new_password, namespace, k8s_secret_name, deployment_name):
    print("\n[步骤1/6] 正在从AWS Secrets Manager中获取当前的凭证信息...")
    current = get_current_secret(secret_name)
    username = current["username"]

    print("[步骤2/6] 更新AWS Secrets Manager中的凭证信息...")
    rotate_in_aws(secret_name, username, new_password)

    print("[步骤3/6] 在数据库层面更新密码(执行ALTER USER命令)...")
    rotate_postgres_password(namespace, new_password)

    print("[步骤4/6] 更新Kubernetes Secret对象...")
    update_kubernetes_secret(namespace, k8s_secret_name, username, new_password)

    print("[步骤5/6] 触发应用程序的滚动重启...")
    rolling_restart(namespace, deployment_name)
    wait_for_rollout(namespace, deployment_name)

    print("[步骤6/6] 验证新的凭证信息在应用程序层面是否有效...")
    success = verify_credential(namespace, deployment_name)

    print("\n" + "=" * 60)
    if success:
        print("密码更新完成。应用程序已验证新凭证的有效性。」)
    else:
        print("密码更新失败。就绪检查通过了,但凭证验证未通过。)")
        print("建议采取的措施:强制重启所有Pod以清除连接池中的旧数据,"
        print("或者检查数据库会话的超时配置。)")
        sys.exit(1)


if __name__ == "__main__":
    import secrets as _secrets
    rotate(
        secret_name="myapp/db-credentials",
        new_password=_secrets.token_urlsafe(16),
        namespace="default",
        k8s_secret_name="db-credentials",
        deployment_name="myapp"
    )

脚本的工作原理

get_current_secret 从 AWS Secrets Manager 中读取当前的凭据信息,这样脚本在生成新密码之前就能知道用户名是什么。

rotate_in_aws 将新的凭据信息写入 Secrets Manager。它会创建一个新的版本而不是覆盖旧版本,因此如果出现问题,你还有短暂的时间可以回滚到之前的配置。

_pg_password_literalrotate_postgres_password 负责执行大多数密码更新脚本都会跳过的那一步,也就是实际修改 PostgreSQL 数据库中的密码。这是通过在运行的 PostgreSQL 容器上直接执行 ALTER USER appuser PASSWORD '...' 命令来完成的。在这一步之前,数据库仍然可以接受旧密码;而在这一步之后,它就无法接受旧密码了。

update_kubernetes_secret 将新密码写入 Kubernetes Secret 对象中,这样以后启动的任何新容器都会使用新的凭据信息。

rolling_restartwait_for_rollout 会依次重启应用程序的各个容器,从而确保整个部署系统能够持续正常运行。当这些步骤完成之后,所有的容器都将会处于“运行中”状态,并且它们的健康检查也会通过——不过需要注意的是,“运行中”仅仅意味着 /healthz 端口返回了 200 状态码而已,而这正是这个用例所要关注的重点。

verify_credential 是 Kubernetes 从不执行的那一步。它会进入新的容器内部,然后调用 /healthz/db 命令,以此使用容器当前环境中的凭据信息建立真实的数据库连接。如果这一操作成功,那么密码更新过程就真正完成了;但如果在健康检查通过之后这个步骤仍然失败,那就说明存在问题:虽然该容器看起来是正常的,但实际上它无法处理数据库请求。

输出结果示例

密码更新成功时,输出结果如下:

[步骤 1/6] 从 AWS Secrets Manager 中读取当前凭据信息…
[步骤 2/6> 更新 AWS Secrets Manager 中的凭据信息…
  [AWS] Secrets Manager 已更新。
[步骤 3/6> 在数据库层面修改密码(执行 ALTER USER 命令)…
  [DB] 正在 PostgreSQL 上执行 ALTER USER 命令…
  [DB] 密码已成功修改。
        新的连接请求现在需要使用新密码。
        现有的连接在关闭之前仍然有效。
[步骤 4/6> 更新 Kubernetes Secret 对象…
  [K8s] Kubernetes Secret ‘db-credentials’ 已更新。
[步骤 5/6> 触发容器的重启过程…
  [K8s> ‘myapp’ 的容器重启已开始。
  [K8s] 正在等待重启完成(超时时间:120 秒)…
  [K8s] 重启已完成。所有容器都处于“运行中”状态。
[步骤 6/6> 在应用程序层面验证新凭据是否有效…
  [验证] 正在执行密码更新后的验证操作…
  [验证] 成功——应用程序确认可以使用新凭据进行身份验证。

============================================================
密码更新完成。在应用程序层面验证成功。
  AWS Secrets Manager:已更新
  PostgreSQL:已更新(执行了 ALTER USER 命令)
  Kubernetes Secret:已更新
  应用程序容器:已重启,且身份验证通过

实验室系统运行正常,整个轮换流程也得以顺利执行。

在破坏任何系统之前,请先确认该Pod是否处于正常状态:

kubectl get pods

你应该会看到myapp这个Pod处于“运行中”状态。这说明一切正常,现在我们可以开始进行测试了。

终端截图,显示'kubectl get pods'的执行结果

故意制造故障

步骤1:使数据库与系统状态不同步

./break_it.sh

这个脚本会使用错误的密码直接在PostgreSQL数据库上执行ALTER USER命令。由于K8s的Secret配置中仍然保存着旧密码,因此Pod的环境设置与数据库的状态就出现了不一致。

步骤2:检查Kubernetes的实际检测结果

kubectl exec deployment/myapp -- curl -s http://localhost:8000/healthz

你会看到{"status":"ok"}这样的结果。在通过kubectl get pods查看时,该Pod仍然显示为“准备就绪”状态。这说明Kubernetes并没有检测到任何异常——而这种差异正是我们在终端中看到的。

步骤3:了解用户的实际使用体验

kubectl exec deployment/myapp -- curl -s http://localhost:8000/healthz/db

你会收到503错误提示,这说明新的数据库连接出现了问题,用户们已经开始遇到使用障碍了。

步骤4:观察混合出现的故障现象(可选)

./load_test.sh

有些请求能够成功执行,是因为它们使用了在系统出现故障之前就已经建立的旧连接;而有些请求则会失败,因为它们需要使用新的连接。虽然该Pod看起来运行正常,但实际上有一半的请求都会失败。

步骤5:运行轮换脚本

python rotate_secret.py

这一次,步骤6会捕获到这些故障现象。你会看到如下输出:

[Step 5/6] 正在触发滚动重启操作…
  [K8s] 轮换任务已完成,所有Pod都显示为“准备就绪”状态。
[Step 6/6> 正在验证新凭证在应用程序层面的使用效果…
  [验证] 执行完轮换后的凭证验证操作…
  [验证] 失败——Pod虽然处于运行状态,但数据库认证失败。
           可达性检测表明HTTP连接是正常的,
           但是应用程序无法使用新的凭证进行认证。
           这实际上反映了两种不同的规则或协议,其中只有一种被自动进行了检查。

============================================================
轮换操作未完成。可达性检测通过,但凭证验证失败。

该Pod正在运行中,在执行`kubectl get pods`命令时会显示“Ready”状态。然而旋转脚本提示凭证存在问题,这就是你在终端中看到的错误信息,这种问题在用户遇到之前就被检测出来了。

经验教训: `/healthz`这个路径可以用来判断HTTP服务器是否正常运行;`/healthz/db`则用于确认应用程序是否能够成功连接到数据库。除非你额外添加了针对数据库的检测脚本,否则Kubernetes只会检查前者。旋转脚本会在每次系统更新时自动添加对数据库的检测功能,这样就能在用户发现问题之前及时发现并处理错误。

脚本无法为你做出的决策

如果验证失败,但Pod仍在运行中,并且向数据库发送请求时也会出现错误,那么你有两个选择:

  1. 强制重启所有Pod,以清空连接池(这样虽然速度更快,但会导致短暂的时间内系统处理能力下降);

  2. 或者

  3. 等待旧的会话自动过期(这样可以避免系统停机,但请求仍然会间歇性地出现错误,直到连接池重新达到正常状态)。

脚本确实发现了问题,但接下来该采取什么措施,这需要由熟悉该系统的工程师来决定。

系统拆除与清理

./teardown.sh

用例5——自动化的金丝雀回滚机制

环境:完全本地环境,使用Helm部署Kind和Prometheus监控工具
语言:Bash脚本

这个用例的作用及其重要性

这个用例会运行一个脚本,该脚本会实时监控新的部署环境,一旦发现任何问题,就会立即触发回滚操作,从而避免用户大量投诉或系统出现严重故障。

在生产环境中,这一点尤为重要。因为当你发布新版本时,并不会立即将所有流量都转向新版本。通常只会将20%的流量发送到新版本,而80%的流量仍然继续使用旧版本。如果新版本出现了问题,也只有20%的用户会受到影响,因此你可以及时进行回滚操作,将损失降到最低。不过,这种回滚机制的有效性取决于你是否正确设置了监控指标。

关键要点:有两个脚本在同时监控同一个系统指标。其中一个脚本只会报告没有错误发生,而另一个脚本则会在检测到问题时立即触发回滚操作。它们的区别就在于所监测的指标不同,因此自动化系统的效果也会随之受到影响。你的自动化系统能发挥多大的作用,完全取决于你选择了哪些监控指标。

需要注意的事项:`canary_watch_v1.sh`脚本只关注错误信息,在金丝雀服务运行缓慢时也不会发出任何警告;而`canary_watch_v2.sh`脚本则会同时监测错误信息和响应时间,一旦发现异常就会立即触发回滚操作。这两者之间的区别正是我们需要了解的关键所在。

需要明确的一点:这并不是一份关于如何使用金丝雀部署机制的指南,而是旨在说明当监控系统只关注一个指标时,会遗漏哪些重要信息。

工作原理

在集群中会有三个组件在运行:稳定的应用程序(由三个Pod组成,负责处理大部分流量)、金丝雀应用程序(由一个Pod组成,负责处理少量流量),以及Prometheus监控工具(每隔15秒就会收集这两个组件的响应时间和错误统计信息)。

该监控脚本会每隔15秒向Prometheus询问:“Canary实例是否运行正常?”如果连续三次得到的回答都是“不正常”,系统就会自动恢复Canary实例的正常状态。

问题在于,“运行正常”到底意味着什么?这才是整个测试场景的核心所在。

终端截图,显示'kubectl get pods'命令的输出结果

设置演示环境

导航到05-canary-rollback/目录,然后运行以下命令:

cd 05-canary-rollback
./setup.sh

设置过程需要几分钟时间。系统会安装Prometheus,部署演示应用的两个版本,并启动一个负载生成器Pod,使其持续向这两个应用发送请求,这样Prometheus就能始终有数据可供分析。

设置完成后,请确认所有服务都已正常运行:

kubectl get pods

你应该会看到如下输出结果:

NAME                                                   READY   STATUS    RESTARTS   AGE
load-generator-68c59698b7-kws2l                        1/1     Running   0          4m54s
myapp-canary-6d6979c66f-g9lgw                          1/1     Running   0          32s
myapp-stable-6bcf994fc4-b4k9l                          1/1     Running   0          4m55s
myapp-stable-6bcf994fc4-ndhxc                          1/1     Running   0          4m55s
myapp-stable-6bcf994fc4-z97kx                          1/1     Running   0          4m55s
prometheus-kube-prometheus-operator-59b847d96c-mp72s   1/1     Running   0          5m58s
prometheus-prometheus-kube-prometheus-prometheus-0     2/2     Running   0          5m1s

这里有3个稳定运行的Pod,1个用于测试异常情况的Canary Pod,以及1个负载生成器Pod,Prometheus也在正常运行。整个实验环境已经准备就绪。

在继续进行其他操作之前,请等待60秒。Prometheus需要时间从这些Pod中收集初始数据。如果跳过这个步骤,监控脚本就会返回空数据,且不会给出任何解释。

三个终端窗口

你需要同时打开三个独立的命令提示符窗口。

在macOS上: 打开Terminal,然后按两次Cmd+T键。此时会看到三个标签页,每个标签页都代表一个独立的终端窗口。
在Linux上: 在大多数终端应用程序中按Ctrl+Shift+T键,或者右键点击菜单选择“打开新标签页”。

将这三个窗口分别命名为“Terminal 1”(用于运行监控脚本)、“Terminal 2”(用于引发故障)和“Terminal 3”(用于观察延迟情况)。

脚本代码

版本1:仅监测错误情况(代码链接在此

#!/usr/bin/env bash
# canary_watch_v1.sh

PROMETHEUS="http://localhost:9090"
DEPLOYMENT="myapp-canary"
NAMESPACE="default"
ERROR_THRESHOLD="0.05"
CHECK_INTERVAL=15
STRIKE_LIMIT=3

strikes=0

echo "Canary监控程序正在运行(v1版本——仅检测错误率)。"
echo "如果连续\({STRIKE_LIMIT}次检测的结果中错误率超过\({ERROR_THRESHOLD}\),系统将触发回滚操作。"
echo ""

while true; do
    ts=$(date '+%Y-%m-%dT%H:%M:%S')

    error_query='sum(rate(http_requests_total{app="myapp-canary",status=~"5.."}[1m])) / sum(rate(httprequests_total{app="myapp-canary"}[1m]))
    
    error_rate=\((curl -sf "\){PROMETHEUS}/api/v1/query" \
        --data-urlencode "query=${error_query}" | \
        python3 -c "
import sys, json
d = json.load(sys.stdin)
result = d['data']['result']
print(result[0]['value'][1] if result else '0')
" 2>/dev/null)

    error_rate=${error_rate:-0}
    above=\((echo "\)error_rate > $ERROR_THRESHOLD" | bc -l)

    echo "[\(ts] error_rate=\){error_rate} | threshold=\({ERROR_threshold} | breach=\)([ "$above" = "1" ] && echo YES || echo NO)"

    if [ "$above" = "1" ]; then
        strikes=$((strikes + 1))
        echo "  已发生\({strikes}/\){STRIKE_LIMIT}次违规事件"
        if [ "(strikes) >= \{STRIKE_LIMIT}\]" ]; then
            echo "  触发回滚操作"
            kubectl rollout undo deployment/"\({DEPLOYMENT}" -n "\){NAMESPACE}"
            exit 0
        fi
    else
        strikes=0
    fi

    sleep "${CHECK_INTERVAL}"
done

版本2:监控错误率与响应时间

#!/usr/bin/env bash
# canary_watch_v2.sh

PROMETHEUS="http://localhost:9090"
DEPLOYMENT="myapp-canary"
NAMESPACE="default"
ERROR_THRESHOLD="0.05"
LATENCY_threshold="2.0"
CHECK_INTERVAL=15
STRIKE_LIMIT=3

strikes=0

echo "Canary监控程序正在运行(版本2——错误率+P99延迟)。"
echo "错误阈值:\({ERROR_THRESHOLD} | 延迟P99阈值:\){LATENCY_threshold}s"
echo ""

while true; do
    ts=$(date '+%Y-%m-%dT%H:%M:%S')

    error_query='sum(rate(http_requests_total{app="myapp-canary",status=~"5.."}[1m])) / sum(rate(httprequests_total{app="myapp-canary"}[1m]))
    error_rate=\((curl -sf "\){PROMETHEUS}/api/v1/query" \
        --data-urlencode "query=${error_query}" | \
        python3 -c "
import sys, json
d = json.load(sys.stdin)
result = d['data']['result']
print(result[0]['value'][1] if result else '0')
" 2>/dev/null)

    latency_query='histogram_quantile(0.99, sum(rate(http_request_duration_seconds_bucket{app="myapp-canary"}[1m])) by (le))'
    latency=\((curl -sf "\){PROMETHEUS}/api/v1/query" \
        --data-urlencode "query=${latency_query}" | \
        python3 -c "
import sys, json
d = json.load(sys.stdin)
result = d['data']['result']
print(result[0]['value'][1] if result else '0')
" 2>/dev/null)

    error_rate=${error_rate:-0}
    latency=${latency:-0}

    error_breach=\((echo "\)错误率 > $ERROR_THRESHOLD" | bc -l)
    latency_breach=\((echo "\)延迟 > $LATENCY_threshold" | bc -l)

    triggered_by=""
    [ "\(error_breach" = "1" ] && triggered_by="error_rate(\){error_rate}")
    [ “[\(latency_breach” = “1” ] && triggered_by="\){triggered_by:+\({triggered_by}, }latency_p99(\){latency}s)"

    echo "[\(ts] 错误率=\){error_rate} | 延迟P99=\){latency}s | 是否触发异常=\){triggered_by:-none}"

    if [ “[\(error_breach” = “1” ] || [ “\)延迟_breach” = “1” ]; then
        strikes=$((strikes + 1))
        echo "  触发次数 \({strikes}/\){STRIKE_LIMIT} | 引发原因:${triggered_by}"
        if [ “[\(strikes” -ge “\)STRIKE_LIMIT” ]; then
            echo ""
            echo "  已触发回滚操作"
            echo "  触发原因:${triggered_by}"
            kubectl rollout undo deployment="\({DEPLOYMENT}" -n "\){NAMESPACE}"
            exit 0
        fi
    else
        strikes=0
    fi

    sleep "${CHECK_INTERVAL}"
done

这些脚本的工作原理

错误率查询会向Prometheus询问:“在上一分钟内,有多少比例的请求出现了错误?”如果结果为0.0,说明没有错误发生;如果结果为0.06,则意味着有6%的请求失败了,这超过了5%的阈值。在输出结果中,这一数据会显示为:

error_rate=0.06 | threshold=0.05 | breach=YES

延迟查询则会询问:“目前,最慢的1%的请求所花费的时间是多少?”如果结果为5.234,说明每100个请求中就有1个请求的耗时超过了5秒。这一数据在输出结果中的呈现方式如下:

latency_p99=5.234秒 | 违反规定 = latency_p99(5.234秒)

V1只执行第一个查询;V2则同时执行这两个查询。使用的是相同的检测工具,面临的问题相同,但得到的结果却不同。

“三次违规即触发回滚”这一规则意味着:单次检测结果异常并不会导致回滚操作,但连续三次出现异常情况时才会触发回滚。这种机制的代价是,在回滚生效之前,系统会暴露在风险中45秒(因为需要分别进行三次检测,每次检测耗时15秒)。

当出现三次违规情况时,监控脚本会自动执行以下操作:kubectl rollout undo deployment/myapp-canary -n default

就是这一行代码触发了回滚操作。该脚本位于文件canary_watch_v2.sh中,会自动运行——用户无需进行任何操作。脚本会自行检测、判断并执行相应的动作。

故意制造故障来进行测试

在终端1中,启动v1监控程序:./canary_watch_v1.sh

你会看到系统每隔15秒就会输出一次以下信息:

./break_it.sh

这样,所有发送给Canary系统的请求都会花费5秒才能完成。这些请求仍然会返回200状态码,即没有出现错误,只是响应速度变慢了。你会看到如下输出:

./check_latency.sh

你会看到如下输出:

./canary_watch_v2.sh

在终端2中,再次触发延迟测试:

./break_it.sh

观察终端1的显示情况。v2会检测到延迟现象,并在连续三次检测到异常后自动执行回滚操作:

Canary监控系统正在运行(v2版本——错误率+P99延迟值)。
错误阈值:0.05 | P99延迟阈值:2.0秒

[2026-05-15T14:30:00] 错误率=0.0 | P99延迟值=0.082秒 | 未发现异常
[2026-05-15T14:30:15] 错误率=0.0 | P99延迟值=5.234秒 | 异常原因:P99延迟值超过阈值(5.234秒)
  第一次检测到异常 | 引发异常的原因:P99延迟值超过阈值(5.234秒)
[2026-05-15T14:30:30] 错误率=0.0 | P99延迟值=5.891秒 | 异常原因:P99延迟值超过阈值(5.891秒)
  第二次检测到异常 | 引发异常的原因:P99延迟值超过阈值(5.891秒)
[2026-05-15T14:30:45] 错误率=0.0 | P99延迟值=6.102秒 | 异常原因:P99延迟值超过阈值(6.102秒)
  第三次检测到异常 | 引发异常的原因:P99延迟值超过阈值(6.102秒)

  回滚操作已触发
  触发原因:P99延迟值超过阈值(6.102秒)

deployment.apps/myapp-canary已成功回滚

虽然错误率始终为0,但由于P99延迟值超过了预设阈值,v2版本还是执行了回滚操作。正是这种额外的检测机制发挥了作用。

回滚操作完成后,需要确认Canary监控系统已经处于休眠状态,但并未被删除:

kubectl rollout history deployment/myapp-canary -n default
版本号  变更原因
1         
2         

系统进行了两次版本更新:回滚操作将版本2降级为初始状态,同时恢复了版本1。所有数据都没有被删除,如果你认为之前的回滚是误报,也可以随时重新部署该服务。

脚本无法为你做出的决策

v2版本是根据P99延迟值进行回滚操作的,而且整个过程中没有出现任何错误。在重新部署之前,你需要判断这种延迟现象是新代码中真正存在的问题,还是只是暂时的异常情况(比如数据库缓存在首次使用时需要加载数据所导致的延迟)。这两种情况都会产生相同的检测结果,但只有你才能根据实际发生的变化来判断哪种情况更有可能。

如果错误地执行了回滚操作,就会导致部署进度受阻,同时也会降低人们对自动化工具的信任。合适的阈值设置取决于你的用户需求和系统环境;脚本最终执行的操作,其实只是反映了你所配置的参数而已。

关闭监控系统

./teardown.sh

https://github.com/Osomudeya/devops-scripting-labs

<我每周都会撰写关于DevOps的文章,内容涵盖实际系统案例、面试技巧、简历编写建议以及真实发生的事件——欢迎订阅我的新闻通讯

Comments are closed.