1
Spark Streamin Appを記述します。このアプリケーションでは、乱数を持つストリームを取り出し、数えます。ここで私が書いたスパークアプリです:Spark Streaming Appはポート上で文字列を受け取ることができません
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "IntegerCount") # 2 threads, app name
ssc = StreamingContext(sc, 1) # sc, time interval for batch update.
nums = ssc.socketTextStream("localhost", 8000) # stream data from TCP; source, port
# create key,value pairs
tests = nums.map(lambda num: (int(num), 1))
# Count each integer in each batch
intCounts = tests.reduceByKey(lambda x, y: x + y)
# Print
intCounts.pprint()
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
は、と私はそのServer.pyでポート8000へ乱数を提供しています:
import socket
from random import randint
host = 'localhost'
port = 8000
address = (host, port)
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(address)
server_socket.listen(5)
print "Listening for client . . ."
conn, address = server_socket.accept()
print "Connected to client at ", address
#pick a large output buffer size because i dont necessarily know how big the incoming packet is
while True:
output = str(randint(0, 10))
conn.send(output)
私はServer.pyと私のスパークアプリケーションを実行し、接続成功する。しかし、私がいることである空の出力を参照してください。私は問題が何であるかを知らない
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
-------------------------------------------
Time: 2017-07-16 22:36:11
-------------------------------------------
-------------------------------------------
Time: 2017-07-16 22:36:12
-------------------------------------------
を、何が起こっているかを理解するために私を助けてください?
誰も問題を知らない??? – fcgtyg