2017-02-28 16 views
0

PTBモデルを変更しました(テンソルフロー/モデル/チュートリアル/ rnn/ptbで見つけることができます)。 psとワーカーが1台のマシンであっても、この分散バージョン(1 psサーバー、2ワーカー)はスピードアップ効果を持ちません。タイムラインプロファイリングは、GPUジョブと分散バージョンのCPUジョブとの間にかなりの遅延を示す。以下のコードとタイムラインのグラフである:PTB rnnモデルのグラフ複製バージョンは、シングルgpuバージョン(tf 1.0.0でも)より遅い

from __future__ import absolute_import 
from __future__ import division 
from __future__ import print_function 

import time 

import numpy as np 
import tensorflow as tf 

import reader 
import tempfile 

flags = tf.flags 
logging = tf.logging 

flags.DEFINE_string(
    "model", "small", 
    "A type of model. Possible options are: small, medium, large.") 
flags.DEFINE_string("data_path", None, 
        "Where the training/test data is stored.") 
flags.DEFINE_string("save_path", None, 
        "Model output directory.") 
flags.DEFINE_bool("use_fp16", False, 
        "Train using 16-bit floats instead of 32bit floats") 

flags.DEFINE_string("ps_hosts","IP1:2222", 
        "Comma-separated list of hostname:port pairs") 
flags.DEFINE_string("worker_hosts", "IP1:2223,IP1:2224", 
        "Comma-separated list of hostname:port pairs") 
flags.DEFINE_string("job_name", None,"job name: worker or ps") 
flags.DEFINE_integer("task_index", None, 
        "Worker task index, should be >= 0. task_index=0 is " 
        "the master worker task the performs the variable " 
        "initialization ") 
flags.DEFINE_integer("num_gpus", 1, 
        "Total number of gpus for each machine." 
        "If you don't use GPU, please set it to '0'") 
flags.DEFINE_integer("replicas_to_aggregate", None, 
        "Number of replicas to aggregate before parameter update" 
        "is applied (For sync_replicas mode only; default: " 
        "num_workers)") 
flags.DEFINE_boolean("sync_replicas", False, 
        "Use the sync_replicas (synchronized replicas) mode, " 
        "wherein the parameter updates from workers are aggregated " 
        "before applied to avoid stale gradients") 
flags.DEFINE_boolean(
    "existing_servers", False, "Whether servers already exists. If True, " 
    "will use the worker hosts via their GRPC URLs (one client process " 
    "per worker host). Otherwise, will create an in-process TensorFlow " 
    "server.") 

FLAGS = flags.FLAGS 


def data_type(): 
    return tf.float16 if FLAGS.use_fp16 else tf.float32 


