python
  • pandas
  • apache-spark
  • pyspark
  • 2017-05-10 7 views 0 likes 
    0

    複数の列の値を更新するが、スパークが急速に変化し、多くの回答が古くなっているように見えるデータセット全体を返すための最良の方法を特定するのに少し苦労しています。特定の列のPySpark更新値

    私は次のようにデータフレームを作成する小規模なクラスタ上でスパーク2.1を実行している:

    df = spark.read.options(header="true",sep = '|').csv(path = 'file:///usr//local//raw_data//somefile.txt') 
    
    print df.columns 
    ['ID','field1','field2','field3','value'] #there are actually many more columns, this is just an example 
    

    私はフィールド1、フィールド2およびフィールド3に以下のマッピング関数を適用されますが、データセット全体を保持する必要が

    def mappingFunction(val,dict): 
        if val in dict: 
         return dict(val) 
        else: 
         return val 
    

    非常に単純化し、私はパンダにそうようにこれを行うことができます:

    df['field1'] = df['field1'].map(mapDict) 
    df['field2'] = df['field2'].map(mapDict) 
    df['field3'] = df['field3'].map(mapDict) 
    

    私はn pyspark、df.rdd.map()の機能がありますが、これはこれに近づく "時代遅れ"のように思えます。さらに、基底のデータセットを既にカラムで分割しているので、 RDDに戻る必要があります。

    また、pyspark.sql.functions.udf(f、returnType = StringType)も参照してください。これは私が使いたいと思うようです。

    私の質問は以下のとおりです。

    誰かがUDFを定義すると、このインスタンスに行くための正しい方法であることを確認してもらえますか?

    もしそうなら、一度に複数の列にUDFを適用するにはどうすればよいですか?私は行を繰り返し処理するので、一度に3つの列すべてにマッピング関数を適用するのが最適なクエリ設計のようですが、他のすべてのコンテキストでそれを行う方法がわかりません。やっている。

    これらの値を更新して、完全なデータセットを返すにはどうすればよいですか?私がやっているすべての集計/操作は、更新された列の値を使用する必要があります。

    洞察力がありがとう!

    答えて

    2

    あなたはおそらく、その後、broadcast変数に辞書を変換引きudfを定義し、ジェネレータ式を使用して、関連するすべての列に適用するオフに最適です:

    はのは、第1のダミーデータセットと辞書を作成してみましょう:

    df = sc.parallelize([ 
        ("a",1,1,2,2), 
        ("b",2,2,3,3), 
        ("c",3,4,3,3)]).toDF(['ID','field1','field2','field3','value']) 
    
    myDict = {1: "y", 2: "x", 3: "z"} 
    

    今、私たちはbroadcast変数に辞書を変換して、ルックアップudf定義:

    broadcastVar = sc.broadcast(myDict) 
    
    def lookup(x): 
    
        if broadcastVar.value.get(x) is None: 
        return x 
        else: 
        return broadcastVar.value.get(x) 
    
    lookup_udf = udf(lookup) 
    

    今何残っているのは、我々は("field"が含まれているすべてのもの)に、当社の機能を適用します列名のlistを生成し、そして私たちのudfとジェネレータ式の内側にそれを入れている:

    from pyspark.sql.functions import col 
    
    cols = [s for s in df.columns if "field" in s] 
    df.select(*(lookup_udf(col(c)).alias(c) if c in cols else c for c in df.columns)).show() 
    +---+------+------+------+-----+ 
    | ID|field1|field2|field3|value| 
    +---+------+------+------+-----+ 
    | a|  y|  y|  x| 2| 
    | b|  x|  x|  z| 3| 
    | c|  z|  4|  z| 3| 
    +---+------+------+------+-----+ 
    
    +0

    ありがとう!非常に役立ちます!最後の質問 - 私はこれらの更新を "永続的"にしたい、それで私は他の集計/計算を実行できる。今すぐ最後の出力はちょうど.show()関数です。最後の行を次のように置き換えますか: df = df.select(*(lookup_udf(col(c)))エイリアス(c)df.columnsの中でcのcの場合はcsを、cの場合はc)collect() – flyingmeatball

    +0

    df1 = df.select(..)のような最後に '.show '、' .show() 'で終わって、データがどのように変換されたかを表示します。最後に' collect() 'を使わないと、データがドライバノードに持ち込まれます。 – mtoto

    +0

    Duh - 私は知っていました1つありがとう! – flyingmeatball

    関連する問題