Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make SmarterCSV thread-safe #277

Closed
wants to merge 2 commits into from
Closed

Conversation

jpcamara
Copy link
Contributor

@jpcamara jpcamara commented Apr 5, 2024

  • SmarterCSV used instance variables on a module, so they were shared across all threads

  • When different threads ran SmarterCSV, they could overwrite the instance values that were set, and effectively corrupt eachothers data. The simplest way to show this was to run a full process call from multiple threads - each threads data gets aggregated together into one large result

  • By using an instance, and passing it around, we simulate an instance approach while maintaining the current API

  • We also expose a thread-local set of instance variables which should provide backwards compatibility if someone were to directly access the global instance variables on the SmarterCSV module (which a spec did, and that's how I noticed this behavior)

Copy link

codecov bot commented Apr 5, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 100.00%. Comparing base (07160c1) to head (0ad475f).

❗ Current head 0ad475f differs from pull request most recent head 7f74f78. Consider uploading reports for the commit 7f74f78 to get more accurate results

Additional details and impacted files
@@            Coverage Diff            @@
##              main      #277   +/-   ##
=========================================
  Coverage   100.00%   100.00%           
=========================================
  Files           11        11           
  Lines          380       379    -1     
=========================================
- Hits           380       379    -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

}
}

it 'at least returns the right number of results from each thread' do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you run this same spec on master, you'll see something like this (it's possible it could produce non-deterministic results)

Failures:

  1) thread safety checks at least returns the right number of results from each thread
     Failure/Error: expect(d[1].size).to eq(correct_sizes[d[0]])

       expected: 5
            got: 16

       (compared using ==)
     # ./spec/features/threading_spec.rb:22:in `block (3 levels) in <top (required)>'
     # ./spec/features/threading_spec.rb:21:in `each'
     # ./spec/features/threading_spec.rb:21:in `block (2 levels) in <top (required)>'

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpcamara thank you for finding this issue, and your PR!

Rakefile Outdated Show resolved Hide resolved
* SmarterCSV used instance variables on a module, so they were shared across all threads

* When different threads ran SmarterCSV, they could overwrite the instance values that were set, and effectively corrupt eachothers day. The simplest way to show this was to run a full `process` call from multiple threads - each threads data gets aggregated together into one large result

* By using an instance, and passing it around, we simulate a insance approach while maintaining the current API

