在任何代码编写之前,每个数据管道都会面临一个根本性的选择:它是按照预定的时间表分批处理数据,还是随着数据的到来立即进行实时处理?

这个选择——是采用批量处理方式还是流式处理方式——会直接决定后续所有系统的架构设计。你使用的工具、能够对数据新鲜度做出的保证、错误处理的复杂程度,以及运行这些系统所需的基础设施,都直接源于这一决策。

如果选择了错误的处理方式,后果将会非常严重。那些本应该使用批量处理方式却采用了流式处理方式的团队,最终会为其实并不需要这种复杂架构的系统维护大量的基础设施。

而那些在实际情况确实需要实时处理数据的情况下却构建了批量处理管道的团队,则会在最糟糕的时刻发现这一缺陷——比如当有利益相关者询问为什么数据仪表盘上的信息已经滞后了六个小时时。

在本文中,你将了解什么是批量处理管道和流式处理管道,它们在架构设计及权衡取舍方面有哪些不同,以及如何使用Python来实现这两种处理方式。读完这篇文章后,你将会掌握一个清晰的框架,从而能够为任何数据工程问题选择合适的处理方案。

先决条件

为了顺利学习本文内容,请确保你已经具备以下条件:

  • 具备编写Python函数及使用模块进行开发的经验

  • 熟悉pandas DataFrame以及基本的数据操作技巧

  • 了解ETL管道的基本功能——数据提取、转换和加载

目录

什么是批量处理管道?

批量处理管道会一次性处理一组数量有限的数据记录——比如一个文件、数据库中的某个时间点的数据,或者一天内产生的所有交易记录。它按照预定的时间表运行,例如每小时、每晚或每周一次,读取该时间段内的所有数据,对其进行处理,然后将处理结果保存到指定的位置。之后,它会停止运行,直到下一次执行任务时才会再次启动。

这种思维模式很简单:先收集数据,再进行处理。在两次处理之间,不会进行任何操作。

在零售领域的ETL环境中,典型的批处理流程可能如下所示:

  1. 在午夜时分,从事务数据库中提取过去24小时内下达的所有订单。

  2. 将这些订单数据与产品目录表及客户信息表进行关联处理。

  3. 按地区和产品类别计算每日收入总额。

  4. 将处理结果加载到数据仓库中,以便后续生成报表。

当这个批处理流程运行完毕时,就会得到关于昨天业务情况的完整且一致的数据快照。等到分析师们早上来到办公室时,数据仓库中的信息就已经是最新的了。

用Python实现批处理流程

最简单的形式来说,批处理流程就是一段包含三个明确划分阶段的Python脚本:数据提取、数据处理、结果加载。

import pandas as pd
from datetime import datetime, timedelta

def extract(filepath: str) -> pd.DataFrame:
    """从每日导出的文件中读取原始订单数据。"""
    df = pd.read_csv(filepath, parse_dates=["order_timestamp"])
    return df

def transform(df: pd.DataFrame) -> pd.DataFrame:
    """对订单数据进行清洗处理,并按地区汇总每日收入。"""
    # 仅保留已完成订单
    df = df[df["status"] == "completed"].copy()

    # 从时间戳中提取日期信息用于分组
    df["order_date"] = df["order_timestamp"].dt.date

    # 汇总数据:按地区和日期计算每日总收入及订单数量
    summary = (
        df.groupby(["order_date", "region"])
        .agg(
            total_revenue=("order_value_gbp", "sum"),
            order_count>("order_id", "count"),
            avg_order_value=("order_value_gbp", "mean"),
        )
        .reset_index()
    )
    return summary

def load(df: pd.DataFrame, output_path: str) -> None:
    """将处理结果写入数据仓库(此处为CSV文件)。"""
    df.to_csv(output_path, index=False)
    print(f"已将{len(df)}条记录加载到{output_path}中")

