2016-07-14 8 views
0

受信機が何度も何度も再起動するという問題があります。
私はSpark 1.6.1を使用しています。 私はストリーミングから受信するためにSpark Streamingを使用し、マップを使用してpbデータを逆シリアル化します。スパークストリーミング受信機Out of Memory(OOM)

私のテストは2例含まれています

  1. をするだけで直接データやプリントを受け取る:アプリは安定している
  2. 受信およびデシリアライズ:これは問題を生成します。発生時間は規則的ではありません。 500Mb /分あります。私はエグゼキュータのメモリを8GBに設定しました。問題は、メモリを極端に割り当てるようなものです。しかし、私は理由を知らない。

マイコード:

val conf = new SparkConf().setAppName(args(8)) 
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
conf.set("spark.streaming.stopGracefullyOnShutdown", "true") 
conf.set("spark.streaming.backpressure.enabled","true") 
conf.set("spark.speculation","true") 
val sc = new SparkContext(conf) 
val ssc = new StreamingContext(sc, Seconds(args(7).toInt)) 
val bigPipeStreams = (1 to args(3).toInt).map{ 
    i => ssc.networkStream(
    new MyBigpipeLogagentReceiver(args(0),args(1),args(2),i,args(4),args(5),args(6).toInt) 
) 
} 
val lines = ssc.union(bigPipeStreams) 
def deserializePbData(value: String) : String = { 

if (null == value || value.isEmpty) { 
    return "" 
} 
var cuid = "" 
var os = "" 
var channel = "" 
var sv = "" 
var resid = "" 
var appid = "" 
var prod = "" 
try { //if exception,useless data,just drop it 
    val timeStrIndex = value.indexOf(",\"time_str\"") 
    var strAfterTruncation = "" 
    if (-1 != timeStrIndex) { 
     strAfterTruncation = value.substring(0,timeStrIndex) + "}" 
    } else { 
     strAfterTruncation = value 
    } 
    val jsonData = JSONObject.fromObject(strAfterTruncation) 
    //val jsonData = value.getAsJsonArray() 
    val binBody = jsonData.getString("bin_body") 
    val pbData = binBody.substring(1,binBody.length()-1).split(",").foldLeft(ArrayBuffer.empty[Byte])((b,a) => b +java.lang.Byte.parseByte(a)).drop(8).toArray 
    Lighttpd.lighttpd_log.parseFrom(pbData).getRequest().getUrl().getUrlFields().getAutokvList().asScala.foreach(a => 
     a.getKey() match { 
      case "cuid" => cuid += a.getValue() 
      case "os" => os += a.getValue() 
      case "channel" => channel += a.getValue() 
      case "sv" => sv += a.getValue() 
      case "resid" => resid += a.getValue() 
      case "appid" => appid += a.getValue() 
      case "prod" => prod += a.getValue() 
      case _ => null 
     } 
    ) 
    val decodeCuid = URLDecoder.decode(cuid, "UTF-8") 
    os = os.toLowerCase() 
    if (os.matches("android(.*)")) { 
     os = "android" 
    } else if (os.matches("iphone(.*)")) { 
     os = "iphone" 
    } else if (os.matches("ipad(.*)")) { 
     os = "ipad" 
    } else if (os.matches("s60(.*)")) { 
     os = "symbian" 
    } else if (os.matches("wp7(.*)")) { 
     os = "wp7" 
    } else if (os.matches("wp(.*)")) { 
     os = "wp" 
    } else if (os.matches("tizen(.*)")) { 
     os = "tizen" 

    val ifHasLogid = Lighttpd.lighttpd_log.parseFrom(pbData).hasLogid() 
    val time = Lighttpd.lighttpd_log.parseFrom(pbData).getTime() 
    if (ifHasLogid) { 
     val logid = Lighttpd.lighttpd_log.parseFrom(pbData).getLogid() 
     if (logid.isEmpty || logid.toString().equals("-") || !resid.toString().equals("01") || channel.isEmpty |!appid.isEmpty || !prod.isEmpty) { 
      "" 
     } else { 
      decodeCuid + "\001" + os + "\001" + channel + "\001" + sv + "\001" + "1" + "\001" + "1" + "\001" + time + "\n" 
     } 
    } else { 
     "" 
    } 
} catch { 
    case _:Throwable => "" 
} 
} 
lines.map(parseData).print() 

エラーテキスト:

016-07-12T12:00:01.546+0800: 5096.643: [GC (Allocation Failure) 
Desired survivor size 442499072 bytes, new threshold 1 (max 15) 
[PSYoungGen: 0K->0K(2356736K)] 5059009K->5059009K(7949312K), 0.0103342 secs] [Times: user=0.21 sys=0.00, real=0.01 secs] 
2016-07-12T12:00:01.556+0800: 5096.654: [Full GC (Allocation Failure) [PSYoungGen: 0K->0K(2356736K)] [ParOldGen: 5059009K->5057376K(5592576K)] 5059009K->5057376K(7949312K), [Metaspace: 44836K->44490K(1089536K)], 0.8769617 secs] [Times: user=17.88 sys=0.04, real=0.88 secs] 
2016-07-12T12:00:02.434+0800: 5097.531: Total time for which application threads were stopped: 1.2951974 seconds, Stopping threads took: 0.0000662 seconds 
java.lang.OutOfMemoryError: Java heap space 
Dumping heap to java_pid24310.hprof ... 
2016-07-12T12:00:30.960+0800: 5126.057: Total time for which application threads were stopped: 28.5260812 seconds, Stopping threads  took: 0.0000995 seconds 
Heap dump file created [5211252802 bytes in 28.526 secs] 
# 
# java.lang.OutOfMemoryError: Java heap space 
# -XX:OnOutOfMemoryError="kill %p" 
# Executing /bin/sh -c "kill 24310"... 
2016-07-12T12:00:31.589+0800: 5126.686: Total time for which application threads were stopped: 0.6289627 seconds, Stopping threads took: 0.0001258 seconds 
2016-07-12T12:00:31.595+0800: 5126.692: Total time for which application threads were stopped: 0.0004822 seconds, Stopping threads took: 0.0001493 seconds 
2016-07-12 12:00:31.597 [Thread-5] ERROR [Logging.scala:95] - Uncaught exception in thread Thread[Thread-5,5,main] 
java.lang.OutOfMemoryError: Java heap space 
    at java.util.Arrays.copyOf(Arrays.java:3236) ~[na:1.8.0_51] 
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) ~[na:1.8.0_51] 
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) ~[na:1.8.0_51] 
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) ~[na:1.8.0_51] 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[na:1.8.0_51] 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) ~[na:1.8.0_51] 
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at com.esotericsoftware.kryo.io.Output.require(Output.java:135) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:420) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at com.esotericsoftware.kryo.io.Output.writeString(Output.java:326) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:194) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1196) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1202) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:858) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:645) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:77) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:157) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:128) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$3.onPushBlock(ReceiverSupervisorImpl.scala:109) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:296) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks( BlockGenerator.scala:268) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
    at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:109) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