class PTBInput(object): 
    """The input data.""" 

    def __init__(self, config, data, ix, worker_num, name=None): 
    data_len = len(data) // worker_num 
    data = data[data_len * ix:data_len * (ix + 1)] 

    self.batch_size = batch_size = config.batch_size 
    self.num_steps = num_steps = config.num_steps 
    self.epoch_size = ((len(data) // batch_size) - 1) // num_steps 
    self.input_data, self.targets = reader.ptb_producer(
     data, batch_size, num_steps, name=name) 


class PTBModel(object): 
    """The PTB model.""" 

    def __init__(self, is_training, config, input_, num_workers=0, global_step=None): 
    self._input = input_ 

    batch_size = input_.batch_size 
    num_steps = input_.num_steps 
    size = config.hidden_size 
    vocab_size = config.vocab_size 

    # Slightly better results can be obtained with forget gate biases 
    # initialized to 1 but the hyperparameters of the model would need to be 
    # different than reported in the paper. 
    def lstm_cell(): 
     # return tf.contrib.rnn.BasicLSTMCell(
     return tf.nn.rnn_cell.BasicLSTMCell(
      size, forget_bias=0.0, state_is_tuple=True) 
    attn_cell = lstm_cell 
    if is_training and config.keep_prob < 1: 
     def attn_cell(): 
     return tf.contrib.rnn.DropoutWrapper(
      lstm_cell(), output_keep_prob=config.keep_prob) 
    # cell = tf.contrib.rnn.MultiRNNCell(
    cell = tf.nn.rnn_cell.MultiRNNCell(
     [attn_cell() for _ in range(config.num_layers)], state_is_tuple=True) 

    self._initial_state = cell.zero_state(batch_size, data_type()) 

    with tf.device("/cpu:0"): 
     embedding = tf.get_variable(
      "embedding", [vocab_size, size], dtype=data_type()) 
     inputs = tf.nn.embedding_lookup(embedding, input_.input_data) 

    if is_training and config.keep_prob < 1: 
     inputs = tf.nn.dropout(inputs, config.keep_prob) 

    # Simplified version of models/tutorials/rnn/rnn.py's rnn(). 
    # This builds an unrolled LSTM for tutorial purposes only. 
    # In general, use the rnn() or state_saving_rnn() from rnn.py. 
    # 
    # The alternative version of the code below is: 
    # 
    # inputs = tf.unstack(inputs, num=num_steps, axis=1) 
    # outputs, state = tf.nn.rnn(cell, inputs, 
    #       initial_state=self._initial_state) 
    outputs = [] 
    state = self._initial_state 
    with tf.variable_scope("RNN"): 
     for time_step in range(num_steps): 
     if time_step > 0: tf.get_variable_scope().reuse_variables() 
     (cell_output, state) = cell(inputs[:, time_step, :], state) 
     outputs.append(cell_output) 

    output = tf.reshape(tf.concat(1, outputs), [-1, size]) 
    softmax_w = tf.get_variable(
     "softmax_w", [size, vocab_size], dtype=data_type()) 
    softmax_b = tf.get_variable("softmax_b", [vocab_size], dtype=data_type()) 
    logits = tf.matmul(output, softmax_w) + softmax_b 
    # loss = tf.contrib.legacy_seq2seq.sequence_loss_by_example(
    loss = tf.nn.seq2seq.sequence_loss_by_example(
     [logits], 
     [tf.reshape(input_.targets, [-1])], 
     [tf.ones([batch_size * num_steps], dtype=data_type())]) 
    self._cost = cost = tf.reduce_sum(loss)/batch_size 
    self._final_state = state 

    if not is_training: 
     return 

    self._lr = tf.Variable(0.0, trainable=False) 
    tvars = tf.trainable_variables() 
    grads, _ = tf.clip_by_global_norm(tf.gradients(cost, tvars), 
             config.max_grad_norm) 
    self._opt = tf.train.GradientDescentOptimizer(self._lr) 

    if FLAGS.sync_replicas: 
     if FLAGS.replicas_to_aggregate is None: 
     replicas_to_aggregate = num_workers 
     else: 
     replicas_to_aggregate = FLAGS.replicas_to_aggregate 

     self._opt = tf.train.SyncReplicasOptimizer(
     self._opt, 
     replicas_to_aggregate=replicas_to_aggregate, 
     total_num_replicas=num_workers, 
     name="ptb_sync_replicas") 

    # train_step = opt.minimize(cross_entropy, global_step=global_step) 

    self._train_op = self._opt.apply_gradients(
     zip(grads, tvars), 
     global_step) 
     # global_step=tf.contrib.framework.get_or_create_global_step()) 

    self._new_lr = tf.placeholder(
     tf.float32, shape=[], name="new_learning_rate") 
    self._lr_update = tf.assign(self._lr, self._new_lr) 

    def assign_lr(self, session, lr_value): 
    session.run(self._lr_update, feed_dict={self._new_lr: lr_value}) 

    @property 
    def input(self): 
    return self._input 

    @property 
    def initial_state(self): 
    return self._initial_state 

    @property 
    def cost(self): 
    return self._cost 

    @property 
    def final_state(self): 
    return self._final_state 

    @property 
    def lr(self): 
    return self._lr 

    @property 
    def opt(self): 
    return self._opt 

    @property 
    def train_op(self): 
    return self._train_op 


class SmallConfig(object): 
    """Small config.""" 
    init_scale = 0.1 
    learning_rate = 1.0 
    max_grad_norm = 5 
    num_layers = 2 
    num_steps = 20 
    hidden_size = 200 
    max_epoch = 4 
    max_max_epoch = 13 
    keep_prob = 1.0 
    lr_decay = 0.5 
    batch_size = 20 
    vocab_size = 10000 


class MediumConfig(object): 
    """Medium config.""" 
    init_scale = 0.05 
    learning_rate = 1.0 
    max_grad_norm = 5 
    num_layers = 2 
    num_steps = 35 
    hidden_size = 650 
    max_epoch = 6 
    max_max_epoch = 39 
    keep_prob = 0.5 
    lr_decay = 0.8 
    batch_size = 20 
    vocab_size = 10000 


class LargeConfig(object): 
    """Large config.""" 
    init_scale = 0.04 
    learning_rate = 1.0 
    max_grad_norm = 10 
    num_layers = 2 
    num_steps = 35 
    hidden_size = 1500 
    max_epoch = 14 
    max_max_epoch = 55 
    keep_prob = 0.35 
    lr_decay = 1/1.15 
    batch_size = 20 
    vocab_size = 10000 


class TestConfig(object): 
    """Tiny config, for testing.""" 
    init_scale = 0.1 
    learning_rate = 1.0 
    max_grad_norm = 1 
    num_layers = 1 
    num_steps = 2 
    hidden_size = 2 
    max_epoch = 1 
    max_max_epoch = 1 
    keep_prob = 1.0 
    lr_decay = 0.5 
    batch_size = 20 
    vocab_size = 10000 


def run_epoch(session, model, global_step, eval_op=None, verbose=False): 
    """Runs the model on the given data.""" 
    start_time = time.time() 
    costs = 0.0 
    iters = 0 
    state = session.run(model.initial_state) 

    fetches = { 
     "cost": model.cost, 
     "final_state": model.final_state, 
     "global_step": global_step, 
    } 
    if eval_op is not None: 
    fetches["eval_op"] = eval_op 

    for step in range(model.input.epoch_size): 
    feed_dict = {} 
    for i, (c, h) in enumerate(model.initial_state): 
     feed_dict[c] = state[i].c 
     feed_dict[h] = state[i].h 

    vals = session.run(fetches, feed_dict) 
    cost = vals["cost"] 
    state = vals["final_state"] 

    costs += cost 
    iters += model.input.num_steps 

    if verbose and step % (model.input.epoch_size // 10) == 10: 
     print("%.3f perplexity: %.3f speed: %.0f wps" % 
      (step * 1.0/model.input.epoch_size, np.exp(costs/iters), 
      iters * model.input.batch_size/(time.time() - start_time))) 
    print("esize is %.3f, one epoch time: %.0f s" % (step,(time.time() - start_time))) 
    return np.exp(costs/iters) 


def get_config(): 
    if FLAGS.model == "small": 
    return SmallConfig() 
    elif FLAGS.model == "medium": 
    return MediumConfig() 
    elif FLAGS.model == "large": 
    return LargeConfig() 
    elif FLAGS.model == "test": 
    return TestConfig() 
    else: 
    raise ValueError("Invalid model: %s", FLAGS.model) 


def main(_): 
    if not FLAGS.data_path: 
    raise ValueError("Must set --data_path to PTB data directory") 

    if FLAGS.job_name is None or FLAGS.job_name == "": 
    raise ValueError("Must specify an explicit `job_name`") 
    if FLAGS.task_index is None or FLAGS.task_index =="": 
    raise ValueError("Must specify an explicit `task_index`") 

    print("job name = %s" % FLAGS.job_name) 
    print("task index = %d" % FLAGS.task_index) 

    #Construct the cluster and start the server 
    ps_spec = FLAGS.ps_hosts.split(",") 
    worker_spec = FLAGS.worker_hosts.split(",") 

    # Get the number of workers. 
    num_workers = len(worker_spec) 

    cluster = tf.train.ClusterSpec({ 
     "ps": ps_spec, 
     "worker": worker_spec}) 

    if not FLAGS.existing_servers: 
    # Not using existing servers. Create an in-process server. 
    server = tf.train.Server(
     cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) 
    if FLAGS.job_name == "ps": 
     server.join() 

    is_chief = (FLAGS.task_index == 0) 
    if FLAGS.num_gpus > 0: 
    # if FLAGS.num_gpus < num_workers: 
    # raise ValueError("number of gpus is less than number of workers") 
    # Avoid gpu allocation conflict: now allocate task_num -> #gpu 
    # for each worker in the corresponding machine 
    gpu = 0 # (FLAGS.task_index % FLAGS.num_gpus) 
    worker_device = "/job:worker/task:%d/gpu:%d" % (FLAGS.task_index, gpu) 
    elif FLAGS.num_gpus == 0: 
    # Just allocate the CPU to worker server 
    cpu = 0 
    worker_device = "/job:worker/task:%d/cpu:%d" % (FLAGS.task_index, cpu) 
    # The device setter will automatically place Variables ops on separate 
    # parameter servers (ps). The non-Variable ops will be placed on the workers. 
    # The ps use CPU and workers use corresponding GPU 
    raw_data = reader.ptb_raw_data(FLAGS.data_path) 
    train_data, valid_data, test_data, _ = raw_data 

    config = get_config() 
    eval_config = get_config() 
    eval_config.batch_size = 1 
    eval_config.num_steps = 1 

    # with tf.Graph().as_default(): 
    with tf.device(
     tf.train.replica_device_setter(
      worker_device=worker_device, 
      ps_device="/job:ps/cpu:0", 
      cluster=cluster)): 
    '''raw_data = reader.ptb_raw_data(FLAGS.data_path) 
    train_data, valid_data, test_data, _ = raw_data 

    config = get_config() 
    eval_config = get_config() 
    eval_config.batch_size = 1 
    eval_config.num_steps = 1''' 

    # with tf.Graph().as_default(): 
    global_step = tf.Variable(0, name="global_step", trainable=False) 
    initializer = tf.random_uniform_initializer(-config.init_scale, 
               config.init_scale) 

    with tf.name_scope("Train"): 
     train_input = PTBInput(config=config, data=train_data, 
          ix=FLAGS.task_index, worker_num=num_workers, name="TrainInput") 
     with tf.variable_scope("Model", reuse=None, initializer=initializer): 
     m = PTBModel(is_training=True, config=config, input_=train_input, num_workers=num_workers, 
        global_step = global_step) 
     tf.scalar_summary("Training Loss", m.cost) 
     tf.scalar_summary("Learning Rate", m.lr) 

    if FLAGS.sync_replicas: 
     local_init_op = m.opt.local_step_init_op 
     if is_chief: 
     local_init_op = m.opt.chief_init_op 

     ready_for_local_init_op = m.opt.ready_for_local_init_op 

     # Initial token and chief queue runners required by the sync_replicas mode 
     chief_queue_runner = m.opt.get_chief_queue_runner() 
     sync_init_op = m.opt.get_init_tokens_op() 

    # init_op = tf.global_variables_initializer() 
    init_op = tf.initialize_all_variables() 
    train_dir = tempfile.mkdtemp() 

    with tf.name_scope("Valid"): 
     valid_input = PTBInput(config=config, data=valid_data, 
          ix=FLAGS.task_index, worker_num=num_workers, name="ValidInput") 
     with tf.variable_scope("Model", reuse=True, initializer=initializer): 
     mvalid = PTBModel(is_training=False, config=config, input_=valid_input, num_workers=num_workers, 
          global_step=global_step) 
     tf.scalar_summary("Validation Loss", mvalid.cost) 

    with tf.name_scope("Test"): 
     test_input = PTBInput(config=eval_config, data=test_data, 
          ix=0, worker_num=1, name="TestInput") 
     with tf.variable_scope("Model", reuse=True, initializer=initializer): 
     mtest = PTBModel(is_training=False, config=eval_config, 
         input_=test_input, num_workers=num_workers, 
         global_step=global_step) 

    if FLAGS.sync_replicas: 
     sv = tf.train.Supervisor(
      is_chief=is_chief, 
      logdir=train_dir, 
      init_op=init_op, 
      local_init_op=local_init_op, 
      ready_for_local_init_op=ready_for_local_init_op, 
      recovery_wait_secs=1, 
      global_step=global_step) 
    else: 
     sv = tf.train.Supervisor(
      is_chief=is_chief, 
      logdir=train_dir, 
      init_op=init_op, 
      recovery_wait_secs=1, 
      global_step=global_step) 

    sess_config = tf.ConfigProto(
     allow_soft_placement=True, 
     log_device_placement=False, 
     device_filters=["/job:ps", "/job:worker/task:%d" % FLAGS.task_index]) 

    # The chief worker (task_index==0) session will prepare the session, 
    # while the remaining workers will wait for the preparation to complete. 
    if is_chief: 
     print("Worker %d: Initializing session..." % FLAGS.task_index) 
    else: 
     print("Worker %d: Waiting for session to be initialized..." % 
      FLAGS.task_index) 

    if FLAGS.existing_servers: 
     server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index] 
     print("Using existing server at: %s" % server_grpc_url) 

     session = sv.prepare_or_wait_for_session(server_grpc_url, 
              config=sess_config) 
    else: 
     session = sv.prepare_or_wait_for_session(server.target, config=sess_config) 

    print("Worker %d: Session initialization complete." % FLAGS.task_index) 

    if FLAGS.sync_replicas and is_chief: 
     # Chief worker will start the chief queue runner and call the init op. 
     session.run(sync_init_op) 
     sv.start_queue_runners(session, [chief_queue_runner]) 

    # sv = tf.train.Supervisor(logdir=FLAGS.save_path) 
    # with sv.managed_session() as session: 
    for i in range(config.max_max_epoch): 
     lr_decay = config.lr_decay ** max(i + 1 - config.max_epoch, 0.0) 
     m.assign_lr(session, config.learning_rate * lr_decay) 

     print("Epoch: %d Learning rate: %.3f" % (i + 1, session.run(m.lr))) 
     train_perplexity = run_epoch(session, m, global_step, eval_op=m.train_op, 
            verbose=True) 
     print("Epoch: %d Train Perplexity: %.3f" % (i + 1, train_perplexity)) 
     valid_perplexity = run_epoch(session, mvalid, global_step) 
     print("Epoch: %d Valid Perplexity: %.3f" % (i + 1, valid_perplexity)) 

    test_perplexity = run_epoch(session, mtest, global_step) 
    print("Test Perplexity: %.3f" % test_perplexity) 

    if FLAGS.save_path: 
     print("Saving model to %s." % FLAGS.save_path) 
     sv.saver.save(session, FLAGS.save_path, global_step=sv.global_step) 


if __name__ == "__main__": 
    tf.app.run() 

(1台のPSサーバ、2労働者を有する)元のシングルGPUのverionのタイムラインは以下の(一回の反復)の通りである:

enter image description here

タイムライン分散verionのワーカー0のフォロー(一回の反復)のようであり、作業者1は、同様のタイムラインを有している。

enter image description here

マシンには2つのTesla m40 gpusがあり、シングルgpu verionのパフォーマンスは約11000 wps(GPU utilは約60%)です。グラフのバージョンは各ワーカーの6000 wpsです(GPU utilは約30%したがって、2人の作業者(2つのgpus)のスピードアップは1.09にすぎません。 同時に、私は同じマシンで1.6倍のスピードアップを持つPTBモデルの複数のGPUバージョンを作成しました(グラフ間またはグラフフレームワークの間には存在しません)。それでは、分散バージョンのパフォーマンスの低下はどうでしたか?

実行コマンド: PS:CUDA_VISIBLE_DEVICES = "" Pythonのptb_word_lm_dist.py --data_path = /データ/単純な例/データ/ --model小--job_name = PS --task_index = 0

ワーカー0:CUDA_VISIBLE_DEVICES = 0パイソンptb_word_lm_dist.py --data_path = /データ/単純な例/データ/ --model小--job_name =ワーカー--task_index = 0

ワーカー1:CUDA_VISIBLE_DEVICES = 1つのpython ptb_word_lm_dist。 py --data_path =/data/simple-examples/data/--model small --job_name = worker --task_index = 1

(テンソルフロー0.12でテスト済み) tf 1.0.0の修正版はパフォーマンスが悪い)

答えて

0

シングルプロセスのテンソルフローを使ってこれを行うとどうなりますか? IE、単一プロセスCUDA_VISIBLE_DEVICES=0,1と?分散型のTensorFlowとは無関係の問題を排除するために、まずそれを試すことが理にかなっています。

分散TensorFlowランニングモデルの状況は、https://github.com/rafaljozefowicz/lmです.1人の作業者のGPUが8人のTensorFlow作業者の8倍でした。

一部は、この問題は、今何が起こるhttps://github.com/tensorflow/tensorflow/issues/6116

大きな要因であることがわかった掘りはgRPCを通じて、大きなテンソルを送信することは極めて非効率的であるということです。フィックスはマスターになっていますが、まだTF 1.0には含まれていないので、夜間に最新のものを試してみる必要があります。

+0

多くの感謝です!それは、0,1 gpusが見える単一のプロセスを使用しても動作しないようです。私も問題6116を読んで、ソースからgrpc-fixバージョン(https://github.com/llhe/tensorflow/tree/grpc-fix)をビルドしました。分散バージョンは実際には元の単一gpuバージョンより約2倍高速ですが、分散型および単一gpuバージョンの絶対的なパフォーマンスは公式のピップホイール(分散型:2人の従業員一人あたり1450 wps、 gpuバージョン:1613 wps)。 – Paul

+0

私は同じ問題(2倍のスピードアップでも、両方のバージョンで深刻な絶対的な性能低下を伴う)を持つyahooのRDMAアクセラレーションバージョンを構築しました – Paul

+0

ボトルネックがどこにあるのか分かりますか?モデル内のさまざまな場所に「tf.Print」ノードを追加すると、マイクロ秒単位のタイムスタンプを持つログ文が標準エラー出力に出力されます。マシン(つまり、GoogLeNet3〜8マシン)にほぼ線形に拡大するさまざまなモデルがあります。そのため、TensorFlowではなくモデルのバグである可能性があります。私はあなたが1つのマシン上のデータを読んでそれを多くのマシンに配布しているかどうかをチェックしようとしましたが、投稿したコードからはわかりません –

0

たぶんボトルネックがあるため、シングルGPUのverion性能の、ディスクIOで約11000 WPS(GPUのutilのに約60%である)

関連する問題