Skip to content

Commit

Permalink
Make SmarterCSV thread-safe
Browse files Browse the repository at this point in the history
* SmarterCSV used instance variables on a module, so they were shared across all threads

* When different threads ran SmarterCSV, they could overwrite the instance values that were set, and effectively corrupt eachothers day. The simplest way to show this was to run a full `process` call from multiple threads - each threads data gets aggregated together into one large result

* By using an instance, and passing it around, we simulate a insance approach while maintaining the current API

* We also expose a thread-local set of instance variables which should provide backwards compatibility if someone were to directly access the global instance variables on the SmarterCSV module (which a spec did, and that's how I noticed this behavior)
  • Loading branch information
jpcamara committed Apr 9, 2024
1 parent 07160c1 commit 90ec4c5
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 137 deletions.
12 changes: 6 additions & 6 deletions lib/smarter_csv/auto_detection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ class << self
# If file has headers, then guesses column separator from headers.
# Otherwise guesses column separator from contents.
# Raises exception if none is found.
def guess_column_separator(filehandle, options)
skip_lines(filehandle, options)
def guess_column_separator(instance, filehandle, options)
skip_lines(instance, filehandle, options)

delimiters = [',', "\t", ';', ':', '|']

Expand All @@ -17,14 +17,14 @@ def guess_column_separator(filehandle, options)
candidates = Hash.new(0)
count = has_header ? 1 : 5
count.times do
line = readline_with_counts(filehandle, options)
line = readline_with_counts(instance, filehandle, options)
delimiters.each do |d|
candidates[d] += line.scan(d).count
end
rescue EOFError # short files
break
end
rewind(filehandle)
rewind(instance, filehandle)

if candidates.values.max == 0
# if the header only contains
Expand All @@ -37,7 +37,7 @@ def guess_column_separator(filehandle, options)
end

# limitation: this currently reads the whole file in before making a decision
def guess_line_ending(filehandle, options)
def guess_line_ending(instance, filehandle, options)
counts = {"\n" => 0, "\r" => 0, "\r\n" => 0}
quoted_char = false

Expand All @@ -62,7 +62,7 @@ def guess_line_ending(filehandle, options)
lines += 1
break if options[:auto_row_sep_chars] && options[:auto_row_sep_chars] > 0 && lines >= options[:auto_row_sep_chars]
end
rewind(filehandle)
rewind(instance, filehandle)

counts["\r"] += 1 if last_char == "\r"
# find the most frequent key/value pair:
Expand Down
18 changes: 9 additions & 9 deletions lib/smarter_csv/file_io.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,23 @@ module SmarterCSV
class << self
protected

def readline_with_counts(filehandle, options)
def readline_with_counts(instance, filehandle, options)
line = filehandle.readline(options[:row_sep])
@file_line_count += 1
@csv_line_count += 1
line = remove_bom(line) if @csv_line_count == 1
instance.file_line_count += 1
instance.csv_line_count += 1
line = remove_bom(line) if instance.csv_line_count == 1
line
end

def skip_lines(filehandle, options)
def skip_lines(instance, filehandle, options)
options[:skip_lines].to_i.times do
readline_with_counts(filehandle, options)
readline_with_counts(instance, filehandle, options)
end
end

def rewind(filehandle)
@file_line_count = 0
@csv_line_count = 0
def rewind(instance, filehandle)
instance.file_line_count = 0
instance.csv_line_count = 0
filehandle.rewind
end

Expand Down
4 changes: 2 additions & 2 deletions lib/smarter_csv/hash_transformations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module SmarterCSV
class << self
def hash_transformations(hash, options)
def hash_transformations(instance, hash, options)
# there may be unmapped keys, or keys purposedly mapped to nil or an empty key..
# make sure we delete any key/value pairs from the hash, which the user wanted to delete:
remove_empty_values = options[:remove_empty_values] == true
Expand All @@ -13,7 +13,7 @@ def hash_transformations(hash, options)

hash.each_with_object({}) do |(k, v), new_hash|
next if k.nil? || k == '' || k == :""
next if remove_empty_values && (has_rails ? v.blank? : blank?(v))
next if remove_empty_values && (instance.has_rails ? v.blank? : blank?(v))
next if remove_zero_values && v.is_a?(String) && v =~ /^(0+|0+\.0+)$/ # values are Strings
next if remove_values_matching && v =~ remove_values_matching

Expand Down
10 changes: 5 additions & 5 deletions lib/smarter_csv/headers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@

module SmarterCSV
class << self
def process_headers(filehandle, options)
@raw_header = nil # header as it appears in the file
@headers = nil # the processed headers
def process_headers(instance, filehandle, options)
instance.raw_header = nil # header as it appears in the file
instance.headers = nil # the processed headers
header_array = []
file_header_size = nil

# if headers_in_file, get the headers -> We get the number of columns, even when user provided headers
if options[:headers_in_file] # extract the header line
# process the header line in the CSV file..
# the first line of a CSV file contains the header .. it might be commented out, so we need to read it anyhow
header_line = @raw_header = readline_with_counts(filehandle, options)
header_line = instance.raw_header = readline_with_counts(instance, filehandle, options)
header_line = preprocess_header_line(header_line, options)

file_header_array, file_header_size = parse(header_line, options)
file_header_array, file_header_size = parse(instance, header_line, options)

file_header_array = header_transformations(file_header_array, options)

Expand Down
10 changes: 5 additions & 5 deletions lib/smarter_csv/options_processing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ class << self
def process_options(given_options = {})
puts "User provided options:\n#{pp(given_options)}\n" if given_options[:verbose]

@options = DEFAULT_OPTIONS.dup.merge!(given_options)
options = DEFAULT_OPTIONS.dup.merge!(given_options)

# fix invalid input
@options[:invalid_byte_sequence] ||= ''
options[:invalid_byte_sequence] ||= ''

puts "Computed options:\n#{pp(@options)}\n" if @options[:verbose]
puts "Computed options:\n#{pp(options)}\n" if options[:verbose]

validate_options!(@options)
@options
validate_options!(options)
options
end

# NOTE: this is not called when "parse" methods are tested by themselves
Expand Down
4 changes: 2 additions & 2 deletions lib/smarter_csv/parse.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ class << self
###
### Thin wrapper around C-extension
###
def parse(line, options, header_size = nil)
def parse(instance, line, options, header_size = nil)
# puts "SmarterCSV.parse OPTIONS: #{options[:acceleration]}" if options[:verbose]

if options[:acceleration] && has_acceleration?
if options[:acceleration] && instance.has_acceleration?
# :nocov:
has_quotes = line =~ /#{options[:quote_char]}/
elements = parse_csv_line_c(line, options[:col_sep], options[:quote_char], header_size)
Expand Down
68 changes: 33 additions & 35 deletions lib/smarter_csv/smarter_csv.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ class KeyMappingError < SmarterCSVException; end

# first parameter: filename or input object which responds to readline method
def SmarterCSV.process(input, given_options = {}, &block) # rubocop:disable Lint/UnusedMethodArgument
initialize_variables
instance = initialize_variables
self.compatibility_instance = instance # backwards compatibility

options = process_options(given_options)

@enforce_utf8 = options[:force_utf8] || options[:file_encoding] !~ /utf-8/i
@verbose = options[:verbose]
instance.enforce_utf8 = options[:force_utf8] || options[:file_encoding] !~ /utf-8/i
instance.verbose = options[:verbose]

begin
fh = input.respond_to?(:readline) ? input : File.open(input, "r:#{options[:file_encoding]}")
Expand All @@ -27,24 +28,25 @@ def SmarterCSV.process(input, given_options = {}, &block) # rubocop:disable Lint
end

# auto-detect the row separator
options[:row_sep] = guess_line_ending(fh, options) if options[:row_sep]&.to_sym == :auto
options[:row_sep] = guess_line_ending(instance, fh, options) if options[:row_sep]&.to_sym == :auto
# attempt to auto-detect column separator
options[:col_sep] = guess_column_separator(fh, options) if options[:col_sep]&.to_sym == :auto
options[:col_sep] = guess_column_separator(instance, fh, options) if options[:col_sep]&.to_sym == :auto

skip_lines(fh, options)
skip_lines(instance, fh, options)

@headers, header_size = process_headers(fh, options)
@headerA = @headers # @headerA is deprecated, use @headers
headers, header_size = process_headers(instance, fh, options)
instance.headers = headers
instance.header_a = headers # headerA is deprecated, use headers

puts "Effective headers:\n#{pp(@headers)}\n" if @verbose
puts "Effective headers:\n#{pp(instance.headers)}\n" if instance.verbose

header_validations(@headers, options)
header_validations(instance.headers, options)

# in case we use chunking.. we'll need to set it up..
if options[:chunk_size].to_i > 0
use_chunks = true
chunk_size = options[:chunk_size].to_i
@chunk_count = 0
instance.chunk_count = 0
chunk = []
else
use_chunks = false
Expand All @@ -53,12 +55,12 @@ def SmarterCSV.process(input, given_options = {}, &block) # rubocop:disable Lint
# now on to processing all the rest of the lines in the CSV file:
# fh.each_line |line|
until fh.eof? # we can't use fh.readlines() here, because this would read the whole file into memory at once, and eof => true
line = readline_with_counts(fh, options)
line = readline_with_counts(instance, fh, options)

# replace invalid byte sequence in UTF-8 with question mark to avoid errors
line = enforce_utf8_encoding(line, options) if @enforce_utf8
line = enforce_utf8_encoding(line, options) if instance.enforce_utf8

print "processing file line %10d, csv line %10d\r" % [@file_line_count, @csv_line_count] if @verbose
print "processing file line %10d, csv line %10d\r" % [instance.file_line_count, instance.csv_line_count] if instance.verbose

next if options[:comment_regexp] && line =~ options[:comment_regexp] # ignore all comment lines if there are any

Expand All @@ -69,35 +71,35 @@ def SmarterCSV.process(input, given_options = {}, &block) # rubocop:disable Lint

while multiline
next_line = fh.readline(options[:row_sep])
next_line = enforce_utf8_encoding(next_line, options) if @enforce_utf8
next_line = enforce_utf8_encoding(next_line, options) if instance.enforce_utf8
line += next_line
@file_line_count += 1
instance.file_line_count += 1

break if fh.eof? # Exit loop if end of file is reached

multiline = count_quote_chars(line, options[:quote_char]).odd?
end

# :nocov:
if multiline && @verbose
print "\nline contains uneven number of quote chars so including content through file line %d\n" % @file_line_count
if multiline && instance.verbose
print "\nline contains uneven number of quote chars so including content through file line %d\n" % instance.file_line_count
end
# :nocov:

line.chomp!(options[:row_sep])

# --- SPLIT LINE & DATA TRANSFORMATIONS ------------------------------------------------------------
dataA, _data_size = parse(line, options, header_size)
dataA, _data_size = parse(instance, line, options, header_size)

dataA.map!{|x| x.strip} if options[:strip_whitespace]

# if all values are blank, then ignore this line
next if options[:remove_empty_hashes] && (dataA.empty? || blank?(dataA))

# --- HASH TRANSFORMATIONS ------------------------------------------------------------
hash = @headers.zip(dataA).to_h
hash = instance.headers.zip(dataA).to_h

hash = hash_transformations(hash, options)
hash = hash_transformations(instance, hash, options)

# --- HASH VALIDATIONS ----------------------------------------------------------------
# will go here, and be able to:
Expand All @@ -108,9 +110,9 @@ def SmarterCSV.process(input, given_options = {}, &block) # rubocop:disable Lint

next if options[:remove_empty_hashes] && hash.empty?

puts "CSV Line #{@file_line_count}: #{pp(hash)}" if @verbose == '2' # very verbose setting
puts "CSV Line #{instance.file_line_count}: #{pp(hash)}" if instance.verbose == '2' # very verbose setting
# optional adding of csv_line_number to the hash to help debugging
hash[:csv_line_number] = @csv_line_count if options[:with_line_numbers]
hash[:csv_line_number] = instance.csv_line_count if options[:with_line_numbers]

# process the chunks or the resulting hash
if use_chunks
Expand All @@ -121,9 +123,9 @@ def SmarterCSV.process(input, given_options = {}, &block) # rubocop:disable Lint
if block_given?
yield chunk # do something with the hashes in the chunk in the block
else
@result << chunk.dup # Append chunk to result (use .dup to keep a copy after we do chunk.clear)
instance.result << chunk.dup # Append chunk to result (use .dup to keep a copy after we do chunk.clear)
end
@chunk_count += 1
instance.chunk_count += 1
chunk.clear # re-initialize for next chunk of data
else
# the last chunk may contain partial data, which is handled below
Expand All @@ -134,33 +136,33 @@ def SmarterCSV.process(input, given_options = {}, &block) # rubocop:disable Lint
if block_given?
yield [hash] # do something with the hash in the block (better to use chunking here)
else
@result << hash
instance.result << hash
end
end
end

# print new line to retain last processing line message
print "\n" if @verbose
print "\n" if instance.verbose

# handling of last chunk:
if !chunk.nil? && chunk.size > 0
# do something with the chunk
if block_given?
yield chunk # do something with the hashes in the chunk in the block
else
@result << chunk.dup # Append chunk to result (use .dup to keep a copy after we do chunk.clear)
instance.result << chunk.dup # Append chunk to result (use .dup to keep a copy after we do chunk.clear)
end
@chunk_count += 1
instance.chunk_count += 1
# chunk = [] # initialize for next chunk of data
end
ensure
fh.close if fh.respond_to?(:close)
end

if block_given?
@chunk_count # when we do processing through a block we only care how many chunks we processed
instance.chunk_count # when we do processing through a block we only care how many chunks we processed
else
@result # returns either an Array of Hashes, or an Array of Arrays of Hashes (if in chunked mode)
instance.result # returns either an Array of Hashes, or an Array of Arrays of Hashes (if in chunked mode)
end
end

Expand All @@ -183,10 +185,6 @@ def count_quote_chars(line, quote_char)
count
end

def has_acceleration?
@has_acceleration ||= !!defined?(parse_csv_line_c)
end

protected

# SEE: https://github.com/rails/rails/blob/32015b6f369adc839c4f0955f2d9dce50c0b6123/activesupport/lib/active_support/core_ext/object/blank.rb#L121
Expand Down
Loading

0 comments on commit 90ec4c5

Please sign in to comment.