Apache Flinkを使用してTwitter Streaming APIでメッセージを取得しようとしています。Apache Flink - データを取得できません。
しかし、私のコードは出力ファイルに何も書き込んでいません。私は特定の単語の入力データを数えようとしています。
pleseは私の例を確認してください。
import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.twitter._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import com.twitter.hbc.core.endpoint.{Location, StatusesFilterEndpoint, StreamingEndpoint}
import org.apache.flink.streaming.api.windowing.time.Time
import scala.collection.JavaConverters._
//////////////////////////////////////////////////////
// Create an Endpoint to Track our terms
class myFilterEndpoint extends TwitterSource.EndpointInitializer with Serializable {
@Override
def createEndpoint(): StreamingEndpoint = {
//val chicago = new Location(new Location.Coordinate(-86.0, 41.0), new Location.Coordinate(-87.0, 42.0))
val endpoint = new StatusesFilterEndpoint()
//endpoint.locations(List(chicago).asJava)
endpoint.trackTerms(List("odebrecht", "lava", "jato").asJava)
endpoint
}
}
object Connection {
def main(args: Array[String]): Unit = {
val props = new Properties()
val params: ParameterTool = ParameterTool.fromArgs(args)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setGlobalJobParameters(params)
env.setParallelism(params.getInt("parallelism", 1))
props.setProperty(TwitterSource.CONSUMER_KEY, params.get("consumer-key"))
props.setProperty(TwitterSource.CONSUMER_SECRET, params.get("consumer-key"))
props.setProperty(TwitterSource.TOKEN, params.get("token"))
props.setProperty(TwitterSource.TOKEN_SECRET, params.get("token-secret"))
val source = new TwitterSource(props)
val epInit = new myFilterEndpoint()
source.setCustomEndpointInitializer(epInit)
val streamSource = env.addSource(source)
streamSource.map(s => (0, 1))
.keyBy(0)
.timeWindow(Time.minutes(2), Time.seconds(30))
.sum(1)
.map(t => t._2)
.writeAsText(params.get("output"))
env.execute("Twitter Count")
}
}
ポイントである、私はエラーメッセージを持っていないと私は私のダッシュボードで確認することができます。私のソースはTriggerWindowにデータを送信しています。しかし、それはデータを受け取っていません:
私は一度に2つの質問があります。
最初に:私のソースは何も受信されていない場合、私のTriggerWindowにバイトを送信しているのはなぜですか?
Seccond:私のコードに間違っていますが、私はTwitterからデータを受け取ることができませんか?
最初の結果は、2分後(つまり、ウィンドウの長さ)に書き込まれます。あなたはそれを待ったことがありますか? TriggerWindowはデータを受信しましたが、43秒後にはファイルに何も書き込まれません。あなたのコードはすべて上手く見えます。 –
こんにちは@DawidWysakowicz、はい私はそれを待っています。私はこのコードを2時間実行していました。私は質問のためにプリントを取った。しかし、Flinkからの出力はありません:( –