# 运行整个批处理流程
raw = extract("orders_2024_06_01.csv")
aggregated = transform(raw)
load(aggregated, "warehouse/daily_revenue_2024_06_01.csv")

让我们来详细看看这段代码的具体功能:

  • extract函数用于读取表示每日订单数据的CSV文件。参数parse_dates告诉pandas将order_timestamp列视为日期对象而非普通字符串——这一点在transform函数中的日期提取步骤中非常重要。

  • transform函数主要完成两件事:首先过滤掉所有未完成的订单(例如已取消的订单),然后按日期和地区对剩余订单进行分组,从而计算出各地区的每日收入总额。.agg()方法能够一次性计算出每个分组中的三项指标。

  • load函数负责将处理结果写入目标存储位置。在实际生产环境中,这个目标可能是数据库或云存储服务,但基本原理是相同的。

这三种功能被刻意分开设计。这种分离方式——提取数据、转换数据、加载数据——使得每个环节都可以独立地进行测试、替换或调试。如果转换逻辑发生了变化,你也不需要修改用于提取数据或加载数据的代码。

当批量处理效果良好时

在以下情况下,使用批量处理流程是正确的选择:

  • 数据更新频率以小时为单位,而非秒。例如,每日销售报告并不需要每分钟都进行更新,每周的营销效果分析模型同样如此。

  • 你需要处理大量的历史数据集。将两年内的交易记录导入新的数据仓库本质上就是一项批量处理任务——这些数据已经存在,数量也是固定的,因此你希望一次性高效地完成对这些数据的处理。

  • 一致性比处理速度更为重要。批量处理流程能够生成完整、实时反映数据状态的副本。输出结果中的每一行都是根据相同的输入数据计算得出的。这种一致性对于财务报告、合规性检查,以及任何需要使用稳定、可重复的数据集的下游流程来说都至关重要。

什么是流处理管道?

流处理管道会持续不断地处理数据,无论是按记录逐条处理,还是以小批量为单位进行处理,只要数据一到达就会立即被处理。这类数据集没有“末端”——管道会无限期地运行,不断从消息队列、Kafka主题或Webhook等数据源中获取数据,并在数据到达的瞬间就开始对其进行处理。

其运作原理可以概括为:“边收集数据边进行处理”。这样的管道会始终保持运行状态。

