ログファイルの内容をテールするkafkaプロデューサ(書式:csv)を作成しました。カフカコンシューマはJavaDStreamを作成するストリーミングアプリケーションです。 forEachRDDメソッドを使用すると、区切り記号 '、'にファイルの各行を分割し、Rowオブジェクトを作成します。私は7列のスキーマを指定しました。 次に、JavaRDDとスキーマを使用してデータフレームを作成しています。 しかし、ここでの問題は、ログファイルのすべての行に同じ数の列がないことです。 したがって、スキーマを満たさない行を除外する方法や、行の内容に基づいて動的にスキーマを作成する方法はありますか?続き コードの一部です:動的スキーマからのデータフレームのスパークまたはスキーマを満たさない行のフィルタリング
JavaDStream<String> msgDataStream =directKafkaStream.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
msgDataStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) {
JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
@Override
public Row call(String msg) {
String[] splitMsg=msg.split(",");
Object[] vals = new Object[splitMsg.length];
for(int i=0;i<splitMsg.length;i++)
{
vals[i]=splitMsg[i].replace("\"","").trim();
}
Row row = RowFactory.create(vals);
return row;
}
});
//Create Schema
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("timeIpReq", DataTypes.StringType, true),DataTypes.createStructField("SrcMac", DataTypes.StringType, true),
DataTypes.createStructField("Proto", DataTypes.StringType, true),DataTypes.createStructField("ACK", DataTypes.StringType, true),
DataTypes.createStructField("srcDst", DataTypes.StringType, true),DataTypes.createStructField("NATSrcDst", DataTypes.StringType, true),
DataTypes.createStructField("len", DataTypes.StringType, true)});
//Get Spark 2.0 session
Dataset<Row> msgDataFrame = session.createDataFrame(rowRDD, schema);
Javaを使用する必要がありますか?私はあなたにScalaのソリューションを提供することができます。それは助けになるだろうか? – maasg
はい。私はJavaを使用しています。私はそれをより快適に使用しています。しかし、あなたがScalaでも解決策を提供できるなら、大きな助けになるでしょう!ありがとう! –