2016-04-08 9 views
1

私のレールアプリケーションでは、遅延作業ジョブワーカーからCSVファイルを行ごとに解析して新しいものを保存するサービスがあります連絡先テーブルに記録します。ブロックの最後ではなく新しいレコードを1つずつ保存してコミットします

最初の連絡先を保存するとすぐに、連絡先テーブルがポストグルでロックされ、すべての行の解析が完了するまでロックが解除されません。すべての行を実行するので、新しい連絡先レコードを保存しているか、既存のレコードを更新していますが、コミットされていません。テーブルがロックされているため、他のユーザーは連絡先レコードを作成できません。

サービスがCSVのすべての行を終了すると、新しい連絡先レコードがすべてコミットされます。データベースに表示され、連絡先テーブルはロックされなくなります。

最後まですべてを保持するのではなく、処理中のCSVファイルの各行に対して、ループのたびにロックを保存、コミット、解放することは可能ですか?ここで

クラスです:

class CsvParsingService 

    attr_accessor :csv_file, :contact 

    def initialize(csv_file) 
    @csv_file = csv_file 
    @contact = nil 
    end 

    def perform 
    process_csv 
    csv_file.finish_import! 
    end 

    def process_csv 
    parser = ::ImportData::SmartCsvParser.new(csv_file.file_url) 

    parser.each do |smart_row| 
     csv_file.increment!(:total_parsed_records) 
     begin 
     self.contact = process_row(smart_row) 
     rescue => e 
     row_parse_error(smart_row, e) 
     end 
    end 
    rescue => e # parser error or unexpected error 
    csv_file.save_import_error(e) 
    end 

    private 

    def process_row(smart_row) 
    new_contact, existing_records = smart_row.to_contact 
    self.contact = ContactMergingService.new(csv_file.user, new_contact, existing_records).perform 
    init_contact_info self.contact 

    if contact_valid? 
     save_imported_contact(new_contact) 
    else 
     reject_imported_contact(new_contact, smart_row) 
    end 
    end 

    def contact_valid? 
    self.contact.first_name || self.contact.last_name || 
     self.contact.email_addresses.first || self.contact.phone_numbers.first 
    end 

    def save_imported_contact(new_contact) 
    self.contact.save! 
    csv_file.increment!(:total_imported_records) 
    log_processed_contacts new_contact 
    end 

    def reject_imported_contact(new_contact, smart_row) 
    csv_file.increment!(:total_failed_records) 
    csv_file.invalid_records.create!(
     original_row: smart_row.row.to_csv, 
     contact_errors: ["Contact rejected. Missing name, email or phone number"] 
    ) 
    log_processed_contacts new_contact 
    false 
    end 

    def row_parse_error(smart_row, e) 
    csv_file.increment!(:total_failed_records) 
    csv_file.invalid_records.create!(
     original_row: smart_row.row.to_csv, 
     contact_errors: contact.try(:errors).try(:full_messages) || [e.inspect] 
    ) 
    end 

    def init_contact_info(contact) 
    unless contact.persisted? 
     contact.user = csv_file.user 
     contact.created_by_user = csv_file.user 
     contact.import_source = csv_file 
    end 
    contact.required_salutations_to_set = true # will be used for envelope/letter saluation 
    end 

    def log_processed_contacts(new_contact) 
    Rails.logger.info(
     "[CSV.parsing] Records parsed:: parsed: #{csv_file.total_parsed_records}"\ 
     " : imported: #{csv_file.total_imported_records} : failed: "\ 
     "#{csv_file.total_failed_records}" 
    ) 
    Rails.logger.info(
     "[CSV.parsing] Contact- New : #{new_contact.email_addresses.map(&:email)}"\ 
     " : #{new_contact.first_name} : #{new_contact.last_name} "\ 
     "#{new_contact.phone_numbers.map(&:number)} :: Old : "\ 
     "#{self.contact.email_addresses.map(&:email)} :"\ 
     "#{self.contact.phone_numbers.map(&:number)}\n" 
    ) 
    end 

end 
+1

'CsvParsingService'はどうやって呼びますか? –

+0

DelayedJobワーカーは 'CsvParsingService.new(csv_file).perform' –

+1

を実行して、各レコードをARトランザクション –

答えて

2

@SeanHuberは正しい軌道に乗っていました。我々はstate-machine_activerecordの宝石を使用しており、作業員はをuploaded州からprocessing州に移し、CsvParsingServiceを呼び出します。

デフォルトでは、state-machine_activerecordtransaction内のすべてのトランジションをラップします。つまり、CsvParsingServiceがデータベースに加えられたすべての変更は、移行が完了するまでコミットされませんでした。

ソリューションはここでオプションuse_transactions: false

でステート・マシンを定義することですが、作業者である:ここでは

class ImportCsvFileWorker 
    def self.perform(csv_file_id) 
    csv_file = CsvFile.find(csv_file_id) 

    csv_file.import! 
    csv_file.send_report! 
    end 
end 

は適切に構成された状態マシンとCsvFileモデルです:

require "import_data/smart_csv_parser" 

class CsvFile < ActiveRecord::Base 
    TwiceImportError = Class.new(StandardError) 
    ReportBeforeImportError = Class.new(StandardError) 

    belongs_to :user 
    has_many :invalid_records, class_name: '::CsvFile::InvalidRecord', dependent: :destroy 
    has_many :contacts, as: :import_source 

    mount_uploader :file, CsvUploader 

    attr_accessor :file_url 

    def filename 
    self[:file] 
    end 

    state_machine initial: :uploaded, use_transactions: false do 
    state :processing 
    state :imported 

    event :start_import! do 
     transition uploaded: :processing 
    end 
    after_transition :uploaded => :processing, do: :parse_data! 

    event :finish_import! do 
     transition processing: :imported 
    end 
    end 

    def import!(file_url=nil) 
    if file_url.nil? 
     file_url = Rails.env.development? ? file.path : file.url 
    end 

    self.file_url = file_url 
    raise TwiceImportError, "cannot import same file twice" unless uploaded? 

    start_import! 
    end 

    def import_failed? 
    import_result[:error].present? 
    end 

    def send_report! 
    raise ReportBeforeImportError, 'please #import! before reporting' unless imported? 
    Mailer.delay.csv_import_report(self) 
    end 

    def save_import_error(exception) 
    import_result[:error_class] = exception.class.to_s 
    import_result[:error] = exception.message 
    import_result[:backtrace] = exception.backtrace 
    import_result_will_change! 
    save(validate: false) 
    end 

    private 

    def parse_data! 
    binding.pry 
    CsvParsingService.new(self).perform 
    end 

    def initialize(*args, &block) 
    super(*args, &block) # NOTE: This *must* be called, otherwise states won't get initialized 
    end 

end 
関連する問題