2017-08-28 5 views
0

長い時間(長時間)正しい結果を検索しました。 CSVファイルを読み込み、非常に低速である(グラフ毎回リロードせずにinput_fnで見積もりを養成するためにキューを使用して:Tensorflow:カスタムエスティメータと "input_fn"関数を使用してCSVファイルにキューを使用

問題...私は何かを明らかに不足していると思うが、私は何を知ることができません)。


は、私は自分の推定を作成するために私にmodel_fn関数を与えるカスタムモデルを作成:その後

tf.estimator.Estimator(model_fn=model_fn, params=model_params) 

を、私は非常に大規模なCSVファイルを読み込む必要がある(中負荷することはできませんメモリ)ので、キューを使用することにしました(ベストソリューションと思われます)。

nb_features = 10 
queue = tf.train.string_input_producer(["test.csv"], 
             shuffle=False) 
reader = tf.TextLineReader() 
key, value = reader.read(queue) 

record_defaults = [[0] for _ in range(nb_features+1)] 
cols = tf.decode_csv(value, record_defaults=record_defaults) 
features = tf.stack(cols[0:len(cols)-1]) # Take all columns without the last 
label = tf.stack(cols[len(cols)-1]) # Take last column 

私はこのコードは大丈夫だと思います。その後


、メインコード:

with tf.Session() as sess: 
    tf.logging.set_verbosity(tf.logging.INFO) 
    sess.run(tf.global_variables_initializer()) 

    coord = tf.train.Coordinator() 
    threads = tf.train.start_queue_runners(coord=coord) 

    # Return a Tensor of 1000 features/labels 
    def get_inputs(): 
     print("input call !") 
     xs = [] 
     ys = [] 
     for i in range(1000): 
      x, y = sess.run([features, label]) 
      xs.append(x) 
      ys.append(y) 
     return tf.constant(np.asarray(xs), dtype=tf.float32), tf.constant(np.asarray(ys)) 

    estimator.train(input_fn=get_inputs, 
        steps=100) 

    coord.request_stop() 
    coord.join(threads) 

あなたが見ることができるように、ここでは醜いものがたくさんある...

私が欲しいもの: I列車機能が各ステップで新しい機能のバッチを使用するようにします。しかし、ここでは、get_inputs関数はトレーニングを開始するときにコールするだけなので、100ステップで1000個の機能を同じバッチで使用します。これを行う簡単な方法はありますか?

は私がestimator.train ステップ= 1が、これは毎回グラフをリロードし、非常に遅くなってループにしてみてください。

私は今、何をすべきかわからないし、それも可能かどうかわからない...私を助けるため

感謝を!

答えて

1

ショートバージョン:CSVファイルをtfrecordsに変換してから​​を使用してください。ロングバージョン:コードを参照してください質問/回答を参照してくださいhere(便宜上下にコピー)。


tf.contrib.data.Dataset APIを確認してください。私はあなたのCSVをTfRecordファイルに変換し、TfRecordDatasetを使用することをお勧めします。ここで完全なチュートリアルがあります。

手順1:csvデータからtfrecordsデータに変換します。以下のコード例。

import tensorflow as tf 


def read_csv(filename): 
    with open(filename, 'r') as f: 
     out = [line.rstrip().split(',') for line in f.readlines()] 
    return out 


csv = read_csv('data.csv') 
with tf.python_io.TFRecordWriter("data.tfrecords") as writer: 
    for row in csv: 
     features, label = row[:-1], row[-1] 
     features = [float(f) for f in features] 
     label = int(label) 
     example = tf.train.Example() 
     example.features.feature[ 
      "features"].float_list.value.extend(features) 
     example.features.feature[ 
      "label"].int64_list.value.append(label) 
     writer.write(example.SerializeToString()) 

これは、ラベルが最後の列の整数で、前の列に浮動小数点の機能があることを前提としています。これは一度だけ実行する必要があります。

手順2:これらのレコードファイルをデコードするデータセットを作成します。tf.estimator.Estimatorで使用する場合

batch_size = 4 
shuffle_size = 10000 
features, labels = input_fn() 
with tf.Session() as sess: 
    f_data, l_data = sess.run([features, labels]) 
print(f_data, l_data) 

を:(推定量とは無関係に)テストする

def parse_function(example_proto): 
    features = { 
     'features': tf.FixedLenFeature((n_features,), tf.float32), 
     'label': tf.FixedLenFeature((), tf.int64) 
    } 
    parsed_features = tf.parse_single_example(example_proto, features) 
    return parsed_features['features'], parsed_features['label'] 


def input_fn(): 
    dataset = tf.contrib.data.TFRecordDataset(['data.tfrecords']) 
    dataset = dataset.map(parse_function) 
    dataset = dataset.shuffle(shuffle_size) 
    dataset = dataset.repeat() # repeat indefinitely 
    dataset = dataset.batch(batch_size) 
    print(dataset.output_shapes) 
    features, label = dataset.make_one_shot_iterator().get_next() 
    return features, label 

estimator.train(input_fn, max_steps=1e7) 
+0

