-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathconcurrency_with_ruby.rb
80 lines (66 loc) · 2.77 KB
/
concurrency_with_ruby.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
require 'thread'
require 'monitor'
require 'net/http'
# Our sample set of currencies
currencies = ['ARS','AUD','CAD','CNY','DEM','EUR','GBP','HKD','ILS','INR','USD','XAG','XAU']
# Set a finite number of simultaneous worker threads that can run
thread_count = 5
# Create an array to keep track of threads
threads = Array.new(thread_count)
# Create a work queue for the producer to give work to the consumer
work_queue = SizedQueue.new(thread_count)
# Add a monitor so we can notify when a thread finishes and we can schedule a new one
threads.extend(MonitorMixin)
# Add a condition variable on the monitored array to tell the consumer to check the thread array
threads_available = threads.new_cond
# Add a variable to tell the consumer that we are done producing work
sysexit = false
consumer_thread = Thread.new do
loop do
# Stop looping when the producer is finished producing work
break if sysexit & work_queue.length == 0
found_index = nil
# The MonitorMixin requires us to obtain a lock on the threads array in case
# a different thread may try to make changes to it.
threads.synchronize do
# First, wait on an available spot in the threads array. This fires every
# time a signal is sent to the "threads_available" variable
threads_available.wait_while do
threads.select { |thread| thread.nil? || thread.status == false ||
thread["finished"].nil? == false}.length == 0
end
# Once an available spot is found, get the index of that spot so we may
# use it for the new thread
found_index = threads.rindex { |thread| thread.nil? || thread.status == false ||
thread["finished"].nil? == false }
end
# Get a new unit of work from the work queue
currency = work_queue.pop
# Pass the currency variable to the new thread so it can use it as a parameter to go
# get the exchange rates
threads[found_index] = Thread.new(currency) do
puts Net::HTTP.get("download.finance.yahoo.com","/d/quotes.csv?e=.csv&f=sl1d1t1&s=USD#{currency}=X")
# When this thread is finished, mark it as such so the consumer knows it is a
# free spot in the array.
Thread.current["finished"] = true
# Tell the consumer to check the thread array
threads.synchronize do
threads_available.signal
end
end
end
end
producer_thread = Thread.new do
# For each currency we need to download...
currencies.each do |currency|
# Put the currency on the work queue
work_queue << currency
# Tell the consumer to check the thread array so it can attempt to schedule the
# next job if a free spot exists.
threads.synchronize do
threads_available.signal
end
end
# Tell the consumer that we are finished downloading currencies
sysexit = true
end