2017-07-09 13 views
1

私はスパークストリーミングを始めました。特定のURLから情報を取得するためにURLをオンラインでストリーミングしたい場合は、URLをストリームするためにJavaCustomReceiverを使用しました。スパークストリーミングCustomReceiver不明なホスト例外

これは私が使用しているコード(source

public class JavaCustomReceiver extends Receiver<String> { 

    private static final Pattern SPACE = Pattern.compile(" "); 

    public static void main(String[] args) throws Exception { 

     SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver"); 
     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); 

     JavaReceiverInputDStream<String> lines = ssc.receiverStream(
      new JavaCustomReceiver("http://stream.meetup.com/2/rsvps", 80)); 

     JavaDStream<String> words = lines.flatMap(new 

       FlatMapFunction<String, String>() { 

       @Override 
       public Iterator<String> call(String x) { 
        return Arrays.asList(SPACE.split(x)).iterator(); 
       } 
       }); 

     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
       new PairFunction<String, String, Integer>() { 

       @Override 
       public Tuple2<String, Integer> call(String s) { 
         return new Tuple2<>(s, 1); 
       } 
       }).reduceByKey(new Function2<Integer, Integer, Integer>() { 
       @Override 
       public Integer call(Integer i1, Integer i2) { 
        return i1 + i2; 
       } 
      }); 

    wordCounts.print(); 
    ssc.start(); 
    ssc.awaitTermination(); 
} 

String host = null; 
int port = -1; 

public JavaCustomReceiver(String host_, int port_) { 
    super(StorageLevel.MEMORY_AND_DISK_2()); 
    host = host_; 
    port = port_; 
} 

public void onStart() { 

    new Thread() { 
     @Override 
     public void run() { 
      receive(); 
     } 
    }.start(); 
} 

public void onStop() { 

} 


private void receive() { 
    try { 
     Socket socket = null; 
     BufferedReader reader = null; 
     String userInput = null; 
     try { 
      // connect to the server 
      socket = new Socket(host, port); 
      reader = new BufferedReader(
        new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); 
      // Until stopped or connection broken continue reading 
      while (!isStopped() && (userInput = reader.readLine()) != null) { 
       System.out.println("Received data '" + userInput + "'"); 
       store(userInput); 
      } 
     } finally { 
      Closeables.close(reader, /* swallowIOException = */ true); 
      Closeables.close(socket, /* swallowIOException = */ true); 
     } 

     restart("Trying to connect again"); 
    } catch (ConnectException ce) { 
     // restart if could not connect to server 
     restart("Could not connect", ce); 
    } catch (Throwable t) { 
     restart("Error receiving data", t); 
    } 
} 
    } 

あるしかし、私はこの問題を解決するにはどうすればよいjava.net.UnknownHostExceptionを発行

を取得しておきますか?私が使用しているコードに何が間違っていますか?

+2

参照されたカスタムレシーバーのコードを読んだ後、それは 'host:port'に接続するTCPレシーバーであることは明らかです。URLをとることができるHTTPレシーバーではありません。 HTTPエンドポイントから読み取るようにコードを変更する必要があります。 – maasg

+0

@maasgあなたは正しいです。ソケットの代わりにURLとopenStream()に変更したところ、データが得られました!! –

答えて

1

参照されたカスタム受信者のコードを読んだ後、それはhost:portに接続し、URLを取ることができるHTTP受信者ではないTCP受信者であることは明らかです。 HTTPエンドポイントから読み取るようにコードを変更する必要があります。

関連する問題