私はどこでもJVM内から自由にカスタム受信機を起動、停止する方法を発見しました。プロトタイプですが、ストレステストはしていません。レシーバスーパーバイザなどの階層はシャットダウンされませんが、カスタム受信者が再起動されるまでは本質的にアイドルになります。
シングルトンHashMap
を保持するオブジェクトを作成スパークAPI
- の意図を尊重しているようです。カスタムレシーバと希望のステータス(有効または無効)をマップに保持します。
;;;
case class IKodaTextSocketReceiverStatus(receiver:IKodaTextSocketReceiver,enabled:Boolean)
{
}
object IKodaTextSocketReceiver extends Logging
{
val receiverMap:mutable.HashMap[String,IKodaTextSocketReceiverStatus]= mutable.HashMap[String,IKodaTextSocketReceiverStatus]()
def restartReceiver(recId:String):Boolean=
{
if(receiverMap.get(recId).isDefined)
{
logger.info("Found existing receiver")
receiverMap.put(recId,new IKodaTextSocketReceiverStatus(receiverMap.get(recId).get.receiver,true))
receiverMap.get(recId).get.receiver.onStart()
true
}
else
{
false
}
}
}
有効なブール値はどこからでも設定できます。受信機の内側または外側のいずれか。私はreceive
メソッドから設定しました。onStart
メソッドは、受信側の有効なステータスをチェックします。それがfalseの場合、それは新しいスレッドで受け取る方法を再起動していない.....と沈黙があります:)
class IKodaTextSocketReceiver(host: String, port: Int, receiverId:String)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
IKodaTextSocketReceiver.receiverMap.put(receiverId,new IKodaTextSocketReceiverStatus(this,true))
def isEnabled(): Boolean =
{
val bo = IKodaTextSocketReceiver.receiverMap.get(receiverId)
if(bo.isDefined)
{
bo.get.enabled
}
else
{
false
}
}
def onStart() {
// Start the thread that receives data over a connection
if(isEnabled())
{
logger.info("Starting IKodaTextSocketReceiver")
if(!super.isStopped()) {
new Thread("IKodaTextSocketReceiver") {
override def run() {
receive()
}
}.start()
}
else
{
logger.info("Receiver disabled")
}
}
else
{
logger.warn("Restarting after stop set")
}
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
var keepReceiving=true;
try {
logger.info(s"Connecting to $host : $port")
socket = new Socket(host, port)
logger.info(s"Connected to $host : $port")
val reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
if(userInput.contains(StreamingConstants.IKODA_END_STREAM))
{
logger.info(s"${StreamingConstants.IKODA_END_STREAM}: Calling disabling receiver")
IKodaTextSocketReceiver.receiverMap.put(receiverId,new IKodaTextSocketReceiverStatus(this,false))
//stop("Exiting, hopefully with elegance and dignity.")
}
}
reader.close()
socket.close()
logger.info("Stopped receiving")
restart("Trying to connect again",keepReceiving)
} catch {
case e: java.net.ConnectException =>
restart(s"Error connecting to $host : $port"+e.getMessage,keepReceiving)
case t: Throwable =>
restart("Error receiving data"+t.getMessage,keepReceiving)
}
}
は、単にそれが
HashMap
に登録されているかどうかをチェックすることにより、カスタムの受信機を起動し
。そうでない場合は、新しいものを作成します。そうであれば、有効に設定されます。
def doStream(ip: String, port: Int): Unit = {
try {
if(!IKodaTextSocketReceiver.restartReceiver("MyFirstReceiver"))
{
val ssc = getSparkStreamingContext(fieldVariables)
ssc.checkpoint("./ikoda/cp")
logger.info("open stream to " + ip + " " + port)
val ikReceiver = new IKodaTextSocketReceiver(ip, port, "MyFirstReceiver")
val lines: DStream[String] = ssc.receiverStream(ikReceiver)
etc etc etc
悲しいことに、現在非難されています – Jake