Elaphas源码分析
Elaphas源码分析
Elephas is an extension of Keras, which allows you to run distributed deep learning models at scale with Spark.
Elephas currently supports a number of applications, including:
- Data-parallel training of deep learning models
- Distributed hyper-parameter optimization
- Distributed training of ensemble models
https://github.com/maxpumperla/elephas/
分布式训练
Master用于计算的类是SparkModel和SparkMLlibModel(spark_model.py)。
SparkModel基于RDD进行计算,SparkMLlibModel则基于LabeledPoints,但内部实现仍然是先转换成RDD。
其中masternetwork是Master的神经元网络,predict和predictclasses均基于master_network。
训练函数是train,含有三种模式asynchronous,synchronous,hogwild。区别解释如下:
- Asynchronous updates with read and write locks (mode='asynchronous') This mode implements the algorithm described as downpour in [1], i.e. each worker can send updates whenever they are ready. The master model makes sure that no update gets lost, i.e. multiple updates get applied at the "same" time, by locking the master parameters while reading and writing parameters. This idea has been used in Google's DistBelief framework.
- Asynchronous updates without locks (mode='hogwild') Essentially the same procedure as above, but without requiring the locks. This heuristic assumes that we still fare well enough, even if we loose an update here or there. Updating parameters lock-free in a non-distributed setting for SGD goes by the name 'Hogwild!' [2], it's distributed extension is called 'Dogwild!' [3].
- Synchronous updates (mode='synchronous') In this mode each worker sends a new batch of parameter updates at the same time, which are then processed on the master. Accordingly, this algorithm is sometimes called batch synchronous parallel or just BSP.
函数train调用_train用于神经网络参数训练,核心思路是对数据进行分片,采用Worker进行训练,然后更新Maser模型参数。
Master神经元网络模型参数和各Worker子模型参数的交互(各Worker向Master获取/更新参数通过HTTP GET/POST方法)。
Master会开启一个基于Flask的web server用于交互。
@app.route('/parameters', methods=['GET'])
@app.route('/update', methods=['POST'])
方法的核心代码描述如下。
rdd = rdd.repartition(self.num_workers)
rdd.mapPartitions(worker.train).collect()
Worker的类为SparkWorker和AsynchronousSparkWorker
其中Worker异步更新参数有两种粒度epoch或者batch。
Worker的训练过程基于Keras,参数的更新基于Keras中model的optimizer指定。
Master参数的更新由SparkModel的optimizer指定,目前有5种方法(optimizers.py)。
1. SGD
2. RMSprop
http://www.cs.toronto.edu/~tijmen/csc321/slides/lecture_slides_lec6.pdf
3. Adagrad
http://www.magicbroom.info/Papers/DuchiHaSi10.pdf
4. Adadelta
http://arxiv.org/abs/1212.5701
5. Adam
http://arxiv.org/abs/1412.6980v8
optimizer通过getupdates计算更新的参数newweights。
模型评估
类ElephasEstimator用于pyspark.ml的Pipeline进行训练和模型评估(ml_model.py),调用方法为:
estimator = ElephasEstimator()
estimator.set_*(...)
pipeline = Pipeline(stages=[estimator])
fitted_pipeline = pipeline.fit(df)
ElephasEstimator调用SparkModel进行训练得到模型参数model_weights。
超参数估计
对于超参数的估计基于类HyperParamModel(hyperparam.py),HyperParamModel基于hyperas。
https://github.com/maxpumperla/hyperas
Hyperas = Keras + Hyperopt: A very simple wrapper for convenient hyperparameter optimization
hyperas用于Drop参数的估计例子如下。
from hyperas.distributions import uniform
model = Sequential()
model.add(Dense(512, input_shape=(784,)))
model.add(Activation('relu')) model.add(Dropout()) model.add(Dense(512))
model.add(Activation('relu')) model.add(Dropout()) model.add(Dense(10))
model.add(Activation('softmax'))
功能函数
utils/中主要包含数据的RDD转换方法及读写锁RWLock的实现。
ml/和mllib/*主要实现了对接spark ml和mllib的adapter。
基于数据的并行
Elaphas目前主要限制于基于数据的并行,原文的解释如下。
Note that right now elephas restricts itself to data-parallel algorithms for two reasons. First, Spark simply makes it very easy to distribute data. Second, neither Spark nor Theano make it particularly easy to split up the actual model in parts, thus making model-parallelism practically impossible to realize.
Elephas的限制与扩展
Elephas训练过程的核心函数如下:
if self.frequency == 'epoch':
for epoch in range(nb_epoch):
weights_before_training = get_server_weights(self.master_url)
model.set_weights(weights_before_training)
self.train_config['nb_epoch'] = 1
if x_train.shape[0] > batch_size:
model.fit(x_train, y_train, show_accuracy=True, **self.train_config)
weights_after_training = model.get_weights()
deltas = subtract_params(weights_before_training, weights_after_training)
put_deltas_to_server(deltas, self.master_url)
elif self.frequency == 'batch':
for epoch in range(nb_epoch):
if x_train.shape[0] > batch_size:
for (batch_start, batch_end) in batches:
weights_before_training = get_server_weights(self.master_url)
model.set_weights(weights_before_training)
batch_ids = index_array[batch_start:batch_end]
X = slice_X(x_train, batch_ids)
y = slice_X(y_train, batch_ids)
model.train_on_batch(X, y)
weights_after_training = model.get_weights()
deltas = subtract_params(weights_before_training, weights_after_training)
put_deltas_to_server(deltas, self.master_url)
由此可见,相对于Keras主要存在两个限制:
- 不支持Graph API,接口不能完全兼容
- 不支持变长RNN/LSTM/GRU的训练
但容易对代码进行扩展使其支持。
同时由于分布式训练过程中同时传递参数的模型,因此可以使用Keras的API对模型及其参数进行save和load操作。
最后需要注意的是,目前Elaphs仅支持到Keras 0.3版本,1.0以上版本暂不支持。
Elephas使用例子
最后附一段Elephas使用的小例子。
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import print_function
import numpy as np
from keras.models import Sequential, Graph
from keras.layers import recurrent, Dropout, TimeDistributedDense
from elephas.spark_model import SparkModel
from elephas.utils.rdd_utils import to_simple_rdd
from elephas import optimizers as elephas_optimizers
from pyspark import SparkContext, SparkConf
X = np.array([[[1,0],[0,1],[1,0]], [[0,1],[1,0],[0,1]]])
y = np.array([[[1,0],[0,1],[1,0]], [[0,1],[1,0],[0,1]]])
HIDDEN_SIZE = 2
RNN = recurrent.SimpleRNN
model = Sequential()
model.add(RNN(HIDDEN_SIZE, input_shape=(3,2), return_sequences=True))
model.add(TimeDistributedDense(2, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam')
# Create Spark context
conf = SparkConf().setAppName('elephas_rnn_test').setMaster('local[8]')
sc = SparkContext(conf=conf)
# Build RDD from numpy features and labels
rdd = to_simple_rdd(sc, X, y)
# Initialize SparkModel from Keras model and Spark context
adadelta = elephas_optimizers.Adadelta()
spark_model = SparkModel(sc, model, optimizer=adadelta, frequency='epoch', mode='asynchronous', num_workers=2)
# Train Spark model
spark_model.train(rdd, nb_epoch=20)
# Evaluate Spark model by evaluating the underlying model
score = spark_model.master_network.evaluate(X, y, show_accuracy=True, verbose=2)
print('Test accuracy:', score[1])
print(spark_model.predict(X))