このアプローチの1つのアプローチは、それぞれの個別の値に対して、その値の隣接するすべての時間範囲を見つけてそれらを合体させるというものです。あなたはgroupBy
を値に使って、それぞれの値に対してstart
とend
のリストを作成することができます。次に、カスタム関数を使用してこれらを連続した時間範囲に縮小することができます。
極端な場合、データセットでディスクのみの永続性レベルを使用する場合、唯一の要件は、start_end
の1行をメモリに収めることができることです。これは、ほとんどのクラスタで、このアプローチの上限をgbでstart_end
の値ごとに設定します。
は、ここでの実装例です(要求されるようにするJava APIを使用して - Scalaはかなり少ない冗長になります):
public class JavaSparkTest {
public static void main(String[] args){
SparkSession session = SparkSession.builder()
.appName("test-changes-in-time")
.master("local[*]")
.getOrCreate();
StructField start = createStructField("start", DataTypes.IntegerType, false);
StructField end = createStructField("end", DataTypes.IntegerType, false);
StructField value = createStructField("value", DataTypes.IntegerType, false);
StructType inputSchema = createStructType(asList(start,end,value));
StructType startEndSchema = createStructType(asList(start, end));
session.udf().register("collapse_timespans",(WrappedArray<Row> startEnds) ->
JavaConversions.asJavaCollection(startEnds).stream()
.sorted((a,b)->((Comparable)a.getAs("start")).compareTo(b.getAs("start")))
.collect(new StartEndRowCollapsingCollector()),
DataTypes.createArrayType(startEndSchema)
);
Dataset<Row> input = session.createDataFrame(asList(
RowFactory.create(123, 124, 1),
RowFactory.create(124, 128, 1),
RowFactory.create(128, 300, 2),
RowFactory.create(300, 400, 2),
RowFactory.create(400, 500, 3),
RowFactory.create(500, 600, 3),
RowFactory.create(600, 700, 3)
), inputSchema);
Dataset<Row> startEndByValue = input.selectExpr("(start start, end end) start_end", "value");
Dataset<Row> startEndsByValue = startEndByValue.groupBy("value").agg(collect_list("start_end").as("start_ends"));
Dataset<Row> startEndsCollapsed = startEndsByValue.selectExpr("value", "explode(collapse_timespans(start_ends)) as start_end");
Dataset<Row> startEndsInColumns = startEndsCollapsed.select("value", "start_end.start", "start_end.end");
startEndsInColumns.show();
}
public static class StartEndRowCollapsingCollector implements Collector<Row, List<Row>, List<Row>>{
@Override
public Supplier<List<Row>> supplier() {
return()-> new ArrayList<Row>();
}
@Override
public BiConsumer<List<Row>, Row> accumulator() {
return (rowList, row) -> {
// if there's no rows in the list or the start doesn't match the current end
if(rowList.size()==0 ||
!rowList.get(rowList.size()-1).getAs(1).equals(row.getAs(0))){
rowList.add(row);
} else {
Row lastRow = rowList.remove(rowList.size()-1);
rowList.add(RowFactory.create(lastRow.getAs(0), row.getAs(1)));
}
};
}
@Override
public BinaryOperator<List<Row>> combiner() {
return (a,b)->{ throw new UnsupportedOperationException();};
}
@Override
public Function<List<Row>, List<Row>> finisher() {
return i->i;
}
@Override
public Set<Characteristics> characteristics() {
return Collections.EMPTY_SET;
}
}
}
とプログラムの出力:値はありません
+-----+-----+---+
|value|start|end|
+-----+-----+---+
| 1| 123|128|
| 3| 400|700|
| 2| 128|400|
+-----+-----+---+
お知らせ順番に。これは、sparkがデータセットを分割して値の行を処理し、行の順序付けに重要性を割り当てるように指示していないためです。もちろん、時間や値のソートされた出力が必要な場合は、通常の方法で並べ替えることができます。
たとえば、最後の値を取得するのに遅れをとってから、現在の最後の値と最後の値が異なるかどうかに基づいてデータセットをフィルタリングします。 –
何か別の行が130 200 1の場合、何が出力になりますか –
ウィンドウ関数を使用できますが、スパークは時系列データの処理に最適ではありません。選択肢がありません。 https:// github .com/sryza/spark-timeseries – SanthoshPrasad