私はCSVファイルを持っていますが、あらかじめ列名がわかりません。 Google Dataflowでいくつかの変換を行った後、JSONでデータを出力する必要があります。データフローでCSVヘッダーを読み取る
ヘッダー行を取得し、すべての行にラベルを浸透させるにはどうすればよいですか?例えば
:
a,b,c
1,2,3
4,5,6
は...(約)は以下のようになる。
{a:1, b:2, c:3}
{a:4, b:5, c:6}
私はCSVファイルを持っていますが、あらかじめ列名がわかりません。 Google Dataflowでいくつかの変換を行った後、JSONでデータを出力する必要があります。データフローでCSVヘッダーを読み取る
ヘッダー行を取得し、すべての行にラベルを浸透させるにはどうすればよいですか?例えば
:
a,b,c
1,2,3
4,5,6
は...(約)は以下のようになる。
{a:1, b:2, c:3}
{a:4, b:5, c:6}
あなたが最初の行とストアヘッダデータを読み込みます(TextIO.TextSourceに類似)カスタムFileBasedSourceを実装する必要があります
@Override
protected void startReading(final ReadableByteChannel channel)
throws IOException {
lineReader = new LineReader(channel);
if (lineReader.readNextLine()) {
final String headerLine = lineReader.getCurrent().trim();
header = headerLine.split(",");
readingStarted = true;
}
}
および後者では、現在の行データへのT:私は速い(完全な)ソリューションを実装しました
@Override
protected boolean readNextRecord() throws IOException {
if (!lineReader.readNextLine()) {
return false;
}
final String line = lineReader.getCurrent();
final String[] data = line.split(",");
// assumes all lines are valid
final StringBuilder record = new StringBuilder();
for (int i = 0; i < header.length; i++) {
record.append(header[i]).append(":").append(data[i]).append(", ");
}
currentRecord = record.toString();
return true;
}
、github上で利用できます。私はまた、実証するデータフローユニットテストを追加した読書:
@Test
public void test_reading() throws Exception {
final File file =
new File(getClass().getResource("/sample.csv").toURI());
assertThat(file.exists()).isTrue();
final Pipeline pipeline = TestPipeline.create();
final PCollection<String> output =
pipeline.apply(Read.from(CsvWithHeaderFileSource.from(file.getAbsolutePath())));
DataflowAssert
.that(output)
.containsInAnyOrder("a:1, b:2, c:3, ", "a:4, b:5, c:6, ");
pipeline.run();
}
どこsample.csv
内容以下があります。
a,b,c
1,2,3
4,5,6
それはまだ新しいapacheビームバージョンと互換性がありますか? – vdolez
は、あなたは、JavaやPythonでそれが必要なのでしょうか? – vdolez