マルチノードスパーククラスタがあり、マスタが存在するノードに自分のsparkプログラムを送信しています。Spark ClusterのスレーブノードでSystem.envと値が読み取られない
ジョブがスレーブノードにサブミットされるとき、HOSTNAMEパラメータはヌル値を与えています。ここでは、プロパティがnullとして読み込まれている行があります。
System.getenv(HOSTNAME)がスレーブノードから読み取られていません。
System.out.println("line 76 System.getenv(HOSTNAME)=" + System.getenv("HOSTNAME"));
AUDIT_USER、AUDIT_PASSWORDも読み込み時にnullです(両方ともプロパティファイルにあります)。
私が1つのノードでジョブをサブミットすると、これらのパラメータに問題はありません。しかし、6ノードのスタンドアロンモードでジョブを送信すると、この問題が発生します。
私は、すべてのノードのプロパティファイルに同じフォルダを作成しました。
ここに私のコードです。 System.envがnullを与えず、プロパティがnullである理由を教えてください。
package com.fb.cpd.myapp;
import java.io.Serializable;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Future;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;
public class GenericLogic implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger logger = LogManager.getLogger(GenericLogic.class);
private PropertiesConfiguration props;
private Producer<String, String> producer = null;
private Future<RecordMetadata> receipt = null;
private RecordMetadata receiptInfo = null;
private ConnectToRDBMS auditor = null;
private ConnectToRDBMS df = null;
private static String myId = null;
private Map<TopicAndPartition, Long> getOffsets(String topic) throws SQLException {
String appName = "myapp";
String TopicName = topic;
Map<TopicAndPartition, Long> topicMap = new HashMap<>(); //
System.out.println("line 64 before making connection");
try {
props = new PropertiesConfiguration("/app/lock/conf/empty.properties");
} catch (ConfigurationException e) { // TODO Auto-generated catch block
System.out.println("Line 70");
e.printStackTrace();
}
try {
System.out.println("line 76 System.getenv(HOSTNAME)=" + System.getenv("HOSTNAME"));
auditor = new ConnectToRDBMS(System.getenv("HOSTNAME"), "lockSparkCollector", null, null, null, null, null,
0, props.getString("AUDIT_USER"), props.getString("AUDIT_PASSWORD"),
props.getString("AUDIT_DB_URL"));
} catch (SQLException e) {
logger.error("ASSERT: run() ERROR CONNECTING TO AUDIT DB " + e.getMessage());
}
System.out.println("line 64 after making connection");
Statement stmt = null;
String query = "select va_application, topic_name, partition_id, from_offset,until_offset from lock_spark_offsets where va_application = "
+ "'" + appName + "'" + " and topic_name= " + "'" + TopicName + "'";
System.out.println("query" + query);
System.out.println("before query exection");
try {
stmt = auditor.dbConnection.createStatement();
System.out.println("line 81");
ResultSet rs = stmt.executeQuery(query);
System.out.println("line 83");
while (rs.next()) {
System.out.println("pass 1 of Resultset");
System.out.println("getOffsets=" + topic.trim() + " " + rs.getInt("partition_id") + " "
+ rs.getString("until_offset") + " " + rs.getString("until_offset"));
Integer partition = rs.getInt("partition_id");
TopicAndPartition tp = new TopicAndPartition(topic.trim(), partition);
System.out.println("102");
Long.parseLong(rs.getString("until_offset"));
topicMap.put(tp, Long.parseLong(rs.getString("until_offset")));
System.out.println("105");
}
System.out.println("after populating topic map");
} catch (
SQLException e) {
System.out.println("printing exception");
e.printStackTrace();
} finally {
if (stmt != null) {
System.out.println("closing statement");
stmt.close();
}
}
return topicMap;
}
public void setDefaultProperties() {
FileChangedReloadingStrategy strategy = new FileChangedReloadingStrategy();
strategy.setRefreshDelay(10000);
System.out.println("Line 45");
// supply the properties file.
try {
props = new PropertiesConfiguration("/app/lock/conf/empty.properties");
} catch (ConfigurationException e) {
// TODO Auto-generated catch block
System.out.println("Line 51");
e.printStackTrace();
}
props.setReloadingStrategy(strategy);
System.out.println("Line 56");
// Producer configs
if (!props.containsKey("acks")) {
props.setProperty("acks", "1");
}
if (!props.containsKey("retries")) {
props.setProperty("retries", "1000");
}
if (!props.containsKey("compression.type")) {
props.setProperty("compression.type", "gzip");
}
if (!props.containsKey("request.timeout.ms")) {
props.setProperty("request.timeout.ms", "600000");
}
if (!props.containsKey("batch.size")) {
props.setProperty("batch.size", "32768");
}
if (!props.containsKey("buffer.memory")) {
props.setProperty("buffer.memory", "134217728");
}
if (!props.containsKey("block.on.buffer.full")) {
props.setProperty("block.on.buffer.full", "true");
}
if (!props.containsKey("SHUTDOWN")) {
props.setProperty("SHUTDOWN", "false");
}
if (!props.containsKey("producer.topic")) {
props.setProperty("producer.topic", "mytopic1");
}
Properties producer_props = ConfigurationConverter.getProperties(props);
producer_props.setProperty("bootstrap.servers", props.getString("target.bootstrap.servers"));
producer_props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer_props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // ????
this.producer = new KafkaProducer<String, String>(producer_props);
System.out.println("Line 107");
}
public void PublishMessages(String st) {
try {
System.out.println("Line 111");
String key = UUID.randomUUID().toString().replace("-", "");
System.out.println("Started Producing...");
receipt = producer.send(new ProducerRecord<String, String>(props.getString("producer.topic"), key, // Key
st));
System.out.println("After Completion of Producing Producing");
} catch (Exception e) {
e.printStackTrace();
System.out.println("Exception in PublishMessages ");
}
}
public void DBConnect() {
try {
auditor = new ConnectToRDBMS(System.getenv("HOSTNAME"), "myapp", props.getString("consumer.topic"), null,
null, null, null, 0, props.getString("AUDIT_USER"), props.getString("AUDIT_PASSWORD"),
props.getString("AUDIT_DB_URL"));
} catch (SQLException e) {
logger.error("ASSERT: run() ERROR CONNECTING TO AUDIT DB " + e.getMessage());
return;
}
}
private void writeToDB(Long startTime, Integer partnId, String fromOffset, String untilOffset, Integer count) {
this.auditor.audit(startTime, partnId, fromOffset, untilOffset, count);
}
/**
*
* @param jsc
* @param topicSet
* @throws Exception
*/
public static void main(String[] args) {
String topicNames = "MySourceTopic";
GenericLogic ec = new GenericLogic();
Map<TopicAndPartition, Long> topicMap = null;
try {
topicMap = ec.getOffsets("MySourceTopic");
} catch (SQLException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
boolean clusterMode = false;
Integer batchDuration = Integer.parseInt("30000");
JavaSparkContext sparkConf = new JavaSparkContext("abcd.net:7077", "Kafka-Spark-Integration");
sparkConf.getConf().set("spark.local.ip", "lock-dt-a4d.xyz.com");
sparkConf.getConf().set("spark.eventLog.enabled", "false");
sparkConf.getConf().set("spark.shuffle.blockTransferService", "nio");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(10000));
Map<String, String> kafkaParams = new HashMap<String, String>();
String pollInterval = "10000";
String zookeeper = "lock-dt-a5d.xyz.com:2181,lock-dt-a6d.xyz.com:2181";
kafkaParams.put("metadata.broker.list", "lock-dt-a5d.xyz.com:9092,lock-dt-a6d.xyz.com:9092");
kafkaParams.put("group.id", "Consumer");
kafkaParams.put("client.id", "Consumer");
kafkaParams.put("zookeeper.connect", zookeeper);
JavaInputDStream<byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc, String.class, byte[].class,
StringDecoder.class, DefaultDecoder.class, byte[].class, kafkaParams, topicMap,
(Function<MessageAndMetadata<String, byte[]>, byte[]>) MessageAndMetadata::message);
directKafkaStream.foreachRDD(rdd -> {
if (rdd.isEmpty()) {
System.out.println("No events polled in last " + pollInterval + " milli seconds");
return;
}
rdd.foreachPartition(itr -> {
Integer partnId = TaskContext.get().partitionId();
Long systime = System.nanoTime();
Map<String, String> hmap = new HashMap<String, String>();
GenericLogic ec2 = new GenericLogic();
ec2.setDefaultProperties();
ec2.DBConnect();
try {
while (itr.hasNext()) {
System.out.println("232");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
});
});
jsc.start();
jsc.awaitTermination();
}
}
ここで、構成ファイルuはHOSTNAMEをエクスポートしますか? –
あなたは同じプロパティファイルをworker mach inesも? –
私はすべてのノードにプロパティファイルを持っています。 HOSTNAMEは一般的なものでなければなりません。プロパティファイルで指定する必要はありません。ではない? – AKC