2016-06-29 3 views
2

私は、日付と整数を含む非常に大きなcsvファイルを持っています。私は各ファイルレコードのEctoレコードを作成する必要があります。問題は、隣接するレコードの日付間の最小時間差に基づいて整数を操作する必要があることです。私はストリームを一点まで処理し、変数にバインドし、次にそれを2つの異なる計算に使用しようとしていました。しかし、2番目の計算では空のストリームが得られます。ストリームにアクセスすると、私が取得したものはすべて削除されるようです。再利用/フォーク/クローン/ dup /何かストリームがある方法はありますか? RXストリームにこのコンセプトがあることはわかっています。私は一本のチェーンでこれを行うために何らかの方法を考えましたが、空になりました。ここでは基本的に私がしようとしていた流れである。エリクシルファイルストリームの再利用/フォーク

def do_something(path) do 
    {:ok, file} = File.open(path) 
    stream = file 
    |> IO.stream(:line) 
    |> Stream.map(&String.split(&1, ",")) 

    dates = stream_to_dates(stream) # stream 
    factor = dates_to_factor(dates) # float 
    values = stream_to_values(stream, factor) # stream 

    Stream.zip(dates, values) 
end 

ので、値は、このように、空である、私は日付を計算することができるよ、その後の要因が、その直後、ストリームと日付の両方が空ストリームでありますジッパーは...空です

答えて

1

あなたは指定されたストリームからペアのストリームを作成するためにStream.transformを使用することができます。そして、

def pairs(stream) do 
    Stream.transform(stream, nil, fn(x, last) -> 
    # The first element is the list of values to return at this point, 
    # the second one is the new accumulator 
    {[{last, x}], x} 
    end) 
    # Drop the first pair of {nil, something} 
    |> Stream.drop(1) 
end 

iex(1)> 1..1000 |> Stream2.pairs |> Enum.take(5) 
[{1, 2}, {2, 3}, {3, 4}, {4, 5}, {5, 6}] 

|> Stream.map(&String.split(&1, ","))の後にペアを使用して、隣接するレコードのペアを取得できるようにする必要があります。より大きなチャンクが必要な場合は、その関数を一般化することができます。