大數(shù)據(jù)時(shí)代,基于單機(jī)的建模很難滿足企業(yè)不斷增長(zhǎng)的數(shù)據(jù)量級(jí)的需求,開發(fā)者需要使用分布式的開發(fā)方式,在集群上進(jìn)行建模。而單機(jī)和分布式的開發(fā)代碼有一定的區(qū)別,本文就將為開發(fā)者們介紹,基于TensorFlow進(jìn)行分布式開發(fā)的兩種方式,幫助開發(fā)者在實(shí)踐的過程中,更好地選擇模塊的開發(fā)方向。
創(chuàng)新互聯(lián)公司是一家專注于成都網(wǎng)站設(shè)計(jì)、做網(wǎng)站與策劃設(shè)計(jì),硯山網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)公司做網(wǎng)站,專注于網(wǎng)站建設(shè)10余年,網(wǎng)設(shè)計(jì)領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:硯山等地區(qū)。硯山做網(wǎng)站價(jià)格咨詢:18980820575
分布式開發(fā)會(huì)涉及到更新梯度的方式,有同步和異步的兩個(gè)方案,同步更新的方式在模型的表現(xiàn)上能更快地進(jìn)行收斂,而異步更新時(shí),迭代的速度則會(huì)更加快。兩種更新方式的圖示如下:
同步更新流程
(圖片來(lái)源:TensorFlow:Large-Scale Machine Learning on Heterogeneous Distributed Systems)
異步更新流程
(圖片來(lái)源:TensorFlow:Large-Scale Machine Learning on Heterogeneous Distributed Systems)
TensorFlow是基于ps、work 兩種服務(wù)器進(jìn)行分布式的開發(fā)。ps服務(wù)器可以只用于參數(shù)的匯總更新,讓各個(gè)work進(jìn)行梯度的計(jì)算。
首先指定ps 服務(wù)器啟動(dòng)參數(shù) –job_name=ps:
python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=ps --task_index=0
接著指定work服務(wù)器參數(shù)(啟動(dòng)兩個(gè)work 節(jié)點(diǎn)) –job_name=work2:
python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=worker --task_index=0
python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=worker --task_index=1
之后,上述指定的參數(shù) worker_hosts ps_hosts job_name task_index 都需要在py文件中接受使用:
tf.app.flags.DEFINE_string("worker_hosts", "默認(rèn)值", "描述說(shuō)明")
接收參數(shù)后,需要分別注冊(cè)ps、work,使他們各司其職:
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
server = tf.train.Server(cluster,job_name=FLAGS.job_name,task_index=FLAGS.task_index)
issync = FLAGS.issync
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
繼而更新梯度。
(1)同步更新梯度:
rep_op = tf.train.SyncReplicasOptimizer(optimizer,
replicas_to_aggregate=len(worker_hosts),
replica_id=FLAGS.task_index,
total_num_replicas=len(worker_hosts),
use_locking=True)
train_op = rep_op.apply_gradients(grads_and_vars,global_step=global_step)
init_token_op = rep_op.get_init_tokens_op()
chief_queue_runner = rep_op.get_chief_queue_runner()
(2)異步更新梯度:
train_op = optimizer.apply_gradients(grads_and_vars,global_step=global_step)
最后,使用tf.train.Supervisor 進(jìn)行真的迭代
另外,開發(fā)者還要注意,如果是同步更新梯度,則還需要加入如下代碼:
sv.start_queue_runners(sess, [chief_queue_runner])
sess.run(init_token_op)
需要注意的是,上述異步的方式需要自行指定集群IP和端口,不過,開發(fā)者們也可以借助TensorFlowOnSpark,使用Yarn進(jìn)行管理。
作為個(gè)推面向開發(fā)者服務(wù)的移動(dòng)APP數(shù)據(jù)統(tǒng)計(jì)分析產(chǎn)品,個(gè)數(shù)所具有的用戶行為預(yù)測(cè)功能模塊,便是基于TensorFlowOnSpark這種分布式來(lái)實(shí)現(xiàn)的。基于TensorFlowOnSpark的分布式開發(fā)使其可以在屏蔽了端口和機(jī)器IP的情況下,也能夠做到較好的資源申請(qǐng)和分配。而在多個(gè)千萬(wàn)級(jí)應(yīng)用同時(shí)建模的情況下,集群也有良好的表現(xiàn),在sparkUI中也能看到相對(duì)應(yīng)的資源和進(jìn)程的情況。最關(guān)鍵的是,TensorFlowOnSpark可以在單機(jī)過度到分布式的情況下,使代碼方便修改,且容易部署。
首先,需要使用spark-submit來(lái)提交任務(wù),同時(shí)指定spark需要運(yùn)行的參數(shù)(–num-executors 6等)、模型代碼、模型超參等,同樣需要接受外部參數(shù):
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--tracks", help="數(shù)據(jù)集路徑")
args = parser.parse_args()
之后,準(zhǔn)備好參數(shù)和訓(xùn)練數(shù)據(jù)(DataFrame),調(diào)用模型的API進(jìn)行啟動(dòng)。
其中,soft_dist.map_fun是要調(diào)起的方法,后面均是模型訓(xùn)練的參數(shù)。
estimator = TFEstimator(soft_dist.map_fun, args) \
.setInputMapping({'tracks': 'tracks', 'label': 'label'}) \
.setModelDir(args.model) \
.setExportDir(args.serving) \
.setClusterSize(args.cluster_size) \
.setNumPS(num_ps) \
.setEpochs(args.epochs) \
.setBatchSize(args.batch_size) \
.setSteps(args.max_steps)
model = estimator.fit(df)
接下來(lái)是soft_dist定義一個(gè) map_fun(args, ctx)的方法:
def map_fun(args, ctx):
...
worker_num = ctx.worker_num # worker數(shù)量
job_name = ctx.job_name # job名
task_index = ctx.task_index # 任務(wù)索引
if job_name == "ps": # ps節(jié)點(diǎn)(主節(jié)點(diǎn))
time.sleep((worker_num + 1) * 5)
cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)
num_workers = len(cluster.as_dict()['worker'])
if job_name == "ps":
server.join()
elif job_name == "worker":
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_index, cluster=cluster)):
之后,可以使用tf.train.MonitoredTrainingSession高級(jí)API,進(jìn)行模型訓(xùn)練和預(yù)測(cè)。
基于TensorFlow的分布式開發(fā)大致就是本文中介紹的兩種情況,第二種方式可以用于實(shí)際的生產(chǎn)環(huán)境,穩(wěn)定性會(huì)更高。
在運(yùn)行結(jié)束的時(shí)候,開發(fā)者們也可通過設(shè)置郵件的通知,及時(shí)地了解到模型運(yùn)行的情況。
同時(shí),如果開發(fā)者使用SessionRunHook來(lái)保存最后輸出的模型,也需要了解到,框架代碼中的一個(gè)BUG,即它只能在規(guī)定的時(shí)間內(nèi)保存,超出規(guī)定時(shí)間,即使運(yùn)行沒有結(jié)束,程序也會(huì)被強(qiáng)制結(jié)束。如果開發(fā)者使用的版本是未修復(fù)BUG的版本,則要自行處理,放寬運(yùn)行時(shí)間。