2017-12-13 1 views
0

Spark 1.6.2コードをJavaのSpark 2.2.0に渡す必要があります。特定のコード片をSpark 1.6.2からSpark 2.2.0に変換するにはどうすればよいですか?

DataFrame eventsRaw = sqlContext.sql("SELECT * FROM my_data"); 
Row[] rddRows = eventsRaw.collect(); 
for (int rowIdx = 0; rowIdx < rddRows.length; ++rowIdx) 
{ 
    Map<String, String> myProperties = new HashMap<>(); 
    myProperties.put("startdate", rddRows[rowIdx].get(1).toString()); 
    JEDIS.hmset("PK:" + rddRows[rowIdx].get(0).toString(), myProperties); // JEDIS is a Redis client for Java 
} 

私の知る限り理解し、Java用スパーク2.2.0にはDataFrameはありません。 Datasetのみ。しかし、DataFrameDatasetに置き換えた場合、の代わりにObject[]の代わりにeventsRaw.collect()という出力が得られます。その後get(1)は赤でマークされており、コードをコンパイルできません。

どうすれば正しく行うことができますか?

答えて

2

DataFrame(スカラ)Dataset<Row>です:

SparkSession spark; 

... 

Dataset<Row> eventsRaw = spark.sql("SELECT * FROM my_data"); 

代わりのcollectあなたではなく(怠惰なシングルトン接続を使用)foreachを使用する必要があります。

eventsRaw.foreach(
    (ForeachFunction<Row>) row -> ... // replace ... with appropriate logic 
); 

またはforeachPartition(各パーティションの接続を初期化) :

eventsRaw.foreachPartition((ForeachPartitionFunction<Row)) rows -> { 
    ... // replace ... with appropriate logic 
}) 
+0

'collect'(コンパイル)で動作します。 'foreach'を使うとどういう意味ですか? – Markus

+0

はい、しかし 'JEDIS'は直列化できません。 – Markus

関連する問題