1

ログファイルの内容をテールするkafkaプロデューサ(書式:csv)を作成しました。カフカコンシューマはJavaDStreamを作成するストリーミングアプリケーションです。 forEachRDDメソッドを使用すると、区切り記号 '、'にファイルの各行を分割し、Rowオブジェクトを作成します。私は7列のスキーマを指定しました。 次に、JavaRDDとスキーマを使用してデータフレームを作成しています。 しかし、ここでの問題は、ログファイルのすべての行に同じ数の列がないことです。 したがって、スキーマを満たさない行を除外する方法や、行の内容に基づいて動的にスキーマを作成する方法はありますか?続き コードの一部です:動的スキーマからのデータフレームのスパークまたはスキーマを満たさない行のフィルタリング

JavaDStream<String> msgDataStream =directKafkaStream.map(new Function<Tuple2<String, String>, String>() { 
      @Override 
      public String call(Tuple2<String, String> tuple2) { 
       return tuple2._2(); 
      } 
     }); 
msgDataStream.foreachRDD(new VoidFunction<JavaRDD<String>>() { 
      @Override 
      public void call(JavaRDD<String> rdd) { 
       JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() { 
        @Override 
        public Row call(String msg) { 
         String[] splitMsg=msg.split(","); 
         Object[] vals = new Object[splitMsg.length]; 
         for(int i=0;i<splitMsg.length;i++) 
         { 
          vals[i]=splitMsg[i].replace("\"","").trim(); 
         } 

         Row row = RowFactory.create(vals); 
         return row; 
        } 
       }); 

       //Create Schema 
       StructType schema = DataTypes.createStructType(new StructField[] { 
         DataTypes.createStructField("timeIpReq", DataTypes.StringType, true),DataTypes.createStructField("SrcMac", DataTypes.StringType, true), 
         DataTypes.createStructField("Proto", DataTypes.StringType, true),DataTypes.createStructField("ACK", DataTypes.StringType, true), 
         DataTypes.createStructField("srcDst", DataTypes.StringType, true),DataTypes.createStructField("NATSrcDst", DataTypes.StringType, true), 
         DataTypes.createStructField("len", DataTypes.StringType, true)}); 

       //Get Spark 2.0 session 


       Dataset<Row> msgDataFrame = session.createDataFrame(rowRDD, schema); 
+0

Javaを使用する必要がありますか?私はあなたにScalaのソリューションを提供することができます。それは助けになるだろうか? – maasg

+0

はい。私はJavaを使用しています。私はそれをより快適に使用しています。しかし、あなたがScalaでも解決策を提供できるなら、大きな助けになるでしょう!ありがとう! –

答えて

2

予想されるスキーマと一致しない行を削除する簡単な方法は、あなたの目標は、データフレームを構築することである場合、また、OptionタイプでflatMapを使用することで、我々は同じflatMapステップを使用して、データにスキーマを適用します。これは、case classの使用によってScalaで容易になります。

// Create Schema 
case class NetInfo(timeIpReq: String, srcMac: String, proto: String, ack: String, srcDst: String, natSrcDst: String, len: String) 

val netInfoStream = msgDataStream.flatMap{msg => 
    val parts = msg.split(",") 
    if (parts.size == 7) { //filter out messages with unmatching set of fields 
    val Array(time, src, proto, ack, srcDst, natSrcDst, len) = parts // use a extractor to get the different parts in variables 
    Some(NetInfo(time, src, proto, ack, srcDst, natSrcDst, len)) // return a valid record 
    } else { 
    None // We don't have a valid. Return None 
    } 
} 

netInfoStream.foreachRDD{rdd => 
    import sparkSession.implicits._ 
    val df = rdd.toDF() // DataFrame transformation is possible on RDDs with a schema (based on a case class) 
    // do stuff with the dataframe 
} 

について:ログ・ファイル内の

すべての行が同じ数の列を持っていません。それらはすべて同じ種類のデータを表すが、潜在的にいくつかの列が欠落して、適切な戦略(ここで例示したような)不完全なデータをフィルタリングまたはそこ場合に定義されたスキーマ内の任意の値を使用するかになるであろうと仮定

どのフィールドが欠落しているかを知るための決定的な方法です。この要件は、データを生成する上流アプリケーションに提起される必要があります。これは、(例えばfield0,,field2,,,field5)空のカンマ配列とCSVで欠損値を表現するために、行の構成DataFrameを完了するために、これを適用する方法がないと行単位の違いは、意味がありません処理するために、動的なスキーマ

が一般的です異なるスキーマ。

関連する問題