fat-squirrel-with-nut-in-mouth

Flink:像松鼠一样快

走过这些天最热门的 IT街道意味着您可能听说过实现流式机器学习,即将 AI 转向流式处理场景,并利用实时功能以及新的人工智能技术。此外,您也会注意到,尽管人们对该主题的兴趣日益浓厚,但缺乏与该主题相关的研究。

如果我们尝试深入一点地研究它,我们就会意识到缺少一个步骤:如今,众所周知的流式处理应用程序仍然无法正确实现模型服务的概念,而行业仍然依赖于lambda体系结构。达到目标。假设一个银行有一个具体经常更新的批量训练的机器学习模型(例如,针对过去的缓冲区溢出攻击尝试应用了优化的梯度下降),并且它希望将模型直接部署到自己的金丝雀。

分布式IDS ( 由流系统支持 ) 以实现有关模型质量的实时响应。从概念上讲,银行应有机会自动将经过训练的模型加载到 IDS 并实时利用它,以便计算对传入事件的预测,实现持久且始终最新的覆盖、在线欺诈检测和保存很多钱

您也可以喜欢:ApacheFlink基本转换示例。

不幸的是,银行被迫使用预定义的布局在基础架构中分发模型,而且(大多数时候)您必须直接部署权重矢量,并通过硬编程数学指令进行计算预测;鉴于繁琐的现实,银行将依靠良好的安全旧并行批处理工作,调查持续事件,因为他们可用磁盘。为了解决这一巨大差距,我们在此介绍Flink-JPMML (repo),一个全新开源的 Scala库,旨在实现流模型服务预测的规模Apache Flink实时引擎。

松鼠一样快

Apache Flink是一个开源分布式流式处理引擎;只要以荒谬的规模进行实时复杂事件处理,它就可提供高可用性和精确一致性。Flink 还提供批处理计算作为流的子情况。Radicalbit的核心使用Flink,尽管如此,它还是惊叹于效率、鲁棒性和可扩展性功能,使自己完美地契合了Kappa架构的核心。 

PMML代表预测标记模型语言,它代表了不同系统中机器学习模型持久性的既定标准。PMML 基于真正高效的xml 语义,它允许定义经过训练的无监督/监督、概率和深度学习模型,以便坚持独立于源的定型模型。这可以由任何系统导入/导出com/jpmml/jpmml-评估器”rel=”nofollow”目标\”\blank”=JPMML评估器库,以便在Flink-jpmml中采用标准。

用户定义的预测(如 Flink API)

首先,为了运行 Flink-JPMML,添加以下依赖项:如果您是sbt-er,

"io.radicalbit" %% "flink-jpmml-scala" % "0.6.3" 

改为为maven用户

<dependencies>
  <dependency>
    <groupId>io.radicalbit</groupId>
    <artifactId>flink-jpmml-scala</artifactId>
    <version>0.6.3</version>
  </dependency>
</dependencies>

可能,您还需要在本地发布库;为此,请按照以下步骤操作:

  1. 在 flink -> sbt 中启动sbt接口。
  2. 跳进flink-jpmml-scala项目目录>项目flink-jpmml-scala.
  3. 在本地存储库 >发布本地发布库。

此时,flink-jpmml 需要提供 scala-core、flink-流和 flink 客户端库。让我们继续吧。无论您的 PMML 模型位于何处,只需提供路径。

val sourcePath = "/path/to/your/pmml/model.xml" 

这将是你唯一需要费心的事情:Flink-JPMML通过实现专用的模型阅读器,自动检查分布式后端对Flink。

import io.radicalbit.flink.pmml.scala.api.reader.ModelReader

val modelReader = ModelReader(sourcePath)

现在,让我们定义一个输入流。

import org.apache.flink.streaming.api.scala._

case class IrisInput(pLength: Double, pWidth: Double, sLength: Double, sWidth: Double, timestamp: Long, color: Int, prediction: Option[String]) {
 def toVector: Vector = DenseVector(pLength, pWidth, sLength, sWidth)
 }

val env = StreamExecutionEnvironment.getExecutionEnvironment
val events: DataStream[IrisInput] = yourIrisSource(env)

来吧。以下点导入

import io.radicalbit.flink.pmml.scala._ 

使用评估方法扩展 Flink 数据流。严格地说,它为您提供了一个工具,让我们实现实时流式预测。

import io.radicalbit.flink.pmml.scala._
import org.apache.flink.ml.math.Vector

