このような数字のリストがあるとしましょう。火花:見つからない数字を見つけるためのプログラム
lst = [1,2,4,5,9,10]
どのように私はそのリストに欠けている番号を見つけるためにスパークプログラムを書きますか?プログラムは3,6,7,8を返します。
アキュムレータで試してみましたが、うまくいきました。
このような数字のリストがあるとしましょう。火花:見つからない数字を見つけるためのプログラム
lst = [1,2,4,5,9,10]
どのように私はそのリストに欠けている番号を見つけるためにスパークプログラムを書きますか?プログラムは3,6,7,8を返します。
アキュムレータで試してみましたが、うまくいきました。
最適なソリューションがないか心配していない場合は、最初にデータをブロードキャストし、すべての要素を含むコレクションを並列化し、ブロードキャストされたデータに基づいてフィルタリングすることです。
lst = [1,2,4,5,9,10]
broadcastVar = sc.broadcast(lst)
all_elems = sc.parallelize([i+1 for i in range(10)])
all_elems.filter(lambda x: x not in broadcastVar.value)
よう
何かあなただけの少量のデータで動作する何かを探しているなら、これは結構です。大量のデータがある場合、この方法は悪いので、使用しないでください。よりよい解決策が必要な場合
、私は基本的にあなたが(parition、数)を出力マップを行うことができますRDDSを使用して、データを分割し、次の
結果を書き込んだり、自分のやりたいことを収集したりすることができます。ノートの一つは、例えば、私は5人の執行を使用していたので、キーが1-2、3-4、5-6、7-8、9-10、キー7-8 wouldn」してきたならばということです任意の要素があります。この一つの選択肢を避けるために(3-4、-1)、(5-6、-1)、(7-8、[(1-2、-1)のようなもので、キーによってグループの前に組合にRDDです-1)、(9-10、-1)]。多くのデータがある場合、これによって追加されるオーバーヘッドは、ジョブ全体に比べて非常に小さいです。
このサンプルコードにはいくつかの問題がありますが、それを概念の証明と見なしてください。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.spark_project.guava.collect.Lists;
import scala.Tuple2;
public class Main {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("spark-missing-nr").master("local[*]").getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
Integer[] lst = new Integer[] { 1, 2, 4, 5, 9, 10 };
JavaRDD<Integer> lstRDD = sc.parallelize(Arrays.asList(lst));
// Partition the data by whether number is smaller/equal or larger than
// 5
JavaPairRDD<String, Integer> groupableRDD = lstRDD.mapToPair(i -> {
String group = i <= 5 ? "1-5" : "6-10";
return new Tuple2<String, Integer>(group, i);
});
// Group by key
JavaPairRDD<String, Iterable<Integer>> groupedRDD = groupableRDD.groupByKey();
// so now we have [(1-5,[1, 2, 4, 5]), (6-10,[9, 10])]
System.out.println(groupedRDD.collect());
// map where you iterate over range specified by key
JavaRDD<List<Integer>> missingValuesLists = groupedRDD.map(t -> {
Integer from = new Integer(t._1().split("-")[0]);
Integer to = new Integer(t._1().split("-")[1]);
List<Integer> valuesList = Lists.newArrayList(t._2());
List<Integer> missingValues = new ArrayList<Integer>();
// iterate over range specified by key
for (int i = from; i < to + 1; i++) {
if (!valuesList.contains(i)) {
missingValues.add(i);
}
}
return missingValues;
});
// outputs [[3], [6, 7, 8]]
System.out.println(missingValuesLists.collect());
sc.close();
}
}
あなたはsubtract
機能を使用して、その後、sc.range
を使用して、フルレンジでRDDを作成してみてくださいすることができます
lst = sc.parallelize([1,2,4,5,9,10])
max_value = lst.max()
full_data = sc.range(1, max_value)
missing_values = full_data.subtract(lst)
あなたは完全なリストのサイズを事前に知っていればあなたはmax()
を呼び出さないようにすることができます。
@Mrinalあなたはこのアプローチを試みましたか? –
遅れて返事をしてくれて申し訳ありません(私は忙しすぎました)。私は試しても機能しますが、何十億という数字を扱っているなら最適なソリューションとはみなされません。とにかく、ソリューションのおかげで、私は減算部分が好きでした:)それは最も簡単なアプローチです。 – Mrinal
は、動作していなかったソリューションとこれまで試みてきたソリューションを共有できます。 –
Sparkは、各行を計算中に他の行を調べることに依存する場合に最適なオプションではありません。 Sparkは、他のアイテムに依存することなく各アイテムを処理できるので、効率的に並列化できます。 –
@ダニエル、私も知っていたが、私はインタビューでそれを実装するように頼まれた。私は彼らにアキュムレータの解決策を教え、それを受け入れました。その後、私は自宅でそれを実装するために、それがアキュムレータとしてうまくいかなかった試みただけで更新操作が値にアクセスしないように、連想で異なるタスクで値を更新するために使用することができます。 – Mrinal