spark-streaming

    0

    1答えて

    スパークストリーミングでカフカストリームから数百万のメッセージを取得しています。 15種類のメッセージがあります。メッセージは1つのトピックから来ています。私はその内容によってのみメッセージを区別することができます。だから私はrdd.containsメソッドを使用して、異なるタイプのrddを取得しています。 サンプルメッセージ { "A": "FOO"、 "B": "バー"、 "タイプ": "第一

    0

    1答えて

    私はコンシューマコードでKafka.Butからデータを受け取るためのSparkコンシューマAPIを作成しようとしています。これらの2つのクラスに対してjar /依存関係を追加することはできません: import org.apache .spark.streaming.scheduler.ReceiverLauncher; import org.apache.spark.streaming.Sche

    0

    1答えて

    私は、毎分60Kイベントを取り込んで処理するKafka - Spark Streamingアプリケーションを持っています。私は可視化レイヤーによってアクセスされる私の変換されたデータフレームを格納するデータベースが必要です。 Spark StreamingでRedshiftを使用することはできますか?またはCassandraを使用する必要がありますか?私は、30秒のスパークウィンドウごとにデータフ

    0

    1答えて

    csvファイルに75のeコマース顧客アカウントデータのデータがあります。 また、別のファイルにトランザクションレコードがあります。ここで、口座番号は主キーです。すべてのアカウントの平均トランザクション数は500です。 今、私はこのデータを処理し、プロモーションオファーを提供することについて何らかの決定をしたいと思います。データ量が膨大なので、私はSparkSQLに行くことにしました。 しかし、この

    1

    1答えて

    私はカフカからスパークダイレクト・ストリームを作成しようとしていますが、directStreamオブジェクトを作成しながら、私のようにエラーを取得しています: kafkaUtilsは、HashMapのの(1には適用されませんタイプ内のメソッドcreateDirectStream私が渡しているパラメータ)。 JavaPairInputDStream directKafkaStream = Kafka

    0

    2答えて

    私はsparkを使用してTerradataテーブルからデータを読み込み、Upsertオラクルテーブルに入れたいと思っています。私はデータを挿入することができますが、UPSERTを実行することはできますか? これは、Oracleにデータを挿入する方法です。 dataframe.write.mode(SaveMode.Append).jdbc(URL、表6、小道具)

    -1

    1答えて

    私はkafkaのApacheログを読んでから、Spark Streamingにさらに処理したいと思います。私はkafkaを初めて使用しています。私が理解している限り、ログファイルを読み込むプロデューサクラスを作成する必要があります。

    1

    1答えて

    私は、カフカからのいくつかのメッセージを監視するスパークストリーミングアプリケーションを持っています。特定のメッセージについては、再接続するまで何度かpingを続行するために長いループに移動する必要があるかもしれません。 一部のエグゼキュータがこのループに行くと、ストリームの処理が停止することがわかります。 正しいですか? ストリームの処理を中断せずにこのループを作成するにはどうすればよいですか?

    -1

    2答えて

    コードの末尾にscalaを使用してsparkを実行する必要があります。私は自分のコードでcountとgroupbyの関数を使いました。私は言及する必要があります、私のコードは、コードの最後の行なしで完全に動作します。 (import sys.process._ /////////////////////////linux commands val procresult="./Users/sae