像Databricks、Snowflake和BigQuery这样的云数据分析平台让构建数据平台变得更加容易。它们为中小型团队提供了出色的扩展方案。

但这种便利并非仅仅体现在租用外部基础设施上,还包括对特定技术框架的依赖性,以及基于供应商提供的功能所构建的操作和安全体系。

在本文中,你将学习如何在开源的数据湖架构中搭建批量数据导入层,并且能够完全掌控所有组件。

我们的重点非常明确:首先会确保数据导入层能够端到端地正常运行,然后在此基础上进一步开发分析功能、管理机制和流处理能力,而不会让你陷入对任何单一工具的依赖。同时,我们还会分析一些常见的集成问题,例如配置错误、分区值被设置为NULL,或者Python版本不兼容等问题。

最终,你将掌握以下内容:

  • 一个基于Docker(使用Compose进行部署)、RustFS(对象存储系统)、Apache Iceberg(表格格式)和Project Nessie(目录管理系统)构建的单节点数据湖环境。

  • 一个利用Apache Airflow协调运行的批量处理流程,该流程会执行PySpark作业,从而生成带有版本标识且经过分区的Iceberg表格。

  • 一种实际应用中的数据导入方案:通过Redis将外部Web爬虫与Airflow解耦,使用轻量级的信号表将原始数据写入对象存储系统。

  • 对这套技术架构的全面了解,包括它的优势与局限性,以及如何进一步优化它以使其适合生产环境。

关于本文的范围:本文主要讨论ELT流程中的“数据导入”这一环节。数据转换(使用dbt或Spark SQL)和分析处理(使用Trino或Superset)虽然是很自然的发展方向,但超出了本文的范围。你在这里构建的基础架构,正是后续这些功能的基石。

我们将涵盖的内容:

数据导入问题

通过具体的使用场景,人们可以更容易地理解这种技术架构或解决方案的结构。其高层次的目标是从外部市场API中获取金融数据,以便进行趋势分析。你需要重点关注的是将这类数据导入数据仓库,以便后续进行进一步分析。

这些数据是通过网络爬虫来采集的,而且每个接口都有一定的速率限制。在批量处理过程中,基于时间的分区机制对于下游处理流程来说非常有效,同时也有助于保持数据的整洁性。

该爬虫作为一个独立的外部进程运行,并通过Redis作业队列与Airflow系统分离。这种设计使得速率限制机制以及爬取任务的生命周期管理都脱离了Airflow的编排层,从而确保各个组件能够在出现故障时独立地进行恢复。

在数据采集过程中,由于爬取任务不具备幂等性,因此确保数据的高可靠性是至关重要的。

技术架构组件

  • RustFS: 一种用Rust语言编写的、兼容S3的对象存储系统

  • Project Nessie: 用于Apache Iceberg表格的事务性目录管理系统

  • Apache Spark: 分布式计算引擎

  • Apache Airflow: 作业调度与编排工具

  • Jupyter Notebook (可选): 可用于针对Iceberg表格执行临时性的Spark查询,但本文未对此进行详细介绍

  • Scrapredis: 用于网络爬虫的作业队列系统

  • Scrapworker: 负责执行网络爬取任务的数据采集工作进程

这种架构已经在一台配置为4核x86/AMD处理器、16GB内存以及60GB硬盘的GCP虚拟机上进行了测试,该虚拟机运行的是Debian GNU/Linux 11操作系统。同时还需要使用Docker及Compose v2工具。只要具备类似或更强的配置,这种架构在任何类似的Linux环境中都应该能够正常运行。

系统概述

数据平台架构图

该爬虫作为一个独立的外部进程运行,并通过Redis作业队列与Airflow系统分离。Airflow会将包含接口地址、查询参数以及目标存储路径的作业信息推送到队列中,爬虫会从队列中获取这些信息并执行爬取任务,然后将原始数据直接写入对象存储系统中。

这种分离机制使得速率限制策略以及爬取任务的生命周期管理不会影响到Airflow的编排流程,同时也能有效隔离各种故障情况。

由于爬取任务不具备幂等性,因此一旦爬取任务失败,恢复起来会相对比较困难。但在爬取阶段之后的其他环节出现的故障,都可以独立地进行重试,而无需重新触发整个爬取过程。

