2017-10-24 4 views
1

期間を保持するレコードのリスト(ナノ秒で表されます:2つのLong、開始のための1つ、終了の1つ)と測定値を含む1つのデータセットがあります。値が変更される期間を保持する新しい集約データセットを作成する必要があります。たとえば:Apache Sparkでtimeseriesデータを集計する方法

input dataset: 
    +-----+-----+-----+ 
    |start|end |value| 
    +-----+-----+-----+ 
    |123 |124 |1 | 
    |124 |128 |1 | 
    |128 |300 |2 | 
    |300 |400 |2 | 
    |400 |500 |3 | 

    result dataset: 
    +-----+-----+-----+ 
    |start|end |value| 
    +-----+-----+-----+ 
    |123 |128 |1 | 
    |128 |400 |2 | 
    |400 |500 |3 | 

私は小さなデータセットでこれを行う方法を知っているが、MapReduceのパラダイム、およびApacheのスパークを使用する方法は考えています。

Apache Spark、javaでこれを達成する方法を教えてください。

+0

たとえば、最後の値を取得するのに遅れをとってから、現在の最後の値と最後の値が異なるかどうかに基づいてデータセットをフィルタリングします。 –

+0

何か別の行が130 200 1の場合、何が出力になりますか –

+0

ウィンドウ関数を使用できますが、スパークは時系列データの処理に最適ではありません。選択肢がありません。 https:// github .com/sryza/spark-timeseries – SanthoshPrasad

答えて

1

これは非常に単純なようです。 groupByでminとmaxを見つけたら、データセットを結合します。

// df is original dataset 
Dataset<Row> df_start = df.groupBy("value").min("start").withColumnRenamed("min(start)", "start").withColumnRenamed("value", "value_start"); 
Dataset<Row> df_end = df.groupBy("value").max("end").withColumnRenamed("max(end)", "end").withColumnRenamed("value", "value_end"); 

Dataset<Row> df_combined = df_start.join(df_end, df_start.col("value_start").equalTo(df_end.col("value_end"))).drop("value_end").withColumnRenamed("value_start", "value").orderBy("value"); 

df_combined.show(false); 
+-----+-----+---+ 
|value|start|end| 
+-----+-----+---+ 
|1 |123 |128| 
|2 |128 |400| 
|3 |400 |700| 
+-----+-----+---+ 
0

このアプローチの1つのアプローチは、それぞれの個別の値に対して、その値の隣接するすべての時間範囲を見つけてそれらを合体させるというものです。あなたはgroupByを値に使って、それぞれの値に対してstartendのリストを作成することができます。次に、カスタム関数を使用してこれらを連続した時間範囲に縮小することができます。

極端な場合、データセットでディスクのみの永続性レベルを使用する場合、唯一の要件は、start_endの1行をメモリに収めることができることです。これは、ほとんどのクラスタで、このアプローチの上限をgbでstart_endの値ごとに設定します。

は、ここでの実装例です(要求されるようにするJava APIを使用して - Scalaはかなり少ない冗長になります):

public class JavaSparkTest { 

    public static void main(String[] args){ 
     SparkSession session = SparkSession.builder() 
       .appName("test-changes-in-time") 
       .master("local[*]") 
       .getOrCreate(); 
     StructField start = createStructField("start", DataTypes.IntegerType, false); 
     StructField end = createStructField("end", DataTypes.IntegerType, false); 
     StructField value = createStructField("value", DataTypes.IntegerType, false); 
     StructType inputSchema = createStructType(asList(start,end,value)); 
     StructType startEndSchema = createStructType(asList(start, end)); 
     session.udf().register("collapse_timespans",(WrappedArray<Row> startEnds) -> 
       JavaConversions.asJavaCollection(startEnds).stream() 
        .sorted((a,b)->((Comparable)a.getAs("start")).compareTo(b.getAs("start"))) 
        .collect(new StartEndRowCollapsingCollector()), 
       DataTypes.createArrayType(startEndSchema) 
     ); 
     Dataset<Row> input = session.createDataFrame(asList(
       RowFactory.create(123, 124, 1), 
       RowFactory.create(124, 128, 1), 
       RowFactory.create(128, 300, 2), 
       RowFactory.create(300, 400, 2), 
       RowFactory.create(400, 500, 3), 
       RowFactory.create(500, 600, 3), 
       RowFactory.create(600, 700, 3) 
     ), inputSchema); 
     Dataset<Row> startEndByValue = input.selectExpr("(start start, end end) start_end", "value"); 
     Dataset<Row> startEndsByValue = startEndByValue.groupBy("value").agg(collect_list("start_end").as("start_ends")); 
     Dataset<Row> startEndsCollapsed = startEndsByValue.selectExpr("value", "explode(collapse_timespans(start_ends)) as start_end"); 
     Dataset<Row> startEndsInColumns = startEndsCollapsed.select("value", "start_end.start", "start_end.end"); 
     startEndsInColumns.show(); 
    } 

    public static class StartEndRowCollapsingCollector implements Collector<Row, List<Row>, List<Row>>{ 

     @Override 
     public Supplier<List<Row>> supplier() { 
      return()-> new ArrayList<Row>(); 
     } 

     @Override 
     public BiConsumer<List<Row>, Row> accumulator() { 
      return (rowList, row) -> { 
       // if there's no rows in the list or the start doesn't match the current end 
       if(rowList.size()==0 || 
         !rowList.get(rowList.size()-1).getAs(1).equals(row.getAs(0))){ 
        rowList.add(row); 
       } else { 
        Row lastRow = rowList.remove(rowList.size()-1); 
        rowList.add(RowFactory.create(lastRow.getAs(0), row.getAs(1))); 
       } 
      }; 
     } 

     @Override 
     public BinaryOperator<List<Row>> combiner() { 
      return (a,b)->{ throw new UnsupportedOperationException();}; 
     } 

     @Override 
     public Function<List<Row>, List<Row>> finisher() { 
      return i->i; 
     } 

     @Override 
     public Set<Characteristics> characteristics() { 
      return Collections.EMPTY_SET; 
     } 
    } 
} 

とプログラムの出力:値はありません

+-----+-----+---+ 
|value|start|end| 
+-----+-----+---+ 
| 1| 123|128| 
| 3| 400|700| 
| 2| 128|400| 
+-----+-----+---+ 

お知らせ順番に。これは、sparkがデータセットを分割して値の行を処理し、行の順序付けに重要性を割り当てるように指示していないためです。もちろん、時間や値のソートされた出力が必要な場合は、通常の方法で並べ替えることができます。

関連する問題