2016-07-12 12:00:31.600 [SIGTERM handler] ERROR [SignalLogger.scala:57] - RECEIVED SIGNAL 15: SIGTERM 
2016-07-12T12:00:31.611+0800: 5126.708: Total time for which application threads were stopped: 0.0005602 seconds, Stopping threads took: 0.0001765 seconds 
2016-07-12T12:00:31.617+0800: 5126.714: Total time for which application threads were stopped: 0.0004800 seconds, Stopping threads took: 0.0001412 seconds 
2016-07-12 12:00:32.483 [Bigpipe Receiver-SendThread(cq01-bigpipe-proxy01.cq01.baidu.com:2181)] WARN [ClientCnxnSocket.java:139] -  Connected to an old server; r-o mode will be unavailable 
2016-07-12T12:00:32.507+0800: 5127.604: Total time for which application threads were stopped: 0.0004604 seconds, Stopping threads took: 0.0001198 seconds 
2016-07-12T12:00:32.509+0800: 5127.606: Total time for which application threads were stopped: 0.0002919 seconds, Stopping threads took: 0.0001800 seconds 
2016-07-12T12:00:32.509+0800: 5127.607: Total time for which application threads were stopped: 0.0002692 seconds, Stopping threads took: 0.0001612 seconds 
2016-07-12 12:00:32.549 [Bigpipe Receiver-SendThread(tc-bigpipe-proxy03.tc.baidu.com:2181)] WARN [ClientCnxnSocket.java:139] -  Connected to an old server; r-o mode will be unavailable 
2016-07-12T12:00:34.220+0800: 5129.317: [GC (Allocation Failure) 
Desired survivor size 424148992 bytes, new threshold 2 (max 15) 
[PSYoungGen: 1931776K->188775K(2363904K)] 6989152K->5246152K(7956480K), 0.2569385 secs] [Times: user=0.00 sys=5.19, real=0.26 secs] 
2016-07-12T12:00:34.477+0800: 5129.575: Total time for which application threads were stopped: 0.2575019 seconds, Stopping threads took: 0.0000384 seconds 
2016-07-12T12:00:35.478+0800: 5130.575: Total time for which application threads were stopped: 0.0002786 seconds, Stopping threads took: 0.0000424 seconds 
2016-07-12T12:00:37.600+0800: 5132.697: [GC (Allocation Failure) 
Desired survivor size 482344960 bytes, new threshold 3 (max 15) 
[PSYoungGen: 2120551K->387013K(2268160K)] 7177928K->5444389K(7860736K), 0.5153031 secs] [Times: user=0.00 sys=9.89, real=0.52 secs] 
2016-07-12T12:00:38.116+0800: 5133.213: Total time for which application threads were stopped: 0.5157529 seconds, Stopping threads took: 0.0000427 seconds 
2016-07-12T12:00:40.116+0800: 5135.213: Total time for which application threads were stopped: 0.0003171 seconds, Stopping threads took: 0.0001000 seconds 
2016-07-12T12:00:40.419+0800: 5135.516: [GC (Allocation Failure) 
Desired survivor size 599785472 bytes, new threshold 2 (max 15) 
[PSYoungGen: 2240965K->471033K(2324992K)] 7298341K->5633517K(7917568K), 0.3621433 secs] [Times: user=0.12 sys=7.11, real=0.36 secs] 
2016-07-12T12:00:40.781+0800: 5135.878: Total time for which application threads were stopped: 0.3626080 seconds, Stopping threads took: 0.0000429 seconds 
2016-07-12T12:00:41.781+0800: 5136.879: Total time for which application threads were stopped: 0.0003301 seconds, Stopping threads took: 0.0000947 seconds 
2016-07-12T12:00:43.108+0800: 5138.205: [GC (Allocation Failure) 
Desired survivor size 620756992 bytes, new threshold 3 (max 15) 
[PSYoungGen: 2324985K->378481K(2054656K)] 7487469K->5831048K(7647232K), 0.2593685 secs] [Times: user=0.66 sys=4.96, real=0.26 secs] 
2016-07-12T12:00:43.368+0800: 5138.465: [Full GC (Ergonomics) [PSYoungGen: 378481K->0K(2054656K)] [ParOldGen: 5452566K->4713601K(5592576K)] 5831048K->4713601K(7647232K), [Metaspace: 44635K->44635K(1089536K)], 4.3137405 secs] [Times: user=9.78 sys=74.53, real=4.31 secs] 
2016-07-12T12:00:47.682+0800: 5142.779: Total time for which application threads were stopped: 4.5736603 seconds, Stopping threads took: 0.0000449 seconds 
2016-07-12T12:00:47.682+0800: 5142.779: Total time for which application threads were stopped: 0.0002430 seconds, Stopping threads took: 0.0000856 seconds 
2016-07-12T12:00:49.954+0800: 5145.052: [GC (Allocation Failure) 
Desired survivor size 597688320 bytes, new threshold 4 (max 15) 
[PSYoungGen: 1583616K->161266K(2189824K)] 6297217K->4874867K(7782400K), 0.0388138 secs] [Times: user=0.00 sys=0.84, real=0.04 secs] 
2016-07-12T12:00:49.993+0800: 5145.091: Total time for which application threads were stopped: 0.0392926 seconds, Stopping threads took: 0.0000449 seconds 
2016-07-12T12:00:51.903+0800: 5147.000: [GC (Allocation Failure) 
Desired survivor size 596115456 bytes, new threshold 5 (max 15) 
[PSYoungGen: 1744882K->324587K(2213888K)] 6458483K->5038189K(7806464K), 0.0334029 secs] [Times: user=0.69 sys=0.03, real=0.04 secs] 
2016-07-12T12:00:51.936+0800: 5147.034: Total time for which application threads were stopped: 0.0338707 seconds, Stopping threads took: 0.0000404 seconds 
2016-07-12T12:00:53.942+0800: 5149.039: [GC (Allocation Failure) 
Desired survivor size 654835712 bytes, new threshold 6 (max 15) 
[PSYoungGen: 1954795K->490438K(2120704K)] 6668397K->5204039K(7713280K), 0.0441762 secs] [Times: user=0.95 sys=0.02, real=0.05 secs] 
2016-07-12T12:00:53.986+0800: 5149.083: Total time for which application threads were stopped: 0.0446174 seconds, Stopping threads took: 0.0000456 seconds 
2016-07-12T12:00:56.102+0800: 5151.199: [GC (Allocation Failure) 
Desired survivor size 763887616 bytes, new threshold 5 (max 15) 
[PSYoungGen: 2120646K->639467K(1943552K)] 6834247K->5370280K(7536128K), 0.1124828 secs] [Times: user=1.07 sys=1.30, real=0.11 secs] 
2016-07-12T12:00:56.214+0800: 5151.312: Total time for which application threads were stopped: 0.1129348 seconds, Stopping threads took: 0.0000396 seconds 
2016-07-12T12:00:57.784+0800: 5152.881: [GC (Allocation Failure) 
Desired survivor size 895483904 bytes, new threshold 4 (max 15) 
[PSYoungGen: 1943531K->745977K(2050048K)] 6674344K->5504073K(7642624K), 0.0971717 secs] [Times: user=1.20 sys=0.67, real=0.10 secs] 
2016-07-12T12:00:57.881+0800: 5152.979: Total time for which application threads were stopped: 0.0977363 seconds, Stopping threads took: 0.0000941 seconds 
2016-07-12T12:00:59.406+0800: 5154.504: [GC (Allocation Failure) 
Desired survivor size 935329792 bytes, new threshold 5 (max 15) 
[PSYoungGen: 2050041K->599188K(1715200K)] 6808137K->5647517K(7307776K), 0.3651465 secs] [Times: user=0.98 sys=5.88, real=0.37 secs] 
2016-07-12T12:00:59.772+0800: 5154.869: Total time for which application threads were stopped: 0.3656089 seconds, Stopping threads took: 0.0000479 seconds 
2016-07-12T12:01:00.968+0800: 5156.066: [GC (Allocation Failure) 
Desired survivor size 954204160 bytes, new threshold 4 (max 15) 
[PSYoungGen: 1568404K->697830K(1667072K)] 6616733K->5746159K(7259648K), 0.0978955 secs] [Times: user=1.91 sys=0.04, real=0.09 secs] 
2016-07-12T12:01:01.066+0800: 5156.164: Total time for which application threads were stopped: 0.0983759 seconds, Stopping threads took: 0.0000482 seconds 
2016-07-12T12:01:02.189+0800: 5157.287: [GC (Allocation Failure) 
Desired survivor size 954204160 bytes, new threshold 3 (max 15) 
[PSYoungGen: 1667046K->465454K(1864192K)] 6715375K->5855655K(7456768K), 0.1261993 secs] [Times: user=2.41 sys=0.29, real=0.12 secs] 
2016-07-12T12:01:02.316+0800: 5157.413: [Full GC (Ergonomics) [PSYoungGen: 465454K->65236K(1864192K)] [ParOldGen: 5390200K->5592328K(5592576K)] 5855655K->5657564K(7456768K), [Metaspace: 44635K->44635K(1089536K)], 3.2729437 secs] [Times: user=12.34 sys=57.11, real=3.28 secs] 
2016-07-12T12:01:05.589+0800: 5160.686: Total time for which application threads were stopped: 3.3998619 seconds, Stopping threads took: 0.0000521 seconds 
2016-07-12T12:01:05.589+0800: 5160.686: Total time for which application threads were stopped: 0.0002330 seconds, Stopping threads took: 0.0000949 seconds 
2016-07-12T12:01:05.688+0800: 5160.785: Total time for which application threads were stopped: 0.0002935 seconds, Stopping threads took: 0.0000514 seconds 
Heap 
PSYoungGen  total 1864192K, used 146620K [0x0000000715580000, 0x00000007c0000000, 0x00000007c0000000) 
    eden space 932352K, 8% used [0x0000000715580000,0x000000071a4fa138,0x000000074e400000) 
    from space 931840K, 7% used [0x0000000787200000,0x000000078b1b5290,0x00000007c0000000) 
    to space 931840K, 0% used [0x000000074e400000,0x000000074e400000,0x0000000787200000) 
