4

私はkafkaトピックでスパークストリーミングを使用しています。トピックは5つのパーティションで作成されます。私のすべてのメッセージはtablenameをキーとしてカフカのトピックに公開されています。 これを仮定すると、そのテーブルのすべてのメッセージは同じパーティションに移動する必要があります。 しかし、私は火花のログに同じテーブルのメッセージは、エグゼキュータのノード1に行くと時々executorのノード2に行くことに注意してください。KafkaトピックパーティションとSparkエグゼキュータマッピング

私は、次のコマンドを使用して、糸クラスタモードでコードを実行しています:

spark-submit --name DataProcessor --master yarn-cluster --files /opt/ETL_JAR/executor-log4j-spark.xml,/opt/ETL_JAR/driver-log4j-spark.xml,/opt/ETL_JAR/application.properties --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=driver-log4j-spark.xml" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=executor-log4j-spark.xml" --class com.test.DataProcessor /opt/ETL_JAR/etl-all-1.0.jar 

と、この提出は1つのドライバは、ノード1とノード2のノード-1および2執行に言うことができます作成​​されます。

私は、ノード1とノード2のエグゼキュータが同じパーティションを読み込まないようにします。しかし、これは起こっている

消費者グループを指定するために以下の設定を試みましたが、違いはありません。

kafkaParams.put("group.id", "app1"); 

これは、我々はcreateDirectStreamメソッドを使用して、ストリームを作成する方法を *未飼育係を介して行われます。

HashMap<String, String> kafkaParams = new HashMap<String, String>(); 
    kafkaParams.put("metadata.broker.list", brokers); 
    kafkaParams.put("auto.offset.reset", "largest"); 
    kafkaParams.put("group.id", "app1"); 

     JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
       jssc, 
       String.class, 
       String.class, 
       StringDecoder.class, 
       StringDecoder.class, 
       kafkaParams, 
       topicsSet 
     ); 

完全なコード:

それはカフカのパーティションに送信されたメッセージは、同じスパークパーティションに着陸する予定正しい前提だ DirectStreamのアプローチを使用して
import java.io.Serializable; 
import java.util.Arrays; 
import java.util.HashMap; 
import java.util.HashSet; 
import java.util.Iterator; 

import org.apache.commons.lang3.StringUtils; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.VoidFunction; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; 
import org.apache.spark.streaming.kafka.KafkaUtils; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import kafka.serializer.StringDecoder; 
import scala.Tuple2; 

public class DataProcessor2 implements Serializable { 
    private static final long serialVersionUID = 3071125481526170241L; 

    private static Logger log = LoggerFactory.getLogger("DataProcessor"); 

    public static void main(String[] args) { 
     final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR); 
     DataProcessorContextFactory3 factory = new DataProcessorContextFactory3(); 
     JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(sparkCheckPointDir, factory); 

     // Start the process 
     jssc.start(); 
     jssc.awaitTermination(); 
    } 

} 

class DataProcessorContextFactory3 implements JavaStreamingContextFactory, Serializable { 
    private static final long serialVersionUID = 6070911284191531450L; 

    private static Logger logger = LoggerFactory.getLogger(DataProcessorContextFactory.class); 

    DataProcessorContextFactory3() { 
    } 

    @Override 
    public JavaStreamingContext create() { 
     logger.debug("creating new context..!"); 

     final String brokers = ApplicationProperties.getProperty(Consts.KAFKA_BROKERS_NAME); 
     final String topic = ApplicationProperties.getProperty(Consts.KAFKA_TOPIC_NAME); 
     final String app = "app1"; 
     final String offset = ApplicationProperties.getProperty(Consts.KAFKA_CONSUMER_OFFSET, "largest"); 

     logger.debug("Data processing configuration. brokers={}, topic={}, app={}, offset={}", brokers, topic, app, 
       offset); 
     if (StringUtils.isBlank(brokers) || StringUtils.isBlank(topic) || StringUtils.isBlank(app)) { 
      System.err.println("Usage: DataProcessor <brokers> <topic>\n" + Consts.KAFKA_BROKERS_NAME 
        + " is a list of one or more Kafka brokers separated by comma\n" + Consts.KAFKA_TOPIC_NAME 
        + " is a kafka topic to consume from \n\n\n"); 
      System.exit(1); 
     } 
     final String majorVersion = "1.0"; 
     final String minorVersion = "3"; 
     final String version = majorVersion + "." + minorVersion; 
     final String applicationName = "DataProcessor-" + topic + "-" + version; 
     // for dev environment 
     SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName(applicationName); 
     // for cluster environment 
     //SparkConf sparkConf = new SparkConf().setAppName(applicationName); 
     final long sparkBatchDuration = Long 
       .valueOf(ApplicationProperties.getProperty(Consts.SPARK_BATCH_DURATION, "10")); 

     final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR); 

     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(sparkBatchDuration)); 
     logger.debug("setting checkpoint directory={}", sparkCheckPointDir); 
     jssc.checkpoint(sparkCheckPointDir); 

     HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topic.split(","))); 

     HashMap<String, String> kafkaParams = new HashMap<String, String>(); 
     kafkaParams.put("metadata.broker.list", brokers); 
     kafkaParams.put("auto.offset.reset", offset); 
     kafkaParams.put("group.id", "app1"); 

