2017-06-04 10 views
1

画像コレクションでPenspFlowを使用してTensorFlowを実行しようとしましたが、RDDで変換を適用したときにこのエラーが発生します。 「://master.spark.tfm:7077スパーク」TensorFlow spark-pyspark rdd変換が返されますRuntimeError:最大再帰の深さを超えました

これはエラーのトレースです:

エラーは、私が「ローカル」を使用して、エラーを持っている関数内で run_inference_on_image ですスパーク提出して起動
/usr/bin/python2.7 /home/utad/PycharmProjects/TensorFlowMirFlickr/ClasificacionImagenes.py 
Setting default log level to "WARN". 
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 
17/06/04 00:47:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
('Model already downloaded:', '/home/utad/TFM/model/inception-2015-12-05.tgz', posix.stat_result(st_mode=33204, st_ino=709599, st_dev=2049, st_nlink=1, st_uid=1000, st_gid=1000, st_size=88931400, st_atime=1496482158, st_mtime=1496482158, st_ctime=1496482158)) 
('rddImagenes: ', [['http://host.images.tfm:8000/mirflickr/im1.jpg'], ['http://host.images.tfm:8000/mirflickr/im10.jpg'], ['http://host.images.tfm:8000/mirflickr/im100.jpg']]) 
17/06/04 00:47:24 ERROR executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1) 
org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 163, in main 
    func, profiler, deserializer, serializer = read_command(pickleSer, infile) 
    File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 54, in read_command 
    command = serializer._read_with_length(file) 
    File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length 
    return self.loads(obj) 
    File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 454, in loads 
    return pickle.loads(obj) 
    File "/home/utad/.local/lib/python2.7/site-packages/six.py", line 118, in __getattr__ 
    _module = self._resolve() 
    File "/home/utad/.local/lib/python2.7/site-packages/six.py", line 115, in _resolve 
    return _import_module(self.mod) 
    File "/home/utad/.local/lib/python2.7/site-packages/six.py", line 118, in __getattr__ 
    _module = self._resolve() 
    File "/home/utad/.local/lib/python2.7/site-packages/six.py", line 115, in _resolve 
    return _import_module(self.mod) 
    File "/home/utad/.local/lib/python2.7/site-packages/six.py", line 118, in __getattr__ 
    _module = self._resolve() 
... 
... 
... 
    File "/home/utad/.local/lib/python2.7/site-packages/six.py", line 115, in _resolve 
    return _import_module(self.mod) 
    File "/home/utad/.local/lib/python2.7/site-packages/six.py", line 118, in __getattr__ 
    _module = self._resolve() 
    File "/home/utad/.local/lib/python2.7/site-packages/six.py", line 115, in _resolve 
    return _import_module(self.mod) 
RuntimeError: maximum recursion depth exceeded 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 

は私にこのドライバースタックトレース

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935) 
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453) 
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:748) 

そして、私のコードを示します:

#!/usr/bin/env python 
# -*- coding: utf-8 -*- 

# Módulos a usar 
from pyspark import SparkContext 
import os.path 
from six.moves import urllib 
import tarfile 
import tensorflow as tf 
import re 
import numpy as np 


# ************************************************************************************** 
# Configuración proceso 
# Modelo lo bajamos de la red y lo guardamos en directorio local 
MODEL_URL = 'http://download.tensorflow.org/models/image/imagenet/inception-2015-12-05.tgz' 
model_dir = '/home/utad/TFM/model' 

# Simulamos que las imagenes las obtenermos de un servicio web 
IMAGES_INDEX_URL = 'http://host.images.tfm:8000/mirflickr/' 

# Otros datos 
numero_imagenes_proceso = 3 # Número total de imágenes a procesar 
lote_size = 1 # Número de imágenes por lote 
max_etiquetas = 5 # Número máximo de etiquetas por imagen 

# Fin Configuración proceso 
# ************************************************************************************** 


