2017-11-03 4 views
1

文字列が含まれているリストが文字列を置き換え[d、e]、a) これらの行を([a、b、c]、[d、e])および([d、e]、[a、b、c])に変換したい私は構造を有するデータフレームを有する

データフレームの列名は "src"と "dst"です。

どうすればこの問題に近づくことができますか?

私が試した:

val result = df.map(f => { 
    if(df.exists(x => x._1.contains(f._2))) { 
    (f._1, df.filter(x => x._1.contains(f._2)).head._1) 
    } else { 
    (f._1, List(f._2)) 
    } 
}).toDF("src", "dst") 

はしかし、この解決策は私に次のエラーを与える:

java.lang.IllegalStateException: unread block data at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2740) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

は、より効率的な方法があるに違いありませんか?

+0

お試しいただいたコードを投稿することはできますか? – Sid

+0

投稿者 –

+0

あなたのデータフレームに3つ以上の行がある場合はどうしたらいいですか? –

答えて

0

私の知る限りは、上記のあなたの質問やコメントから理解しているとして、以下はあなたの解決策になることができ

+---------+---+ 
|src  |dst| 
+---------+---+ 
|[a, b, c]|d | 
|[d, e] |a | 
+---------+---+ 

として入力dataframeを考えるとあなたは

import org.apache.spark.sql.functions._ 
val joinExpr = udf((col1: mutable.WrappedArray[String], col2: String) => col1.contains(col2)) 

df.as("t1").join(df.as("t2"), joinExpr($"t1.src", $"t2.dst")).select($"t1.src".as("src"), $"t2.src".as("dst")).show(false) 
として joinudf機能を使用することができます

最終出力を得るには

+---------+---------+ 
|src  |dst  | 
+---------+---------+ 
|[a, b, c]|[d, e] | 
|[d, e] |[a, b, c]| 
+---------+---------+ 

希望する答えは