BigDL: Optimized Deep Learning on Apache Spark*

Making Deep Learning More Accessible with an Open Source, Distributed Deep Learning Framework

Artificial intelligence (AI) plays a central role in today’s smart and connected world—and is continuously driving the need for scalable, distributed big data analytics with deep learning capabilities. There is also an increasing demand to conduct deep learning in the same cluster along with existing data processing pipelines to support feature engineering and traditional machine learning. To address the need for a unified platform for big data analytics and deep learning, Intel recently released BigDL, an open source distributed deep learning framework for Apache Spark*. In this article, we’ll discuss BigDL features and how to get started building models using BigDL.

BigDL is implemented as a library on top of Spark (Figure 1), allowing easy scale-out computing. With BigDL, users can write their deep learning applications as standard Spark programs, which can directly run on top of existing Spark or Hadoop* clusters.

 

 

 

 

 

Figure 1 – BigDL implementation

Overview of BigDL

BigDL brings native support for deep learning functionalities to big data and Spark platforms by providing:

  • Rich, deep learning support. Modeled after Torch, BigDL provides comprehensive support for deep learning, including numeric computing (e.g., Tensor) and high-level neural networks. In addition, users can load pretrained Caffe* or Torch models into Spark programs using BigDL.
  • Extremely high performance. To achieve high performance, BigDL uses Intel® Math Kernel Library (Intel® MKL) and multithreaded programming in each Spark task. Consequently, it is orders of magnitude faster than out-of-box open source Caffe, Torch, or TensorFlow* on a single-node Intel® Xeon® processor (i.e., comparable with mainstream GPU).
  • Efficient scale-out. BigDL can efficiently scale out to perform data analytics at big data scale by
    leveraging Apache Spark, as well as efficient implementations of synchronous SGD and all-reduce
    communications on Spark.

Native integration with Spark is a key advantage for BigDL. Since it is built on top of Spark, it is easy to distribute model training, the computationally intensive part of deep learning. Rather than requiring the user to explicitly distribute the computation, BigDL automatically spreads the work across the Spark cluster.

BigDL supports Hadoop and Spark as unified data analytics platforms (for data storage, data processing and mining, feature engineering, classical machine learning, and deep learning) and makes deep learning more accessible to big data users and data scientists.

Typical BigDL use cases include:

  • Analyzing a large amount of data using deep learning technologies, on the same big data (Hadoop and/or Spark) cluster where the data are stored (in, say, HDFS*, HBase*, Hive*, etc.) to eliminate a large volume of unnecessary data transfer between separate systems.
  • Adding deep learning functionalities (either training, fine-tuning, or prediction) to the big data (Spark)
    programs and/or workflow to reduce system complexity and the latency for end-to-end learning.
  • Leveraging existing Hadoop and/or Spark clusters to run the deep learning applications, which can then be dynamically shared with other workloads (e.g., ETL, data warehousing, feature engineering, classical machine learning, graph analytics, etc.).

Getting Started with BigDL

BigDL provides comprehensive support for deep learning, including numeric computing (e.g., Tensor), high-level neural networks, as well as distributed stochastic optimizations (e.g., synchronous minibatch SGD and all-reduce communications on Spark). Table 1 summarizes the abstractions and APIs provided by BigDL.