快速入门

首先,需要初始化项目配置:

# 克隆仓库
git clone https://github.com/ps-mir/data-platform

# 创建共享的Docker网络
docker network create data-platform

# 创建主机目录,设置权限并下载Spark JAR文件
chmod +x init.sh && ./init.sh

按照以下顺序启动服务(关闭服务时则按相反顺序进行):

  1. RustFS
cd rustfs && docker compose up -d
  1. Nessie
cd nessie && docker compose up -d
  1. Spark — 首次运行时需要先进行构建
cd spark && docker compose build && docker compose up -d
  1. Scrapredis
cd scrapredis && docker compose up -d
  1. Airflow — 首次运行时需要先进行构建
cd airflow-docker && docker compose build && docker compose up -d

在Nessie启动后,创建相应的命名空间:

curl -X POST http://localhost:19120/iceberg/v1/main/namespaces \
  -H "Content-Type: application/json" \
  -d '{"namespace": ["default"]}'

curl -X POST http://localhost:19120/iceberg/v1/main/namespaces \
  -H "Content-Type: application/json" \
  -d '{"namespace": ["scraper"]}'

Scrapworker直接在主机上运行(没有使用Docker容器化)。它要求Python版本大于或等于3.14:

cd scrapworker
pip install -e .
CONFIG_PATH=./config/config.local.yaml RUSTFS_ACCESS_KEY=rustfsadmin RUSTFS_SECRET_KEY=rustfsadmin python -m scrapworker

在激活Airflow中的scraper_pipeline_v1之前,必须确保Scrapworker已经在运行中。如果没有Scrapworker,该管道会将任务推送到队列中,但由于没有 worker 来处理这些任务,它们会无限期地停留在wait_for_completion状态中。

Trino也在配置中包含,但目前尚未测试其与Nessie的集成情况。

运行管道

当所有服务都运行起来后,下一步就是在Airflow中激活这些管道。默认情况下,所有的DAG在创建时都会被暂停状态。这四个管道在复杂程度上是相互关联的,按照顺序依次执行它们,是确保整个系统各部分能够正确连接的最佳方法。

这四个管道都已经加载到系统中,但默认都是处于暂停状态。在触发执行之前,请先在Airflow的用户界面中解除每个管道的暂停状态。

所有Airflow管道

让我们来逐一了解一下这些管道:

spark_static_data_v1_skeleton: Hello DAG

这是一个最简单的有向无环图,其中没有使用Spark,只包含一个用于打印信息的Python任务。如果该任务的状态显示为绿色,说明Airflow的调度器和工作者都运行正常。[2026-04-09 22:00:01] INFO - 任务操作符:

spark_static_data_v2_submit:Spark提交任务

该任务通过SparkSubmitOperator来提交一个PySpark作业,该作业会将静态数据集写入到Iceberg表中。由于不进行分区处理,因此每次运行都会覆盖之前的数据。

在Nessie目录中,这个任务的记录如下:

类型:ICEBERG_TABLE
元数据存储位置:s3://warehouse/default/static_data_e7e43123-95a7-44d2-b6d5-67c9c7aa4321/metadata/00000-08a5a2db-6f12-4f21-b2a9-de3d9123fbd3.metadata.json

spark_partitioned_data_v1:Spark分区处理

这个任务在步骤2的基础上增加了基于时间的 Partitioning功能。分区的键值是根据预定的执行时间来确定的,因此每次运行都会将数据写入到对应的(ds, hr, min)分区中,而不会影响之前的数据。

在RustFS中,示例文件的路径如下:warehouse/default/static_data_partitioned_b172c66f-722b-44f3-bbee-069355753ff6/data/ds=2026-03-28/hr=23/min=15/00000-4-7a196a47-2ac0-4023-af68-ca10487fccb2-0-00001.parquet

scraper_pipeline_v1:数据抓取管道

这就是完整的数据采集流程。Airflow会向Scrapredis发送任务请求,Scrapworker会调用Binance的API并将原始数据写入到RustFS中,最后Airflow会将处理结果发布到Nessie目录中。

