1

私はCloudera VMで簡単な単語カウントSparkStreamingプログラムを実行しようとしています。 REPLモードでScalaを使用していて、IDEを使用していません。接続スカラを使用してSparkStreamingプログラムを実行中に拒否されました

は、ここで私は、接続がエラーを拒否し得る私のコード

val ssc = new StreamingContext(sc, Seconds(2)) 

val lines = ssc.socketTextStream("localhost",8585,MEMORY_ONLY) 

val wordsFlatMap = lines.flatMap(_.split(" ")) 

val wordsMap = wordsFlatMap.map(w => (w,1)) 

val wordCount = wordsMap.reduceByKey((a,b) => (a+b)) 

wordCount.print 

ssc.start 

です。私はプログラムをREPLモードで実行しています。エラーは次のとおりです。

scala> ssc.start 
17/04/19 03:06:43 INFO scheduler.ReceiverTracker: Starting 1 receivers 
17/04/19 03:06:43 INFO scheduler.ReceiverTracker: ReceiverTracker started 
17/04/19 03:06:43 INFO dstream.ForEachDStream: metadataCleanupDelay = -1 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: metadataCleanupDelay = -1 
17/04/19 03:06:43 INFO dstream.MappedDStream: metadataCleanupDelay = -1 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: metadataCleanupDelay = -1 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: metadataCleanupDelay = -1 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Slide time = 2000 ms 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Checkpoint interval = null 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Remember duration = 2000 ms 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Initialized and validated [email protected] 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Slide time = 2000 ms 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Checkpoint interval = null 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Remember duration = 2000 ms 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Initialized and validated [email protected] 
17/04/19 03:06:43 INFO dstream.MappedDStream: Slide time = 2000 ms 
17/04/19 03:06:43 INFO dstream.MappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 
17/04/19 03:06:43 INFO dstream.MappedDStream: Checkpoint interval = null 
17/04/19 03:06:43 INFO dstream.MappedDStream: Remember duration = 2000 ms 
17/04/19 03:06:43 INFO dstream.MappedDStream: Initialized and validated [email protected] 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Slide time = 2000 ms 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1) 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Checkpoint interval = null 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Remember duration = 2000 ms 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Initialized and validated [email protected] 
17/04/19 03:06:43 INFO dstream.ForEachDStream: Slide time = 2000 ms 
17/04/19 03:06:43 INFO dstream.ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1) 
17/04/19 03:06:43 INFO dstream.ForEachDStream: Checkpoint interval = null 
17/04/19 03:06:43 INFO dstream.ForEachDStream: Remember duration = 2000 ms 
17/04/19 03:06:43 INFO dstream.ForEachDStream: Initialized and validated [email protected] 
17/04/19 03:06:43 INFO util.RecurringTimer: Started timer for JobGenerator at time 1492596404000 
17/04/19 03:06:43 INFO scheduler.JobGenerator: Started JobGenerator at 1492596404000 ms 
17/04/19 03:06:43 INFO scheduler.JobScheduler: Started JobScheduler 
17/04/19 03:06:43 INFO streaming.StreamingContext: StreamingContext started 