ParOldGen  total 5592576K, used 5592328K [0x00000005c0000000, 0x0000000715580000, 0x0000000715580000) 
    object space 5592576K, 99% used [0x00000005c0000000,0x00000007155420a8,0x0000000715580000) 
Metaspace  used 44654K, capacity 44990K, committed 45864K, reserved 1089536K 
    class space used 6212K, capacity 6324K, committed 6440K, reserved 1048576K 

新しいエラー:私はそれを修正する方法を知りたいproblem.I'd OOMを呼び出して、アップロードエラーだと思いますこのアップロードエラー?

2016-07-15 11:41:47.307 [shuffle-client-0] ERROR [TransportChannelHandler.java:128] - Connection to  nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong. 
2016-07-15 11:41:47.309 [shuffle-client-0] ERROR [TransportResponseHandler.java:122] - Still have 1 requests outstanding when connection from nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 is closed 
2016-07-15 11:41:47.314 [shuffle-client-0] ERROR [Logging.scala:95] - Error while uploading block input-0-1468553896200 
java.io.IOException: Connection from nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 closed 
at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51] 
2016-07-15T11:41:47.316+0800: 2176.487: Total time for which application threads were stopped: 0.0002632 seconds, Stopping threads took: 0.0000521 seconds 
2016-07-15 11:41:47.316 [Thread-5] WARN [Logging.scala:91] - Failed to replicate input-0-1468553896200 to BlockManagerId(2, nmg01-taihang-d10207.nmg01.baidu.com, 30456), failure #0 
java.io.IOException: Connection from nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 closed 
at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT] 
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_51] 
2016-07-15T11:41:48.316+0800: 2177.487: Total time for which application threads were stopped: 0.0003391 seconds, Stopping threads took: 0.0000979 seconds 
2016-07-15T11:41:51.312+0800: 2180.483: [GC (Allocation Failure) --[PSYoungGen: 2894863K->2894863K(3007488K)] 8299519K->9550273K(9998336K), 0.7462118 secs] [Times: user=9.78 sys=0.02, real=0.74 secs] 
2016-07-15T11:41:52.059+0800: 2181.230: [Full GC (Ergonomics) [PSYoungGen: 2894863K->0K(3007488K)] [ParOldGen: 6655410K->6895736K(6990848K)] 9550273K->6895736K(9998336K), [Metaspace: 44409K->44409K(1087488K)], 0.4061892 secs] [Times: user=7.50 sys=0.01, real=0.41 secs] 
+0

