2017-07-16 6 views
1

Spark Streamin Appを記述します。このアプリケーションでは、乱数を持つストリームを取り出し、数えます。ここで私が書いたスパークアプリです:Spark Streaming Appはポート上で文字列を受け取ることができません

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 

sc = SparkContext("local[2]", "IntegerCount") # 2 threads, app name 
ssc = StreamingContext(sc, 1) # sc, time interval for batch update. 

nums = ssc.socketTextStream("localhost", 8000) # stream data from TCP; source, port 

# create key,value pairs 
tests = nums.map(lambda num: (int(num), 1)) 

# Count each integer in each batch 
intCounts = tests.reduceByKey(lambda x, y: x + y) 

# Print 
intCounts.pprint() 

ssc.start()    # Start the computation 
ssc.awaitTermination() # Wait for the computation to terminate 

は、と私はそのServer.pyでポート8000​​へ乱数を提供しています:

import socket 
from random import randint 

host = 'localhost' 
port = 8000 
address = (host, port) 

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
server_socket.bind(address) 
server_socket.listen(5) 


print "Listening for client . . ." 
conn, address = server_socket.accept() 
print "Connected to client at ", address 
#pick a large output buffer size because i dont necessarily know how big the incoming packet is 
while True: 
    output = str(randint(0, 10)) 
    conn.send(output) 

私はServer.pyと私のスパークアプリケーションを実行し、接続成功する。しかし、私がいることである空の出力を参照してください。私は問題が何であるかを知らない

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
Setting default log level to "WARN". 
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 
------------------------------------------- 
Time: 2017-07-16 22:36:11 
------------------------------------------- 

------------------------------------------- 
Time: 2017-07-16 22:36:12 
------------------------------------------- 

を、何が起こっているかを理解するために私を助けてください?

+0

誰も問題を知らない??? – fcgtyg

答えて

0

解決済み、「\ n」で文字列を送信しました。

import socket 
from random import randint 

host = 'localhost' 
port = 8000 
address = (host, port) 

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
server_socket.bind(address) 
server_socket.listen(5) 


print "Listening for client . . ." 
conn, address = server_socket.accept() 
print "Connected to client at ", address 
#pick a large output buffer size because i dont necessarily know how big the incoming packet is 
while True: 
    output = str(randint(0, 10)) + "\n" ### THAT IS THE FIX. 
    conn.send(output) 
関連する問題