Kafka 是当下热门的消息队列中间件,它可以实时地处理海量数据,具备高吞吐、低延时等特性及可靠的消息异步传递机制,可以很好地解决不同系统间数据的交流和传递问题。

Kafka 在马蜂窝也有非常广泛的应用,为很多核心的业务提供支撑。本文将围绕 Kafka 在马蜂窝大数据平台的应用实践,介绍相关业务场景、在 Kafka 应用的不同阶段我们遇到了哪些问题以及如何解决、之后还有哪些计划等。

Part.1

应用场景

从 Kafka 在大数据平台的应用场景来看,主要分为以下三类:

第一类是将 Kafka 作为数据库,提供大数据平台对实时数据的存储服务。从来源和用途两个维度来说,可以将实时数据分为业务端 DB 数据、监控类型日志、基于埋点的客户端日志(H5、WEB、APP、小程序)和服务端日志。

第二类是为数据分析提供数据源,各埋点日志会作为数据源,支持并对接公司离线数据、实时数据仓库及分析系统,包括多维查询、实时 Druid OLAP、日志明细等。

第三类是为业务方提供数据订阅。除了在大数据平台内部的应用之外,我们还使用 Kafka 为推荐搜索、大交通、酒店、内容中心等核心业务提供数据订阅服务,如用户实时特征计算、用户实时画像训练及实时推荐、反作弊、业务监控报警等。
主要应用如下图所示:

Part.2

演进之路

四个阶段

早期大数据平台之所以引入 Kafka 作为业务日志的收集处理系统,主要是考虑到它高吞吐低延迟、多重订阅、数据回溯等特点,可以更好地满足大数据场景的需求。但随着业务量的迅速增加,以及在业务使用和系统维护中遇到的问题,例如注册机制、监控机制等的不完善,导致出现问题无法快速定位,以及一些线上实时任务发生故障后没有快速恢复导致消息积压等, 使 Kafka 集群的稳定性和可用性得受到挑战,经历了几次严重的故障。

解决以上问题对我们来说迫切而棘手。针对大数据平台在使用 Kafka 上存在的一些痛点,我们从集群使用到应用层扩展做了一系列的实践,整体来说包括四个阶段:

第一阶段:版本升级。围绕平台数据生产和消费方面存在的一些瓶颈和问题,我们针对目前的 Kafka 版本进行技术选型,最终确定使用 1.1.1 版本。

第二阶段:资源隔离。为了支持业务的快速发展,我们完善了多集群建设以及集群内 Topic 间的资源隔离。

第三阶段:权限控制和监控告警。

首先在安全方面,早期的 Kafka 集群处于裸跑状态。由于多产品线共用 Kafka,很容易由于误读其他业务的 Topic 导致数据安全问题。因此我们基于 SASL/ SCRAM + ACL 增加了鉴权的功能。

在监控告警方面,Kafka 目前已然成为实时计算中输入数据源的标配,那么其中 Lag 积压情况、吞吐情况就成为实时任务是否健康的重要指标。因此,大数据平台构建了统一的 Kafka 监控告警平台并命名「雷达」,多维度监控 Kafka 集群及使用方情况。

第四阶段:应用扩展。早期 Kafka 在对公司各业务线开放的过程中,由于缺乏统一的使用规范,导致了一些业务方的不正确使用。为解决该痛点,我们构建了实时订阅平台,通过应用服务的形式赋能给业务方,实现数据生产和消费申请、平台的用户授权、使用方监控告警等众多环节流程化自动化,打造从需求方使用到资源全方位管控的整体闭环。

下面围绕几个关键点为大家展开介绍。

核心实践

1. 版本升级之前大数据平台一直使用的是 0.8.3 这一 Kafka 早期版本,而截止到当前,Kafka 官方最新的 Release 版本已经到了 2.3,于是长期使用 0.8 版本过程中渐渐遇到的很多瓶颈和问题,我们是能够通过版本升级来解决的。
举例来说,以下是一些之前使用旧版时常见的问题:

  • 缺少对 Security 的支持:存在数据安全性问题及无法通过认证授权对资源使用细粒度管理
  • broker under replicated:发现 broker 处于 under replicated 状态,但不确定问题的产生原因,难以解决。
  • 新的 feature 无法使用:如事务消息、幂等消息、消息时间戳、消息查询等。
  • 客户端的对 offset 的管理依赖 zookeeper, 对 zookeeper 的使用过重, 增加运维的复杂度
  • 监控指标不完善:如 topic、partition、broker 的数据 size 指标, 同时 kafka manager 等监控工具对低版本 kafka 支持不好

