2017-07-03 7 views
1

mqttブローカからgpsデータを取得し、hadoopクラスタにロードするプログラムで作業しています。 hdfsにデータを書き込もうとすると、IOExceptionが発生します。以下は、完全なスタックトレースです:ファイルをHdfsに書き込む際のIOException

java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status; Host Details : local host is: "quickstart.cloudera/192.168.25.170"; destination host is: "quickstart.cloudera":8020; 
    at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:765) 
    at org.apache.hadoop.ipc.Client.call(Client.java:1165) 
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:184) 
    at com.sun.proxy.$Proxy7.create(Unknown Source) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:165) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:84) 
    at com.sun.proxy.$Proxy7.create(Unknown Source) 
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:187) 
    at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1250) 
    at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1269) 
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1063) 
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1021) 
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:232) 
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:75) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:806) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:686) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:675) 
    at com.mqttHadoopLoader.hadoop.MqttLoader.HdfsWriter.writeToHdfs(HdfsWriter.java:19) 
    at com.mqttHadoopLoader.hadoop.MqttLoader.MqttDataLoader.messageArrived(MqttDataLoader.java:43) 
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:354) 
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:162) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status 
    at com.google.protobuf.UninitializedMessageException.asInvalidProtocolBufferException(UninitializedMessageException.java:81) 
    at org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto$Builder.buildParsed(RpcPayloadHeaderProtos.java:1094) 
    at org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto$Builder.access$1300(RpcPayloadHeaderProtos.java:1028) 
    at org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto.parseDelimitedFrom(RpcPayloadHeaderProtos.java:986) 
    at org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:850) 
    at org.apache.hadoop.ipc.Client$Connection.run(Client.java:781) 

私はOutputStreamを作成しようが、私のEclipseのデバッガが正しく動作していないので、それは言うことは難しいときにエラーが発生しているように思え、それはに接続できないことを言います( VMと私はスタックオーバーフローでここに投稿された多数の修正を試しました)。ここに私のHdfsWriterのための私のコードは次のとおりです。

String destFile = "hdfs://127.0.0.0.1:8020/gpsData/output/gps_data.txt"; 
//Note *this is just a placeholder IP address for the purpose of this post. I do have the fully correct IP address for the program. 

    public void writeToHdfs(String gpsInfo) { 
     try { 
      Configuration conf = new Configuration(); 
      System.out.println("Connecting to -- " + conf.get("fs.defaultFS")); 

      FileSystem fs = FileSystem.get(URI.create(destFile), conf); 
      System.out.println(fs.getUri()); 

      // Error seems to occur here 
      OutputStream outStream = fs.create(new Path(destFile)); 

      byte[] messageByt = gpsInfo.getBytes(); 
      outStream.write(messageByt); 
      outStream.close(); 

      System.out.println(destFile + " copied to HDFS"); 

     } catch (FileNotFoundException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

これはHdfsWriterを求めてMQTT方法である:

public void messageArrived(String topic, MqttMessage message) 
     throws Exception { 
      System.out.println(message); 
      HdfsWriter hdfsWriter = new HdfsWriter(); 
      hdfsWriter.writeToHdfs(message.toString()); 
    } 

私はまだ任意およびすべてのヘルプは素晴らしいことだそうHadoopのことは本当に新しいです。私は私のデバッグ作業を持っていると決定的に私はFileSystemのメソッドを呼び出そうとするたびにエラーが発生したことを伝えることができ

UPDATE 。たとえば、エラーはfs.exists(pt)fs.setReplication()によってトリガされます。

+0

慎重に、エラーメッセージを読む必要があります。それはあなたに何が間違っているかを伝えます:* InvalidProtocolBufferException:必須フィールドがないメッセージ:callId、status; *。したがって、無効なデータをサーバーに送信しています。問題はあなたが送るものであり、あなたがそれを送る方法ではありません。 –

+0

@JB Nizetはい**私は** **エラーメッセージを読んでいます。複数回。何度も何度も繰り返します。私は、エラーメッセージが "メッセージ"について何を話しているのか知りません。私のコードを見れば、 "Message"という名前のものを直接呼び出すものは何もないことが分かります。私は自分のデバッグを動作させており、呼び出すFileSystemメッセージで起こっていることを伝えることができます(つまり、 'fs.exists(pt)'または 'fs.setReplication(pt、(short)1)'でも起こります。 )。私はこのエラーがどこで直接発生しているのかわかりません。 – ebbBliss

答えて

0

私はgoogle protobufライブラリを使用してhdfsを信じています。あなたのクライアントコードはprotobufのバージョンが間違っている(互換性がない)ようです。この方向に掘り下げてみてください。

0

HDFSクライアントとNameNodeの間のプロトコルは、Googleプロトコルバッファを使用してメッセージをシリアル化します。エラーは、クライアントから送信されたメッセージに予想されるフィールドがすべて含まれていないため、サーバーと互換性がないことを示します。

これは、NameNodeのバージョンより古いバージョンのHDFSクライアントを実行している可能性が高いことを示しています。たとえば、callIdフィールドは、Apache JIRAの問題HADOOP-9762によって追跡され、Apache Hadoop 2.1.0-betaで出荷された機能で実装されました。そのバージョンより前のクライアントはメッセージにcallIdを含めなかったので、2.1.0-beta以降を実行しているNameNodeとは互換性がありません。

クライアントアプリケーションがHadoopクラスターのバージョンと一致するHadoopクライアントライブラリーを使用していることを確認することをお勧めします。スタックトレースから、Clouderaディストリビューションを使用しているようです。もしそうなら、ClouderaがMavenリポジトリに提供するマッチするクライアントライブラリ依存バージョンを使うことで、最も成功する可能性が高くなります。詳細は、Using the CDH 5 Maven Repositoryを参照してください。

関連する問題