2016-10-11 5 views
0

マルチノードスパーククラスタがあり、マスタが存在するノードに自分のsparkプログラムを送信しています。Spark ClusterのスレーブノードでSystem.envと値が読み取られない

ジョブがスレーブノードにサブミットされるとき、HOSTNAMEパラメータはヌル値を与えています。ここでは、プロパティがnullとして読み込まれている行があります。

System.getenv(HOSTNAME)がスレーブノードから読み取られていません。

 System.out.println("line 76 System.getenv(HOSTNAME)=" + System.getenv("HOSTNAME")); 

AUDIT_USER、AUDIT_PASSWORDも読み込み時にnullです(両方ともプロパティファイルにあります)。

私が1つのノードでジョブをサブミットすると、これらのパラメータに問題はありません。しかし、6ノードのスタンドアロンモードでジョブを送信すると、この問題が発生します。

私は、すべてのノードのプロパティファイルに同じフォルダを作成しました。

ここに私のコードです。 System.envがnullを与えず、プロパティがnullである理由を教えてください。

package com.fb.cpd.myapp; 

import java.io.Serializable; 
import java.sql.ResultSet; 
import java.sql.SQLException; 
import java.sql.Statement; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Properties; 
import java.util.UUID; 
import java.util.concurrent.Future; 

import org.apache.commons.configuration.ConfigurationConverter; 
import org.apache.commons.configuration.ConfigurationException; 
import org.apache.commons.configuration.PropertiesConfiguration; 
import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy; 
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.Producer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.clients.producer.RecordMetadata; 
import org.apache.log4j.LogManager; 
import org.apache.log4j.Logger; 
import org.apache.spark.TaskContext; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 

import kafka.common.TopicAndPartition; 
import kafka.message.MessageAndMetadata; 
import kafka.serializer.DefaultDecoder; 
import kafka.serializer.StringDecoder; 

public class GenericLogic implements Serializable { 
    /** 
    * 
    */ 
    private static final long serialVersionUID = 1L; 
    private static final Logger logger = LogManager.getLogger(GenericLogic.class); 
    private PropertiesConfiguration props; 
    private Producer<String, String> producer = null; 
    private Future<RecordMetadata> receipt = null; 
    private RecordMetadata receiptInfo = null; 
    private ConnectToRDBMS auditor = null; 
    private ConnectToRDBMS df = null; 

    private static String myId = null; 

    private Map<TopicAndPartition, Long> getOffsets(String topic) throws SQLException { 
     String appName = "myapp"; 
     String TopicName = topic; 
     Map<TopicAndPartition, Long> topicMap = new HashMap<>(); // 
     System.out.println("line 64 before making connection"); 

     try { 
      props = new PropertiesConfiguration("/app/lock/conf/empty.properties"); 
     } catch (ConfigurationException e) { // TODO Auto-generated catch block 
      System.out.println("Line 70"); 
      e.printStackTrace(); 
     } 

     try { 
      System.out.println("line 76 System.getenv(HOSTNAME)=" + System.getenv("HOSTNAME")); 
      auditor = new ConnectToRDBMS(System.getenv("HOSTNAME"), "lockSparkCollector", null, null, null, null, null, 
        0, props.getString("AUDIT_USER"), props.getString("AUDIT_PASSWORD"), 
        props.getString("AUDIT_DB_URL")); 
     } catch (SQLException e) { 
      logger.error("ASSERT: run() ERROR CONNECTING TO AUDIT DB " + e.getMessage()); 
     } 
     System.out.println("line 64 after making connection"); 

     Statement stmt = null; 

     String query = "select va_application, topic_name, partition_id, from_offset,until_offset from lock_spark_offsets where va_application = " 
       + "'" + appName + "'" + " and topic_name= " + "'" + TopicName + "'"; 
     System.out.println("query" + query); 
     System.out.println("before query exection"); 
     try { 
      stmt = auditor.dbConnection.createStatement(); 
      System.out.println("line 81"); 

      ResultSet rs = stmt.executeQuery(query); 
      System.out.println("line 83"); 
      while (rs.next()) { 
       System.out.println("pass 1 of Resultset"); 
       System.out.println("getOffsets=" + topic.trim() + " " + rs.getInt("partition_id") + " " 
         + rs.getString("until_offset") + " " + rs.getString("until_offset")); 
       Integer partition = rs.getInt("partition_id"); 

       TopicAndPartition tp = new TopicAndPartition(topic.trim(), partition); 
       System.out.println("102"); 
       Long.parseLong(rs.getString("until_offset")); 
       topicMap.put(tp, Long.parseLong(rs.getString("until_offset"))); 
       System.out.println("105"); 

      } 
      System.out.println("after populating topic map"); 

     } catch (

     SQLException e) { 
      System.out.println("printing exception"); 
      e.printStackTrace(); 
     } finally { 
      if (stmt != null) { 
       System.out.println("closing statement"); 
       stmt.close(); 
      } 
     } 
     return topicMap; 
    } 

