SparkとHBaseの新機能ですが、2つをリンクする必要がありますが、ライブラリのspark-hbase-connectorを試しましたが、spark-submitでエラーがなくても動作しません表示されます。私はここや他の場所で類似の問題やチュートリアルを検索しましたが、見つけられませんでしたので、SparkストリーミングからHBaseに書き込む方法やチュートリアルや本をお勧めします。 は、事前にHBaseにスパークストリーミングをリンクする
0
A
答えて
1
ありがとう何最後に働いていたことだった。ここで
val hconf = HBaseConfiguration.create()
val hTable = new HTable(hconf, "mytab")
val thePut = new Put(Bytes.toBytes(row))
thePut.add(Bytes.toBytes("colfamily"), Bytes.toBytes("c1"), Bytes.toBytes(value)
hTable.put(thePut)
0
がスパークストリーミングやカフカ経由でのHBaseにデータを保存するためにスプライスマシン(オープンソース)を使用して、いくつかのサンプルコードです...
私たちはこれもまた闘って、それが少し難しいかもしれないことを知っています。ここで
+0
そのリンクがなくなる可能性があるので、回答に関連する部分を回答に含めてください。 – Beryllium
0
は、関連するコードは、この質問は、多くのレベルでオフトピックです...
LOG.info("************ SparkStreamingKafka.processKafka start");
// Create the spark application and set the name to MQTT
SparkConf sparkConf = new SparkConf().setAppName("KAFKA");
// Create the spark streaming context with a 'numSeconds' second batch size
jssc = new JavaStreamingContext(sparkConf, Durations.seconds(numSeconds));
jssc.checkpoint(checkpointDirectory);
LOG.info("zookeeper:" + zookeeper);
LOG.info("group:" + group);
LOG.info("numThreads:" + numThreads);
LOG.info("numSeconds:" + numSeconds);
Map<String, Integer> topicMap = new HashMap<>();
for (String topic: topics) {
LOG.info("topic:" + topic);
topicMap.put(topic, numThreads);
}
LOG.info("************ SparkStreamingKafka.processKafka about to read the MQTTUtils.createStream");
//2. KafkaUtils to collect Kafka messages
JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper, group, topicMap);
//Convert each tuple into a single string. We want the second tuple
JavaDStream<String> lines = messages.map(new TupleFunction());
LOG.info("************ SparkStreamingKafka.processKafka about to do foreachRDD");
//process the messages on the queue and save them to the database
lines.foreachRDD(new SaveRDDWithVTI());
LOG.info("************ SparkStreamingKafka.processKafka prior to context.strt");
// Start the context
jssc.start();
jssc.awaitTermination();
関連する問題
- 1. スパークストリーミング:ソースHBase
- 2. スパークストリーミングhbaseエラー
- 3. スパークストリーミングとHbase
- 4. スキーマレスデータによるスパークストリーミング
- 5. スパークストリーミング:キューにポイントをマッピングする
- 6. Javaでスパークストリーミングを停止するには?
- 7. スパークストリーミングJavaPairDStreamをテキストファイル
- 8. スパークストリーミングMYSQL
- 9. スパークストリーミングJavaCustomReceiver
- 10. スパークストリーミングrawSocketStream
- 11. HBaseマップ出力をhdfs&HBASeに出力
- 12. スパークストリーミングのセットアップに関する質問
- 13. hbaseコマンドを実行するシェルスクリプト - hbaseテーブルを作成する
- 14. 適切なhbase-site.xmlを探すHBaseクライアントのhbase-default.xml設定例
- 15. スパークストリーミング+ Accumulo - SerializeをBatchWriterImpl
- 16. updatestatebykey - Pyspark - スパークストリーミング
- 17. スパークストリーミングがDstream
- 18. スパークストリーミングのバッチルックアップデータ
- 19. スパークストリーミング - java.lang.NoSuchMethodErrorエラー
- 20. スパークストリーミング:Print JavaInputDStream
- 21. Fitbitデータ用スパークストリーミング
- 22. スパークストリーミングkafka concurrentModificationException
- 23. スパークストリーミング保留バッチ
- 24. カフカ+スパークストリーミング:kafka.common.OffsetOutOfRangeException
- 25. スパークストリーミング:java.lang.NoClassDefFoundError:kafka/api/TopicMetadataRequest
- 26. スパークストリーミングHiveContext NullPointerException
- 27. スパークストリーミングKafka java.lang.ClassNotFoundException:org.apache.kafka.common.serialization.StringDeserializer
- 28. スパークストリーミングxmlファイル
- 29. スパークストリーミング - 動的フィルタ
- 30. スパークストリーミング再生
です。 SOに関する質問をどのように行うのか、そしてどのような質問がここで答えられるのかをお読みください。 – eliasah