* We also expose a thread-local set of instance variables which should provide backwards compatibility if someone were to directly access the global instance variables on the SmarterCSV module (which a spec did, and that's how I noticed this behavior)
@tilo
Copy link
Owner

tilo commented Apr 9, 2024

Hi @jpcamara, can you help me understand the use case where you run into issues with threading?
I'd like to fully understand the scenario you're encountering.

In the systems where I've used SmarterCSV over the years, I typically have several Sidekiq workers:

  • one worker running SmarterCSV.process (optionally with chunking)
  • optionally one worker that processes the chunks
  • one worker that handles each line of the CSV

In systems where users upload CSV files, I upload/capture the file in S3, and then kick-off the Sidekiq worker to download and run SmarterCSV.process on the file, kicking-off the other workers in the process.

@jpcamara
Copy link
Contributor Author

jpcamara commented Apr 10, 2024

Hi @jpcamara, can you help me understand the use case where you run into issues with threading? I'd like to fully understand the scenario you're encountering.

In the systems where I've used SmarterCSV over the years, I typically have several Sidekiq workers:

  • one worker running SmarterCSV.process (optionally with chunking)
  • optionally one worker that processes the chunks
  • one worker that handles each line of the CSV

In systems where users upload CSV files, I upload/capture the file in S3, and then kick-off the Sidekiq worker to download and run SmarterCSV.process on the file, kicking-off the other workers in the process.

Hey @tilo! There are a few issues i'm encountering.

To start - I've committed more threading scenarios (in threading_spec.rb) that break on main. It now includes header specs, and specs for line counts and chunk counts. There are scenarios I could create to corrupt the other instance variables in use as well.

In testing my upgrade to the newest version (1.10.3) I saw these issues I could easily reproduce and I needed to hold off using it as a result. Most of my usages of SmarterCSV were still on 1.7.1 (https://github.com/tilo/smarter_csv/blob/v1.7.1/lib/smarter_csv.rb), which used instance variables but only in a minor way. They had threading issues as well, but because the main processing was not dependent on them it didn't really matter.

I use SmarterCSV in a couple different ways:

  • In some simple cases, I just process a CSV in its entirety
  • In most cases, I process a CSV in chunks. In some cases I split those chunks into separate jobs, sometimes I process the chunks inline.

I am processing from a variety of sources in all different CSV formats with different headers/delimiters/file sizes. I don't have one dedicated worker doing the SmarterCSV.process call, so multiple workers/threads are operating on CSVs - often at the same time.

As a result, it's very easy for me to encounter the scenarios I have laid out in the threading_spec.rb.

  1. When not chunking, it's easy to have SmarterCSV concatenate results from each thread together. That means that multiple sources of data can get mixed together incorrectly. This is obviously a big issue, particularly if you were ever processing user submitted data and produced results that merged multiple peoples data together
  2. When chunking, it's easy to mix headers together. Sometimes headers are mixed between threads, sometimes they're just nil because the Ruby thread scheduler swapped out while the @headers assignment was happening.
  3. The returned chunk_count and csv_line_counts are also wrong. That means that if you rely on chunk_count for any kind of additional metadata or processing, you will get incorrect results when running multiple threads.

In CRuby, 1 and 3 are easy to recreate. 2 takes a bit more work because of the thread scheduler only swapping ruby code around every 100ms, which is why in the spec I run it multiple times. But that just means it will happen, but just more intermittently. On Truffle or JRuby it'd be very easy to recreate - I tried it on JRuby and it happened without the extra iterations, and it was broken in more dramatic ways (headers are very broken when run across multiple threads).

I'm confident my usage of SmarterCSV is not unconventional, and using an instance approach completely removes the thread safety issues.

@jpcamara
Copy link
Contributor Author

Hi @tilo! Any thoughts on this?

@jpcamara
Copy link
Contributor Author

jpcamara commented May 29, 2024

I realized it's actually pretty easy to get 2 to happen, even on CRuby.

With the following CSVs, and the following code, you'll be able to easily recreate broken headers. Sometimes headers are swapped between threads, and sometimes they are nil and raise a NilClass error on zip. This is happening even when chunking files, it just requires process to be called on multiple threads for different files. If you run this code, you'll see something similar to the following:

Iteration[124]: Wrong header! a,b,c,d,e,f,g
Iteration[207]: Wrong header! h,i,j,k,l,m,n
Iteration[256]: Wrong header! a,b,c,d,e,f,g
Iteration[264]: Wrong header! a,b,c,d,e,f,g
Iteration[258]: Wrong header! a,b,c,d,e,f,g
Iteration[323]: Wrong header! h,i,j,k,l,m,n
Iteration[322]: Wrong header! a,b,c,d,e,f,g
Iteration[380]: Wrong header! a,b,c,d,e,f,g
Iteration[443]: Wrong header! h,i,j,k,l,m,n
Iteration[444]: undefined method `zip' for nil:NilClass
...

csv_one.csv

a,b,c,d,e,f,g
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7
1,2,3,4,5,6,7

csv_two.csv

h,i,j,k,l,m,n
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
8,9,10,11,12,13,14
require "concurrent-ruby"

def run_forever(pool_size: 10)
  pool = Concurrent::FixedThreadPool.new(pool_size)
  i = Concurrent::AtomicFixnum.new

  loop do
    pool.post do
      local_i = i.increment
      yield local_i
    end
  end
end

run_forever(pool_size: 10) do |i|
  if i.even?
    SmarterCSV.process('csv_one.csv', chunk_size: 5) do |chunk|
      raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[a b c d e f g]
    end
  else
    SmarterCSV.process('csv_two.csv', chunk_size: 5) do |chunk|
      raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[h i j k l m n]
    end
  end
rescue StandardError => e
  puts "Iteration[#{i}]: #{e.message}"
end

@jpcamara
Copy link
Contributor Author

jpcamara commented Jun 4, 2024

Sorry to be so noisy! This'll be my last comment on the issue for awhile. I'm writing a series on concurrency in ruby, so I share some of my thoughts on threading issues and class-level ivars in there: https://jpcamara.com/2024/06/04/your-ruby-programs.html. Just more context on this issue.

@contentfree
Copy link
Contributor

Confirming that we're seeing this with 1.10.3, too. Had a worker processing two CSVs (one more than a million rows) and had header/column corruption midway through processing. Please consider merging @tilo

@tilo
Copy link
Owner

tilo commented Jun 25, 2024

I'll have a look at the examples you provided

@tilo
Copy link
Owner

tilo commented Jul 2, 2024

I'm looking into it, but wanted to point out that when calling it concurrently,
the simplest fix is adding a mutex:


run_forever(pool_size: 10) do |i|
  if i.even?
    MUTEX.synchronize do
      SmarterCSV.process('csv_one.csv', chunk_size: 5) do |chunk|
        raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[a b c d e f g]
      end
    end
  else
    MUTEX.synchronize do
      SmarterCSV.process('csv_two.csv', chunk_size: 5) do |chunk|
        raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[h i j k l m n]
      end
    end
  end
rescue StandardError => e
  puts "Iteration[#{i}]: #{e.message}"
end

@jpcamara
Copy link
Contributor Author

jpcamara commented Jul 2, 2024

I'm looking into it, but wanted to point out that when calling it concurrently,

the simplest fix is adding a mutex:




run_forever(pool_size: 10) do |i|

  if i.even?

    MUTEX.synchronize do

      SmarterCSV.process('csv_one.csv', chunk_size: 5) do |chunk|

        raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[a b c d e f g]

      end

    end

  else

    MUTEX.synchronize do

      SmarterCSV.process('csv_two.csv', chunk_size: 5) do |chunk|

        raise "Wrong header! #{chunk.first.keys.join(',')}" if chunk.first.keys != %i[h i j k l m n]

      end

    end

  end

rescue StandardError => e

  puts "Iteration[#{i}]: #{e.message}"

end



My approach for handling thread safety issues in gems is to isolate that code to a "single_threaded" queue:

class UnsafeJob
  include Sidekiq::Job
  sidekiq_options queue: "single_threaded"

  def perform; end
end

Then I configure it to run with a concurrency of 1, and I instead run multiple processes or servers.

sidekiq -q single_threaded -c 1
# or put it in your config file
:concurrency: 1
:queues: 
  - single_threaded

This approach will work the same as a global mutex, but also allows horizontal scaling.

Using a global mutex technically works, but it means whichever thread acquires that mutex then completely hogs the GVL for any CSV processing, including bypassing the 100ms thread scheduler swap.

But if this was running on a web server like Puma, a mutex is the only reasonable option, like suggested.

@tilo
Copy link
Owner

tilo commented Jul 3, 2024

@contentfree @jpcamara please check out this PR: #279

@contentfree
Copy link
Contributor

contentfree commented Jul 3, 2024 via email

@tilo
Copy link
Owner

tilo commented Jul 3, 2024

FYI: The fix will be released soon from this PR

@jpcamara
Copy link
Contributor Author

jpcamara commented Jul 5, 2024

Closing in favor of this fix: #279

Thanks @tilo !

@jpcamara jpcamara closed this Jul 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants