2016-05-23 31 views
0

コンテキスト春のバッチカスタム完了ポリシー

我々は外部のものから私たちのDBにローカライズされた国の名前(異なる言語の国名のすなわち翻訳)を複製バッチジョブを持っています。 1つのチャンク(つまり、最初のチャンク - アンドラのすべての翻訳、次のチャンク、U.A.E.のすべての翻訳など)で1つの国のすべてのローカライズされた国名を処理することでした。私たちは、国のために利用できる翻訳の合計数を提供するために、いくつかのOracleの分析関数は+外部データを読み込むためのJdbcCursorItemReaderを使用します。

select country_code, language_code, localized_name, COUNT(1) OVER(PARTITION BY c_lng.country_code) as lng_count 
from EXT_COUNTRY_LNG c_lng 
order by c_lng.countty_code, c_lng.language_code 

ような何か問題

だから塊で、この入力を切断するのは簡単になります。停止チャンクlng_countで指定された行の正確な量を読み取り、次の行で新しい行を開始すると、実際にはそれほど単純ではないように見えます:(

最初に試すものはカスタム完了ポリシーです。問題は、 ItemReaderで読んだ最後の項目にアクセスすることはできません。明示的にそれを読者のコンテキストに置き、ポリシーに戻す必要があります。追加のリーダーの変更/リーダーリスナーの追加が必要なため、好きではありません。さらに、私は、同じアイテムが前後に直列化/逆直列化されているのが好きではありません。そして、私はJobContext/StepContextがそのようなデータのためのよい場所であるように感じません。

我々はこのようなソリューションになってしまうので、最後に...

をそこにも、このようなデータのためのより良い場所のように見えますRepeatContextだが、私はそれが簡単にに取得することができませんでした:

@Bean(name = "localizedCountryNamesStep") 
@JobScope 
public Step insertCountryStep(
     final StepBuilderFactory stepBuilderFactory, 
     final MasterdataCountryNameReader countryNameReader, 
     final MasterdataCountryNameProcessor countryNameProcessor, 
     final MasterdataCountryNameWriter writer) { 
    /* Use the same fixed-commit policy, but update it's chunk size dynamically */ 
    final SimpleCompletionPolicy policy = new SimpleCompletionPolicy(); 
    return stepBuilderFactory.get("localizedCountryNamesStep") 
      .<ExtCountryLng, LocalizedCountryName> chunk(policy) 
      .reader(countryNameReader) 
      .listener(new ItemReadListener<ExtCountryLng>() { 

       @Override 
       public void beforeRead() { 
        // do nothing 
       } 

       @Override 
       public void afterRead(final ExtCountryLng item) { 
        /* Update the cunk size after every read: consequent reads 
        inside the same country = same chunk do nothing since lngCount is always the same there */ 
        policy.setChunkSize(item.getLngCount()); 
       } 

       @Override 
       public void onReadError(final Exception ex) { 
        // do nothing 
       } 
      }) 
      .processor(countryNameProcessor) 
      .writer(writer) 
      .faultTolerant() 
      .skip(RuntimeException.class) 
      .skipLimit(Integer.MAX_VALUE) // Batch does not support unlimited skip 
      .retryLimit(0) // this solution disables only retry, but not recover 
      .build(); 
} 

動作していますが、コードを最小限変更する必要がありますが、まだまだ私にとっては醜いです。だから私は、すべての必要な情報が既にItemReaderで利用可能なときにバネバッチでダイナミックチャンクサイズを実行する別のエレガントな方法があるのだろうかと疑問に思っていますか?

+0

afterReadはchunkksizeを変更するための適切な場所ではなく、次のチャンクで有効にするためにafterWriteに配置します。 –

+0

論理的にはafterWriteが正しく発音されますが、1)チャックを書き込んだ後はその情報はありません余分なDBクエリなし2)最初のチャンクのサイズは何とか決定されるべきです - 別の追加のDBクエリ? –

+0

