Advancing Artificial Intelligence on Apache Spark* with BigDL
Features, Use-cases, and the Future
BigDL has evolved into a vibrant open-source project since Intel introduced it in December of 2016. In this article, we give details on its implementation, describe some real-world use-cases, and provide a glimpse into the new end-to-end analytics plus artificial intelligence (AI) pipelines (the Analytics Zoo platform) being built on top of Apache Spark* and BigDL. [Editor’s note: We ran the article BigDL: Optimized Deep Learning on Apache Spark in The Parallel Universe, Issue 28, describing the first version of this framework. Much has changed since then.]
Demand is growing for organizations to apply deep learning technologies―computer vision, natural language processing, generative adversarial networks―in their big data platforms and analytics pipelines. To support this convergence of deep learning and big data analytics, Intel open-sourced the BigDL distributed deep learning framework back in December of 2016.
BigDL is implemented as an Apache Spark library that lets users write large-scale deep learning applications (including model training and inference) as standard Spark programs that can run directly on existing big data (Hadoop* or Spark) clusters.
BigDL provides comprehensive support for deep learning technologies:
- Neural network operations
In particular, users can run existing models defined in other frameworks (i.e., TensorFlow*, Keras*, Caffe*, and Torch*) directly on Spark in a distributed way. BigDL also provides seamless integration of deep learning technologies into the big data ecosystem. A BigDL program can not only interact directly with Spark components (e.g., DataFrames, Spark Streaming, ML Pipelines), it can also run in a variety of big data frameworks (e.g., Apache Storm*, Apache Flink*, Apache Kafka*).
Writing BigDL Applications
Since BigDL is implemented in Apache Spark, users can easily do end-to-end data analytics, as illustrated in the five-step sequence below.
Step 1: Read text data (articles and associated labels) into a Spark resilient distributed dataset (RDD).
spark = SparkContext(appName=”text_classifier”, …) texts_rdd = spark.textFile(“hdfs://...”) words_rdd = texts_rdd.map(lambda text, label: ([w for w in to_words(text)], label))
Step 2: Transform loaded data into an RDD of training samples using RDD transformations.
w2v = news20.get_glove_w2v(dim=…) vector_rdd = words_rdd.map(lambda word_list, label ([to_vec(w, w2v) for w in word_list], label)) sample_rdd = vector_rdd.map(lambda vector_list, label: to_sample(vector_list, label))
Step 3: Construct a neural network model.
model = Sequential().add(Recurrent().add(LSTM(…))) .add(Linear(…)) .add(LogSoftMax())
Step 4: Train the model (with the specified loss and optimization method).
loss = ClassNLLCriterion() optim_method = Adagrad() optimizer = Optimizer(model=model, training_rdd=sample_rdd, criterion=loss, optim_method=optim_method, …) optimizer.set_train_summary(summary=TrainSummary(…)) trained_model = optimizer.optimize()
Step 5: Predict results using existing models (pretrained in BigDL, TensorFlow, Keras, Caffe, or Torch).
test_rdd = … prediction_rdd = trained_model.predict(test_rdd)
BigDL provides highly scalable, distributed training using synchronous mini-batch stochastic gradient descent and data parallelism. It implements a parameter server (PS) architecture (through an all-reduce operation) directly on top of the BlockManager inside Spark. After each task computes its local gradients, instead of sending them back to the driver, it locally aggregates the gradients from all partitions within a single worker. Then the aggregated gradient on each node is sliced into chunks that are exchanged among all the cluster nodes. Each node is responsible for a specific chunk, which in essence implements a PS architecture in BigDL for parameter synchronization. Each node retrieves and aggregates gradients for the slice of the model that it’s responsible for computing. After the pairwise exchange completes, each node has its own portion of aggregated gradients, which it uses to update its own portion of weights. Then the exchange happens again to synchronize the updated weights. At the end of this procedure, each node has a copy of the updated weights. As a result, BigDL can perform highly scalable, distributed training of deep learning models (Figure 1).
Quantization refers to technologies that store numbers and perform calculations in more compact, lower-precision form. BigDL takes advantage of this type of low-precision computing to quantize existing models for optimized inference. BigDL first loads existing models, then quantizes the parameters of selected layers (e.g., spatial convolution) into 8-bit integer form to produce a quantized model (Figure 2).
During model inference, each quantized layer:
- Quantizes the 32-bit floating point input to 8-bit integer
- Performs the 8-bit calculations (such as GEMM) using the quantized parameters and data
- Dequantizes the results to 32-bit floating point
Many of these operations can be fused in the implementation, so the quantization and dequantization.
Math.round(1.0 * value / Math.max(Math.abs(max), Math.abs(min)) * Byte.MaxValue).toByte
Unlike many quantization implementations, BigDL adopts a new local quantization scheme that performs the quantization and dequantization operations in each small, local quantization window (a small sub-block, such as a patch or kernel in convolution, of the parameters or input data). As a result, BigDL can use very low-bit integers (i.e., 8-bit) in model quantization with minimal loss in accuracy (less than 0.1%), a 4x reduction in model size, and up to a 2x speedup in inference (Figure 3).
Besides being a standard Spark program, BigDL also provides support for model training and inference on a local Java* virtual machine (JVM) without Spark. This helps improve efficiency when running BigDL on a single node because there’s no overhead from parameter synchronization or task scheduling. More importantly, it becomes easier to directly integrate BigDL models (for either inference or fine tuning) with various JVM-based big data frameworks (e.g., Apache Storm, Flink, or Kafka).
Use-Case and Applications
In this section, we describe three typical BigDL use-cases:
- Model inference
- Distributed training
- Transfer learning
Image Feature Extraction
JD.com is one of the largest online retailers in the world. It’s built an end-to-end object detection and image feature extraction pipeline on top of BigDL and Spark (Figure 4). The pipeline first reads hundreds of millions of pictures from a distributed database into Spark (as an RDD of pictures), and then preprocesses the RDD (including resizing, normalization, and batching) in a distributed fashion using Spark. After that, it uses BigDL to load an SSD model (pretrained in Caffe) for large-scale, distributed object detection on Spark. This generates the coordinates and scores for the detected objects in each of the pictures. It then generates the target images (by keeping the highest-scoring object as the target and cropping the original picture based on the coordinates of the target) and further preprocesses the RDD of target images (including resizing and batching). Finally, it uses BigDL to load a DeepBit model (again, pretrained in Caffe) for distributed feature extraction of the target images to generate the corresponding features. Finally, it stores the results (i.e., an RDD of extracted object features) in the Hadoop Distributed File System (HDFS).
The entire data analytics and deep learning pipeline―including data loading, partitioning, preprocessing, model inference, and storing the results―is easy to implement under a unified programming paradigm using Spark and BigDL. The end-to-end pipeline also delivers a speedup of approximately 3.83x compared to running the same solution on a GPU cluster (Figure 5), as reported by JD.3
Cray has integrated BigDL into its Urika-XC* analytics software suite and built an end-to-end precipitation nowcasting (predicting short-term precipitation) workflow that includes data preparation, model training, and inference (Figure 6). First, the application reads more than a terabyte of raw radar scan data into Spark as an RDD of radar images and then converts it into an RDD of NumPy* ndarrays. It then trains a sequence-to-sequence model using a sequence of images leading up to the current time as the input with a sequence of predicted images, arbitrarily far in the future, as the output. After the model is trained, it can be used for predictions such as precipitation patterns for the next hour (Figure 7).
Image Similarity-Based Recommendations
MLSListings Inc. is a large real estate multiple listing service (MLS) that’s been building an image-similarity-based house recommendation system on Spark and BigDL. The end-to-end workflow is implemented through transfer learning (including feature extractions and fine-tuning) to compute both the semantic and visual similarity of house photos (Figure 8). To compute the semantic similarity for the photos, the system fine-tunes the Inception v1 model pretrained on the Places dataset4 to build three new classifiers that determine whether the photo shows the house front exterior, the house style, and the number of stories.
It first loads three pretrained Inception v1 models and then appends two new layers (a fully-connected layer followed by a Softmax* layer) to each model to train the new classifiers. After training, these classifiers can be used to tag (or label) each house listing photo. To compute the visual similarity, the system uses the VGG-16 model pretrained on the Places dataset to extract the image feature for each house listing photo This is then combined with the tags generated by the classifiers and stored as a distributed table. At model serving time, the user can select a house listing photo and have the system recommend listings with similar visual characteristics (Figure 9).
What’s Next for BigDL?
While BigDL is integrated into Spark and extends its capabilities to address the challenges of big data application developers, will a library alone be enough to simplify and accelerate the deployment of ML/DL workloads on production clusters? Based on the community feedback and customer implementation, Intel has built Analytics Zoo, an end-to-end analytics plus AI platform that makes it easy to build Spark and BigDL applications by providing high-level pipeline APIs and built-in deep learning models, with reference use-cases.
High-Level Pipeline APIs
Analytics Zoo provides a set of easy-to-use, high-level pipeline APIs on top of Spark and BigDL, including:
- nnframes: Native deep learning support in Spark DataFrames and ML Pipelines
- autograd: Building custom layer/loss using auto-differentiation operation
- Transfer learning: Customizing pretrained models for feature extraction or fine tunin
Using these high-level pipeline APIs, users can easily build complex deep learning pipelines in just a few lines of code.
Step 1: Initialize NNContext and load images into DataFrames using NNImageReader.
from zoo.common.nncontext import * from zoo.pipeline.nnframes import * sc = init_nncontext () imageDF = NNImageReader.readImages(image_path, sc)
Step 2: Process data using DataFrames transformations
getName = udf(lambda row: ...) getLabel = udf(lambda name: ...) df = imageDF.withColumn(“name”, getName(col(“image”))) \ .withColumn(“label”, getLabel(col(‘name’)))
Step 3: Process images using built-in feature engineering operations.
from zoo.feature.image import * transformer = RowToImageFeature() \ -> ImageResize(64, 64) \ -> ImageChannelNormalize(123.0, 117.0, 104.0) \ -> ImageMatToTensor() \ -> ImageFeatureToTensor())
Step 4: Load an existing model (in this case, pre-trained in Caffe), remove the last few layers, and freeze the first few layers.
from zoo.pipeline.api.net import * full_model = Net.load_caffe(def_path, model_path) # Remove layers after pool5/drop_7x7_s1 model = full_model.new_graph([“pool5/drop_7x7_s1”]) # freeze layers from input to pool4/3x3_s2 inclusive model.freeze_up_to([“pool4/3x3_s2”])
Step 5: Add a few new layers using a Keras-style API and a custom Lambda layer.
from zoo.pipeline.api.autograd import * from zoo.pipeline.api.keras.layers import * from zoo.pipeline.api.keras.models import * def add_one_func(x): return x + 1.0 input = Input(name=”input”, shape=(3, 224, 224)) inception = model.to_keras()(input) flatten = Flatten()(inception) lambda = Lambda(function=add_one_func)(flatten) logits = Dense(2)(lambda) newModel = Model(input, logits)
Step 6: Train the model using Spark ML Pipelines.
cls = NNClassifier(model, CrossEntropyCriterion(), transformer) \ .setLearningRate(0.003).setBatchSize(40) \ .setMaxEpoch(1).setFeaturesCol(“image”) \ .setCachingSample(False) nnModel = cls.fit(df)
Built-In Deep Learning Models
Analytics Zoo provides several built-in deep learning models you can use for a variety of problem types such as object detection, image classification, text classification, and recommendation.
- Object detection: Using the Object Detection API (including a set of pretrained detection models such as SSD and Faster-RCNN), you can easily build object detection applications (e.g., localizing and identifying multiple objects in images and videos).
- Image classification: Using the Image Classification API (including a set of pretrained detection models such as VGG, Inception, ResNet, and MobileNet), you can easily build image classification applications.
- Text classification: The Text Classification API provides a set of pre-defined models (e.g., CNN, LSTM) for text classification applications.
- Recommendation: The Recommendation API provides a set of pre-defined models (e.g., Neural Collaborative Filtering, Wide and Deep Learning) to create recommendation engines.
You can easily use the Analytics Zoo Object Detection API for out-of-box inference on a large set of images.
Step 1: Download object detection models in Analytics Zoo from the Detection Model Zoo. A collection of detection models (pretrained on the PSCAL VOC and COCO datasets) is available.
Step 2: Load the image data and object detection model.
from zoo.common.nncontext import get_nncontext from zoo.models.image.objectdetection import * spark = init_nncontext() image_set = ImageSet.read(img_path, spark)
Step 3: Use the Object Detection API for off-the-shell inference and visualization
output = model.predict_image_set(image_set) visualizer = Visualizer(model.get_config().label_map(), \ encoding=”jpg”) visualized = visualizer(output).get_image(to_chw=False) \ .collect() for img_id in range(len(visualized)): cv2.imwrite(output_path + ‘/’ + str(img_id) + ‘.jpg’, \ visualized[img_id])
The Big Deal about BigDL
BigDL is a work in progress, but initial experience and feedback are encouraging. Since its initial release in December of 2016, it’s received over 2,400 stars on GitHub* and enabled many users to build new analytics and deep learning applications on top of Hadoop and/or Spark clusters. The new Analytics Zoo project will make it even easier to build BigDL applications by providing an end-to-end analytics plus AI platform on top of Apache Spark and BigDL.
1. Alex Heye, et al. “Scalable Deep Learning with BigDL on the Urika-XC Software Suite.”
2. Jason (Jinquan) Dai, et al. “Leveraging Low Precision and Quantization for Deep Learning Using the Amazon EC2 C5 Instance and BigDL.”
3. Jason (Jinquan) Dai, et al. “Building Large-Scale Image Feature Extraction with BigDL at JD.com.”
4. B. Zhou, et al. “Places: A 10 Million Image Database for Scene Recognition,” IEEE Transactions on Pattern Analysis and Machine Intelligence (TPAMI), 2017.
Software and workloads used in performance tests may have been optimized for performance only on Intel microprocessors. Performance tests, such as SYSmark and MobileMark, are measured using specific computer systems, components, software, operations and functions. Any change to any of those factors may cause the results to vary. You should consult other information and performance tests to assist you in fully evaluating your contemplated purchases, including the performance of that product when combined with other products. For more complete information visit http://www.intel.com/performance.
Intel’s compilers may or may not optimize to the same degree for non-Intel microprocessors for optimizations that are not unique to Intel microprocessors. These optimizations include SSE2, SSE3, and SSSE3 instruction sets and other optimizations. Intel does not guarantee the availability, functionality, or effectiveness of any optimization on microprocessors not manufactured by Intel. Microprocessor-dependent optimizations in this product are intended for use with Intel microprocessors. Certain optimizations not specific to Intel microarchitecture are reserved for Intel microprocessors. Please refer to the applicable product User and Reference Guides for more information regarding the specific instruction sets covered by this notice.