2017-09-26 11 views
7

私は、リソースマネージャとしてYARNを使用し、2ノードでスパークジョブを実行しています。自分の条件が満たされていない場合は、意図的に手順を失敗する必要があるため、次の手順は構成ごとに実行されません。 これを達成するために、私はdynamoDBにログメッセージを挿入した後、カスタム例外をスローします。Spark、EMRでSparkExceptionをスローするときの不正な動作

正常に動作しますが、Dynamoのレコードが2回挿入されています。

以下は私のコードです。

if(<condition>) { 
    <method call to insert in dynamo> 
    throw new SparkException(<msg>); 
    return; 
} 

例外をスローする行を削除すると正常に動作しますが、手順は完了します。

ログメッセージを2度取得せずに、ステップを失敗させる方法を教えてください。

ありがとうございました。あなたのエラー状態がヒットし、二つの異なる執行で処理されたため、

よろしく、 Sorabh

答えて

2

は、おそらくあなたのダイナモメッセージが挿入された理由は二回でした。スパークは、その作業者の間で行われるべき作業を分けており、それらの作業者は知識を共有していません。

スパークステップをFAILにする必要があるかどうかはわかりませんが、sparkダイレクトを直接実行するのではなく、アプリケーションコードでその失敗ケースを追跡することをお勧めします。言い換えれば、エラーを検出してスパーク・ドライバーに戻すコードを書いてから、それを適切な方法で実行してください。

これを行う1つの方法は、アキュムレータを使用して、データの処理中に発生するエラーをカウントすることです。あなたがフィードバックを探している場合は、このアプローチについて

val accum = sc.longAccumulator("Error Counter") 
def doProcessing(a: String, b: String): String = { 
    if(condition) { 
    accum.add(1) 
    null 
    } 
    else { 
    doComputation(a, b) 
    } 
} 
val doProcessingUdf = udf(doProcessing _) 

df = df.withColumn("result", doProcessing($"a", $"b")) 

df.write.format(..).save(..) // Accumulator value not computed until an action occurs! 

if(accum.value > 0) { 
    // An error detected during computation! Do whatever needs to be done. 
    <insert dynamo message here> 
} 

一つの良いところはある:それは(私はスカラ座やデータフレームを仮定しているが、必要に応じてあなたはRDDのおよび/またはのpythonに適応することができます)おおよそ次のようになりますSpark UIでは、実行中のアキュムレータ値を表示することができます。参考までに、アキュムレータに関する文書は次のとおりです。 http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators

関連する問題