# Obtenemos modelo 
def get_tensorflow_model(): 
    # Download and extract model tar file 
    filename = MODEL_URL.split('/')[-1] 
    filepath = os.path.join(model_dir, filename) 
    if not os.path.exists(filepath): 
     filepath2, _ = urllib.request.urlretrieve(MODEL_URL, filepath) 
     print("filepath2", filepath2) 
     statinfo = os.stat(filepath) 
     print('Succesfully downloaded', filename, statinfo.st_size, 'bytes.') 
     tarfile.open(filepath, 'r:gz').extractall(model_dir) 
    else: 
     print('Model already downloaded:', filepath, os.stat(filepath)) 


# Obtenida de classify_image.py del github de TensorFlow 
class NodeLookup(object): 
    """Converts integer node IDs to human readable labels.""" 

    def __init__(self, label_lookup_path=None, uid_lookup_path=None): 
     if not label_lookup_path: 
      label_lookup_path = os.path.join(
       model_dir, 'imagenet_2012_challenge_label_map_proto.pbtxt') 
     if not uid_lookup_path: 
      uid_lookup_path = os.path.join(
       model_dir, 'imagenet_synset_to_human_label_map.txt') 
      self.node_lookup = self.load(label_lookup_path, uid_lookup_path) 

    def load(self, label_lookup_path, uid_lookup_path): 
     """Loads a human readable English name for each softmax node. 

     Args: 
      label_lookup_path: string UID to integer node ID. 
      uid_lookup_path: string UID to human-readable string. 

     Returns: 
      dict from integer node ID to human-readable string. 
     """ 
     if not tf.gfile.Exists(uid_lookup_path): 
      tf.logging.fatal('File does not exist %s', uid_lookup_path) 
     if not tf.gfile.Exists(label_lookup_path): 
      tf.logging.fatal('File does not exist %s', label_lookup_path) 

     # Loads mapping from string UID to human-readable string 
     proto_as_ascii_lines = tf.gfile.GFile(uid_lookup_path).readlines() 
     uid_to_human = {} 
     p = re.compile(r'[n\d]*[ \S,]*') 
     for line in proto_as_ascii_lines: 
      parsed_items = p.findall(line) 
      uid = parsed_items[0] 
      human_string = parsed_items[2] 
      uid_to_human[uid] = human_string 

     # Loads mapping from string UID to integer node ID. 
     node_id_to_uid = {} 
     proto_as_ascii = tf.gfile.GFile(label_lookup_path).readlines() 
     for line in proto_as_ascii: 
      if line.startswith(' target_class:'): 
       target_class = int(line.split(': ')[1]) 
      if line.startswith(' target_class_string:'): 
       target_class_string = line.split(': ')[1] 
       node_id_to_uid[target_class] = target_class_string[1:-2] 

     # Loads the final mapping of integer node ID to human-readable string 
     node_id_to_name = {} 
     for key, val in node_id_to_uid.items(): 
      if val not in uid_to_human: 
       tf.logging.fatal('Failed to locate: %s', val) 
      name = uid_to_human[val] 
      node_id_to_name[key] = name 

     return node_id_to_name 

    def id_to_string(self, node_id): 
     if node_id not in self.node_lookup: 
      return '' 
     return self.node_lookup[node_id] 


