0

Tensorflowの高レベルAPIを使用して、MNISTの畳み込みニューラルネットワークを分散して訓練したいと考えています。 クラスタ構成を指定して、Estimator(下のコード)に渡そうとしました。tf.learn Estimatorsの分散トレーニングですか?

私はMergeFromに次のエラー パラメータを取得しています()同じクラスのインスタンスでなければなりません:tensorflow.ConfigProtoは

をプロパティを持って予想誰もが、私は構成を指定していますかで間違っているものを知っていますか?

from __future__ import absolute_import 
from __future__ import division 
from __future__ import print_function 


import grpc 
import numpy as np 
import tensorflow as tf 
from tensorflow.contrib import learn 
from tensorflow.contrib.learn.python.learn.estimators import model_fn as model_fn_lib 
from tensorflow.contrib.learn.python.learn.estimators import run_config as run_config_lib 
from tensorflow.python import debug as tf_debug 
tf.logging.set_verbosity(tf.logging.ERROR) 
import json 
import os 
import shutil 

### Data - Mnist 

mnist=learn.datasets.load_dataset('mnist') 
train_data=mnist.train.images 
train_labels=np.asarray(mnist.train.labels, dtype=np.int32) 
eval_data=mnist.test.images 
eval_labels=np.asarray(mnist.test.labels, dtype=np.int32) 

BATCH_SIZE=100 
NUM_EPOCHS=10 
train_input_fn = learn.io.numpy_input_fn({'x': train_data}, train_labels, shuffle=True, batch_size=BATCH_SIZE, 
             num_epochs=NUM_EPOCHS) 
batch_size = 100 
num_epochs = 1 
eval_input_fn = learn.io.numpy_input_fn({'x': eval_data}, eval_labels, shuffle=False, batch_size=batch_size, num_epochs=num_epochs) 

### Cluster 

my_cluster = {'ps': ['/cpu:0'], 
       'worker': ['/gpu:0']} 
os.environ['TF_CONFIG'] = json.dumps(
      {'cluster': my_cluster, 
      'task': {'type': 'worker', 'index': 1}}) 

my_configs=learn.RunConfig() 

server = tf.train.Server(server_or_cluster_def=my_configs.cluster_spec, job_name='worker') 

### Model 

def cnn_model_fn(features, labels, mode): 

    input_layer=tf.reshape(features['x'],shape=[-1,28,28,1]) 

    #conv1 
    conv1=tf.layers.conv2d(inputs=input_layer, 
          filters=32, 
          kernel_size=[5, 5], 
          padding='same', 
          activation=tf.nn.relu) 
    pool1=tf.layers.max_pooling2d(inputs=conv1, pool_size=[2,2], strides=2) 

    #conv2 
    conv2=tf.layers.conv2d(inputs=pool1, 
          filters=64, 
          kernel_size=[5,5], 
          padding='same', 
          activation=tf.nn.relu) 
    pool2=tf.layers.max_pooling2d(inputs=conv2, pool_size=[2,2], strides=2) 

    #fully connected layers 
    pool2_flat=tf.reshape(pool2, [-1, 7*7*64]) 
    dense1=tf.layers.dense(pool2_flat, 1024, activation=tf.nn.relu) 
    dropout = tf.layers.dropout(inputs=dense1, rate=0.4, training=mode == learn.ModeKeys.TRAIN) 

    #fc2 
    logits=tf.layers.dense(dropout, 10, activation=tf.nn.relu) 
    loss = None 
    train_op = None 

    #loss 
    if mode != learn.ModeKeys.INFER: 
     onehot_labels=tf.one_hot(indices=tf.cast(labels, tf.int32),depth=10) 
     loss=tf.losses.softmax_cross_entropy(onehot_labels=onehot_labels, logits=logits) 

    #optimizer 
    if mode == learn.ModeKeys.TRAIN: 
     with tf.device("/job:worker/task:1"): 
      train_op = tf.contrib.layers.optimize_loss(
      loss=loss, 
      global_step=tf.contrib.framework.get_global_step(), 
      learning_rate=0.0001, 
      optimizer="Adam") 

    #predictions 
    predictions={ 
      'classes': tf.argmax(logits, axis=1) , 
      'predictions': tf.nn.softmax(logits,name="softmax_tensor")   
     } 
    return model_fn_lib.ModelFnOps(mode=mode, predictions=predictions, loss=loss, train_op=train_op) 

classifier=learn.Estimator(model_fn=cnn_model_fn, model_dir="/tmp/mnist_distributed", config=my_configs) 

### logging 

tensors_to_log = {"probabilities": "softmax_tensor"} 
logging_hook = tf.train.LoggingTensorHook(tensors=tensors_to_log, every_n_iter=50) 

### Metrics 

metrics = { 
    "accuracy": 
     learn.MetricSpec(
      metric_fn=tf.metrics.accuracy, prediction_key="classes"), 
} 

### Distributing training 

distributed_experiment=learn.Experiment(estimator=classifier, 
       train_input_fn=train_input_fn, 
       eval_input_fn=eval_input_fn, 
       eval_metrics=metrics, 
       #train_monitors=my_monitors, 
       train_steps=200, 
       ) 

distributed_experiment.train_and_evaluate() 

答えて

0

my_configは、RunConfigの代わりにRunConfigのインスタンスである必要があります。 RunConfig initを実行すると、環境変数TF_CONFIGからps、workers、task configをロードします。 https://www.tensorflow.org/api_docs/python/tf/contrib/learn/RunConfig

+0

クラスのインスタンスが修正されました。ありがとう、私は今サーバを起動することができますが、上記のコードは実行時にまだ停止しています distributed_experiment.train() 他に何かが見当たりませんか? – smh

+0

@smhログを提供できますか? 1 psと1人の労働者はいませんか? – BoscoTsang

+0

psとして別のサーバーを起動する必要はありますか? 私はクラスターを誤って定義しています。あるいは、トレーニングがcnn_model_fnで配布される方法については、修正方法を理解していません。 これは、** distributed_experiment.train()**が停止する前のログです。 'DEBUG:tensorflow:機能情報を{'x'に設定する:TensorSignature(dtype = tf.float32、shape = TensorShape([Dimension(None )、Dimension(784)])、is_sparse = False)}。 DEBUG:tensorflow:ラベル情報をTensorSignatureに設定する(dtype = tf.int32、shape = TensorShape([Dimension(None)])、is_sparse = False) INFO:テンソルフロー:CheckpointSaverHookを作成する。 – smh

1

uはTFに分散推定を実行したい場合は、インスタンスがあります:ここ

from tensorflow.contrib.learn.python.learn import learn_runner 
from tensorflow.contrib.learn.python.learn.estimators import run_config 

... 

learn_runner.run(
    experiment_fn=create_experiment_fn(config), 
    output_dir=output_dir) 

そして「experiment_fnは、」あなたのコードでは単に「distributed_experiment」です。あなたの実験には 'output_dir'もあるはずです

関連する問題