GCSでライン区切りのJSONファイルをgzipしました。 Dataflowを使用して読み込み、BigQueryに保存します。Google Cloud Dataflow(Apache Beam) - gzipファイルの最初の行を常に解析できません
ただし、ファイルの最初の行でJSONを解析できません。 私はジャクソンを使用して、ログが
Failed to parse JSON, com.fasterxml.jackson.core.JsonParseException:
Unexpected character ('_' (code 95)):
Expected space separating root-level values at
[Source: (String)"40085_telemetry_2015-09-09.log0000664000076600007660000011300712574251007016553
0ustar xxxadminxxxadmin"[truncated 142 chars];
line: 1, column: 7
を言う私は、ファイルの内容を確認するときしかし、上記のような何の文字列がありませんでした。確かに有効なJSON文字列です。 Dataflowが処理を開始すると、上記の文字列が最初の行の先頭に追加されます。
これはなぜ起こりますか? Apache Beam Java SDKバージョン2.1.0を使用します。
マイコードshowns以下のように:
static class ReadFile extends PTransform<PInput, PCollection<String>> {
private static final long serialVersionUID = 1L;
private ValueProvider<String> files;
public ReadFile(ValueProvider<String> env, ValueProvider<String> productName, ValueProvider<String> files,
ValueProvider<String> deadLetterId) {
this.files = files;
}
@Override
public PCollection<String> expand(PInput input) {
Pipeline p = input.getPipeline();
// this pipeline supports both wildcard and comma separated
String inputFile = files.get();
if (inputFile.contains("*")) {
return p.apply("Read from GCS with wildcard prefix",
TextIO.read().from(inputFile).withCompressionType(CompressionType.GZIP));
}
String[] targetFiles = inputFile.split(",");
PCollectionList<String> rowsList = PCollectionList.empty(p);
for (String targetFile : targetFiles) {
PCollection<String> fileLines = p.apply("Read (" + targetFile + ")",
TextIO.read().from(targetFile).withCompressionType(CompressionType.GZIP));
rowsList = rowsList.and(fileLines);
}
PCollection<String> allRows = rowsList.apply("Flatten rows", Flatten.<String>pCollections());
return allRows;
}
}
JavaまたはPython SDKを使用していますか?パイプラインのこの部分を構成するコードを表示できますか?このファイルは "gzip -d"を使って圧縮解除できますか?ファイル拡張子とは何ですか? – jkff
返信のために@jkffに感謝します。私は情報を追加しましたが、その間に "gzip -d"でファイルを解凍すると、最初の行に文字列が追加されていました(MacのArchive Utilityを使用した場合は起こりません)。 –
@jkff「gzip -d」と表示されているものは正しいと思いますが、この文字列は実際には存在しますが、Mac Archive Utilityは何とかそれをちょっと省略します。あなたに迷惑をかけて申し訳ありませんが、私はこれが私たちのデータの単なる問題だと思っています:( –