//   @formatter:off 
      JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
        jssc, 
        String.class, 
        String.class, 
        StringDecoder.class, 
        StringDecoder.class, 
        kafkaParams, 
        topicsSet 
      ); 
//   @formatter:on 
     processRDD(messages, app); 
     return jssc; 
    } 

    private void processRDD(JavaPairInputDStream<String, String> messages, final String app) { 
     JavaDStream<MsgStruct> rdd = messages.map(new MessageProcessFunction()); 

     rdd.foreachRDD(new Function<JavaRDD<MsgStruct>, Void>() { 

      private static final long serialVersionUID = 250647626267731218L; 

      @Override 
      public Void call(JavaRDD<MsgStruct> currentRdd) throws Exception { 
       if (!currentRdd.isEmpty()) { 
        logger.debug("Receive RDD. Create JobDispatcherFunction at HOST={}", FunctionUtil.getHostName()); 
        currentRdd.foreachPartition(new VoidFunction<Iterator<MsgStruct>>() { 

         @Override 
         public void call(Iterator<MsgStruct> arg0) throws Exception { 
          while(arg0.hasNext()){ 
           System.out.println(arg0.next().toString()); 
          } 
         } 
        }); 
       } else { 
        logger.debug("Current RDD is empty."); 
       } 
       return null; 
      } 
     }); 
    } 
    public static class MessageProcessFunction implements Function<Tuple2<String, String>, MsgStruct> { 
     @Override 
     public MsgStruct call(Tuple2<String, String> data) throws Exception { 
      String message = data._2(); 
      System.out.println("message:"+message); 
      return MsgStruct.parse(message); 
     } 

    } 
    public static class MsgStruct implements Serializable{ 
     private String message; 
     public static MsgStruct parse(String msg){ 
      MsgStruct m = new MsgStruct(); 
      m.message = msg; 
      return m; 
     } 
     public String toString(){ 
      return "content inside="+message; 
     } 
    } 

} 

答えて

3

各Sparkパーティションが毎回同じSparkワーカーによって処理されることは想定できません。各バッチ間隔で、各パーティションのOffsetRangeごとにSparkタスクが作成され、処理のためにクラスタに送信され、使用可能なワーカーに着陸します。

パーティションのローカリティを探しています。 partition locality that the direct kafka consumer supportsは、SparkとKafkaの配備が同じ場所にある場合に処理されるオフセット範囲を含むkafkaホストです。それは私が頻繁に見ていない展開トポロジです。

ホストのローカリティを設定する必要がある場合は、Apache SamzaまたはKafka Streamsを参照してください。

3

Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)によれば、パーティションの明示的なマッピングをホストに指定することができます。

2つのホスト(h1とh2)があり、カフカのトピックtopic-nameに3つのパーティションがあるとします。次の重要なコードは、指定されたパーティションをJavaのホストにマップする方法を示しています。

Map<TopicPartition, String> partitionMapToHost = new HashMap<>(); 
// partition 0 -> h1, partition 1 and 2 -> h2 
partitionMapToHost.put(new TopicPartition("topic-name", 0), "h1"); 
partitionMapToHost.put(new TopicPartition("topic-name", 1), "h2"); 
partitionMapToHost.put(new TopicPartition("topic-name", 2), "h2"); 
List<String> topicCollection = Arrays.asList("topic-name"); 
Map<String, Object> kafkaParams = new HasMap<>(); 
kafkaParams.put("bootstrap.servers", "10.0.0.2:9092,10.0.0.3:9092"); 
kafkaParams.put("group.id", "group-id-name"); 
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
JavaInputDStream<ConsumerRecord<String, String>> records = KafkaUtils.createDirectStream(jssc, 
    LocationStrategies.PreferFixed(partitionMapToHost), // PreferFixed is the key 
    ConsumerStrategies.Subscribe(topicCollection, kafkaParams)); 

また、均等に利用できる執行全体のパーティションを配布LocationStrategies.PreferConsistent()を使用して、指定されたパーティションのみが指定されたエグゼキュータによって消費されることを保証することができます。

関連する問題