2016-06-27 11 views
0

私はRDDからの入力を受け取り、drlファイルからの読み込みルールをいくつか実行するsparkプログラムに取り組んでいます。SparkとDroolsの統合(drlファイルからルールを読む)

私は私が働いていないということです、それは私に与え理由見当もつかないオブジェクトのHzの属性が0であるところはどこでも、それは1

によってカウンタ属性をインクリメントする必要があることをルールを作ったDRLファイルで

ストリーム内のすべてのデータについて0の出力を返します(はい、hz属性が0のデータがあり、yesの場合、すべての属性を出力してカウンタが0であることも確認できます)

私はKieSessionFactory Gitのハブプロジェクトで見つけたクラスhttps://github.com/mganta/sprue/blob/master/src/main/java/com/cloudera/sprue/KieSessionFactory.java

しかし、私はこの部分が問題は、drlファイルからのみ読み取り、ルールを適用することです。以下

は私のScalaのコードである:(私は問題があると思う部分をマークしたが、最初のDRLファイルをご覧くださいしている)

package com.streams.Scala_Consumer 

import org.apache.kafka.clients.consumer.ConsumerConfig 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.{ SparkConf, SparkContext } 
import org.apache.spark.SparkContext._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.dstream.{ DStream, InputDStream, ConstantInputDStream } 
import org.apache.spark.streaming.kafka.v09.KafkaUtils 
import org.apache.spark.streaming.{ Seconds, StreamingContext } 
import org.apache.spark.sql.functions.avg 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.streaming.kafka.producer._ 
import org.apache.kafka.common.serialization.{ Deserializer, Serializer } 
import org.apache.kafka.common.serialization.StringSerializer 
import org.kie.api.runtime.StatelessKieSession 
//import KieSessionFactory.getKieSession; 
//import Sensor 

object scala_consumer extends Serializable { 

// schema for sensor data 
class Sensor(resid_1: String, date_1: String, time_1: String, hz_1: Double, disp_1: Double, flo_1: Double, sedPPM_1: Double, psi_1: Double, chlPPM_1: Double, counter_1: Int) extends Serializable 
{ 
var resid = resid_1 
var date = date_1 
var time = time_1 
var hz = hz_1 
var disp = disp_1 
var flo = flo_1 
var sedPPM = sedPPM_1 
var psi = psi_1 
var chlPPM = chlPPM_1 
var counter = counter_1 

def IncrementCounter (param: Int) = 
{ 
    counter = counter + param 
} 
} 

// function to parse line of sensor data into Sensor class 
def parseSensor(str: String): Sensor = { 
    val p = str.split(",") 
    //println("printing p: " + p) 
    new Sensor(p(0), p(1), p(2), p(3).toDouble, p(4).toDouble, p(5).toDouble, p(6).toDouble, p(7).toDouble, p(8).toDouble, 0) 
} 

var counter = 0 
val timeout = 10 // Terminate after N seconds 
val batchSeconds = 2 // Size of batch intervals 

def main(args: Array[String]): Unit = { 

    val brokers = "maprdemo:9092" // not needed for MapR Streams, needed for Kafka 
    val groupId = "testgroup" 
    val offsetReset = "latest" 
    val batchInterval = "2" 
    val pollTimeout = "1000" 
    val topics = "/user/vipulrajan/streaming/original:sensor" 
    val topica = "/user/vipulrajan/streaming/fail:test" 
    val xlsFileName = "./src/main/Rules.drl" 

    val sparkConf = new SparkConf().setAppName("SensorStream").setMaster("local[1]").set("spark.testing.memory", "536870912") 
                    .set("spark.streaming.backpressure.enabled", "true") 
            .set("spark.streaming.receiver.maxRate", Integer.toString(2000000)) 
            .set("spark.streaming.kafka.maxRatePerPartition", Integer.toString(2000000)); 

    val ssc = new StreamingContext(sparkConf, Seconds(batchInterval.toInt)) 

    // Create direct kafka stream with brokers and topics 
    val topicsSet = topics.split(",").toSet 
    val kafkaParams = Map[String, String](
     ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, 
     ConsumerConfig.GROUP_ID_CONFIG -> groupId, 
     ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> 
      "org.apache.kafka.common.serialization.StringDeserializer", 
     ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
      "org.apache.kafka.common.serialization.StringDeserializer", 
     ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset, 
     ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false", 
     "spark.kafka.poll.time" -> pollTimeout 
    ) 

    val producerConf = new ProducerConf(
     bootstrapServers = brokers.split(",").toList 
    ) 

    val messages = KafkaUtils.createDirectStream[String, String](ssc, kafkaParams, topicsSet) 

    val values: DStream[String] = messages.map(_._2) 
    println("message values received") 
    //values.print(10) 
///////////*************************PART THAT COULD BE CAUSING A PROBLEM**************************///////////// 
    values.foreachRDD(x => try{ 
           print("did 1\n")  //markers for manual and minor debugging 
           val myData = x.mapPartitions(s => {s.map(sens => {parseSensor(sens)})}) 
           //myData.collect().foreach(println) 
           //println(youData.date) 
           print("did 2\n") 
           val evalData = myData.mapPartitions(s => { 
           val ksession = KieSessionFactory.getKieSession(xlsFileName) 
           val retData = s.map(sens => {ksession.execute(sens); sens;}) 
           retData 
           }) 
           evalData.foreach(t => {println(t.counter)}) 
           print("did 3\n") 
           } 

    catch{case e1: ArrayIndexOutOfBoundsException => println("exception in line ")}) 
///////////*************************PART THAT COULD BE CAUSING A PROBLEM**************************///////////// 
    println("filtered alert messages ") 

    // Start the computation 
    ssc.start() 
    // Wait for the computation to terminate 
    ssc.awaitTermination() 

} 
} 

DRLファイル

package droolsexample 

import com.streams.Scala_Consumer.Sensor; 
import scala.com.streams.Scala_Consumer.Sensor; //imported because my rules file lies in the src/main folder 
              //and code lies in src/main/scala 

// declare any global variables here 
dialect "java" 
rule "Counter Incrementer" 

when 
    sens : Sensor (hz == 0) 

then 
    sens.IncrementCounter(1); 
end 

私はdrlファイルの代わりにxlsファイルを使用しようとしましたが、私はjavaクラスを作成しようとしました。

6/06/27 16:38:30.462実行者タスク起動ワーカー0 WARN AbstractKieModule:KieBase defaultKieBaseのファイルが見つかりません

カウンタ値を出力すると、すべてゼロになります。誰も救助する?

+0

単純なルール 'rule xの場合、System.out.println(" Hello ");そのファイルには「終了」があり、発火していない場合は、知識ベースを正しく作成していない可能性があります。同じ簡単な名前の2つの異なるクラスをインポートすることは良いアイデアだと思いますか? – laune

+0

私はそれらも個別にインポートしようとしました。また、私はちょうど( "こんにちは")どちらも動作しなかった印刷を試みました。私は申し訳ありませんが、私は知識ベースは何か、私はそれをGoogleになるかもしれませんが、リンクやリソースを持っている場合私は本当にここに投稿することができれば感謝するだろうか? –

答えて

1

あなたがsparkを実行して実行するためにJARを渡すとき、plsはKIEなどの他の依存関係のJARも同じJARに含まれていることを確認してから、Spark-Submitで実行します。

代替は、2つの別々のプロジェクトを持っているあなたは、他のあなたのスパークプログラムをahve 1は、2本の瓶を持って、あなたは以下のようないくつかのこと、それを実行して、あなたのKIEのプロジェクトです:

NOHUP「--conf火花提出しますspark.driver.extraJavaOptions -Dlog4j.configuration = file:/log4j.properties "\ - キューabc \ - マスター糸\ --deploy-mode cluster \ --jars drools-kie-project-0.0。 1-SNAPSHOT.jar - クラスcom.abc.DroolsSparkJob SparkCallingDrools-0.0.1-SNAPSHOT.jar \ -inputfile/user/hive/warehouse/abc/* -output/user/hive/warehouse/drools-Op> app .log &

関連する問題