skip to Main Content

My app have an import function that’ll execute a Sidekiq Worker and import a bunch of CSV rows, saving them to my database. This works fine when I execute Sidekiq in my local machine, but when I deploy the code to production Sidekiq will execute the job correctly only once. When I use the import function a second time the job goes directly to the history pile in Sidekiq, and the logic inside the worker is never executed. It’s really strange because it throws no error and it’s like the job was executed correctly. For staging I’m using Redis in AWS Elastic Cache.

redis_version: 5.0.6

rails, "5.0.7"
sidekiq, "6.0.5"
sidekiq-failures, "1.0.0"
sidekiq-history, "0.0.11"
sidekiq-limit_fetch, "3.4.0"
sidekiq-pro, "5.0.1"
sidekiq-unique-jobs, "6.0.15"

I would appreciate any tips related to problems you faced before similar to this, or anything else I can do to debug this problem. I already ran

Sidekiq.redis { |conn| conn.ping }
=> "PONG"

So looks like Redis is connected ok.

Project Worker

# frozen_string_literal: true

class ImportWorker
  include Sidekiq::Worker
  sidekiq_options queue: "import_worker", lock: :until_executed, retry: false

  def perform(import_id)
    import = Import.find_by(id: import_id)
    return if import.blank?

    path = import.file.expiring_url(10)
    file = open(path)

    csv = CSV.parse(file.read, headers: true)
    import.update!( number_of_lines_in_csv: csv.size,
                    import_started_at: DateTime.now)

    created_transactions = []
    csv.each do |row|
      guid = row["TransactionUniqueId"]
      next if guid.blank?

      existing_transaction = Transaction.find_by(transaction_unique_id: guid)
      next if existing_transaction.present?

      attributes = Transaction.convert_attributes(import, row).merge(imported_at: Time.now)

      transaction = Transaction.create!(attributes)
      created_transactions << [transaction.id, guid]
      Rails.logger.info "Transaction #{row["TransactionUniqueId"]} created."
    end

    import.update!(import_finished_at: DateTime.now,
                   imported:           true)
    send_mail(import_id, created_transactions)
  end

  def send_mail(import_id, created_transactions)
    ["[email protected]", "[email protected]"].each do |email|
      ImportTransactionsMailer.import_processed(import_id, email, created_transactions).deliver
    end
  end
end

Edit 1: Sorry, I forgot to say that I’m using Cloud66 to deploy my app, if this help in any way.

2

Answers


  1. Chosen as BEST ANSWER

    I found what the problem was. So, I was triggering my ImportWorker in a after_create hook in my Import model as bellow.

    # frozen_string_literal: true
    
    class Import < ApplicationRecord
      has_many :transactions
      belongs_to :admin_user
    
      has_attached_file :file, s3_protocol: :https
      validates_attachment_content_type :file, content_type: ["text/plain",
                                                              "text/csv",
                                                              "application/vnd.ms-excel",
                                                              "application/octet-stream"]
      validates :file, attachment_presence: true
      has_paper_trail
      after_create :run_import_in_background
    
      def run_import_in_background
        ImportWorker.perform_async(id)
      end
    end
    
    

    But when the Worker executed the first lines to find the Import

    class ImportWorker
      include Sidekiq::Worker
      sidekiq_options queue: "import_worker", lock: :until_executed, retry: false
    
      def perform(import_id)
        import = Import.find_by(id: import_id)
        return if import.blank?
    ...
    

    if the import was nil it should return. The problem is, I assumed that the import would never be nil, because this was triggered from an after_create hook, but it actually was coming as nil. When I changed my return line to raise StandardError.new("Empty import object.") if import.blank? the worker started to fail.

    So I also changed my worker sidekiq_options from retry: false to retry: 3 and in the second try the worker executed ok, because it could now find the Import with the specified id. So I think this is some kind of sync problem between the after_create hook and Sidekiq. This might be related to using S3 gem with this setup too. Saving the file in S3 can be causing some delay in saving the object in the DB.

    You can see the final Worker code bellow.

    # frozen_string_literal: true
    
    class ImportWorker
      include Sidekiq::Worker
      sidekiq_options queue: "import_worker", lock: :until_executed, retry: 3
    
      def perform(import_id)
        import = Import.find_by(id: import_id)
        raise StandardError.new("Empty import object.") if import.blank?
    ...
    

  2. sidekiq_options queue: "import_worker", lock: :until_executed, retry: false
    

    What happens if an error occurs? Will the job will be discarded but the uniqueness lock will remain, preventing further jobs from enqueuing?

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search