このブログはdatabricksのWebサイトで見つかりました。 Apache Kafkaの複雑なデータストリームを消費および変換するためにSpark SQLのAPIをどのように活用できるかを示しています。
https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
UDFは、デシリアライザの行に使用することができる方法を説明するセクションがあります:
object MyDeserializerWrapper {
val deser = new MyDeserializer
}
spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) =>
MyDeserializerWrapper.deser.deserialize(topic, bytes)
)
df.selectExpr("""deserialize("topic1", value) AS message""")
私はJavaを使用して、したがって、どのようにそれができる確認するには、次のサンプルUDFを書くためにしなければならなかったのですJavaで呼び出すこと:
UDF1<byte[], String> mode = new UDF1<byte[], String>() {
@Override
public String call(byte[] bytes) throws Exception {
String s = new String(bytes);
return "_" + s;
}
};
次のように今私は、例を数える構造化されたストリーミング・ワードでこのUDFを使用することができます。
Dataset<String> words = df
//converted the DataFrame to a Dataset of String using .as(Encoders.STRING())
// .selectExpr("CAST(value AS STRING)")
.select(callUDF("mode", col("value")))
.as(Encoders.STRING())
.flatMap(
new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING());
私のための次のステップは、除算デシリアライズ用のUDFを作成することです。私はすぐにそれを投稿します。