以零售行业的ETL流程为例,流处理管道可以在订单被下达的瞬间就开始处理这些订单:

  1. 用户在网站上下了订单,系统会向消息队列中发送一条订单事件。

  2. 流处理管道会在几毫秒内读取到这条事件。

  3. 它会验证这条事件的合法性,对其进行必要的处理,然后将其转发给下游系统。

  4. 欺诈检测系统、库存管理系统以及实时数据看板都会立即收到更新后的信息。

  5. 流处理与批量处理的本质区别在于:数据并不会被暂时存储在文件中等待处理,而是会持续不断地流动,而管道必须能够跟上这种数据的流动速度。

    用Python实现流处理管道

    Python中的生成器函数非常适合用来构建流处理管道。生成器会逐个产生值,在每次产生一个值后会暂停一段时间——这一机制正好符合“边接收数据边进行处理”的需求,因为这样就不需要将所有数据都加载到内存中。

    import json
    import time
    from typing import Generator, Dict
    
    def event_source(filepath: str) -> Generator[Dict, None, None]:
        """
        模拟从文件中读取订单事件流。
        在实际应用中,这个函数会从Kafka或消息队列中获取数据。
        """
        with open(filepath, "r") as f:
            for line in f:
                event = json.loads(line.strip())
                yield event
                time.sleep(0.01)  # 模拟事件之间的延迟时间
    
    def validate(event: Dict) -> bool:
        """检查事件是否包含所需的字段以及有效的数值。"""
        required_fields = ["order_id", "customer_id", "order_value_gbp", "region"]
        if not all(field in event for field in required_fields):
            return False
        if event["order_value_gbp"] <= 0:
            return False
        return True
    
    def enrich(event: Dict) -> Dict:
        """在将事件转发给下游系统之前,为其添加一些衍生字段。"""
        event["processed_at"] = time.strftime("%Y-%m-%dT%H:%M:%S")
        event["value_tier"] = (
            "high" if event["order_value_gbp"] >= 500
            else "mid" if event["order_value_gbp"] >= 100
            else "low"
        )
        return event
    
    def run_streaming_pipeline(source_file: str) -> None:
        """在数据从源文件中到达的瞬间就对其进行处理。"""
        processed = 0
        skipped = 0
    
        for raw_event in event_source(source_file):
            if not validate(raw_event):
                skipped += 1
                continue
    
            enriched_event = enrich(raw_event)
    
            # 在实际应用中:会将处理后的数据发送到下游主题或存储到指定的位置。
            print(f"[{enriched_event['processed_at']}] "
                  f"订单编号:{enriched_event['order_id']} | "
                  f"订单金额:{enriched_event['order_value_gbp']:.2f} | "
                  f"等级:{enriched_event['value_tier']}")
            processed += 1
    
        print(f"\n处理完成。共处理了{processed}条记录,其中{skipped}条被跳过。")
    
    run_streaming_pipeline("order_events.jsonl")
    

    以下是具体的处理流程:

    • event_source是一个生成器函数——请注意其中使用了yield关键字而非return。每次调用yield event时,函数都会暂停执行,并将一个事件传递给调用者。系统会先处理这个事件,然后生成器才会继续执行并获取下一个事件。这意味着无论数据流的大小如何,内存中同一时间只会保存一个事件。time.sleep(0.01)这一代码用于模拟消息队列中事件到达之间的延迟。

    • validate会在对事件进行进一步处理之前,检查其中是否包含必需的字段以及这些字段的值是否有效。在流式处理环境中,无效事件极为常见——网络问题、上游系统的错误或数据结构的变化都可能导致记录格式不正确。因此,提前验证数据并跳过无效事件,远比让它们被传递到下游系统更为安全。

    • enrich会为事件添加一些衍生字段,例如处理时间戳或值的分级信息。在生产环境中,这个步骤还可能包括与查找表进行关联操作、调用外部API或应用模型预测结果。

    • run_streaming_pipeline将所有这些步骤整合在一起。for循环会依次处理event_source中生成的每个事件,让它们依次经过validate → enrich → route这三个处理阶段,并记录下已处理的事件数量以及被跳过的事件数量。

    当流式处理效果良好时

    在以下情况下,流式处理管道是最佳选择:

    • 数据更新频率以秒或毫秒为单位。欺诈检测、实时库存更新、动态仪表盘以及警报系统等,都需要数据能够立即被处理——如果使用每小时执行一次的批处理作业,这些功能将无法正常发挥作用。

    • 数据量太大,无法先进行累积存储。高频产生的物联网传感器数据、用户点击记录以及金融交易数据,每小时可能会生成数百万条记录。如果在处理之前将这些数据全部累积起来,不仅会占用大量的存储空间,而且处理时间也会过长,从而影响系统的实用性。

    • 你需要根据实时发生的事件采取相应的行动,而不仅仅是进行数据报告。流式处理管道可以根据单个事件的发生来触发下游操作——例如发送通知、阻止某笔交易或更新推荐结果。而批处理管道则只能对已经发生的事情进行报告。

    关键差异一览

    以下是我们迄今为止讨论过的批处理与流式处理之间的主要区别:

    处理方式 批处理 流式处理
    数据规模 有限的数据集 连续不断的数据流
    处理触发条件 预定的时间表 每条数据的到达时刻
    处理延迟 几分钟到几小时 几毫秒到几秒钟
    处理效率 较高(适用于批量处理) 每条数据的处理开销较低
    复杂性 较低 较高
    状态管理 每次运行时均为无状态 通常会在多个事件之间保持状态关联
    错误处理方式 重新执行整个处理流程 针对每个错误事件设置专门的队列进行处理
    数据一致性 非常高(可获取实时数据快照) 最终能够达到一致性
    适用场景 数据报告、机器学习训练、数据补录 警报系统、实时功能展示、事件路由处理

    在批处理与流处理之间做出选择

    好吧,所有这些信息都非常有用。但那么,如何在批处理和流处理之间做出选择呢?其实这个决定取决于三个问题:

    数据需要保持多新的状态?如果相关方能够接受几小时之前的处理结果,那么批处理方式会更简单、成本也更低。但如果他们需要几秒钟内就能得到结果,那么流处理就是必不可少的。

    你的处理逻辑有多复杂?批处理作业可以处理大规模的数据集,进行复杂的聚合运算,并应用复杂的业务逻辑,而无需担心延迟问题。而流处理流程则必须快速处理每一条数据,这就限制了每条数据所能被处理的复杂程度。

    你的运营能力如何?流处理基础设施——比如Kafka集群、Flink或Spark Streaming作业、死信队列以及一次只交付一次的结果保证机制——其运维难度远高于一个简单的Python脚本。如果你的团队规模较小,或者你的应用场景并不需要实时结果,那么这种复杂性就会带来不必要的成本。

    最初可以选择批处理方式。因为它构建起来更简单,测试也更容易,调试和维护的难度也都更低。只有当出现特定的、实际的需求时,才应该转向流处理方式。大多数数据处理问题其实都属于批处理范畴;而真正需要使用流处理的方式,通常在遇到这些问题时才会被明显地意识到。

    正如你可能已经猜到的那样,对于某些数据处理系统来说,可能需要同时结合批处理和流处理两种方式。因此,才出现了混合处理模式。

    混合处理模式:Lambda架构与Kappa架构

    在实践中,许多生产环境中的数据系统都会同时使用这两种处理模式。其中最常见的两种混合架构就是Lambda架构和Kappa架构。

    Lambda架构会并行运行批处理层和流处理层。批处理层负责处理完整的历史数据,从而产生准确且一致的结果;而流处理层则负责处理实时数据,立即生成近似的结果。下游的处理系统会将这两种结果合并使用——利用流处理产生的结果来保证数据的新鲜性,同时用批处理产生的结果来确保结果的准确性。

    这种模式的缺点在于运维难度较高:你需要维护两套独立的处理代码库,而这两套代码库必须产生语义上相同的结果。

    Kappa架构则通过仅使用流处理层来简化这一过程。不过,在需要进行批处理式重新处理时,Kappa架构仍然允许你通过相同的处理流程来重新处理历史数据。当你的流处理框架——比如Apache KafkaApache Flink——支持日志保留和重放功能时,这种架构就能发挥很好的作用。你只需要一套代码库、一套处理逻辑,而且当处理流程发生变化时,也可以轻松地重新处理历史数据。

    这两种架构并没有哪一种在所有情况下都更优越。在那些最初采用批处理方式、后来逐步引入流处理技术的组织中,Lambda架构更为常见;而在那些将流处理作为主要处理模式的系统中,Kappa架构则更为普遍。

    结论

    批处理与流处理是两种具有不同权衡取舍的 처리方式,它们各自适用于不同类型的问题。批处理流程在一致性、简洁性以及大规模数据处理能力方面表现优异;而流处理流程则更擅长处理低延迟任务、实现实时响应以及进行连续性数据处理。

    在选择Apache Spark、Kafka或Flink等具体框架之前,如果能在架构层面深入理解这两种处理模式,你就能够做出正确的选择,并清楚地解释自己的选择理由。这些框架只是实现了这些处理模式,而判断哪种模式最适合你的需求,这个决定权首先属于你。

Comments are closed.