2017-08-23 22 views
0

こんにちは私はApache Sparkを初めて使いました。私は学習する途中です。 私はkafkaのトピックからjsonデータ用のスパークストリーミングを作成しました。以下は、jsonデータが連続してストリームされていることです。 しかし、今私はこのjsonデータでどうやって遊んでいいのか分かりません。 DataSetとDataFrameを使用してJsonデータを処理しましたが、いくつかのエラーが発生しました。 ストリーミングからストリーミングされたデータをどうやって遊ぶことができるのか、いくつかの例で私を助けてください。Apache SparkからのJsonデータの処理方法Javaでのストリーミング

注:私はApache spark 1.6.3バージョンを使用しています。

(null{"time":"2017/08/21 18:25:11","model":"20E84fb","speed":"20E84fb","cellId":"0605d822E84fb","course":"146.37E84fb","header":"ST600ALTE84fb","deviceId":206675884,"distance":"166E84fb","longitude":"-099.168493E84fb","latitude":"19.428616E84fb","payload":"ST600ALT+number+;206675884;20;376;20161005;16:26:59;0605d822;334;20;2ee5;63;+19.428616;-099.168493;000.213;146.37;6;1;166;12.21;000000;34;000887;4.4;1;0.00E84fb","date":"2017/08/21 18:25:11E84fb"}) 

コード:

package datapipeline; 

import java.util.Arrays; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.regex.Pattern; 

import org.apache.log4j.Level; 
import org.apache.log4j.Logger; 
import org.apache.spark.SparkConf; 
import org.apache.spark.SparkContext; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.sql.DataFrame; 
import org.apache.spark.sql.Dataset; 
import org.apache.spark.sql.SQLContext; 
import org.apache.spark.storage.StorageLevel; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.Time; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 
import org.json.simple.JSONObject; 
import org.json.simple.parser.JSONParser; 
import org.onosproject.net.Device; 

import scala.Tuple2; 

public final class SparkConsumer { 
    //private static SparkContext sc = new SparkContext(); 
    private static final Pattern SPACE = Pattern.compile(" "); 


    private static void setLogLevels() { 
     boolean log4jInitialized = Logger.getRootLogger().getAllAppenders().hasMoreElements(); 
     if (!log4jInitialized) { 
      // We first log something to initialize Spark's default logging, then we override the 
      // logging level. 
      Logger.getLogger(SparkConsumer.class).info("Setting log level to [WARN] for streaming example." + 
        " To override add a custom log4j.properties to the classpath."); 
      Logger.getRootLogger().setLevel(Level.WARN); 
     } 
    } 

    public static void main(String[] args) throws Exception { 

     String jars[]={"C:\\DeviceStreaming-1.0.0.jar"}; 


     setLogLevels(); 


     SparkConf sparkConf = new SparkConf().setAppName("CustomerKafkaConsumerThread") 
       .set("spark.local.ip","localhost:9092") 
       .setMaster("local[*]").setJars(jars); 
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(3000)); 
     JavaSparkContext ctx = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(sparkConf)); 

     SQLContext sqlContext = new SQLContext(ctx); 

     Map<String, Integer> topicMap = new HashMap<>(); 

     topicMap.put("iot", 10); 


     JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc,"localhost:2181","SparkConsumer", topicMap,StorageLevel.MEMORY_ONLY()); 
     messages.print(); 


     JavaDStream<String> json = messages.map(
       new Function<Tuple2<String, String>, String>() { 
        public String call(Tuple2<String, String> message) { 

         return message._2(); 
        } 
       } 
      ); 

     json.foreachRDD(rdd -> { 

      //DataFrame df = sqlContext.read().json(rdd); 
      DataFrame df=sqlContext.createDataFrame(rdd, Device.class); 
      df.registerTempTable("rdd"); 
      df.filter("cellId"); 
      /*DataFrame deviceFrame= sqlContext.sql("SELECT cellID FROM rdd where cellId=206675884"); 
      df.show(); 
      df.printSchema(); 

      List<String> list= deviceFrame.javaRDD().map(row -> row.getString(0)).collect();*/ 

     }); 

     jssc.start(); 
     jssc.awaitTermination(); 
    } 
} 

答えて

0

あなたはget_json_object機能を使用してJSONからデータを抽出することができます。

指定されたjsonパスに基づいてjson文字列からjsonオブジェクトを抽出し、抽出されたjsonオブジェクトのjson文字列を返します。入力されたjson文字列が無効な場合はnullを返します。

は何か試してみてください:

df.withCoulmn("parsed",..functions.from_json(new Column("raw_json))).printSchema

をこれはあなたが好きな、あなたが操作を行うことができ、それが生成スキーマを与える必要があります。

EDIT: は、おそらくこれは多分あなたはUDFのは、解析されたオブジェクトに対する操作を実行する必要があります、1.6.3のための最適なソリューションではありません

参照:ソリューションのhttps://spark.apache.org/docs/1.6.3/api/java/org/apache/spark/sql/functions.html#get_json_object(org.apache.spark.sql.Column,%20java.lang.String)

+0

感謝。しかし、あなたが言及した手順はスパークの2.1.0バージョンから動作します。私は1.6.3に取り組んでいます – user3837415

+0

申し訳ありませんが、私はバージョンに気付かなかった。私は新しい答えでそれを更新した – aclokay

関連する問題