2017-10-11 7 views
0

BigQueryで1つの列を変更し、手動で他の列をすべて手作業で保存することなく更新されたデータを新しい表に書きたいと思います。私は次のコードでやりたいものを達成することができます - 私は100+の列(または本当にさえ10+列を持っていた場合は単一のBigQuery列を変更して新しい表に書き込む

row = p | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query))  
new_row = row | beam.Map(lambda x: (x["col1"], x["col2"], preprocess(x["text_col"])) 
output = new_row | beam.Map(lambda (col1, col2, processed_text): {"col1": col1, "col2": col2, "text": processed_text} 

output | beam.io.WriteToBigQuery(path_to_new_table) 

、これは基本的に書くと手ですべての列を維持するために私を必要とし)これは非常に扱いにくく、扱いにくいです。行の上でいくつかの関数(この例ではpreprocess())を実行し、その列だけを更新して他の列を保持する簡単な方法はありますか?

+0

ヒント:いないタプルとしてではなく、辞書としてデータ要素を表現してみてください。次に、.copy()を使用して辞書をコピーし、必要に応じてコピーを修正し、変更された辞書を返すことができます。 – jkff

+0

ああ、これは辞書を受け入れる別個の関数でなければならず、そこで変更が発生する可能性があります。 – reese0106

+0

@jkff私の答えをチェックし、これがあなたが意図したものかどうかを教えてください:) – reese0106

答えて

1

@ jkffのおかげで、私はこれを行う方法を考え出しました。関数はdictを受け取って受け取る必要があります。そして、dictの単一の要素を変更するだけで済みます。以下のような何か:

new_row = row | beam.Map(lambda x: preprocess_text(x, col_to_transform='text_column')` 
preprocess_textは()のようなものになるだろう

def preprocess_text(row, col_to_transform): 
    row_copy = row.copy() 
    line = row_copy[col_to_transform] 
    line = ... # preprocessing transform goes here 
    row_copy[col_to_transform] = line 

    return row_copy 
+0

あなたは元のオブジェクトを変更する代わりにトランスフォームに渡される現在の要素を変更することは安全ではありません。 – jkff

+0

@jkff提案に感謝します。前処理機能内でコピーを実行するのは安全ですか?それとも他の場所に置くようアドバイスしますか? – reese0106

+0

私はそれをするために別の場所を考えることができません。 beam.Map、beam.FlatMap、beam.ParDo、beam.Filterなどに渡される関数は、入力要素を変更すること(つまり、すでに出力した要素を変更すること)からすべての機能を禁止しています。不変としてPCollectionsの。したがって、それを修正したい場合は、前処理機能でコピーを作成する必要があります。 – jkff

関連する問題