プロセスの前にターゲットテーブルを拭き取りますか?それともこれはただの仕事ですか? –

答えて

2

最も簡単な方法は、単純に国別にステップを分割することです。そうすれば、各国は独自のステップを踏み出すことになります。また、パフォーマンスを向上させるために各国間でスレッドを作成することもできます。

単一のリーダーである必要がある場合は、PeekableItemReaderデリゲートをラップし、SimpleCompletionPolicyを拡張して目的を達成できます。

public class CountryPeekingCompletionPolicyReader extends SimpleCompletionPolicy implements ItemReader<CountrySpecificItem> { 

    private PeekableItemReader<? extends CountrySpecificItem> delegate; 

    private CountrySpecificItem currentReadItem = null; 

    @Override 
    public CountrySpecificItem read() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception { 
     currentReadItem = delegate.read(); 
     return currentReadItem; 
    } 

    @Override 
    public RepeatContext start(final RepeatContext context) { 
     return new ComparisonPolicyTerminationContext(context); 
    } 

    protected class ComparisonPolicyTerminationContext extends SimpleTerminationContext { 

     public ComparisonPolicyTerminationContext(final RepeatContext context) { 
      super(context); 
     } 

     @Override 
     public boolean isComplete() { 
      final CountrySpecificItem nextReadItem = delegate.peek(); 

      // logic to check if same country 
      if (currentReadItem.isSameCountry(nextReadItem)) { 
       return false; 
      } 

      return true; 
     } 
    } 
} 

は、その後、あなたの文脈では、定義します

<batch:tasklet> 
    <batch:chunk chunk-completion-policy="countrySpecificCompletionPolicy" reader="countrySpecificCompletionPolicy" writer="someWriter" /> 
</batch:tasklet> 

<bean id="countrySpecificCompletionPolicy" class="CountryPeekingCompletionPolicyReader"> 
    <property name="delegate" ref="peekableReader" /> 
</bean> 


<bean id="peekableReader" class="YourPeekableItemReader" /> 

編集:はあなたの問題で戻って考えると、パーティショニングは、クリーンなアプローチとして私を打ちます。 partitioned stepを使用すると、各ItemReader(確かにscope="step"にする)には、ステップ実行コンテキストから単一のcountryNameが渡されます。はい、実行コンテキストのマップ(国ごとに1つのエントリ)と最大の作業単位に対応するのに十分なハードコーディングされたコミット間隔を構築するには、カスタムPartitionerクラスが必要ですが、その後はすべてが非常に定型的です。各スレーブステップは1つのチャンクに過ぎないので、再起動は問題を起こす可能性のあるすべての国にとって相対的な風となるはずです。

+0

これは私たちが実際に始めたところです:)しかし、それは私の考えです。(私が間違っていれば修正します)そのような分割は実際にSpring Batchの主なコンセプトに反しています。通常は正確なアイテムで作業する必要があります。読者にバッチの機能を組み込んではいけません。これにより、状況をよりきめ細かく制御できます。 私のパーティションに合わせても、完璧な読者が完璧な戦略を立ててもうまくいくでしょうが、それでもカスタム実装が必要です。 もう少し答えを待つつもりですが、これは受け入れられます;) –

+0

各パーティションが自国をカバーしている場合は、コミット間隔を非常に大きく設定して、コミットが最大の国であっても確実にカバーできるようにすることができます。つまり、「純粋な」春のバッチ・アプローチは、単一のリーダ/ライター、(おそらく500室の何かから)意味をなすチャンク・サイズ、そして中途半端な失敗からの再起動可能性と再処理可能性です。私は実際にはもっと "真の北"になるもう一つの考えを持っていて、まもなく私の答えを編集します。 –

+0

このソリューションを実装しようとしました。私は次のエラーがありました:Beanプロパティ 'delegate'は書き込み可能ではないか、または無効なセッターメソッドを持っています。 setterのパラメータ型がgetterの戻り値の型と一致していますか?それを修正する方法はありますか? –

関連する問題