私はRDDからの入力を受け取り、drlファイルからの読み込みルールをいくつか実行するsparkプログラムに取り組んでいます。SparkとDroolsの統合(drlファイルからルールを読む)
私は私が働いていないということです、それは私に与え理由見当もつかないオブジェクトのHzの属性が0であるところはどこでも、それは1によってカウンタ属性をインクリメントする必要があることをルールを作ったDRLファイルで
ストリーム内のすべてのデータについて0の出力を返します(はい、hz属性が0のデータがあり、yesの場合、すべての属性を出力してカウンタが0であることも確認できます)
私はKieSessionFactory Gitのハブプロジェクトで見つけたクラスhttps://github.com/mganta/sprue/blob/master/src/main/java/com/cloudera/sprue/KieSessionFactory.java
しかし、私はこの部分が問題は、drlファイルからのみ読み取り、ルールを適用することです。以下
は私のScalaのコードである:(私は問題があると思う部分をマークしたが、最初のDRLファイルをご覧くださいしている)package com.streams.Scala_Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.{ DStream, InputDStream, ConstantInputDStream }
import org.apache.spark.streaming.kafka.v09.KafkaUtils
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.sql.functions.avg
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka.producer._
import org.apache.kafka.common.serialization.{ Deserializer, Serializer }
import org.apache.kafka.common.serialization.StringSerializer
import org.kie.api.runtime.StatelessKieSession
//import KieSessionFactory.getKieSession;
//import Sensor
object scala_consumer extends Serializable {
// schema for sensor data
class Sensor(resid_1: String, date_1: String, time_1: String, hz_1: Double, disp_1: Double, flo_1: Double, sedPPM_1: Double, psi_1: Double, chlPPM_1: Double, counter_1: Int) extends Serializable
{
var resid = resid_1
var date = date_1
var time = time_1
var hz = hz_1
var disp = disp_1
var flo = flo_1
var sedPPM = sedPPM_1
var psi = psi_1
var chlPPM = chlPPM_1
var counter = counter_1
def IncrementCounter (param: Int) =
{
counter = counter + param
}
}
// function to parse line of sensor data into Sensor class
def parseSensor(str: String): Sensor = {
val p = str.split(",")
//println("printing p: " + p)
new Sensor(p(0), p(1), p(2), p(3).toDouble, p(4).toDouble, p(5).toDouble, p(6).toDouble, p(7).toDouble, p(8).toDouble, 0)
}
var counter = 0
val timeout = 10 // Terminate after N seconds
val batchSeconds = 2 // Size of batch intervals
def main(args: Array[String]): Unit = {
val brokers = "maprdemo:9092" // not needed for MapR Streams, needed for Kafka
val groupId = "testgroup"
val offsetReset = "latest"
val batchInterval = "2"
val pollTimeout = "1000"
val topics = "/user/vipulrajan/streaming/original:sensor"
val topica = "/user/vipulrajan/streaming/fail:test"
val xlsFileName = "./src/main/Rules.drl"
val sparkConf = new SparkConf().setAppName("SensorStream").setMaster("local[1]").set("spark.testing.memory", "536870912")
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.streaming.receiver.maxRate", Integer.toString(2000000))
.set("spark.streaming.kafka.maxRatePerPartition", Integer.toString(2000000));
val ssc = new StreamingContext(sparkConf, Seconds(batchInterval.toInt))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
"spark.kafka.poll.time" -> pollTimeout
)
val producerConf = new ProducerConf(
bootstrapServers = brokers.split(",").toList
)
val messages = KafkaUtils.createDirectStream[String, String](ssc, kafkaParams, topicsSet)
val values: DStream[String] = messages.map(_._2)
println("message values received")
//values.print(10)
///////////*************************PART THAT COULD BE CAUSING A PROBLEM**************************/////////////
values.foreachRDD(x => try{
print("did 1\n") //markers for manual and minor debugging
val myData = x.mapPartitions(s => {s.map(sens => {parseSensor(sens)})})
//myData.collect().foreach(println)
//println(youData.date)
print("did 2\n")
val evalData = myData.mapPartitions(s => {
val ksession = KieSessionFactory.getKieSession(xlsFileName)
val retData = s.map(sens => {ksession.execute(sens); sens;})
retData
})
evalData.foreach(t => {println(t.counter)})
print("did 3\n")
}
catch{case e1: ArrayIndexOutOfBoundsException => println("exception in line ")})
///////////*************************PART THAT COULD BE CAUSING A PROBLEM**************************/////////////
println("filtered alert messages ")
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()
}
}
DRLファイル
package droolsexample
import com.streams.Scala_Consumer.Sensor;
import scala.com.streams.Scala_Consumer.Sensor; //imported because my rules file lies in the src/main folder
//and code lies in src/main/scala
// declare any global variables here
dialect "java"
rule "Counter Incrementer"
when
sens : Sensor (hz == 0)
then
sens.IncrementCounter(1);
end
私はdrlファイルの代わりにxlsファイルを使用しようとしましたが、私はjavaクラスを作成しようとしました。
6/06/27 16:38:30.462実行者タスク起動ワーカー0 WARN AbstractKieModule:KieBase defaultKieBaseのファイルが見つかりません
カウンタ値を出力すると、すべてゼロになります。誰も救助する?
単純なルール 'rule xの場合、System.out.println(" Hello ");そのファイルには「終了」があり、発火していない場合は、知識ベースを正しく作成していない可能性があります。同じ簡単な名前の2つの異なるクラスをインポートすることは良いアイデアだと思いますか? – laune
私はそれらも個別にインポートしようとしました。また、私はちょうど( "こんにちは")どちらも動作しなかった印刷を試みました。私は申し訳ありませんが、私は知識ベースは何か、私はそれをGoogleになるかもしれませんが、リンクやリソースを持っている場合私は本当にここに投稿することができれば感謝するだろうか? –