dstream

    2

    2答えて

    私はJavaDStreamsで動作するアプリケーションを持っています。 これはコードの一部です。ここでは、単語が表示される頻度を計算します。今 JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Overrid

    0

    1答えて

    SparkStreamingアプリケーション(Python)をデベロップしている間、私はそれがどのようにうまく動作するのかよく分かりません。 jsonファイルストリーム(ディレクトリにポップする)を読んで、それぞれのjsonオブジェクトと参照に対して結合操作を実行してから、それをテキストファイルに書き戻すだけです。 config = configparser.ConfigParser() con

    0

    1答えて

    次の2つは同じですか? val dstream = stream.window(Seconds(60), Seconds(1)) val x = dstream.map(x => ...) と val dstream = stream.window(Seconds(60), Seconds(1)) val x = dstream.transform(rdd => rdd.map(x =>

    0

    1答えて

    Dstream(Spark Streaming)のTransform APIを使用してデータを並べ替えています。 netcatを使ってTCPソケットから読み込み中です。 次のコード行が使用されています。 myDStream.transform(rdd => rdd.sortByKey()) sortByKey関数が見つかりません。誰でもこのステップの問題点を助けてくれますか?

    1

    1答えて

    InputDStreamを使用して(並列化されていない)ストリーミングを実行するために、Apache Sparkの長時間実行されるストリーミングジョブを設定しています。 私が達成しようとしているのは、キューのバッチが(ユーザー定義のタイムアウトに基づいて)時間がかかり過ぎると、バッチをスキップして完全に放棄できるようにしたいということです。実行。 spark APIまたはオンラインでこの問題の解決

    0

    1答えて

    私は、それぞれスキーマS1とS2を持つ2つのソケットストリームS1とS2を受け取りました。 スパークストリーミングを使用して、属性 "a"に関してS1とS2を結合したいと思います。以下は私のコードです: sc = SparkContext("local[3]", "StreamJoin") ssc = StreamingContext(sc, 1) S1 = ssc.sock

    0

    1答えて

    これは、一日と言うために通ってくるしているどのように多くのエラーメッセージ/警告メッセージの数を保つために Pyspark filter operation on Dstream のフォローアップの質問です、時間 - どのように仕事をデザインするのか。 私が試してみました: from __future__ import print_function import sys from pysp

    1

    1答えて

    私は、カフカ(createDstreamを使用)から消費しているストリーミングジョブを持っています。 「ID」 [id1,id2,id3 ..] のその流れIは、各ID [id:t1,id2:t2,id3:t3...] のための「T」を言うIDの配列を受け取り、いくつかの外部コールを行い、いくつかの情報をバック受信ユーティリティまたはAPIを持っています DStreamを保持し、Dstre

    1

    1答えて

    1)MLlib Random Forestを使用しようとしています。 2列 id, predicted_value 1, 0.5 2, 0.4 私の機能セットを持っている必要があり、私の最終的な出力は スコア、データとスコアリング---電車を訓練しているが、私は訓練し、得点するとき、それは次のような特徴として使用することができなかったように私は、idフィールドをドロップそれが今、私はスコア

    0

    1答えて

    sparkストリーミング(pyspark)からelasticserachにデータをインデックス付けする際に問題が発生しています。データはdstreamです。インデックス= CLUSと種類を=データ GET /clus/_mapping/data { "clus": { "mappings": { "data": { "properties": {