2017-04-19 12 views
1
package org.apache.spark.examples.kafkaToflink; 

import java.io.ByteArrayOutputStream; 
import java.io.IOException; 
import java.io.OutputStream; 
import java.io.PrintStream; 
import java.nio.charset.StandardCharsets; 
import java.util.Properties; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema; 

import com.microsoft.azure.datalake.store.ADLException; 
import com.microsoft.azure.datalake.store.ADLFileOutputStream; 
import com.microsoft.azure.datalake.store.ADLStoreClient; 
import com.microsoft.azure.datalake.store.IfExists; 
import com.microsoft.azure.datalake.store.oauth2.AccessTokenProvider; 
import com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider; 

import scala.util.parsing.combinator.testing.Str; 

public class App { 

    public static void main(String[] args) throws Exception { 
     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

     Properties properties = new Properties(); 
     properties.setProperty("bootstrap.servers", "192.168.1.72:9092"); 
     properties.setProperty("group.id", "test"); 
     DataStream<String> stream = env.addSource(
       new FlinkKafkaConsumer010<String>("tenant", new SimpleStringSchema(), properties), "Kafka_Source"); 

     stream.addSink(new ADLSink()).name("Custom_Sink").setParallelism(128); 
     env.execute("App"); 
    } 
} 

class ADLSink<String> extends RichSinkFunction<String> { 

    private java.lang.String clientId = "***********"; 
    private java.lang.String authTokenEndpoint = "***************"; 
    private java.lang.String clientKey = "*****************"; 
    private java.lang.String accountFQDN = "****************"; 
    private java.lang.String filename = "/Bitfinex/ETHBTC/ORDERBOOK/ORDERBOOK.json"; 

    @Override 
    public void invoke(String value) { 

     AccessTokenProvider provider = new ClientCredsTokenProvider(authTokenEndpoint, clientId, clientKey); 
     ADLStoreClient client = ADLStoreClient.createClient(accountFQDN, provider); 
     try { 
      client.setPermission(filename, "744"); 
      ADLFileOutputStream stream = client.getAppendStream(filename); 

      System.out.println(value); 
      stream.write(value.toString().getBytes()); 

      stream.close(); 

     } catch (ADLException e) { 

      System.out.println(e.requestId); 
     } catch (Exception e) { 

      System.out.println(e.getMessage()); 
      System.out.println(e.getCause()); 
     } 

    } 

} 

私は、whileループを使用してAzureデータレイクストアにあるファイルを継続的に追加しようとしていますが、この場合、操作APPENDはHTTP500で失敗しました。私はJavaを使用しています操作APPENDがHTTP500で失敗しましたか?

+0

質問をいただきありがとうございます。 HTTP 500は「サーバー」エラーです。私はADLSチームに調査し、あなたに手を差し伸べるように頼んでいます。 –

+0

あなたは、(a)appendまたはconcurrentappendを使用しているかどうかに関する情報を提供できますか?(b)これが単一のスレッドまたは複数のスレッドで起こっているかどうか。 –

+0

@AmitKulkarni私は追加を使用していますが、これはシングルスレッドで発生しています –

答えて

1

Anubhav、Azureデータレイクストリームはシングルライターストリームです。つまり、これらのスレッド間で何らかの形式の同期を行わない限り、複数のスレッドから同じストリームに書き込むことはできません。これは、書き込みごとに書き込むオフセットを指定し、複数のスレッドでオフセットが一貫していないためです。

あなたはあなたのケースでは、複数のスレッド(あなたのコードで.setParallelism(128)コール)から

を書いているように見える、次の2つの選択肢があります。各スレッドで別のファイルに

  1. 書き込み。私はあなたのユースケースを知らないのですが、私たちは、さまざまなスレッドの自然な使用である多くのケースで、異なるファイルに書き込むことがわかっています。
  2. すべてのスレッドに同じファイルへの書き込みをさせることが重要な場合は、すべてのインスタンスが同じADLFileOutputStreamへの参照を持つように、シンクを少しリファクタリングする必要があります。 write()close()は同期しています。

さて、ここでもう一つの問題がある - があったと言いますが(ADLFileOutputStreamの取得・リース以来、リースが競合していることを示す)HTPPのに4xxエラーされている必要がありましたエラーではなく、HTTP 500は、サーバー側の問題。その問題を解決するには、アカウント名とアクセス時間を知る必要があります。その情報はStackOverflowで共有するのは安全ではありませんので、そのためのサポートチケットを開き、このSOの質問を参照してください。問題は最終的に私に送られます。

関連する問題