2016-03-27 4 views
0

SparkとHBaseの新機能ですが、2つをリンクする必要がありますが、ライブラリのspark-hbase-connectorを試しましたが、spark-submitでエラーがなくても動作しません表示されます。私はここや他の場所で類似の問題やチュートリアルを検索しましたが、見つけられませんでしたので、SparkストリーミングからHBaseに書き込む方法やチュートリアルや本をお勧めします。 は、事前にHBaseにスパークストリーミングをリンクする

+0

です。 SOに関する質問をどのように行うのか、そしてどのような質問がここで答えられるのかをお読みください。 – eliasah

答えて

1

ありがとう何最後に働いていたことだった。ここで

val hconf = HBaseConfiguration.create() 
val hTable = new HTable(hconf, "mytab") 
val thePut = new Put(Bytes.toBytes(row)) 
thePut.add(Bytes.toBytes("colfamily"), Bytes.toBytes("c1"), Bytes.toBytes(value) 
hTable.put(thePut) 
0

がスパークストリーミングやカフカ経由でのHBaseにデータを保存するためにスプライスマシン(オープンソース)を使用して、いくつかのサンプルコードです...

https://github.com/splicemachine/splice-community-sample-code/tree/master/tutorial-kafka-spark-streaming

私たちはこれもまた闘って、それが少し難しいかもしれないことを知っています。ここで

+0

そのリンクがなくなる可能性があるので、回答に関連する部分を回答に含めてください。 – Beryllium

0

は、関連するコードは、この質問は、多くのレベルでオフトピックです...

 LOG.info("************ SparkStreamingKafka.processKafka start"); 

    // Create the spark application and set the name to MQTT 
    SparkConf sparkConf = new SparkConf().setAppName("KAFKA"); 

    // Create the spark streaming context with a 'numSeconds' second batch size 
    jssc = new JavaStreamingContext(sparkConf, Durations.seconds(numSeconds)); 
    jssc.checkpoint(checkpointDirectory); 

    LOG.info("zookeeper:" + zookeeper); 
    LOG.info("group:" + group); 
    LOG.info("numThreads:" + numThreads); 
    LOG.info("numSeconds:" + numSeconds); 


    Map<String, Integer> topicMap = new HashMap<>(); 
    for (String topic: topics) { 
     LOG.info("topic:" + topic); 
     topicMap.put(topic, numThreads); 
    } 

    LOG.info("************ SparkStreamingKafka.processKafka about to read the MQTTUtils.createStream"); 
    //2. KafkaUtils to collect Kafka messages 
    JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper, group, topicMap); 

    //Convert each tuple into a single string. We want the second tuple 
    JavaDStream<String> lines = messages.map(new TupleFunction()); 

    LOG.info("************ SparkStreamingKafka.processKafka about to do foreachRDD"); 
    //process the messages on the queue and save them to the database 
    lines.foreachRDD(new SaveRDDWithVTI()); 


    LOG.info("************ SparkStreamingKafka.processKafka prior to context.strt"); 
    // Start the context 
    jssc.start(); 
    jssc.awaitTermination(); 
関連する問題