@XBin、エラー情報には*スクリーンショットのみを含めないでください。スクリーンショットのテキストを検索することはできません。通常は読むのがはるかに難しいです。エラーを含めるには、エラーの実際のテキストを含め、テキストとしてコピーし、コードブロックでフォーマットしてください。エラーのスクリーンショットを持っている人が質問を読んでいる人にプレーンテキスト以外の追加情報を提供している場合は、エラーテキストを質問にコピーする*に加えて*を含めます。 – Makyen

+0

[mcve]というコードがないと、この質問はオフトピックになることがあります:デバッグヘルプを求める質問(「**なぜこのコードは機能していませんか?**」)には、以下が含まれている必要があります。エラー*と*•それを再現するのに必要な最短コード**。明確な問題文がない質問は、他の読者にとって有用ではありません。参照:* [mcve]、[ここではどのような話題を聞くことができますか](http://stackoverflow.com/help/on-topic)、[どのように良い質問をしますか](http ://stackoverflow.com/help/how-to-ask)。つまり、問題を完全に*再現するのに十分なコードを意味します。 – Makyen

+0

ありがとう、私はコードとログファイルを添付しています。 –

答えて

0

コードに構造上の誤りがあるようです。

} else if (os.matches("tizen(.*)")) { 
    os = "tizen" 

ブロックを開きますが、それを "ブロックを閉じない:(掲載され、それは構造を反映するために再インデントする)あなたのコードを見ての過程で、私はあなたの最後のelse if文がことがわかりましたすべきだ "。代わりに、ブロックが実際に終了している:

} catch { 

完全なコードを、意図していた(と-インデント再)表示されているとおりである:

val conf = new SparkConf().setAppName(args(8)) 
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
conf.set("spark.streaming.stopGracefullyOnShutdown", "true") 
conf.set("spark.streaming.backpressure.enabled","true") 
conf.set("spark.speculation","true") 
val sc = new SparkContext(conf) 
val ssc = new StreamingContext(sc, Seconds(args(7).toInt)) 
val bigPipeStreams = (1 to args(3).toInt).map{ 
    i => ssc.networkStream(
     new MyBigpipeLogagentReceiver(args(0),args(1),args(2),i,args(4),args(5),args(6).toInt) 
    ) 
} 
val lines = ssc.union(bigPipeStreams) 
def deserializePbData(value: String) : String = { 

    if (null == value || value.isEmpty) { 
     return "" 
    } 
    var cuid = "" 
    var os = "" 
    var channel = "" 
    var sv = "" 
    var resid = "" 
    var appid = "" 
    var prod = "" 
    try { //if exception,useless data,just drop it 
     val timeStrIndex = value.indexOf(",\"time_str\"") 
     var strAfterTruncation = "" 
     if (-1 != timeStrIndex) { 
      strAfterTruncation = value.substring(0,timeStrIndex) + "}" 
     } else { 
      strAfterTruncation = value 
     } 
     val jsonData = JSONObject.fromObject(strAfterTruncation) 
     //val jsonData = value.getAsJsonArray() 
     val binBody = jsonData.getString("bin_body") 
     val pbData = binBody.substring(1,binBody.length()-1).split(",").foldLeft(ArrayBuffer.empty[Byte])((b,a) => b +java.lang.Byte.parseByte(a)).drop(8).toArray 
     Lighttpd.lighttpd_log.parseFrom(pbData).getRequest().getUrl().getUrlFields().getAutokvList().asScala.foreach(a => 
      a.getKey() match { 
       case "cuid" => cuid += a.getValue() 
       case "os" => os += a.getValue() 
       case "channel" => channel += a.getValue() 
       case "sv" => sv += a.getValue() 
       case "resid" => resid += a.getValue() 
       case "appid" => appid += a.getValue() 
       case "prod" => prod += a.getValue() 
       case _ => null 
      } 
     ) 
     val decodeCuid = URLDecoder.decode(cuid, "UTF-8") 
     os = os.toLowerCase() 
     if (os.matches("android(.*)")) { 
      os = "android" 
     } else if (os.matches("iphone(.*)")) { 
      os = "iphone" 
     } else if (os.matches("ipad(.*)")) { 
      os = "ipad" 
     } else if (os.matches("s60(.*)")) { 
      os = "symbian" 
     } else if (os.matches("wp7(.*)")) { 
      os = "wp7" 
     } else if (os.matches("wp(.*)")) { 
      os = "wp" 
     } else if (os.matches("tizen(.*)")) { 
      os = "tizen" 
     } 

     val ifHasLogid = Lighttpd.lighttpd_log.parseFrom(pbData).hasLogid() 
     val time = Lighttpd.lighttpd_log.parseFrom(pbData).getTime() 
     if (ifHasLogid) { 
      val logid = Lighttpd.lighttpd_log.parseFrom(pbData).getLogid() 
      if (logid.isEmpty || logid.toString().equals("-") || !resid.toString().equals("01") || channel.isEmpty |!appid.isEmpty || !prod.isEmpty) { 
       "" 
      } else { 
       decodeCuid + "\001" + os + "\001" + channel + "\001" + sv + "\001" + "1" + "\001" + "1" + "\001" + time + "\n" 
      } 
     } else { 
      "" 
     } 
    } catch { 
     case _:Throwable => "" 
    } 
} 
lines.map(parseData).print() 

私は機能性のためにあなたのコードをチェックしていません。これは、あなたが投稿したコードを非常に簡単に見ているときに目立った構文/構造上の問題です。

+0

申し訳ありませんが質問に回答してください。スタックフローの最初の質問。 –

+0

ブロックを閉じないと、コピーエラーです。私のコードはそれを閉じました。 –

関連する問題