こんにちは、お返事ありがとうございます。私はすでにあなたの投稿を見ましたが、このソリューションはCSVをTFRecordに変換する必要があります。キュー&バッチ機能を使用するだけでこれを回避できないのだろうかと思います。 – Kayoku

+0

あなたは...できますが、データセット全体がフードの下でやっていることは間違いありません。 csvから直接読み込み、列車のデータを解析することはできますが、速度は遅くなります。私がtfrecordsについて理解している唯一の欠点は、余分なスペースを占めることです。それは本当に心配ですか?イメージをロードしている場合は、パスをtfrecordsに保存してから、通常の方法でファイルから読み込むことができます。 – DomJack

+0

いいえ、TFrecordで複製することは本当に重要ではありません。それは私の個人的な知識のためだけです(そして、私はこの質問に多くの時間を費やすので、hahaです)。あなたは変換せずにCSVからそれを行うことをお考えですか?そして、あなたのソリューションでは、すべてのデータセットがメモリにロードされませんか?ちょうど現在のバッチ、右か?私は明日これを試してみます – Kayoku

0

ので、多くの可能性をテストした後、私が見つけます純粋なCSVのソリューションですが、いくつかの条件で動作するので、私は再びあなたの助けが必要です!

のは、コードに行こう:

filename = "test.csv" 

queue = tf.train.string_input_producer([filename], 
            num_epochs=1, 
            shuffle=False) 
reader = tf.TextLineReader() 
_, csv_row = reader.read(queue) 
record_defaults = [[0] for _ in range(341)] 
cols = tf.decode_csv(csv_row, record_defaults=record_defaults) 
features = tf.stack(cols[0:340]) 
label = tf.stack(cols[340]) 

# WORKS ------ 
min_after_dequeue = 1000 
capacity = min_after_dequeue + 3 * 4 
example_batch, label_batch = tf.train.batch(
    [features, label], batch_size=4, capacity=capacity) 
# ------------ 

# DOESN'T WORK 
#def input_fn(): 
# min_after_dequeue = 1000 
# capacity = min_after_dequeue + 3 * 4 
# example_batch, label_batch = tf.train.batch(
#  [features, label], batch_size=4, capacity=capacity) 
# return example_batch, label_batch 
# ------------ 

with tf.Session() as sess: 
    tf.global_variables_initializer().run() 
    tf.local_variables_initializer().run() 

    # start populating filename queue 
    coord = tf.train.Coordinator() 
    threads = tf.train.start_queue_runners(coord=coord, sess=sess) 

    cpt = 0 
    while True: 
    try: 
     cpt += 1 
     print(cpt, end=' ') 
     #_, l = input_fn() 
     #print(l.eval()) 
     print(label_batch.eval()) 
    except tf.errors.OutOfRangeError: 
     break; 

    coord.request_stop() 
    coord.join(threads) 

(私は341個の列が含まれているCSVファイル、340個の機能& 1つのラベルを使用)


あなたが見ることができるように、コードはかなりあります醜いですが、CSVを直接読むことができます。残念ながら、Estimatorでこれを使用するには "input_fn"関数が必要なので、バッチ作成を "input_fn"に入れようとしています。しかし、このコードを実行するときTensorflowこの行しようとすると、すべてがフリーズ(とあなたの美しいシェルをブロック):だから

print(l.eval()) 

を、誰もが、すべてが停止する理由についての考えを持っている場合は、私を助けてください!

ありがとうございました。

+0

'tf.estimator.Estimator'が独自のグラフを作成すると思いますので、テンソル/オペレーションの作成を' input_fn'の中に入れてください。それは、ちょうど書いた擬似バージョンで動作しましたが、失敗した場合はポストバックします。 – DomJack

+0

あなたは何を意味するのか分かりません。 input_fn funcの "with tf.Session()..."コードの前にすべてが必要ですか? – Kayoku

+0

かなりかわいいです。'graph'に操作を追加するもの(あなたが望むならば、Pythonのプリミティブの割り当てを外に置くことができますが、入力パイプラインにのみ適用すれば、より良い結果が得られます)。 – DomJack

0

あなたが呼び出されていないtf.train.start_queue_runnersを懸念している場合は、以下を試してみてください。

class ThreadStartHook(tf.train.SessionRunHook): 
    def after_create_session(self, session, coord): 
     self.coord = coord 
     self.threads = tf.train.start_queue_runners(coord=coord, sess=session) 

    def end(self, session): 
     self.coord.request_stop() 
     self.coord.join(self.threads) 


estimator.train(input_fn, [ThreadStartHook()]) 

私が始めたときと同様の考えを持っていたが、それは必要ありませんがわかりました。

+0

よろしく!私はこのコードが問題を解決すると思います。しかし、別のものが表示されます...(そうでなければ、それは楽しいではありません)。 "合格Tensor(" sparse_softmax_cross_entropy_loss/value:0 "、shape =()、dtype = float32)は、現在のグラフと等しいグラフ属性を持つ必要があります。今、このバグについて調査します。 – Kayoku

+0

私はよく理解しています...実際には、tf.train.batchの後に "start_queue_runners"を呼び出す必要があります。 (テスト済みで承認済み) – Kayoku

関連する問題