    public void setDefaultProperties() { 
     FileChangedReloadingStrategy strategy = new FileChangedReloadingStrategy(); 
     strategy.setRefreshDelay(10000); 
     System.out.println("Line 45"); 
     // supply the properties file. 
     try { 
      props = new PropertiesConfiguration("/app/lock/conf/empty.properties"); 
     } catch (ConfigurationException e) { 
      // TODO Auto-generated catch block 
      System.out.println("Line 51"); 
      e.printStackTrace(); 
     } 
     props.setReloadingStrategy(strategy); 
     System.out.println("Line 56"); 

     // Producer configs 
     if (!props.containsKey("acks")) { 
      props.setProperty("acks", "1"); 
     } 

     if (!props.containsKey("retries")) { 
      props.setProperty("retries", "1000"); 
     } 

     if (!props.containsKey("compression.type")) { 
      props.setProperty("compression.type", "gzip"); 
     } 

     if (!props.containsKey("request.timeout.ms")) { 
      props.setProperty("request.timeout.ms", "600000"); 
     } 

     if (!props.containsKey("batch.size")) { 
      props.setProperty("batch.size", "32768"); 
     } 

     if (!props.containsKey("buffer.memory")) { 
      props.setProperty("buffer.memory", "134217728"); 
     } 

     if (!props.containsKey("block.on.buffer.full")) { 
      props.setProperty("block.on.buffer.full", "true"); 
     } 

     if (!props.containsKey("SHUTDOWN")) { 
      props.setProperty("SHUTDOWN", "false"); 
     } 

     if (!props.containsKey("producer.topic")) { 
      props.setProperty("producer.topic", "mytopic1"); 
     } 

     Properties producer_props = ConfigurationConverter.getProperties(props); 

     producer_props.setProperty("bootstrap.servers", props.getString("target.bootstrap.servers")); 
     producer_props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     producer_props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // ???? 

     this.producer = new KafkaProducer<String, String>(producer_props); 
     System.out.println("Line 107"); 

    } 

    public void PublishMessages(String st) { 

     try { 
      System.out.println("Line 111"); 
      String key = UUID.randomUUID().toString().replace("-", ""); 
      System.out.println("Started Producing..."); 

      receipt = producer.send(new ProducerRecord<String, String>(props.getString("producer.topic"), key, // Key 
        st)); 
      System.out.println("After Completion of Producing Producing"); 
     } catch (Exception e) { 
      e.printStackTrace(); 
      System.out.println("Exception in PublishMessages "); 
     } 

    } 

    public void DBConnect() { 
     try { 
      auditor = new ConnectToRDBMS(System.getenv("HOSTNAME"), "myapp", props.getString("consumer.topic"), null, 
        null, null, null, 0, props.getString("AUDIT_USER"), props.getString("AUDIT_PASSWORD"), 
        props.getString("AUDIT_DB_URL")); 
     } catch (SQLException e) { 
      logger.error("ASSERT: run() ERROR CONNECTING TO AUDIT DB " + e.getMessage()); 
      return; 
     } 
    } 

    private void writeToDB(Long startTime, Integer partnId, String fromOffset, String untilOffset, Integer count) { 
     this.auditor.audit(startTime, partnId, fromOffset, untilOffset, count); 

    } 