scala> 17/04/19 03:06:43 INFO scheduler.ReceiverTracker: Receiver 0 started 
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Got job 0 (submitJob at ReceiverTracker.scala:557) with 1 output partitions 
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Final stage: ResultStage 0(submitJob at ReceiverTracker.scala:557) 
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Parents of final stage: List() 
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Missing parents: List() 
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:554), which has no missing parents 
17/04/19 03:06:44 INFO scheduler.JobScheduler: Added jobs for time 1492596404000 ms 
17/04/19 03:06:44 INFO scheduler.JobScheduler: Starting job streaming job 1492596404000 ms.0 from job set of time 1492596404000 ms 
17/04/19 03:06:44 INFO spark.SparkContext: Starting job: print at <console>:47 
17/04/19 03:06:44 INFO storage.MemoryStore: ensureFreeSpace(65984) called with curMem=0, maxMem=560497950 
17/04/19 03:06:44 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 64.4 KB, free 534.5 MB) 
17/04/19 03:06:44 INFO storage.MemoryStore: ensureFreeSpace(22354) called with curMem=65984, maxMem=560497950 
17/04/19 03:06:44 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 21.8 KB, free 534.4 MB) 
17/04/19 03:06:44 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:41905 (size: 21.8 KB, free: 534.5 MB) 
17/04/19 03:06:44 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861 
17/04/19 03:06:44 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:554) 
17/04/19 03:06:44 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Registering RDD 3 (map at <console>:42) 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Got job 1 (print at <console>:47) with 1 output partitions 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Final stage: ResultStage 2(print at <console>:47) 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 1) 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Missing parents: List() 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (ShuffledRDD[4] at reduceByKey at <console>:44), which has no missing parents 
17/04/19 03:06:45 INFO storage.MemoryStore: ensureFreeSpace(2400) called with curMem=88338, maxMem=560497950 
17/04/19 03:06:45 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.3 KB, free 534.4 MB) 
17/04/19 03:06:45 INFO storage.MemoryStore: ensureFreeSpace(1429) called with curMem=90738, maxMem=560497950 
17/04/19 03:06:45 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1429.0 B, free 534.4 MB) 
17/04/19 03:06:45 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:41905 (size: 1429.0 B, free: 534.5 MB) 
17/04/19 03:06:45 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (ShuffledRDD[4] at reduceByKey at <console>:44) 
17/04/19 03:06:45 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 
17/04/19 03:06:45 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 2644 bytes) 
17/04/19 03:06:45 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0) 
17/04/19 03:06:45 INFO util.RecurringTimer: Started timer for BlockGenerator at time 1492596405400 
17/04/19 03:06:45 INFO receiver.BlockGenerator: Started BlockGenerator 
17/04/19 03:06:45 INFO receiver.BlockGenerator: Started block pushing thread 
17/04/19 03:06:45 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from 10.0.2.15:50802 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Starting receiver 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Waiting for receiver to be stopped 
17/04/19 03:06:45 INFO dstream.SocketReceiver: Connecting to localhost:8585 
17/04/19 03:06:45 WARN receiver.ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error connecting to localhost:8585 
java.net.ConnectException: Connection refused 
    at java.net.PlainSocketImpl.socketConnect(Native Method) 
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) 
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) 
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) 
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
    at java.net.Socket.connect(Socket.java:579) 
    at java.net.Socket.connect(Socket.java:528) 
    at java.net.Socket.<init>(Socket.java:425) 
    at java.net.Socket.<init>(Socket.java:208) 
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73) 
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59) 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Error connecting to localhost:8585: java.net.ConnectException: Connection refused 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0 
17/04/19 03:06:45 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:8585 - java.net.ConnectException: Connection refused 
    at java.net.PlainSocketImpl.socketConnect(Native Method) 
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) 
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) 
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) 
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
    at java.net.Socket.connect(Socket.java:579) 
    at java.net.Socket.connect(Socket.java:528) 
    at java.net.Socket.<init>(Socket.java:425) 
    at java.net.Socket.<init>(Socket.java:208) 
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73) 
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59) 

以下に示すように、私は自分のコードを変更したとき、私は別のエラーが出る:

var sparkConf = new SparkConf().setAppName("Streaming Example").setMaster("local[2]").set("spark.drive.allowMultipleContexts","true") 
val ssc = new StreamingContext(sparkConf,Seconds(2)) 

-