def run_inference_on_image(sess, image, lookup): 
    """Hacemos inferencia sobre la imagen. 

    Args: 
     sess: TensorFlow Session 
     image: Imagen a leer 
     lookup: node lookup obtenido previamente 

    Returns: 
     (image ID, image URL, scores), 
     where scores is a list of (human-readable node names, score) pairs 
    """ 
    image_data = urllib.request.urlopen(image).read() 
    print("Image: ", image_data) 
    # Some useful tensors: 
    # 'softmax:0': A tensor containing the normalized prediction across 
    # 1000 labels. 
    # 'pool_3:0': A tensor containing the next-to-last layer containing 2048 
    # float description of the image. 
    # 'DecodeJpeg/contents:0': A tensor containing a string providing JPEG 
    # encoding of the image. 
    # Runs the softmax tensor by feeding the image_data as input to the graph. 
    softmax_tensor = sess.graph.get_tensor_by_name('softmax:0') 
    try: 
     predictions = sess.run(softmax_tensor, {'DecodeJpeg/contents:0': image_data}) 
     print("predictions: ", predictions) 
    except: 
     # Handle problems with malformed JPEG files 
     return image, None 
    predictions = np.squeeze(predictions) 
    top_k = predictions.argsort()[-max_etiquetas:][::-1] 
    print("top_k predictions: ", top_k) 
    scores = [] 
    for node_id in top_k: 
     if node_id not in lookup: 
      human_string = '' 
     else: 
      human_string = lookup[node_id] 
     score = predictions[node_id] 
     scores.append((human_string, score)) 
    print ("tupla: ", image, scores) 
    return image, scores 


def apply_inference_on_batch(lote, lookup_bc): 
    """Apply inference to a batch of images. 

    We do not explicitly tell TensorFlow to use a GPU. 
    It is able to choose between CPU and GPU based on its guess of which will be faster. 
    """ 
    with tf.Graph().as_default() as g: 
     print("Apply") 
     graph_def = tf.GraphDef() 
     print("Graph_def: ", graph_def) 

     graph_def.ParseFromString(model_data_bc.value) 
     print("Graph_def1: ", graph_def) 
     _ = tf.import_graph_def(graph_def, name='') 
     print("TF: ", tf) 
     with tf.Session() as sess: 
      print("Sesion: ", sess) 
      print("Lote: ", lote) 
      print("Node_lookup: ", lookup_bc) 
      labeled = [run_inference_on_image(sess, image, lookup_bc.value) for image in 
         lote] 
      return [tup for tup in labeled if tup[1] is not None] 


# Función auxiliar para obtener el nombre de una imagen. 
def obtener_nombre_imagen(x): 
    return IMAGES_INDEX_URL + x.split('<')[1].split('>')[1] 


# Iniciamos SparkContext 
# sc = SparkContext('spark://master.spark.tfm:7077', 'TensorFlow') 
sc = SparkContext('local') 
get_tensorflow_model() 

# Cargamos el modelo y lo distribuimos 
model_path = os.path.join(model_dir, 'classify_image_graph_def.pb') 
with tf.gfile.FastGFile(model_path, 'rb') as f: 
    model_data = f.read() 
model_data_bc = sc.broadcast(model_data) 

# Distribuimos node lookup para ser utilizado en los workers 
node_lookup = NodeLookup().node_lookup 
node_lookup_bc = sc.broadcast(node_lookup) 

# Obtenemos una lista de las imágenes a procesar y las agrupamos en lotes 
imagenes = urllib.request.urlopen(IMAGES_INDEX_URL).read().split('<li>')[2:numero_imagenes_proceso+2] 
lote_imagenes = [imagenes[i:i + lote_size] for i in range(0, len(imagenes), lote_size)] 

# Paralelizamos los lotes de imagenes y procesamos 
rddImagenes = sc.parallelize(lote_imagenes).map(lambda x: map(obtener_nombre_imagen, x)) 
print("rddImagenes: ", rddImagenes.collect()) 
imagenes_etiquetadas = rddImagenes.flatMap(lambda x: apply_inference_on_batch(x, node_lookup_bc)) 
# imagenes_etiquetadas = rddImagenes.flatMap(lambda x: x[0].split("/")) 

l = imagenes_etiquetadas.collect() 

何が起こっているのかアドバイスしてください。

編集:私はちょうどに「image_data = urllib.request.urlopen(image).read()

答えて

0

呼び出すときに問題があることを発見した私は考え出した、または少なくとも私は、回避策を見つけました。

私が変更されました:ため

from six.moves import urllib 

import urllib 

を6からurllibはが動作しない理由を私は知りませんが、これは私の問題を解決します。

関連する問題