每次运行时,都会获取以下数据:https://api.binance.com/api/v3/trades?symbol=BTCUSDT&limit=10

环境搭建

这是一个使用Docker Compose构建的单节点开发环境。这个基础配置结构清晰,可以通过针对性的修改轻松扩展到生产环境中。

  • 在生产环境中进行部署时,需要为每个组件配置高可用性、持久化存储管理措施,并加强安全性防护。

  • 所有使用的镜像都固定为特定版本,这样可以避免在多次拉取镜像时出现意外问题。

  • 所有的容器都共享一个名为data-platform的外部Docker网络,这样各个服务就可以使用容器的名称作为主机名进行通信了。

  • 一个init.sh脚本会在数据文件夹中创建所需的目录结构,并负责初始化Docker网络。

RustFS

RustFS是这一技术架构中的对象存储层。Nessie的REST目录模式严重依赖于与S3兼容的终端节点;如果将其配置在本地文件系统上运行,那么在系统启动时Nessie的健康检查就会失败,从而导致目录初始化过程中出现错误。对于新的部署环境而言,REST目录模式是推荐的选择,因为该模式能够实现凭证的管理以及多引擎之间的协同工作。

对于那些希望自行托管且兼容S3的存储系统而言,MinIO无疑是理想的选择;但后来它采用了更为严格的许可协议。RustFS则是另一种开源替代方案,这种系统是用Rust语言编写的,并且能够利用本地磁盘进行数据存储。

在写入数据时,Spark会通过S3FileIO将Parquet格式的文件直接上传到RustFS中。同时,Nessie也会同步提交相应的表元数据,因此数据与元数据的状态要么会一起得到更新,要么根本不会发生任何变化。这就是Apache Iceberg的核心优势:它能够确保数据文件及其元数据在更新时始终保持原子性。

对于生产环境或云部署来说,像AWS S3、Google Cloud Storage或Azure Blob Storage这样的托管型对象存储服务才是更合适的选择。而在大规模应用场景中,也可以考虑使用SeaweedFSCeph/RGWGarage这类自行托管的存储解决方案。

注意事项:

  • 桶的创建:当RustFS通过健康检查后,会自动运行一个使用amazon/aws-cli工具编写的rustfs-init脚本来创建s3://warehouse这个桶。因此,你无需手动创建这个桶。

  • 权限设置:RustFS在容器内部以uid=10001的身份运行。在容器启动之前,宿主目录(data/rustfs/datadata/rustfs/applogs)必须属于这个用户账户,否则系统会默默地失败。init.sh脚本通过sudo chown -R 10001:10001命令来处理权限设置问题。

  • 镜像的固定:在构建Docker容器时,需要将rustfs/rustfs:1.0.0-alpha.85-glibc这个镜像固定下来。在升级之前,请先确认uid值是否发生了变化:可以通过docker run --rm --entrypoint id rustfs/rustfs:命令来进行验证。如果uid值发生了变化,就需要重新运行init.sh脚本或手动调整权限设置。

  • Spark的数据写入方式:Spark会通过S3FileIO将数据文件直接上传到RustFS中。而Nessie仅负责管理表元数据,并不会代理数据传输的过程。这两者之间的交互发生在数据提交时,而不是在数据写入阶段。

Nessie

Nessie用于记录仓库中所有表的列表、这些表的数据文件以及它们的结构信息。如果没有Nessie,Spark就无法准确了解仓库中实际存储了哪些数据。

Hive Metastore提供了基于Thrift的API,多年来一直被用作元数据管理的标准工具。它通过底层的数据库为元数据更新提供事务支持,但这些事务仅限于元数据层面,底层的数据文件并不参与这些事务操作,因此不同表之间的数据历史记录也无法被关联起来。

Apache Iceberg通过原子级的表提交机制解决了数据和元数据管理之间的矛盾。而Nessie在此基础上更进一步:它将元数据管理系统视为一个Git仓库,每一次对表的修改都相当于一次提交操作。用户可以对多个表进行分支操作、添加标签,或者原子性地回滚之前的更改。

Spark通过Nessie的Iceberg REST端点来读写表元数据。目录状态会被保存到Postgres中,因此即使容器重新启动,这些信息也不会丢失。