17/04/19 03:18:52 INFO spark.SparkContext: Running Spark version 1.5.0-cdh5.5.0 
    17/04/19 03:18:52 INFO spark.SecurityManager: Changing view acls to: cloudera 
    17/04/19 03:18:52 INFO spark.SecurityManager: Changing modify acls to: cloudera 
    17/04/19 03:18:52 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera) 
    17/04/19 03:18:53 INFO slf4j.Slf4jLogger: Slf4jLogger started 
    17/04/19 03:18:53 INFO Remoting: Starting remoting 
    17/04/19 03:18:53 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42235] 
    17/04/19 03:18:53 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:42235] 
    17/04/19 03:18:53 INFO util.Utils: Successfully started service 'sparkDriver' on port 42235. 
    17/04/19 03:18:53 INFO spark.SparkEnv: Registering MapOutputTracker 
    17/04/19 03:18:53 INFO spark.SparkEnv: Registering BlockManagerMaster 
    17/04/19 03:18:53 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-b87051bc-5b7f-4c4f-975f-a0661b3ec29f 
    17/04/19 03:18:53 INFO storage.MemoryStore: MemoryStore started with capacity 534.5 MB 
    17/04/19 03:18:53 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-a3c5d465-ca27-4aa0-ad43-47088abb7703/httpd-01babb12-0237-4faa-9917-394a768cbcaa 
    17/04/19 03:18:53 INFO spark.HttpServer: Starting HTTP Server 
    17/04/19 03:18:53 INFO server.Server: jetty-8.y.z-SNAPSHOT 
    17/04/19 03:18:53 INFO server.AbstractConnector: Started [email protected]:52313 
    17/04/19 03:18:53 INFO util.Utils: Successfully started service 'HTTP file server' on port 52313. 
    17/04/19 03:18:53 INFO spark.SparkEnv: Registering OutputCommitCoordinator 
    17/04/19 03:18:53 INFO server.Server: jetty-8.y.z-SNAPSHOT 
    17/04/19 03:18:53 WARN component.AbstractLifeCycle: FAILED [email protected]:4040: java.net.BindException: Address already in use 
    java.net.BindException: Address already in use 
     at sun.nio.ch.Net.bind0(Native Method) 
     at sun.nio.ch.Net.bind(Net.java:444) 
     at sun.nio.ch.Net.bind(Net.java:436) 
     at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) 
     at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) 
     at org.spark-project.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) 
     at org.spark-project.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) 
     at org.spark-project.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) 
     at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) 
     at org.spark-project.jetty.server.Server.doStart(Server.java:293) 
     at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) 
     at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:236) 
     at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246) 
     at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246) 
     at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1913) 
     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
     at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1904) 
     at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:246) 
     at org.apache.spark.ui.WebUI.bind(WebUI.scala:136) 
     at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474) 
     at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.SparkContext.<init>(SparkContext.scala:474) 
     at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:854) 
     at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:81) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36) 
     at $line31.$read$$iwC$$iwC$$iwC.<init>(<console>:38) 
     at $line31.$read$$iwC$$iwC.<init>(<console>:40) 
     at $line31.$read$$iwC.<init>(<console>:42) 
     at $line31.$read.<init>(<console>:44) 
     at $line31.$read$.<init>(<console>:48) 
     at $line31.$read$.<clinit>(<console>) 
     at $line31.$eval$.<init>(<console>:7) 
     at $line31.$eval$.<clinit>(<console>) 
     at $line31.$eval.$print(<console>) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
     at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) 
     at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
     at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) 
     at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) 
     at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) 
     at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) 
     at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) 
     at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
     at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
     at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) 
     at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) 
     at org.apache.spark.repl.Main$.main(Main.scala:31) 
     at org.apache.spark.repl.Main.main(Main.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
    17/04/19 03:18:53 WARN component.AbstractLifeCycle: FAILED [email protected]: java.net.BindException: Address already in use 
    java.net.BindException: Address already in use 
     at sun.nio.ch.Net.bind0(Native Method) 
     at sun.nio.ch.Net.bind(Net.java:444) 
     at sun.nio.ch.Net.bind(Net.java:436) 
     at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) 
     at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) 
     at org.spark-project.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) 
     at org.spark-project.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) 
     at org.spark-project.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) 
     at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) 
     at org.spark-project.jetty.server.Server.doStart(Server.java:293) 
     at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) 
     at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:236) 
     at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246) 
     at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246) 
     at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1913) 
     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
     at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1904) 
     at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:246) 
     at org.apache.spark.ui.WebUI.bind(WebUI.scala:136) 
     at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474) 
     at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.SparkContext.<init>(SparkContext.scala:474) 
     at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:854) 
     at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:81) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36) 
     at $line31.$read$$iwC$$iwC$$iwC.<init>(<console>:38) 
     at $line31.$read$$iwC$$iwC.<init>(<console>:40) 
     at $line31.$read$$iwC.<init>(<console>:42) 
     at $line31.$read.<init>(<console>:44) 
     at $line31.$read$.<init>(<console>:48) 
     at $line31.$read$.<clinit>(<console>) 
     at $line31.$eval$.<init>(<console>:7) 
     at $line31.$eval$.<clinit>(<console>) 
     at $line31.$eval.$print(<console>) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
     at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) 
     at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
     at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) 
     at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) 
     at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) 
     at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) 
     at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) 
     at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
     at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
     at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) 
     at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) 
     at org.apache.spark.repl.Main$.main(Main.scala:31) 
     at org.apache.spark.repl.Main.main(Main.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null} 
    17/04/19 03:18:53 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 
    17/04/19 03:18:53 INFO server.Server: jetty-8.y.z-SNAPSHOT 
    17/04/19 03:18:53 INFO server.AbstractConnector: Started [email protected]:4041 
    17/04/19 03:18:53 INFO util.Utils: Successfully started service 'SparkUI' on port 4041. 
    17/04/19 03:18:53 INFO ui.SparkUI: Started SparkUI at http://localhost:4041 
    17/04/19 03:18:53 WARN metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 
    17/04/19 03:18:53 INFO storage.BlockManagerMaster: Registered BlockManager 
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at: 
org.apache.spark.SparkContext.<init>(SparkContext.scala:82) 
org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:1017) 

