2017-01-31 18 views
1

私は、Mac上でall-sparkノートブックドッカーの画像を使って、apree toreeとscala(https://github.com/jupyter/docker-stacks/tree/master/all-spark-notebook)を使用しています。私は、スパークのマニュアルの基本的なストリーミングの例をテストしようとしていますScalaスパークストリーミング(Apache Toree経由)

に伴うWICH: 1)ポート9999 2に耳を傾け、sparkstreamingオブジェクトを起動する)ネットキャストプログラム開始:だから私は起動nc -lk 9999

を9999ポートを結合コンテナ:

$ sudo docker run -it --rm -p 9999:9999 -p 8888:8888 -e GRANT_SUDO=yes --user root --pid=host -e TINI_SUBREAPER=true -v $HOME/Informatique/notebooks:/home/jovyan/work:rw jupyter/all-spark-notebook 

しかし、その後、それに接続しようと、私は、「ポート使用済み」エラーました:

$ nc -lk 9999 
nc: Address already in use 

私はまた、容器に自分を入れてみました:

[email protected]:~$ docker ps 
CONTAINER ID  IMAGE      COMMAND     CREATED    STATUS    PORTS           NAMES 
0bd6b70bacfa  jupyter/all-spark-notebook "tini -- start-not..." 23 seconds ago  Up 22 seconds  0.0.0.0:8888->8888/tcp, 0.0.0.0:9999->9999/tcp wonderful_brattain 
[email protected]:~$ docker exec -ti wonderful_brattain /bin/bash 
[email protected]:~/work# nc -lk 9999 
bash: nc: command not found 
[email protected]:~/work# sudo apt-get update 
[email protected]:~/work# sudo apt-get install netcat-traditional 
[email protected]:~/work# nc -lk 9999 
aaaa aaa aaa 
bb bbb bbb 
cc cc cc 

しかし、Scalaのノートブックに表示されるものはありません。

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ 
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") 
val ssc = new StreamingContext(sc, Seconds(1)) 
val lines = ssc.socketTextStream("localhost", 9999) 
val words = lines.flatMap(_.split(" ")) 
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 
// Count each word in each batch 
val pairs = words.map(word => (word, 1)) 
val wordCounts = pairs.reduceByKey(_ + _) 
// Print the first ten elements of each RDD generated in this DStream to the console 
wordCounts.print() 

で:

ssc.start()    // Start the computation 
ssc.awaitTermination() // Wait for the computation to terminate 

与える:

------------------------------------------- 
Time: 1485880101000 ms 
------------------------------------------- 

------------------------------------------- 
Time: 1485880102000 ms 
------------------------------------------- 

------------------------------------------- 
Time: 1485880103000 ms 
------------------------------------------- 

------------------------------------------- 
Time: 1485880104000 ms 
------------------------------------------- 

これらのネットワークの問題に対処するにはどうすればよいですか?

答えて

0

スパークストリーミングコンテキストは、ncサーバーに接続して、そこからデータをストリーミングしようとします。ポート9999、ではなく、のSparkコンテキストでリッスンしているサーバーはncです。

最初にノートブックコンテナを-p 9999:9999で起動しているため、Dockerはホスト上にポート9999を予約しているので、ポートはすでに使用中です。ホスト上でnc -lk 9999を実行しようとすると、競合が発生します。

ノートブックコンテナ内で動作しているカーネルがアクセスできるように、ncサーバを設定する必要があります。これを行う1つの方法は、別のDockerコンテナにncサーバを実行し、両方のコンテナを同じDockerネットワークに接続することです。

まず、二つの容器が通信できるようにするために、あなたのホスト上のドッカーネットワークを作成します。

docker network create testnet 

今、自身の容器にncを実行します。

docker run -it --rm --name nc --network testnet appropriate/nc -lk 9999 

--network testnetオプションはtestnetネットワークへのコンテナを添付する。 --name ncオプションを使用すると、ホスト名ncを使用して、同じネットワーク上の他のコンテナからコンテナにアクセスできるようになります。

ノートブックコンテナを別々に実行します。また、--network testnetを使用する必要があります。

docker run -it --rm --network testnet -p 8888:8888 \ 
-v $HOME/Informatique/notebooks:/home/jovyan/work:rw \ 
jupyter/all-spark-notebook 

最後に、あなたのノートブックのコードでは、スパークコンテキストは、ホスト名ncに接続されていることを確認してください。

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ 
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") 
val ssc = new StreamingContext(sc, Seconds(5)) 
val lines = ssc.socketTextStream("nc", 9999) 
val words = lines.flatMap(_.split(" ")) 
import org.apache.spark.streaming.StreamingContext._ 
// Count each word in each batch 
val pairs = words.map(word => (word, 1)) 
val wordCounts = pairs.reduceByKey(_ + _) 
// Print the first ten elements of each RDD generated in this DStream to the console 
wordCounts.print() 

ssc.start()    // Start the computation 
ssc.awaitTermination() // Wait for the computation to terminate 

あなたはncコンテナ用の端子に入力した場合:

hello, world 

あなたは、あなたのノートにこれを見る必要があります。

------------------------------------------- 
Time: 1485987495000 ms 
------------------------------------------- 
(hello,,1) 
(world,1) 
関連する問題