6

私はGoogleクラウドデータフローを使用したBigQueryテーブルにトピックからPubSubのメッセージデータを挿入したいと思います。 すべてがうまくいきますが、BigQueryテーブルでは "߈ "のような読めない文字列を見ることができます。 これは私のパイプラインである:挿入PubSubのメッセージ

p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/project-name/topics/topic-name")) 
.apply(ParDo.named("Transformation").of(new StringToRowConverter())) 
.apply(BigQueryIO.Write.named("Write into BigQuery").to("project-name:dataset-name.table") 
    .withSchema(schema) 
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)) 

と私のシンプルなStringToRowConverter機能は次のとおりです。

class StringToRowConverter extends DoFn<String, TableRow> { 
private static final long serialVersionUID = 0; 

@Override 
public void processElement(ProcessContext c) { 
    for (String word : c.element().split(",")) { 
     if (!word.isEmpty()) { 
      System.out.println(word); 
     c.output(new TableRow().set("data", word)); 
     } 
    } 
} 
} 

そして、これは私がPOSTリクエストを介して送信されたメッセージである:私は何をしないのです

POST https://pubsub.googleapis.com/v1/projects/project-name/topics/topic-name:publish 
{ 
"messages": [ 
    { 
    "attributes":{ 
"key": "tablet, smartphone, desktop", 
"value": "eng" 
    }, 
    "data": "34gf5ert" 
    } 
] 
} 

? ありがとうございました!

+0

[この](https://github.com/bomboradata/pubsub-to-bigqueryが)あなたはBQに直接パブリッシュ/サブスクライブに使用することができるオープンソースであります – PUG

答えて

6

https://cloud.google.com/pubsub/reference/rest/v1/PubsubMessageによると、pubsubメッセージのJSONペイロードはbase64でエンコードされています。データフロー内のPubsubIOは、デフォルトでString UTF8コーダーを使用します。 base64でデコードされ、UTF-8文字列として解釈されたときに "34gf5ert"を指定した文字列の例は、正確に "߈ "となります。私は私のpubsubメッセージを開梱していますどのようにこれは

2

@Override 
public void processElement(ProcessContext c) { 

    String json = c.element(); 

    HashMap<String,String> items = new Gson().fromJson(json, new TypeToken<HashMap<String, String>>(){}.getType()); 
    String unpacked = items.get("JsonKey"); 

はあなたにその便利を願っています。

関連する問題