2017-10-10 16 views
2

ClojureでSpark Structured Streamingの例を書き直そうとしています。次のようにClojureでSpark Structured Streamingの例を書き込むときのエラー

例はScalaで書かれている:

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

(ns flambo-example.streaming-example 
    (:import [org.apache.spark.sql Encoders SparkSession Dataset Row] 
      [org.apache.spark.sql.functions] 
      )) 

(def spark 
    (-> 
    (SparkSession/builder) 
    (.appName "sample") 
    (.master "local[*]") 
    .getOrCreate) 
) 


(def lines 
    (-> spark 
     .readStream 
     (.format "socket") 
     (.option "host" "localhost") 
     (.option "port" 9999) 
     .load  
    ) 
) 

(def words 
    (-> lines 
     (.as (Encoders/STRING))  
     (.flatMap #(clojure.string/split % #" "))  
    )) 

上記のコードは、次の例外を引き起こします。

;; java.lang.IllegalArgumentExceptionによって引き起こされる ;;一致するメソッドが見つかりません:クラス ;のflatMap org.apache.spark.sql.Dataset

どうすればこのエラーを回避できますか?

答えて

1

署名に従わなければなりません。 JavaのDataset APIはDataset.flatMapの2つの実装、scala.Function1

def flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U] 

と前者があなたのためにかなり役に立たないですが、あなたができるはずであるスパーク自身o.a.s.api.java.function.FlatMapFunction

def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] 

をとる秒1をとるものを提供します後者のものを使う。 RDD API flambouses macros to create Spark friendly adaptersの場合は、flambo.api/fnでアクセスできます。これらは直接Datasetsで動作するかどうかはわかりませんが、必要に応じて調整する必要があります。

暗黙的にEncodersに依存できないため、戻り値の型に一致する明示的なエンコーダも用意する必要があります。

(def words 
    (-> lines 
    (.as (Encoders/STRING))  
    (.flatMap f e)  
)) 
fFlatMapFunctionを実装

eEncoderです:

全体的にあなたの周りに何かをする必要があります。 1つの実装例:

(def words 
    (-> lines 
     (.as (Encoders/STRING))  
     (.flatMap 
     (proxy [FlatMapFunction] [] 
      (call [s] (.iterator (clojure.string/split s #" ")))) 
     (Encoders/STRING)))) 

しかし、私はより良いものを見つけることができると思います。

実際には、Datasetと入力しないでください。DataFrameDataset[Row])に焦点を当てます。

関連する問題