只需10分钟即可在Deep Java Library中使用Spark进行深度学习 – Qing Lan

Apache Spark是一种广泛使用的数据处理技术,并且被机器学习用户大量使用。Spark可用于对产品进行分类,预测需求并个性化建议。尽管Spark支持多种编程语言,但首选的Spark SDK是为Scala实现的,大多数深度学习框架都没有很好地支持它。大多数机器学习框架都倾向于将Python与SDK结合使用,从而使Spark开发人员的选择不尽人意:将其代码移植到Python或实现自定义Scala包装器。这些选项影响开发人员的工作速度,并以易碎的代码威胁生产环境。在此博客中,我们演示了用户如何使用 Deep Java库 直接从Scala执行深度学习工作负载(DJL)。DJL是一个框架无关的库,旨在直接在使用Java开发的Spark作业中提供深度学习。在下面的教程中,尽管也支持PyTorch和TensorFlow,我们将逐步介绍使用MXNet进行图像分类的方案。

有关本文完整代码,请参见 DJL Spark图像分类示例

示例:使用DJL和Spark进行图像分类

在本教程中,我们使用 resnet50 (一种预先训练的模型)来运行推理。对于本教程,我们将使用具有三个工作程序节点的单个群集进行分类。

我们的示例将在流程中创建多个执行器,并为每个执行器分配任务。每个执行器包含一个或多个在不同线程中执行任务的核心。这为每个工作节点提供了均衡的工作负载以进行大数据处理。

步骤1.创建Spark项目

我们使用流行的开源工具 sbt 在Scala中构建此Spark项目。您可以在 此处 找到有关如何开始使用sbt的更多资源。我们使用以下代码块在sbt中定义我们的项目:

name := "sparkExample"
version := "0.1"
scalaVersion := "2.11.12"
scalacOptions += "-target:jvm-1.8"
resolvers += Resolver.mavenLocal
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
libraryDependencies += "ai.djl" % "api" % "0.5.0"
libraryDependencies += "ai.djl" % "repository" % "0.5.0"
// Using MXNet Engine
libraryDependencies += "ai.djl.mxnet" % "mxnet-model-zoo" % "0.5.0"
libraryDependencies += "ai.djl.mxnet" % "mxnet-native-auto" % "1.6.0"

本教程使用MXNet作为其基础引擎。可轻而易举地切换到另一个框架,如下例所示:

// Using PyTorch Engine
libraryDependencies += "ai.djl.pytorch" % "pytorch-model-zoo" % "0.5.0"
libraryDependencies += "ai.djl.pytorch" % "pytorch-native-auto" % "1.5.0"

步骤2:配置Spark

在本教程中,我们在本地计算机上运行此示例。Spark应用程序将使用以下配置:

// Spark configuration
val conf = new SparkConf()
  .setAppName("Simple Image Classification")
  .setMaster("local
  • ") .setExecutorEnv("MXNET_ENGINE_TYPE", "NaiveEngine") val sc = new SparkContext(conf)
  • MXNet中的多线程推理需要NaiveEngine参数。如果使用PyTorch或TensorFlow,则可以删除以下行:

    .setExecutorEnv("MXNET_ENGINE_TYPE", "NaiveEngine")

    步骤3:识别输入数据

    在本教程中,输入数据表示为包含要分类图像的文件夹。Spark将加载这些二进制文件并将其分区到不同的分区。每个分区由一个执行程序执行。以下语句将在每个分区上均匀分布文件夹中的所有图像。

    val partitions = sc.binaryFiles("images/*")

    步骤4:定义Spark作业

    接下来,我们使用上一步中创建的分区为此作业创建执行图。在Spark中,每个执行程序都以多线程方式执行任务。结果,我们需要在执行推理之前将每个模型加载到执行器中。我们使用以下代码进行设置:

    // Start assign work for each worker node
    val result = partitions.mapPartitions( partition => {
       // before classification
        val criteria = Criteria.builder
            .optApplication(Application.CV.IMAGE_CLASSIFICATION)
            .setTypes(classOf[BufferedImage], classOf[Classifications])
            .optFilter("dataset", "imagenet")
            .optFilter("layers", "50")
            .optProgress(new ProgressBar)
            .build
       val model = ModelZoo.loadModel(criteria)
        val predictor = model.newPredictor()
       // classification
       partition.map(streamData => {
            val img = ImageIO.read(streamData._2.open())
            predictor.predict(img).toString
        })
    })

    需要为每个分区指定 ModelZoo 的条件,以找到相应的模型并创建预测变量。在分类过程中,我们从 RDD 加载图像并为其创建推理。该模型使用 ImageNet数据集 进行了训练,并存储在DJL ModelZoo中。

    步骤5:定义输出位置

    完成映射过程后,主节点将收集,汇总结果并将其保存在文件系统中。

    result.collect().foreach(print)
    result.saveAsTextFile("output")

    运行此代码将产生前面列出的输出类。输出文件将保存到output不同分区的文件夹中。有关本教程的完整代码,请参见 Scala示例 。控制台的预期输出:

    [
        class: "n02085936 Maltese dog, Maltese terrier, Maltese", probability: 0.81445
        class: "n02096437 Dandie Dinmont, Dandie Dinmont terrier", probability: 0.08678
        class: "n02098286 West Highland white terrier", probability: 0.03561
        class: "n02113624 toy poodle", probability: 0.01261
        class: "n02113712 miniature poodle", probability: 0.01200
    ][
        class: "n02123045 tabby, tabby cat", probability: 0.52391
        class: "n02123394 Persian cat", probability: 0.24143
        class: "n02123159 tiger cat", probability: 0.05892
        class: "n02124075 Egyptian cat", probability: 0.04563
        class: "n03942813 ping-pong ball", probability: 0.01164
    ][
        class: "n03770679 minivan", probability: 0.95839
        class: "n02814533 beach wagon, station wagon, wagon, estate car, beach waggon, station waggon, waggon", probability: 0.01674
        class: "n03769881 minibus", probability: 0.00610
        class: "n03594945 jeep, landrover", probability: 0.00448
        class: "n03977966 police van, police wagon, paddy wagon, patrol wagon, wagon, black Maria", probability: 0.00278
    ]

    在您的生产系统中

    在这种方法中,我们使用RDD为演示目的执行图像作业。随着 DataFrame使用率趋势并节省高速缓存 ,生产用户应考虑为这些图像创建一个架构并将其以DataFrame格式存储。从Spark 3.0开始,Spark提供了一个 二进制文件读取器选项 ,使图像转换为DataFrame更加方便。

    案例分析

    亚马逊零售系统(ARS)使用DJL对通过Spark路由的大量数据流进行数百万个预测。这些预测确定了客户倾向于使用数千种客户属性跨多个类别采取行动,然后向客户呈现相关广告和横幅的倾向。ARS使用成千上万的功能和数亿的客户-超过10万亿个数据点。他们需要一个可以有效扩展的解决方案。为了解决这个关键问题,他们最初为他们的工作创建了一个Scala包装器,但是包装器存在内存问题并且执行缓慢。在采用DJL之后,他们的解决方案可与Spark完美配合,并且总推理时间从数天缩短至数小时。