命名空间初始化

与Hive Metastore不同,Nessie不会自动创建命名空间。如果尝试将表写入一个不存在的命名空间,那么在数据已经写入RustFS之后,这个操作将会失败,从而导致一些文件没有对应的目录条目而成为“孤儿文件”。命名空间属于结构元数据,因此它们的初始化需要在一次性的配置步骤中完成,而不能通过流程化的方式来实现。

Nessie将Iceberg目录元数据存储在s3://warehouse/路径下。而Iceberg表数据则存储在由命名空间生成的路径中,例如,对于default命名空间,其数据存储路径为s3://warehouse/default/

S3凭证配置问题

Nessie的S3凭证字段不接受纯字符串作为输入值(这可能是出于安全考虑)。即使对于本地凭证,也必须使用格式为urn:nessie-secret:quarkus:的秘密URI。

此外,对于那些包含连字符的Quarkus属性名称来说,SCREAMING_SNAKE_CASE这种环境变量命名规则会导致这些属性被忽略,系统会自动使用默认值(而默认值往往会导致问题)。正确的做法是在compose环境配置块中直接使用点表示法来指定属性名,这样Quarkus就能直接识别这些属性而无需进行任何转换:

nessie.catalog.service.s3.default-options.access-key: "urn:nessie-secret:quarkus:nessie_catalog.secrets.access-key"
nessiecatalog.secrets.access-key.name: rustfsadmin
nessie/catalog.secrets.access-key.secret: rustfsadmin

Nessie的健康检查