val out = events.evaluate(modelReader) { (event, model) =>

// flink pmml model requires to be evaluated against Flink Vectors
val vectorEvent: Vector = event.toVector

// now we can call model: PmmlModel predict method
val prediction = model.predict(vectorEvent)

// Prediction container own the prediction result as a ADT called Score
prediction match {

case Prediction(Score(value)) =>
// return the event with updated prediction
event.copy(kind = Some(computeKind(value)))
case Prediction(EmptyScore) =>
// return just the event
logger.info("It was not possible to predict event {}", event); event
}
out.print()

env.execute("Flink JPMML simple execution.")
}

private def computeKind(value: Double): String = {
    value match {
    case 1.0 => "Iris-setosa"
    case 2.0 => "Iris-versicolor"
    case 3.0 => "Iris-virginica"
    case _ => "other"
    }
}

现在,您可以获取此处提供的示例 PMML 聚类模型,唯一负责将类添加为输出参数;所以让我们简单地添加

<groupid>MiningField name="class" invalidValueTreatment="asIs" usageType="predicted"/</groupid>

到矿场列表Flink-JPMML 将向您发送有关加载状态的日志消息:

19/09/10 14:33:11 INFO package$RichDataStream$$anon$1: Model has been read successfully, model name: k-means 
最后,我们让运算符输出一些随机花。

IrisInput(5.7,1.8,2.5,0.7, 34, 1495635020923, Some(Other))
IrisInput(5.5,3.8,5.2,4.3, 93, 1495635020233, Some(Iris-setosa))
IrisInput(4.3,2.3,2.0,3.1, 122, 1495635020100, Some(Other))
IrisInput(5.1,5.7,4.8,2.1,255, 1495635020583, Some(Iris-versicolor))
IrisInput(4.2,0.8,0.9,2.6, 0, 1495635020921, Some(Iris-virginica))

Flink-JPMML 还带来了一个快捷方式,以便对 Flink 矢量的数据流执行快速预测。此功能如下:

val vectorStream = events.map(_.toVector) 
val predictions: (Prediction, Vector) = vectorStream.evaluate(reader)

如果用户需要在评估之前应用具体的数学预处理,并且只需要预测结果(例如模型质量评估),则这非常有用。

幕后发生了什么?

Flink-jpmml 具有简单且易于使用的 API 结构,它尝试将所有性能作为目标,使 Flink 成为当今最强大的分布式处理引擎之一。

读者

ModelReader 对象旨在从每个 Flink 支持的分布式系统检索 PMML 模型;也就是说,它能够从任何支持的分布式文件系统(例如 HDFS、Alluxio)加载。模型读取器实例将交付给任务管理器,后者仅在运算符具体化时利用前者的 API:这意味着模型被懒洋洋地覆盖。

模型

该库允许 Flink 通过为每个任务管理器使用单例加载器来加载模型,因此它独立于每个 TM 上运行的子任务数进行读取。这种优化允许 Flink 在线程安全性中扩展模型评估,因为即使是真正基础的 PMMM 也可以在几百个 MB 上增长。

评估为 UDF

评估方法实现一个基础的 FlatMap 实现,并且由上述用户定义的函数丰富,该函数由用户作为部分函数提供。以前,这个想法是创建一些a-la-flinkML,即一个由策略模式塑造的核心对象,以便像使用典型的ML库那样计算预测。

但是,在一天结束时,我们执行一个流任务,因此用户具有无界输入事件和模型作为 PmmlModel 的实例。此处 Flink-JPMML 要求用户仅计算预测,但无论如何,UDF 允许应用任何类型的自定义操作,并允许任何可序列化的输出类型。

关闭

我们引入了一个可扩展的轻量级库,称为 Flink-JPMML,利用 Apache Flink 功能作为实时处理引擎,并提供一种全新的方法来为随PMML标准导出的任何机器学习模型提供服务。在下一篇博文中,我们将讨论 Flink-JPMML 如何允许用户管理 NaN 值,我们将介绍库如何处理失败;此外,我们将提供 Flink 矢量选择背后的原因,我们将指出我们期望遵循的步骤,以使此库变得更好。

我们非常乐意欢迎 Flink-JPMML 的新贡献者,只需检查存储库和打开的问题。

相关文章

com/文章/数据流使用apache-flink–apache-ignite”rel=”nofollow”=使用ApacheFlink和ApacheIgnite的数据流。

Comments are closed.