誰かが私にエラーを是正助けることができますか?

+0

実際にポート8585でオープンソケットを使用していますか? –

答えて

1

アプローチ1

期待どおりsocketTextStream()を使用しているので、あなたが見ているエラーです。だから、火花がjava.net.socket

そしてjava.net.socketを使用していますSocketInputDStreamのインスタンスは、それはあなたの指定されたアドレスとポート番号で実行されているサーバーを期待している意味クライアントソケット、ある作成されます。

ローカルマシンのポート8585で実行されるサービスが必要です。

私が何を意味するのかを確認するには、あなたの環境でmasterまたはappNameを設定する必要はないかもしれません。

import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.SparkConf 

object MyStream 
{ 
    def main(args:Array[String]) 
    { 
    val sc = new StreamingContext(new SparkConf().setMaster("local").setAppName("socketstream"),Seconds(10)) 
    val mystreamRDD = sc.socketTextStream("bbc.co.uk",80) 
    mystreamRDD.print() 
    sc.start() 
    sc.awaitTermination() 
    } 
} 

アプリはBBCのウェブサイトへのHTTPを話すことはありませんので、これはすべてのコンテンツを返しませんが、それは、接続が例外を拒否し得ることはありません。上記のコードは同じエラーを与えた場合はLinux上でローカルサーバーを実行するには

、あなたは、このような

ような単純なコマンド
cat data.txt | ncat -l -p 8585 

でのnetcatを使用することができ、その後のアプローチ2.

アプローチに従う2

しかし、物事の数はエラーを引き起こしている可能性があります:

  • 間違ったIP /ポートに接続しようとしています。
  • サーバーを起動していません。
  • サーバーが接続をリッスンしていません。
  • サーバーに受け入れられるのを待っている保留中の接続が多すぎます。
  • ファイアウォールがサーバーに到達する前に接続をブロックしています。

希望すると、これが役立ちます。

+1

BindExceptionがsparkUIの2番目のエラーであることを追加するのと同じです。プロパティ名がミスペルトのようです。これはspark.drive.allowMultipleContextsです。それはspark.driver.allowMultipleContextsである必要があります – sparker

+0

ありがとう! :)今はうまく動作します。 – Swa

関連する問題