Name Name Descriptions
Tensor Multidimensional array of numeric types (e.g., Int, Float, Double)
Module Individual layers of the neural network (e.g., ReLU, Linear, SpatialConvolution,Sequential)
Criterion Given input and target, computing gradient per given loss function
Sample A record consisting of feature and label, each of which is a tensor
DataSet Training, validation, and test data; one may use Transformer to perform series of data transformations (w/ -> operators) on DataSet
Engine Runtime environment for the training (e.g., node#, core#, spark versus local,multithreading)
Optimizer Stochastic optimizations for local or distributed training (using various OptimMethodsuch as SGD, AdaGrad)

Table 1. BigDL abstractions and APIs

A BigDL program can run either as a local Scala/Java* program or as a Spark program. [Editor’s Note: Python* support will be available shortly and may even be available by the time this article is published.] To quickly experiment with BigDL code as a local Scala/Java program using the interactive Scala shell (REPL), one can first type:

$ source PATH_To_BigDL/scripts/bigdl.sh
$ SPARK_HOME/bin/spark-shell --jars bigdl-0.1.0-SNAPSHOT-jar-with-dependencies.
jar


The Scala shell will show something like:

Welcome to
    ____              __
   / __/__  ___ _____/ /__
  _\ \/ _ \/ _ `/ __/  ‘_/
 /___/ .__/\_,_/_/ /_/\_\  version 1.6.0
    /_/
   
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79)
Spark context available as sc.
scala>

One can then experiment with the Tensor APIs in BigDL as follows:

scala> import com.intel.analytics.bigdl.tensor.Tensor
import com.intel.analytics.bigdl.tensor.Tensor

scala> Tensor[Double](2,2).fill(1.0)
res9: com.intel.analytics.bigdl.tensor.Tensor[Double] =
1.0 1.0
1.0 1.0
[com.intel.analytics.bigdl.tensor.DenseTensor of size 2x2]


One can also experiment with the Module APIs in BigDL as follows:

scala> import com.intel.analytics.bigdl.numeric.NumericFloat // import global
float tensor numeric type
import com.intel.analytics.bigdl.numeric.NumericFloat

scala> import com.intel.analytics.bigdl.nn._
import com.intel.analytics.bigdl.nn._

scala> val f = Linear(3,4) // create the module
mlp: com.intel.analytics.bigdl.nn.Linear[Float] = nn.Linear(3 -> 4)

// let’s see what f’s parameters were initialized to. (‘nn’ always inits to
something reasonable)
scala> f.weight
res5: com.intel.analytics.bigdl.tensor.Tensor[Float] =
-0.008662592 0.543819 -0.028795477
-0.30469555 -0.3909278 -0.10871882
0.114964925 0.1411745 0.35646403
-0.16590376 -0.19962183 -0.18782845
[com.intel.analytics.bigdl.tensor.DenseTensor of size 4x3]


Building a Simple Text Classifier with BigDL

Going beyond APIs, let’s see how to build a text classifier using a simple convolutional neural network (CNN) model.

A BigDL program starts with import com.intel.analytics.bigdl._ and then initializes the engine (including the number of executor nodes, the number of physical cores on each executor, and whether it runs on Spark or as a local Java program):

  val sc = new SparkContext(
   Engine.init(param.nodeNum, param.coreNum, true).get
    .setAppName(“Text classification”)
    .set(“spark.akka.frameSize”, 64.toString)
    .set(“spark.task.maxFailures”, “1”))


After that, the example broadcasts the pretrained word embedding and loads the input data using RDD transformations (vectorizedRdd):

   // For large dataset, you might want to get such RDD[(String, Float)] from
HDFS
   val dataRdd = sc.parallelize(loadRawData(), param.partitionNum)
   val (word2Meta, word2Vec) = analyzeTexts(dataRdd)
   val word2MetaBC = sc.broadcast(word2Meta)
   val word2VecBC = sc.broadcast(word2Vec)
   val vectorizedRdd = dataRdd
     .map {case (text, label) => (toTokens(text, word2MetaBC.value), label)}
     .map {case (tokens, label) => (shaping(tokens, sequenceLen), label)}
     .map {case (tokens, label) => (vectorization(
      tokens, embeddingDim, word2VecBC.value), label)}


It then converts the processed data (vectorizedRdd) to an RDD of Sample, and then randomly splits the sample RDD (sampleRDD) into training data (trainingRDD) and validation data(valRDD):

  val sampleRDD = vectorizedRdd.map {case (input: Array[Array[Float]],
                       label: Float) =>
    Sample(
     featureTensor = Tensor(input.flatten, Array(sequenceLen, embeddingDim))
       .transpose(1, 2).contiguous(),
     labelTensor = Tensor(Array(label), Array(1)))
  }
  
  val Array(trainingRDD, valRDD) = sampleRDD.randomSplit(
   Array(trainingSplit, 1 - trainingSplit))



After that, the example builds the CNN model by calling buildModel:

   def buildModel(classNum: Int): Sequential[Float] = {
    val model = Sequential[Float]()
    model.add(Reshape(Array(param.embeddingDim, 1, param.maxSequenceLength)))
    model.add(SpatialConvolution(param.embeddingDim, 128, 5, 1))
    model.add(ReLU())
    model.add(SpatialMaxPooling(5, 1, 5, 1))
    model.add(SpatialConvolution(128, 128, 5, 1))
    model.add(ReLU())
    model.add(SpatialMaxPooling(5, 1, 5, 1))
    model.add(SpatialConvolution(128, 128, 5, 1))
    model.add(ReLU())
    model.add(SpatialMaxPooling(35, 1, 35, 1))
    model.add(Reshape(Array(128)))
    model.add(Linear(128, 100))
    model.add(Linear(100, classNum))
    model.add(LogSoftMax())
    model
  }


It then creates the Optimizer, passes the RDD of training data (trainingRDD) to the Optimizer (with specific batch size), and finally trains the model (using Adagrad as the optimization method, and setting relevant hyperparameters in state):

   val optimizer = Optimizer(
    model = buildModel(classNum),
    sampleRDD = trainingRDD,
    criterion = new ClassNLLCriterion[Float](),
    batchSize = param.batchSize
   )
   val state = T(“learningRate” -> 0.01, “learningRateDecay” -> 0.0002)
   optimizer
    .setState(state)
    .setOptimMethod(new Adagrad())
  .setValidation(Trigger.everyEpoch, valRDD, Array(new Top1Accuracy[Float]),
          param.batchSize)
    .setEndWhen(Trigger.maxEpoch(2)) 
    .optimize()



Building an End-to-End Application with BigDL

With BigDL, users can build end-to-end AI applications using a single analytics pipeline based on Spark, including data management, feature management, feature transformations, model training and prediction, and results evaluation. We have worked with customers in different domains and developed end-to-end solutions using BigDL for fraud detection and defect detection, to name a couple. Figure 2 illustrates an end-to-end image recognition and object detection pipeline built using BigDL on Spark, which collects and processes large volumes of images from manufacturing pipelines and automatically detects product defects from these images (using convolutional neural network models on BigDL).

 

 

 

 

 

 

 

 

Figure 2 – End-to-end image recognition and object detection pipeline

Making Deep Learning Accessible

In this article, we discussed BigDL, an open source distributed deep learning framework for Apache Spark. BigDL makes deep learning more accessible to big data users and data scientists by allowing users to write their deep learning applications as standard Spark programs, and to run these deep learning applications directly on top of existing Spark or Hadoop clusters. As a result, it makes Hadoop/Spark a unified platform for data storage, data processing and mining, feature engineering, traditional machine learning, and deep learning workloads, which can provide better economy of scale, higher resource utilization, ease of use/development, and better TCO.

Software and workloads used in performance tests may have been optimized for performance only on Intel microprocessors. Performance tests, such as SYSmark and MobileMark, are measured using specific computer systems, components, software, operations and functions. Any change to any of those factors may cause the results to vary. You should consult other information and performance tests to assist you in fully evaluating your contemplated purchases, including the performance of that product when combined with other products. For more complete information visit http://www.intel.com/performance.

Intel’s compilers may or may not optimize to the same degree for non-Intel microprocessors for optimizations that are not unique to Intel microprocessors. These optimizations include SSE2, SSE3, and SSSE3 instruction sets and other optimizations. Intel does not guarantee the availability, functionality, or effectiveness of any optimization on microprocessors not manufactured by Intel. Microprocessor-dependent optimizations in this product are intended for use with Intel microprocessors. Certain optimizations not specific to Intel microarchitecture are reserved for Intel microprocessors. Please refer to the applicable product User and Reference Guides for more information regarding the specific instruction sets covered by this notice.

For more complete information about compiler optimizations, see our Optimization Notice.