java.io.NotSerializableException: io.netty.channel。 (SerializableSerializer.java:41) 〜[storm-core-1.0.1.2.5.0.0-1245.jar:1.0.1.2.5.0.0-1245]でのDefaultChannelHandlerContextは、com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) 〜[kryo-3.0.3.jar :?] at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:113) ) 〜[kryo-3.0.3.jar :?](Kryo.java:628) com.esotericsoftware.kryo.Kryo.writeClassAndObject(英語) 〜[kryo-3.0.3.jar :?] at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) 〜[kryo-3.0.3.jar :?] at com。 (Kryo.java:534) 〜[kryo-3.0.3.jar :?] で[ kryo-3.0.3.jar :?] at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) 〜[storm-core-1.0.1.2.5.0.0-1 245.jar:1.0.1.2.5.0.0-1245] at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) 〜[storm-core-1.0.1.2.5.0.0-1245] .jar:1.0.1.2.5.0.0-1245] at org.apache.storm.daemon.worker $ mk_transfer_fn $ transfer_fn__6723.invoke(worker.clj:192) 〜[storm-core-1.0.1.2.5.0。 0-1245.jar:1.0.1.2.5.0.0-1245] at org.apache.storm.daemon.executor $ start_batch_transfer__GT_worker_handler_BANG_ $ fn__6411.invoke(executor.clj:313) 〜[storm-core-1.0.1.2 .5.0.0-1245.jar:1.0.1.2.5.0.0-1245] at org.apache.storm.disruptor $ clojure_handler $ reify__6005.onEvent(disruptor.clj:40) 〜[storm-core-1.0。 1.2.5.0.0-1245.jar:1.0.1.2.5.0.0-1245] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue。 Javaの:451) 〜[嵐コア-1.0.1.2.5.0.0-1245.jar:1.0.1.2.5.0.0-1245] ... 6もっとによって引き起こさ:java.lang.RuntimeException:java.io.NotSerializableException:java.lang.RuntimeException:によって引き起こさio.netty.channel.DefaultChannelHandlerContext
私は嵐ローカルモードを使用します問題はありませんが、クラスタ上で障害に報告されます。私は嵐自体についてはあまり知識を持っていますが、シリアライズされていない(そのマップに格納されている)ChannelHandlerContextをシリアライズしようようにそれはそう
public class NettySpout extends BaseRichSpout {
private static final long serialVersionUID = 1L;
/**
* colloctor for spout
*/
private SpoutOutputCollector collector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
collector=spoutOutputCollector;
StormServer stormServer=new StormServer();
stormServer.run();
}
@Override
public void nextTuple() {
Values tuple;
try {
while ((tuple = ServerHandler.queue.take()) != null) {
collector.emit(tuple);
}
} catch (Exception e) {
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("value","channl"));
}
public class ServerHandler extends ChannelInboundHandlerAdapter{
private static Logger logger = LogManager.getLogger(ServerHandler.class);
public static LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<Values>();
public static Map<String,ChannelHandlerContext> ctxes;
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
JSONObject message = (JSONObject) msg;
queue.put(new Values(new StreamData(message.toString().getBytes()), new HashMap<>(ctxes)));
}
あなたは正しいです、ChannelHandlerContextシリアル化できません、おそらくこの方法は間違っています、私は他の方法を試してください。あなたの答えをありがとう。 – Tdz