2016-04-10 10 views
0

私はスパークの世界で非常に新しく、6ヶ月前にjavaをプログラミングし始めました。だから難しいです:JavaDstreamオブジェクトを文字列にマップする方法は?スパークストリーミングとモデル予測JAVA

JavaDStream<String> words = lines.flatMap(
      new FlatMapFunction<String, String>() { 
       public Iterable<String> call(String x) { 
        return Arrays.asList(x.split(",")); 
       } 
      }); 
    words.print(); String vec = words; 

このJavaDStreamを文字列に変換したいと思います。後で私は私のモデルへの入力としてそれを使用することができるでしょう!あなたが探していることは文字列の中にDSTREAMオブジェクトをマッピングする方法が、各ストリーミングでDSTREAMに含まれる値を獲得する方法ではありません

 Double predictionDone = sameModel.predict(Vectors.dense(vec)); 
     System.out.println(predictionDone.toString()); 
+0

Spark Streamingのドキュメントを深く読んで、それを適用するために必要な基礎を得ることをお勧めします:http://spark.apache.org/docs/latest/streaming-programming-guide.html – maasg

+0

私はやっていますそれはthx、私は2つの部分があるが、私はそれらを接続することはできません...私はストリームを開くことができますまた、モデルを読み込んで予測を行うことができます –

+0

私はあなたの方法であなたを助けるために幅広い答えを追加しましたが、多くの材料にJavaで始める場合は、代わりにScalaにジャンプすることをお勧めします。機能面をよりよく理解するのに役立ちます。 – maasg

答えて

0

私は...私は達成するためにforeachRDDメソッドを使用する必要がありますbeleave間隔。 DStreamはRDDの時間バインドされたコレクションです。高度な操作を適用することでDStreamを操作できます。スパーク内では、これらの操作は、各時間間隔で構築されたRDDに、その時点で利用可能なデータが適用されます。

「DStreamからStringへ移動する方法」を考える代わりに、正しいパスは「DStreamの要素にアクセスして、それらにスコア関数を適用する方法」です。太い線で

、あなたはいくつかの手順が必要になります。

- まず、supported DStream implementations(または独自のロール)のいずれかを使用して、あなたのストリームを構築:

JavaDStream<String> textDStream = ... 

-apply transformationsデータを取得します形であなたはそれを必要とする:あなたが正しい形でデータを持っている-once

JavaDStream<String> wordsDStream = textDStream.flatMap(...).filter(...) 

は、あなたが順番にDSTREAMへoutput operationを適用する必要がありますデータで実際に何かをするforeachRDDは最も一般的な出力演算子で、基礎となるRDDにactionsを適用できます。粗株における

wordsDStream.foreachRDD{rdd => // here we get access to the RDD 
    rdd.foreach{word => // here we get access to the content of the RDD, 
          // which is the 'words' in the DStream 
     val score = model.score(word) 
     // do something with 'score' like write it to a db or file 
    } 
} 

(これはスカラ擬似コードである。Javaでは構造は同じである。のみコードがより冗長である)

これは追従する構造です。 MLモデルを組み合わせることで、ある程度の複雑さが増し、困難になる可能性があります。

0
words.foreachRDD(new Function<JavaRDD<String>, Void>() { 
     public Void call(JavaRDD<String> rdd) throws Exception { 
      if(rdd!=null) 
      { 
       List<String> result = rdd.collect(); 
       double[] d = new double[69]; // model input expected lenght 
       int i=0; 


       for (String temp : result) { 
        double aDouble = Double.parseDouble(temp); 
        d[i]=aDouble; i++; 
        list_transactions.add(Vectors.dense(d)); //global variable 

       } 
      } 
      return null; 

ここに解決策があります。それは役に立ちます

@maasg、thx助けて! 「DStreamの要素にアクセスしてスコア関数を適用する方法」これが私が探していたものの鍵でした!

関連する問題