spark-streaming

    -1

    1答えて

    私はストリーミングを開始するのが新しく、マップの仕組みを理解できません。私は私が書いたので、何のコンストラクタから、それを通過した後、ストリームからいくつかのポイントをエンキューしたいです: val data = inp.flatMap(_.split(",")) val points = data.map(_.toDouble) val queue: Queue[Point] = new Q

    0

    1答えて

    プロデューサから送信されたデータがコンシューマに届かない理由はわかりません。 私はcloudera仮想マシンに取り組んでいます。 私は、プロデューサーがKafkaを使用し、消費者がスパークストリーミングを使用する単純なプロデューサーコンシューマーを作成しようとしています。 Scalaではプロデューサーコード:Scalaでは import java.util.Properties import o

    0

    1答えて

    Dstream(Spark Streaming)のTransform APIを使用してデータを並べ替えています。 netcatを使ってTCPソケットから読み込み中です。 次のコード行が使用されています。 myDStream.transform(rdd => rdd.sortByKey()) sortByKey関数が見つかりません。誰でもこのステップの問題点を助けてくれますか?

    0

    2答えて

    KafkaとSparkを介してAvroメッセージのストリームを処理している間に、処理済みのデータをElasticSearchインデックスに文書として保存しています。 は、ここでは、コード(簡体字)です:予想通り directKafkaStream.foreachRDD(rdd ->{ rdd.foreach(avroRecord -> { byte[] encodedA

    0

    1答えて

    スパークダイレクトストリーミングを使用するとき、私はズキーパーカーに私のオフセットを保存しようとしています。私はそれがAPIで利用できません見るように、我々はJavaPairInputDstreamを取得するために使用できる方法の回避策があります私はJavaInputDstream APIは、特定のオフセットから起動するオプションを持っている見るが、私はJavaPairInputDstreamのた

    10

    1答えて

    sparkストリーミングジョブからtsdbデータベースを開くためにメトリックを送信するにはどうすればよいですか?私はGrafanaのデータソースとしてopen tsdbを使用しようとしています。私が始めることができるいくつかの参考文献で私を助けてください。 似たような仕事をするオープンtsdbレポーターがここにあります。スパークストリーミングジョブのメトリックを統合するにはどうすればよいですか?そ

    0

    1答えて

    私は助けが必要です。私は、Pythonコードを使用してWebページからファイルをダウンロードし、ローカルファイルシステムに配置してから、putコマンドを使用してHDFSに転送してから操作を実行します。 しかし、ファイルサイズが非常に大きく、ローカルファイルシステムへのダウンロードが適切な手順でない場合があります。ですから、ファイルをローカルファイルシステムを使用せずにHDFSに直接ダウンロードした

    1

    1答えて

    (textFileStreamを使用して)ディレクトリからファイルを取得するSparkストリーミングジョブを実行しています。 ジョブがダウンしてもファイルはまだディレクトリに追加されている場合があります。 ジョブが再び起動すると、それらのファイルは(ジョブが実行されている間に新しいものではなく、変更されていないので)ピックアップされていませんが、処理されたいものです。 1)解決策はありますか?どの