2016-08-21 2 views
3

私はNumPy配列を持っていて、Queueを使ってTensorFlowのコードでそれを読みたいと思っています。私はキュー全体のデータをシャッフルし、指定された数のエポックを返すようにし、その後にエラーをスローします。例のサイズや例の数をハードコードする必要がないのが最善でしょう。 私はshuffle batchがその目的を果たすことを意図していると思います。次のように私はそれを使用して試してみました:テンソルフローにnumpy配列を渡す

data = tf.constant(train_np) # train_np is my numpy array of shape (num_examples, example_size) 
batch = tf.train.shuffle_batch([data], batch_size=5, capacity=52200, min_after_dequeue=10, num_threads=1, seed=None, enqueue_many=True) 

sess.run(tf.initialize_all_variables()) 
tf.train.start_queue_runners(sess=sess) 
batch.eval() 

そのアプローチの問題点は、それが継続的にすべてのデータを読み込み、私はエポックのいくつかの番号の後に終了することを指定することができないということです。私はRandomShuffleQueueを使用してデータを数回挿入することができることを知っていますが、 a)私はエポック*メモリのデータを浪費したくありませんし、b)キューがエポック間をシャッフルできるようにします。

独自のキューを作成しなくても、Tensorflowのエポックでシャッフルされたデータを読み込む良い方法はありますか?

答えて

5

別のキューを作成し、その上にデータをキューに入れてnum_epoch回、閉じてから、batchに接続することができます。メモリを節約するために、このキューを小さくしてアイテムをそのキューに並行してエンキューすることができます。エポックの間に少しのミキシングがあります。混合を完全に防ぐために、num_epochs=1で以下のコードをとり、それをnum_epochs回と呼ぶことができます。キューがそれに全体numpyのデータセットをロードするのに十分な大きさでないとき

tf.reset_default_graph() 
data = np.array([1, 2, 3, 4]) 
num_epochs = 5 
queue1_input = tf.placeholder(tf.int32) 
queue1 = tf.FIFOQueue(capacity=10, dtypes=[tf.int32], shapes=[()]) 

def create_session(): 
    config = tf.ConfigProto() 
    config.operation_timeout_in_ms=20000 
    return tf.InteractiveSession(config=config) 

enqueue_op = queue1.enqueue_many(queue1_input) 
close_op = queue1.close() 
dequeue_op = queue1.dequeue() 
batch = tf.train.shuffle_batch([dequeue_op], batch_size=4, capacity=5, min_after_dequeue=4) 

sess = create_session() 

def fill_queue(): 
    for i in range(num_epochs): 
     sess.run(enqueue_op, feed_dict={queue1_input: data}) 
    sess.run(close_op) 

fill_thread = threading.Thread(target=fill_queue, args=()) 
fill_thread.start() 

# read the data from queue shuffled 
tf.train.start_queue_runners() 
try: 
    while True: 
     print batch.eval() 
except tf.errors.OutOfRangeError: 
    print "Done" 

ところで、上記enqueue_manyパターンがハングします。以下のようにデータをチャンクにロードすることで、より小さなキューを持つ柔軟性を得ることができます。

tf.reset_default_graph() 
data = np.array([1, 2, 3, 4]) 
queue1_capacity = 2 
num_epochs = 2 
queue1_input = tf.placeholder(tf.int32) 
queue1 = tf.FIFOQueue(capacity=queue1_capacity, dtypes=[tf.int32], shapes=[()]) 

enqueue_op = queue1.enqueue_many(queue1_input) 
close_op = queue1.close() 
dequeue_op = queue1.dequeue() 

def dequeue(): 
    try: 
     while True: 
      print sess.run(dequeue_op) 
    except: 
     return 

def enqueue(): 
    for i in range(num_epochs): 
     start_pos = 0 
     while start_pos < len(data): 
      end_pos = start_pos+queue1_capacity 
      data_chunk = data[start_pos: end_pos] 
      sess.run(enqueue_op, feed_dict={queue1_input: data_chunk}) 
      start_pos += queue1_capacity 
    sess.run(close_op) 

sess = create_session() 

enqueue_thread = threading.Thread(target=enqueue, args=()) 
enqueue_thread.start() 

dequeue_thread = threading.Thread(target=dequeue, args=()) 
dequeue_thread.start() 
+0

どのように動作するか(ドキュメントを指している可能性があります)についてもう少し詳しく説明できますか? 特に:別のスレッドを作成し、そこにすべての要素num_epochsをエンキューすると、あまりにも多くのメモリが使用されないのはなぜですか? そして、なぜFIFOキューに複数回データをエンキューすると、要素が混在するのでしょうか? 1つのエポックが終わり、もう1つのエポックが始まるときだけでしょうか? また、 'num_epochs'回のコード全体を呼び出すことは、「データを読む良い方法」とはまったく異なります。P – sygi

+1

キューの容量が10であれば、一度に10個の例のスペースしか必要とせず、非同期以前の例が使用されているので、キューのロードにより多くの例が追加されます。 (shuffle_batch)によって作成された2番目のキューは、エポックがどこで終了するかを知らないため、エポックからの最後のバッチには次のエポックからのエントリも含まれている可能性があります。 –

+0

キューのドキュメントは[こちら](https://www.tensorflow.org/versions/r0.10/how_tos/threading_and_queues/index.html)、[こちら](http://yaroslavvb.blogspot.com/ 2016/05/queues-in-tensorflow.html)は、私が彼らに与えた話からのスライドです。 –

関連する問題