    /** 
    * 
    * @param jsc 
    * @param topicSet 
    * @throws Exception 
    */ 
    public static void main(String[] args) { 
     String topicNames = "MySourceTopic"; 
     GenericLogic ec = new GenericLogic(); 
     Map<TopicAndPartition, Long> topicMap = null; 
     try { 

      topicMap = ec.getOffsets("MySourceTopic"); 

     } catch (SQLException e1) { 
      // TODO Auto-generated catch block 
      e1.printStackTrace(); 
     } 

     boolean clusterMode = false; 

     Integer batchDuration = Integer.parseInt("30000"); 
     JavaSparkContext sparkConf = new JavaSparkContext("abcd.net:7077", "Kafka-Spark-Integration"); 

     sparkConf.getConf().set("spark.local.ip", "lock-dt-a4d.xyz.com"); 
     sparkConf.getConf().set("spark.eventLog.enabled", "false"); 
     sparkConf.getConf().set("spark.shuffle.blockTransferService", "nio"); 

     JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(10000)); 
     Map<String, String> kafkaParams = new HashMap<String, String>(); 
     String pollInterval = "10000"; 
     String zookeeper = "lock-dt-a5d.xyz.com:2181,lock-dt-a6d.xyz.com:2181"; 

     kafkaParams.put("metadata.broker.list", "lock-dt-a5d.xyz.com:9092,lock-dt-a6d.xyz.com:9092"); 
     kafkaParams.put("group.id", "Consumer"); 
     kafkaParams.put("client.id", "Consumer"); 
     kafkaParams.put("zookeeper.connect", zookeeper); 

     JavaInputDStream<byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc, String.class, byte[].class, 
       StringDecoder.class, DefaultDecoder.class, byte[].class, kafkaParams, topicMap, 
       (Function<MessageAndMetadata<String, byte[]>, byte[]>) MessageAndMetadata::message); 

     directKafkaStream.foreachRDD(rdd -> { 
      if (rdd.isEmpty()) { 
       System.out.println("No events polled in last " + pollInterval + " milli seconds"); 
       return; 
      } 

      rdd.foreachPartition(itr -> { 
       Integer partnId = TaskContext.get().partitionId(); 
       Long systime = System.nanoTime(); 
       Map<String, String> hmap = new HashMap<String, String>(); 

       GenericLogic ec2 = new GenericLogic(); 
       ec2.setDefaultProperties(); 
       ec2.DBConnect(); 

       try { 

        while (itr.hasNext()) { 
         System.out.println("232"); 
        } 

       } catch (Exception e) { 
        logger.error(e.getMessage(), e); 
       } 

      }); 
     }); 
     jsc.start(); 
     jsc.awaitTermination(); 
    } 

} 
+1

ここで、構成ファイルuはHOSTNAMEをエクスポートしますか? –

+1

あなたは同じプロパティファイルをworker mach inesも? –

+0

私はすべてのノードにプロパティファイルを持っています。 HOSTNAMEは一般的なものでなければなりません。プロパティファイルで指定する必要はありません。ではない? – AKC

答えて

0

あなたは、私たちは、すべてのノードのOSを教えてくださいすることができますし、マスターノード上で注意することはHOSTNAMEをエクスポートしていることを保証している場合。お使いのOSの詳細をお知らせいただければ、あなたの質問に答えてください。

あなたのコンテキストに関係なく、情報だけに関連している可能性があります。System.getenv( "HOSTNAME")は、すべてのプラットフォーム(UbuntuやMacなど)でホスト名を提供していない可能性があります。

HOSTNAMEをエクスポートしない方が良いでしょう。

注:既に小道具が空でも空でもないことを確認しているとしますか? デバッグしていない場合、プロパティファイルがロードされているかどうかをチェックし、ロードされていれば空のプロパティファイルではないため、ファイルからプロパティをロードしています。

環境変数だけでなく、プロパティも返されない場合は、プロパティファイルまたはその相対位置が別のコンピュータにある可能性があります 別のコンピュータに配置された正確なコピーでない場合は、また、Linuxに適したファイルであることを確認してください(Windowsでは書いたり編集したりLinuxに入れないでください)。

0

私はstart-slaves.shでsalvesを起動しました これは問題です。