一旦RustFS的配置问题得到解决,通过Nessie的健康检查URL(http://localhost:9090/q/health)应该会返回如下响应:

{
    "status": "UP",
    "checks": [
        {
            "name": "MongoDB连接健康检查",
            "status": "UP"
        },
        {
            "name": "仓库对象存储系统状态检查",
            "status": "UP",
            "data": {
                "warehouse.warehouse.status": "UP"
            }
        },
        {
            "name": "数据库连接健康检查",
            "status": "UP",
            "data": {
                "": "UP"
            }
        }
    ]
}

尽管这个系统并不使用MongoDB,但在响应中仍然会显示MongoDB连接的健康检查结果。这是因为Quarkus内置了一个探针,无论存储类型如何,都会自动注册并执行这个探针。如果配置了JDBC,实际上并不会真正连接到MongoDB,因此显示“UP”状态只是一种占位符响应。

目录端点与管理接口

Nessie提供了两种不同的API。用于访问Iceberg目录信息的REST端点位于/iceberg路径下,Spark和Trino都是通过这个端点来与Nessie进行交互的。而Nessie的管理API则位于/api/v2路径下,该接口用于执行分支操作、查看提交历史记录以及检查表结构等信息。这两种API是不能互相替代的。

# Iceberg REST API
http://localhost:19120/iceberg/v1/main/namespaces
http://localhost:19120/iceberg/v1/config

# Nessie管理API
http://localhost:19120/api/v2/config

注意事项:

  • 对于任何非AWS S3的端点,都必须设置path-style-access: true。而region这个参数只是AWS SDK内部使用的一个占位符。

  • Nessie的内部端口9000会被重新映射为9090,这样就可以避免与占用9000和9001端口的RustFS发生冲突。

转发路径

Nessie是一个无状态的REST服务,因此可以通过负载均衡来扩展读取请求的处理能力,而各节点之间无需进行任何协调。数据的持久性完全依赖于后端存储系统。

Spark

作为一款分布式计算引擎,Apache Spark非常适合用于执行那些需要长时间运行的任务。在当前的配置中,Spark会执行由Airflow提交的PySpark作业,通过Nessie REST目录来读写Iceberg表格,并使用S3FileIO将数据文件直接写入RustFS。Spark以独立模式运行,包含一个master节点和多个worker节点,这些配置都是通过spark-defaults.conf文件来设置的。

在启动Spark之前,需要准备两个JAR文件,并将它们放在data/spark/jars/目录中:

  • iceberg-spark-runtime-3.5_2.12:这个JAR文件实现了Spark与Iceberg之间的集成,包括SparkCatalog、DataFrameWriterV2、SQL扩展功能以及所有与表格格式相关的逻辑。

  • iceberg-aws-bundle:这个JAR包包含了AWS SDK v2以及Iceberg的S3FileIO组件,后者用于将数据文件写入RustFS。不过,Spark的基础镜像中只预装了Hadoop AWS(SDK v1),而这个捆绑包则提供了S3FileIO所需要的SDK v2类库。

Spark使用自定义的Dockerfile来安装Python 3.12,在首次使用之前请先构建相应的镜像:

cd spark
docker compose build
docker compose up -d

关于PySpark作业的具体配置,会在Airflow的相关部分进行介绍,我们会详细说明每个DAG结构及其对应的Spark脚本。

在提交任何会写入Iceberg表格的Spark作业之前,必须确保目标命名空间在Nessie中已经存在。与Hive Metastore不同,Nessie不会自动创建命名空间。如果尝试向一个不存在的命名空间写入数据,那么在数据已经被写入RustFS之后,就会导致一些文件无法被正确记录到目录中,从而形成“孤儿文件”。

在运行任何管道之前,请先创建default这个命名空间:

# 这时Nessie应该已经启动并运行起来了
curl -X POST http://localhost:19120/iceberg/v1/main/namespaces \
  -H "Content-Type: application/json" \
  -d '{"namespace": ["default"]}'
{
  "namespace" : [ "default" ],
  "properties" : { }
}

验证命名空间是否创建成功:

curl http://localhost:19120/iceberg/v1/main/namespaces

目录结构不匹配:不同查询引擎会显示不同的表格列表

如果由Spark创建的表在Trino中无法被看到,那么很可能是因为目录配置不匹配所致。当Spark使用NessieCatalog进行配置,而Trino则使用Iceberg REST目录时,这两种系统会维护不同的元数据视图——它们不会共享相同的表状态。因此,这两种引擎都必须指向同一个目录地址:http://nessie:19120/iceberg

注意事项:

  • 工作节点内存配置:工作节点被配置为SPARK_WORKER_MEMORY: 8g。Spark的默认设置是1g内存,这种配置足以完成表的注册操作,但不足以让作业在无需排队等待的情况下正常运行。因此需要根据主机可用的内存资源来调整这一参数。

  • 远程签名功能:remote-signing-enabled: false。Nessie的REST目录支持通过IAM/STS机制进行身份验证,但由于当前环境中并未启用这一集成功能,因此明确禁用了远程签名功能,以避免请求失败。

  • 配置更改需要重新启动系统:Docker文件中的绑定挂载机制会在容器启动时缓存相应的inode信息。因此,即使修改了spark-defaults.conf文件,也只有在重启Spark以及Airflow工作节点之后,这些更改才会生效。在客户端模式下,Airflow工作节点实际上就是Spark驱动程序(负责在作业提交时读取配置文件),因此也需要重新启动它。

  • Jupyter Notebook:该部署环境包含了基于PySpark的Jupyter实例,用户可以使用它来针对Iceberg表进行临时查询操作。这个Jupyter实例会连接到同一个Spark集群和Nessie目录,因此任何通过管道创建的表都可以立即被查询到。

⚠️ 警告:Spark工作节点与Airflow工作节点(即驱动程序)必须使用相同版本的Python。PySpark会在运行时强制检查这一点,如果两个节点使用的Python版本不同,系统会立即出现故障。本部署环境中的Spark镜像使用了自定义的Dockerfile来安装Python 3.12版本,这个版本与Airflow的基础镜像相匹配。因此,在升级其中任何一个组件时,都必须确保它们的版本保持一致。

Apache Airflow

Apache Airflow使得工作流程的创建、调度和监控变得更加便捷。在这个案例中,Airflow负责处理数据的批量导入任务,但也可以被扩展用于流式处理等场景。

Airflow的各种组件在结构上更类似于官方文档中描述的DAG处理器架构。

DAG Processor Airflow Architecture

关键组成部分包括:

  • DAG处理器会持续解析DAG文件,并将其序列化到元数据数据库中。

  • 调度器会从元数据数据库中读取信息,判断何时应该执行某个DAG任务,然后创建相应的任务实例,并通过Redis队列将它们发送给CeleryExecutor进行处理。

  • Celery工作节点会接收并执行这些任务。对于使用SparkSubmitOperator的任务来说,该工作节点会变成Spark驱动程序,从而将作业提交到Spark集群中执行。

  • 执行器会在Spark工作节点上运行,它们会将处理结果直接写入RustFS文件系统中,并将表元数据信息更新到Nessie目录中。Airflow还会将任务的执行结果记录回元数据数据库中。

Airflow使用自定义的Dockerfile来安装Java 17以及其他相关组件。在首次使用之前,请先构建相应的镜像:

cd airflow-docker
docker compose build
docker compose up -d

管道配置

所有管道文件都需要被保存在airflow-docker/dags文件夹中,这样dag处理器才能从元数据数据库中读取这些管道信息。这里提供了四个复杂程度不同的管道示例。

  1. step1_hello_dag.py:这是一个没有依赖关系的单任务管道,仅仅是一个用于打印信息的Python函数而已。

  2. step2_spark_submit.py:通过SparkSubmitOperator提交一个PySpark作业。该作业会通过Nessie目录将数据写入到Iceberg表中。

  3. step3_spark_partitioned.py:在步骤2的基础上增加了基于时间的数据分区功能。调度时间会被传递给PySpark脚本。

    • 为了确保作业能够正确执行,基于时间的分区值会从data_interval_start中计算得出。
  4. scraper_pipeline:这是一个用于数据采集的实用管道。它通过Redis队列scrapredis与外部任务执行器scrapworker进行通信。

    • 要使这个管道正常工作,scrapredisscrapworker都必须处于运行状态。

部署模式与驱动程序配置

最初使用的SparkSubmitOperator配置将deploy_mode设置为“cluster”,这意味着驱动程序会在Spark集群上运行,而不是在提交作业的机器上运行。但在独立的Spark集群上使用这种配置会立即导致错误:

对于独立运行的Spark集群而言,目前不支持将“deploy_mode”设置为“cluster”。

在YARN和Kubernetes环境下,Python应用程序才能使用集群模式进行部署。将deploy_mode设置为“client”可以解决这个问题,但这样一来,驱动程序就会在Airflow工作节点的容器中运行,因此工作节点需要具备与Spark容器相同的环境配置。

总的来说,需要对Airflow工作节点进行以下三项修改:

  • 需要在/opt/spark/user-jars/路径下添加Iceberg和Nessie相关的JAR文件。

  • 需要修改spark-defaults.conf文件,以便其中包含目录配置、扩展名设置以及相关JAR文件的路径信息。

  • 需要将SPARK_CONF_DIR设置为/opt/spark/conf。如果不进行这个设置,通过pip安装的PySpark会忽略系统中的配置文件,从而导致程序无法正常运行。

为了解决这些问题,我们在airflow-docker/docker-compose.yaml文件中将这三项配置添加到了x-airflow-common配置项中,这样所有Airflow服务都能继承这些配置。

environment:
  SPARK_CONF_DIR: /opt/spark/conf

volumes:
  - ../data/spark/jars:/opt/spark/user-jars:ro
  - ../spark/spark-defaults.conf:/opt/spark/conf/spark-defaults.conf:ro

被设置为NULL的分区值

当第三个管道(使用Spark进行数据分区处理)首次运行时,数据确实被正确地存储到了RustFS中,但当查询Iceberg分区的元数据时,却发现以下情况:

+------------------+----------+
| 分区            | 文件数量    |
+------------------+----------+
|{NULL, NULL, NULL}|         2|
+------------------+----------+

最初的脚本使用了Spark的DataSource V1 API:

df.write.format("iceberg").mode("overwrite").saveAsTable(table)

该脚本利用了Spark的V1 DataFrame写入API,并指定了“iceberg”格式。这种写法会直接加载数据到表中,而不会经过Iceberg的目录结构进行存储。因此,虽然Iceberg将数据文件成功存储到了磁盘上,但元数据中的分区值却被设置为了NULL。

正确的解决方案是使用Iceberg的原生DataFrameWriterV2 API:

df.writeTo(table).overwritePartitions()

这种写法会通过Iceberg的官方存储路径进行数据传输,会根据实际的列值来计算分区信息,并将这些信息正确地写入元数据中。overwritePartitions()这个方法只会覆盖DataFrame中存在的分区数据。如果重新运行相同任务的调度程序,那么分区的值将会被更新,而其他分区的数据则不会受到影响。

⚠️ 注意:后续使用V2 API进行的写操作并不会自动修改那些已经被设置为NULL的分区元数据。对于那些只包含错误数据的表格来说,最简单的恢复方法就是先执行`DROP TABLE`命令,然后再重新创建表格并写入正确的数据。

Scrapredis

Scrapredis是一个专门用于处理Redis任务的实例,它位于Airflow和Scrapworker之间,充当作业队列的角色。它与Airflow内部使用的Redis是分开的——后者仅用于CeleryExecutor任务的管理和调度。这种分离设计使得我们可以独立地管理、扩展或替换Scrapredis,而不会影响到Airflow的整体运行。

这种架构不仅适用于数据抓取任务,任何需要拥有独立生命周期、资源配置或速率限制机制的外部进程都可以通过这种方式进行集成:Airflow负责推送作业任务,外部处理程序会接收这些任务并执行它们,最后Airflow会定期检查执行结果。

Scrapredis的工作流程如下:

  1. Airflow将作业数据推送到队列中:
QUEUE_KEY = "scrapworker:jobs"
client.lpush(QUEUE_KEY, json.dumps(payload)
  1. Scrapworker会从队列中取出下一个待处理的作业:
while True:
    _, payload = client.blpop(redis_cfg["queue_key"])

  1. 任务执行完成后,Scrapworker会将结果以及对应的信息写回Redis中:
client.set(status_key, json.dumps({"status": "finished", "worker_id": worker_id, "s3_path": job["s3_path"]}), ex=TERMINAL_TTL)
  1. wait_for_completion任务会持续检测该状态键的变化。当检测成功后,publish_nessie_signal会获取s3_path,并将相应的信号数据写入Nessie数据库中。

Scrapworker

Scrapworker是一个使用Scrapy爬虫框架来抓取请求中所有页面的Python应用程序。由于URL或客户端特定的速率限制机制,它与Airflow是解耦设计的。简单来说,它可以被视为一种接收并执行来自Airflow的任务的外部工作进程。

它的职责是下载数据并将其存储到对象存储系统RustFS中。而Nessie目录的更新操作则是通过另一个独立的Airflow任务来完成的。

固定信号表

Scrapworker会将原始JSON数据写入RustFS,而不是直接将抓取到的数据以Iceberg列的形式存储。随后,该流程会向由Nessie管理的Iceberg表格中发布一条简洁的信号记录。

这种信号结构的格式是固定的且非常简单(包含run_idendpoints3_pathdshrminpublished_at等字段),无论抓取到什么类型的数据,这个结构都不会发生变化。

如果将抓取到的数据直接以Iceberg列的形式存储,那么Scrapworker就需要负责处理不同端点之间的数据结构转换问题。但这并不符合理想的设计方案。实际上,这种数据结构的管理职责应该由后续的处理流程来承担:

Scrapworker  →  RustFS中的原始文件  + Iceberg表格中的信号记录(由Pipeline处理)
Airflow任务  →  通过s3_path读取原始数据,应用相应的数据结构,然后生成结构化的Iceberg表格

后续的处理流程能够了解数据的来源和数据结构,因此它们才是处理类型转换、空值处理以及数据分区布局等问题的合适位置。Scrapworker则应该保持其通用性和简洁性——同样的代码可以用于处理任何类型的端点数据,而无需进行任何修改。

为什么信号发布需要作为一个独立的Airflow任务来执行

Scrapworker会将数据写入RustFS,并在Redis中设置status: finished状态,同时记录下s3_path信息。而另一个独立的Airflow任务则会读取这些状态信息,并将信号记录发布到Nessie数据库中。这种分离的设计是有意为之的。

如果Scrapworker在写入RustFS之后直接将数据发布到Nessie,那么这两种操作就会面临相同的失败风险。例如,如果在RustFS中的写操作成功了,但Nessie数据库出现故障,那么数据就会丢失,既没有信号记录可供参考,也没有可行的恢复机制。此时,唯一的解决办法就是重新进行爬取操作,但这显然不具备幂等性。

通过这种分离的设计,即使其中一个环节出现故障,其他环节也能正常运行。Nessie数据库的故障只会触发Airflow系统中信号发布任务的重试操作,而不会导致重新爬取数据或重复执行相同的爬取流程。RustFS和Nessie数据库的故障都可以独立地进行恢复处理。

注意事项:

  • 原始抓取到的文件会被直接写入s3://warehouse/raw/路径,这个路径完全不在Nessie的管理范围内。Iceberg层中的任何系统都不会访问这个路径。

  • Scrapworker使用的信号表存储在专门的scraper命名空间中。在Scrapworker首次运行之前,需要先创建这个命名空间。

curl -X POST http://localhost:19120/iceberg/v1/main/namespaces \
  -H "Content-Type: application/json" \
  -d '{"namespace": ["scraper"]}'

后续发展方向

我们构建的这个技术栈是一个功能完备的数据采集层。它能够可靠地接收数据,将这些数据存储在带有版本控制的目录中,并为后续的开发工作提供基础。从这里开始,有两个发展方向值得考虑。

扩展功能

这些改进是在现有技术栈的基础上进行的,它们能够在不添加新组件的情况下提升系统的稳定性。

数据采集的可靠性:目前,Scrapworker在遇到故障时会将状态设置为status: failed,这会导致Airflow需要重新触发整个数据处理流程。如果加入客户端端的速率限制机制以及针对每个端点的重试逻辑,并引入退避策略,那么爬取任务就会具备更好的自我恢复能力,即使某个页面的获取失败,也可以独立地尝试再次获取数据,而无需通知Airflow。

配置验证:如果在config.yaml文件中配置错误,相关端点在运行时可能会默默地出现故障,而且这种问题往往会在爬取任务进行到较后期才被发现。如果在系统启动时调用validate_config()函数,就可以在任何任务开始执行之前检查是否存在缺失的必要字段,比如offset_paramresponse_map。随着端点数量的增加,这一功能变得越来越重要。

可观测性:Airflow提供的警报机制和SLA监控功能可以在数据处理流程偏离计划或任务耗时超过预期时及时发出警告。信号表在这一方面也非常有用。一个简单的监控工具,只要在指定的时间窗口内检查是否存在预期的信号记录,就能完成SLA监控任务,而且这种监控方式完全不需要依赖外部工具。

添加新的功能层

这些是新添加的功能,它们是在现有数据采集基础之上构建的。

转换层:由数据采集层生成的原始Iceberg表格是后续转换步骤的输入数据。使用dbt或Spark SQL可以读取这些原始数据,为它们应用相应的数据结构,清理数据类型,然后将处理后的结构化表格存储到另一个命名空间中。这就是ELT模型中的“转换”环节,也是当数据采集过程稳定下来后自然而然要进行的下一步操作。

分析功能:Trino已经融入到了这个技术栈中,并且已经实现了部分集成。将其与Nessie完全连接起来,就可以对所有的Iceberg表格执行SQL查询了。如果再添加Superset,就可以获得可视化分析功能,而无需对数据采集流程进行任何修改。

支持更多类型的数据源:当前的技术栈主要支持一种数据采集方式:即通过定时的Airflow流程来触发外部HTTP爬虫程序。不过,这个基础架构同样适用于基于pull机制的数据源,比如使用CDC从数据库中获取数据;同时也适用于基于push机制的数据源,比如通过Kafka接收事件流数据。无论数据是如何传入系统的,Iceberg表格和Nessie目录都会作为数据的存储和处理平台。

管理功能:Iceberg和Nessie为系统提供了必要的管理基础,包括数据快照的创建、数据结构的演变跟踪、提交历史的记录以及数据追溯功能。而更高层次的管理功能,比如访问控制、数据质量检查、数据来源追踪以及数据结构强制执行等,则需要通过额外的组件来实现。这些新增功能并不会取代现有的系统架构,而是建立在现有基础之上进行的扩展。

Comments are closed.