私のレールアプリケーションでは、遅延作業ジョブワーカーからCSVファイルを行ごとに解析して新しいものを保存するサービスがあります連絡先テーブルに記録します。ブロックの最後ではなく新しいレコードを1つずつ保存してコミットします
最初の連絡先を保存するとすぐに、連絡先テーブルがポストグルでロックされ、すべての行の解析が完了するまでロックが解除されません。すべての行を実行するので、新しい連絡先レコードを保存しているか、既存のレコードを更新していますが、コミットされていません。テーブルがロックされているため、他のユーザーは連絡先レコードを作成できません。
サービスがCSVのすべての行を終了すると、新しい連絡先レコードがすべてコミットされます。データベースに表示され、連絡先テーブルはロックされなくなります。
最後まですべてを保持するのではなく、処理中のCSVファイルの各行に対して、ループのたびにロックを保存、コミット、解放することは可能ですか?ここで
クラスです:
class CsvParsingService
attr_accessor :csv_file, :contact
def initialize(csv_file)
@csv_file = csv_file
@contact = nil
end
def perform
process_csv
csv_file.finish_import!
end
def process_csv
parser = ::ImportData::SmartCsvParser.new(csv_file.file_url)
parser.each do |smart_row|
csv_file.increment!(:total_parsed_records)
begin
self.contact = process_row(smart_row)
rescue => e
row_parse_error(smart_row, e)
end
end
rescue => e # parser error or unexpected error
csv_file.save_import_error(e)
end
private
def process_row(smart_row)
new_contact, existing_records = smart_row.to_contact
self.contact = ContactMergingService.new(csv_file.user, new_contact, existing_records).perform
init_contact_info self.contact
if contact_valid?
save_imported_contact(new_contact)
else
reject_imported_contact(new_contact, smart_row)
end
end
def contact_valid?
self.contact.first_name || self.contact.last_name ||
self.contact.email_addresses.first || self.contact.phone_numbers.first
end
def save_imported_contact(new_contact)
self.contact.save!
csv_file.increment!(:total_imported_records)
log_processed_contacts new_contact
end
def reject_imported_contact(new_contact, smart_row)
csv_file.increment!(:total_failed_records)
csv_file.invalid_records.create!(
original_row: smart_row.row.to_csv,
contact_errors: ["Contact rejected. Missing name, email or phone number"]
)
log_processed_contacts new_contact
false
end
def row_parse_error(smart_row, e)
csv_file.increment!(:total_failed_records)
csv_file.invalid_records.create!(
original_row: smart_row.row.to_csv,
contact_errors: contact.try(:errors).try(:full_messages) || [e.inspect]
)
end
def init_contact_info(contact)
unless contact.persisted?
contact.user = csv_file.user
contact.created_by_user = csv_file.user
contact.import_source = csv_file
end
contact.required_salutations_to_set = true # will be used for envelope/letter saluation
end
def log_processed_contacts(new_contact)
Rails.logger.info(
"[CSV.parsing] Records parsed:: parsed: #{csv_file.total_parsed_records}"\
" : imported: #{csv_file.total_imported_records} : failed: "\
"#{csv_file.total_failed_records}"
)
Rails.logger.info(
"[CSV.parsing] Contact- New : #{new_contact.email_addresses.map(&:email)}"\
" : #{new_contact.first_name} : #{new_contact.last_name} "\
"#{new_contact.phone_numbers.map(&:number)} :: Old : "\
"#{self.contact.email_addresses.map(&:email)} :"\
"#{self.contact.phone_numbers.map(&:number)}\n"
)
end
end
'CsvParsingService'はどうやって呼びますか? –
DelayedJobワーカーは 'CsvParsingService.new(csv_file).perform' –
を実行して、各レコードをARトランザクション –