同时对一些目标版本的特性进行了选型调研,如:

  • 0.9 版本, 增加了配额和安全性, 其中安全认证和授权是我们最关注的功能
  • 0.10 版本,更细粒度的时间戳. 可以基于偏移量进行快速的数据查找,找到所要的时间戳。这在实时数据处理中基于 Kafka 数据源的数据重播是极其重要的
  • 0.11 版本, 幂等性和 Transactions 的支持及副本数据丢失/数据不一致的解决。
  • 幂等性意味着对于同一个 Partition,面对 Data 的多次发布,Kafka broker 端就可以做到自动去重;
  • 对 Transactions 的支持使一个事务下发布多条信息到多个 Topic Partition 时,我们可以使它以原子性的方式被完成。在我们的下游消费者中,很多都是用 Flink 做一些流处理的工作,因此在数据处理及故障恢复时仅一次语义则显得尤为重要。而 0.11 版本对于事务的支持则可以保证与 Kafka 交互的 Flink 应用实现端到端仅一次语义, 支持 EOS 可以对数据可靠性有绝对要求, 比如交易、风控等场景下的重要支持。
  • Leader Epoch:解决了原先依赖水位表示副本进度可能造成的数据丢失/数据不一致问题。
  • 1.1 版本,运维性的提升。比如当 Controller Shut Down,想要关闭一个 Broker 的时候,之前需要一个很长很复杂的过程在 1.0 版本得到很大的改善。

最终选择 1.1 版本, 则是因为出于 Camus 与 Kafka 版本的兼容性及 1.1 版本已经满足了使用场景中重要新特性的支持的综合考量。这里再简单说一下 Camus 组件,同样是由 Linkedin 开源,在我们的大数据平台中主要作为 Kafka 数据 Dump 到 HDFS 的重要方式。

2. 资源隔离

之前由于业务的复杂性和规模不大,大数据平台对于 Kafka 集群的划分比较简单。于是,一段时间以后导致公司业务数据混杂在一起,某一个业务主题存在的不合理使用都有可能导致某些 Broker 负载过重,影响到其他正常的业务,甚至某些 Broker 的故障会出现影响整个集群,导致全公司业务不可用的风险。
针对以上的问题,在集群改造上做了两方面实践

  • 按功能属性拆分独立的集群
  • 集群内部 Topic 粒度的资源隔离

(1)集群拆分

按照功能维度拆分多个 Kafka 物理集群,进行业务隔离,降低运维复杂度。

以目前最重要的埋点数据使用来说, 目前拆分为三类集群,各类集群的功能定义如下:

  • Log 集群:各端的埋点数据采集后会优先落地到该集群, 所以这个过程不能出现由于 Kafka 问题导致采集中断,这对 Kafka 可用性要求很高。因此该集群不会对外提供订阅,保证消费方可控;同时该集群业务也作为离线采集的源头,数据会通过 Camus 组件按小时时间粒度 dump 到 HDFS 中,这部分数据参与后续的离线计算。
  • 全量订阅集群:该集群 Topic 中的绝大部分数据是从 Log 集群实时同步过来的。上面我们提到了 Log 集群的数据是不对外的,因此全量集群就承担了消费订阅的职责。目前主要是用于平台内部的实时任务中,来对多个业务线的数据分析并提供分析服务。
  • 个性定制集群:之前提到过,我们可以根据业务方需求来拆分、合并数据日志源,同时我们还支持定制化 Topic,该集群只需要提供分流后 Topic 的落地存储。

集群整体架构划分如下图:

(2)资源隔离

Topic 的流量大小是集群内部进行资源隔离的重要依据。例如,我们在业务中埋点日志量较大的两个数据源分别是后端埋点数据源 server-event 和端上的埋点 mobile-event 数据源,我们要避免存储两个数据的主题分区分配到集群中同一个 Broker 上的节点。通过在不同 Topic 进行物理隔离,就可以避免 Broker 上的流量发生倾斜。

3. 权限控制和监控告警

(1)权限控制

开始介绍时我们说过,早期 Kafka 集群没有设置安全验证处于裸跑状态,因此只要知道 Broker 的连接地址即可生产消费,存在严重的数据安全性问题。

一般来说, 使用 SASL 的用户多会选择 Kerberos,但就平台 Kafka 集群的使用场景来说,用户系统并不复杂,使用 Kerberos 就有些大材小用, 同时 Kerberos 相对复杂,存在引发其他问题的风险。另外,在 Encryption 方面, 由于都是运行在内网环境,所以并没有使用 SSL 加密。

最终平台 Kafka 集群使用 SASL 作为鉴权方式, 基于 SASL/ SCRAM + ACL 的轻量级组合方式,实现动态创建用户,保障数据安全。

(2)监控告警

之前在集群的使用中我们经常发现,消费应用的性能无缘无故变差了。分析问题的原因, 通常是滞后 Consumer 读取的数据大概率没有命中 Page- cache,导致 Broker 端机器的内核要首先从磁盘读取数据加载到 Page- cache 中后,才能将结果返还给 Consumer,相当于本来可以服务于写操作的磁盘现在要读取数据了, 影响了使用方读写同时降低的集群的性能。

这时就需要找出滞后 Consumer 的应用进行事前的干预从而减少问题发生,因此监控告警无论对平台还是用户都有着重大的意义。下面介绍一下我们的实践思路。

整体方案:

整体方案主要是基于开源组件 Kafka JMX Metrics+OpenFalcon+Grafana:

  • Kafka JMX Metrics:Kafka broker 的内部指标都以 JMX Metrics 的形式暴露给外部。1.1.1 版本 提供了丰富的监控指标,满足监控需要
  • OpenFalcon:小米开源的一款企业级、高可用、可扩展的开源监控系统
  • Grafana:Metrics 可视化系统,大家比较熟悉,可对接多种 Metrics 数据源。

关于监控:

  • Falcon-agent:部署到每台 Broker 上, 解析 Kafka JMX 指标上报数据
  • Grafana:用来可视化 Falcon Kafka Metrics 数据,对 Cluster、Broker、Topic、Consumer 4 个角色制作监控大盘。
  • Eagle:获取消费组 Active 状态、消费组 Lag 积压情况,同时提供 API,为监控告警系统「雷达」提供监控数据。

关于告警:

雷达系统: 自研监控系统,通过 Falcon 及 Eagle 获取 Kafka 指标,结合设定阈值进行告警。以消费方式举例,Lag 是衡量消费情况是否正常的一个重要指标,如果 Lag 一直增加,必须要对它进行处理。

发生问题的时候,不仅 Consumer 管理员要知道,它的用户也要知道,所以报警系统也需要通知到用户。具体方式是通过企业微信告警机器人自动提醒对应消费组的负责人或使用者及 Kafka 集群的管理者。

监控示例:

4. 应用扩展

(1)实时数据订阅平台 

实时数据订阅平台是一个提供 Kafka 使用全流程管理的系统应用,以工单审批的方式将数据生产和消费申请、平台用户授权、使用方监控告警等众多环节流程化自动化, 并提供统一管控。

核心思想是基于 Kafka 数据源的身份认证和权限控制,增加数据安全性的同时对 Kafka 下游应用进行管理。

(2)标准化的申请流程

无论生产者还是消费者的需求,使用方首先会以工单的方式提出订阅申请。申请信息包括业务线、Topic、订阅方式等信息;工单最终会流转到平台等待审批;如果审批通过,使用方会分配到授权账号及 Broker 地址。至此,使用方就可以进行正常的生产消费了。

(3)监控告警

对于平台来说,权限与资源是绑定的,资源可以是用于生产的 Topic 或消费使用的 GroupTopic。一旦权限分配后,对于该部分资源的使用就会自动在我们的雷达监控系统进行注册,用于资源整个生命的周期的监控。 

(4)数据重播

出于对数据完整性和准确性的考量,目前 Lamda 架构已经是大数据的一种常用架构方式。但从另一方面来说, Lamda 架构也存在资源的过多使用和开发难度高等问题。

实时订阅平台可以为消费组提供任意位点的重置,支持对实时数据按时间、位点等多种方式的数据重播, 并提供对 Kappa 架构场景的支持,来解决以上痛点。

(5)主题管理

为什么提供主题管理?举一些很简单的例子,比如当我们想让一个用户在集群上创建他自己的 Kafka  Topic,这时显然是不希望让他直接到一个节点上操作的。因此刚才所讲的服务,不管是对用户来讲,还是管理员来讲,我们都需要有一个界面操作它,因为不可能所有人都通过 SSH 去连服务器。

因此需要一个提供管理功能的服务,创建统一的入口并引入主题管理的服务,包括主题的创建、资源隔离指定、主题元数据管理等。

(6)数据分流

在之前的架构中, 使用方消费 Kafka 数据的粒度都是每个 Kafka Topic 保存 LogSource 的全量数据,但在使用中很多消费方只需要消费各 LogSource 的部分数据,可能也就是某一个应用下几个埋点事件的数据。如果需要下游应用自己写过滤规则,肯定存在资源的浪费及使用便捷性的问题;另外还有一部分场景是需要多个数据源 Merge 在一起来使用的。

基于上面的两种情况, 我人实现了按业务方需求拆分、合并并定制化 Topic 支持跨数据源的数据合并及 appcode 和 event code 的任意组个条件的过滤规则。

Part.3

后续计划

  1. 解决数据重复问题。为了解决目前平台实时流处理中因故障恢复等因素导致数据重复的问题,我们正在尝试用 Kafka 的事务机制结合 Flink 的两段提交协议实现端到端的仅一次语义。目前已经在平台上小范围试用, 如果通过测试,将会在生产环境下推广。
  2. Consumer 限流。在一写多读场景中, 如果某一个 Consumer 操作大量读磁盘, 会影响 Produce 级其他消费者操作的延迟。l因此,通过 Kafka Quota 机制对 Consume 限流及支持动态调整阈值也是我们后续的方向
  3. 场景扩展。基于 Kafka 扩展 SDK、HTTP 等多种消息订阅及生产方式,满足不同语言环境及场景的使用需求。

以上就是关于 Kafka 在马蜂窝大数据平台应用实践的分享,如果大家有什么建议或者问题,欢迎留言。

本文作者:毕博,马蜂窝大数据平台研发工程师。责任编辑:于雪

Comments are closed.