-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathblackjack.py
345 lines (269 loc) · 12.3 KB
/
blackjack.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from matplotlib import cm
from tqdm import tqdm
rewards = {'win': 1, 'draw': 0, 'lose': -1}
actions = {'hits': 0, 'sticks': 1}
np.random.seed(27)
def get_card():
card = np.random.randint(1, 14)
return min(card, 10)
def get_card_value(card, player_usable_card):
if card == 1 and player_usable_card:
return 11
return card
def is_usable(card, player_sum):
return card == 1 and player_sum < 11
# the strategy that to keep hitting while the cards' sum < 20
def player_policy(player_sum, player_usable_card, dealer_showed_card):
if player_sum == 20 or player_sum == 21:
return actions['sticks']
else:
return actions['hits']
# the policy that choose hits or sticks equally random
def behavior_policy(player_sum, player_usable_card, dealer_showed_card):
if np.random.binomial(1, 0.5):
return actions['sticks']
return actions['hits']
def play(player_policy, initial_state=None, initial_action=None):
game_trajectory = []
# initial player's card and dealer's card
player_sum = 0
player_usable_card = False
dealer_showed_card = 0
dealer_hiden_card = 0
if initial_state is None:
while player_sum < 12:
card = get_card()
if not player_usable_card:
player_usable_card = is_usable(card, player_sum)
player_sum += get_card_value(card, is_usable(card, player_sum))
dealer_showed_card = get_card()
dealer_hiden_card = get_card()
else:
player_sum, player_usable_card, dealer_showed_card = initial_state
dealer_hiden_card = get_card()
dealer_sum = 0
dealer_usable_card = False
dealer_usable_card = is_usable(dealer_showed_card, dealer_sum)
dealer_sum += get_card_value(dealer_showed_card, dealer_usable_card)
if not dealer_usable_card:
dealer_usable_card = is_usable(dealer_hiden_card, dealer_sum)
dealer_sum += get_card_value(dealer_hiden_card, is_usable(dealer_hiden_card, dealer_sum))
# game starts
# player's turn
state = []
while True:
state = [player_sum, player_usable_card, dealer_showed_card]
if player_sum > 21:
if player_usable_card:
player_sum -= 10
player_usable_card = False
else:
# player goes bust
return (game_trajectory, rewards['lose'])
else:
if initial_action is not None:
action = initial_action
initial_action = None
else:
action = player_policy(player_sum, player_usable_card, dealer_showed_card)
game_trajectory.append((state, action))
if action == actions['sticks']:
break
card = get_card()
if not player_usable_card:
player_usable_card = is_usable(card, player_sum)
player_sum += get_card_value(card, is_usable(card, player_sum))
# dealer's turn (happens when player sticks)
while True:
if dealer_sum > 21:
if dealer_usable_card:
dealer_sum -= 10
dealer_usable_card = False
else:
return (game_trajectory, rewards['win'])
elif dealer_sum >= 17:
break
else:
card = get_card()
if not dealer_usable_card:
dealer_usable_card = is_usable(card, dealer_sum)
dealer_sum += get_card_value(card, is_usable(card, dealer_sum))
# compare the final sum between player and dealer (happens when both sum <= 21)
state = [player_sum, player_usable_card, dealer_showed_card]
if player_sum > dealer_sum:
return (game_trajectory, rewards['win'])
elif player_sum == dealer_sum:
return (game_trajectory, rewards['draw'])
else:
return (game_trajectory, rewards['lose'])
# first-visit Monte Carlo prediction
def first_visit_MC(episodes):
states_usable_ace = np.zeros((10, 10))
states_usable_ace_count = np.zeros((10, 10))
states_no_usable_ace = np.zeros((10, 10))
states_no_usable_ace_count = np.zeros((10, 10))
for _ in tqdm(range(episodes)):
game_trajectory, reward = play(player_policy)
visited_usable_ace = np.full((10, 10), False)
visited_no_usable_ace = np.full((10, 10), False)
for (state, _) in game_trajectory:
# since player's sum in range [12, 21]
player_sum = state[0] - 12
# since dealer's card in range [1, 10]
dealer_card = state[2] - 1
if state[1]:
if not visited_usable_ace[player_sum, dealer_card]:
states_usable_ace[player_sum, dealer_card] += reward
states_usable_ace_count[player_sum, dealer_card] += 1
visited_usable_ace[player_sum, dealer_card] = True
else:
if not visited_no_usable_ace[player_sum, dealer_card]:
states_no_usable_ace[player_sum, dealer_card] += reward
states_no_usable_ace_count[player_sum, dealer_card] += 1
visited_no_usable_ace[player_sum, dealer_card] = True
states_usable_ace_count[states_usable_ace_count == 0] = 1
states_no_usable_ace_count[states_no_usable_ace_count == 0] = 1
return states_usable_ace / states_usable_ace_count, states_no_usable_ace / states_no_usable_ace_count
# Monte Carlo Exploring Starts
def monte_carlo_ES(episodes):
# state = [player_sum, player_usable_card, dealer_showed_card]
state_action_values = np.zeros((10, 2, 10, 2))
state_action_pair_count = np.zeros((10, 2, 10, 2))
def greedy_policy(player_sum, player_usable_card, dealer_showed_card):
player_sum -= 12
player_usable_card = int(player_usable_card)
dealer_showed_card -= 1
values = state_action_values[player_sum, player_usable_card, dealer_showed_card, :] /\
np.where(state_action_pair_count[player_sum, player_usable_card, dealer_showed_card, :] == 0,
state_action_pair_count[player_sum, player_usable_card, dealer_showed_card, :], 1)
return np.random.choice(np.argwhere(values == np.amax(values)).flatten().tolist())
for eps in tqdm(range(episodes)):
initial_state = [np.random.choice(range(12, 22)), np.random.choice([True, False]), np.random.choice(range(1, 11))]
initial_action = np.random.choice([actions['hits'], actions['sticks']])
# greedy_policy is "behavior policy", while player_policy is "target policy"
policy = greedy_policy if eps else player_policy
game_trajectory, reward = play(policy, initial_state, initial_action)
first_visit_set = set()
for state, action in game_trajectory:
player_sum = state[0] - 12
player_usable_card = int(state[1])
dealer_showed_card = state[2] - 1
state_action = (player_sum, player_usable_card, dealer_showed_card, action)
if state_action not in first_visit_set:
first_visit_set.add(state_action)
state_action_values[player_sum, player_usable_card, dealer_showed_card, action] += reward
state_action_pair_count += 1
# To avoid dividing by 0
state_action_pair_count[state_action_pair_count == 0] = 1
return state_action_values / state_action_pair_count
# Monte Carlo Off Policy
def monte_carlo_off_policy(episodes):
initial_state = [13, True, 2]
# importance sampling ratio list
rhos = np.zeros(episodes)
returns = np.zeros(episodes)
for eps in range(0, episodes):
game_trajectory, reward = play(behavior_policy, initial_state)
rho = 1.0
for (state, action) in game_trajectory:
player_sum = state[0]
player_usable_card = state[1]
dealer_showed_card = state[2]
if player_policy(player_sum, player_usable_card, dealer_showed_card) == action:
rho /= 0.5
else:
rho = 0
break
rhos[eps] = rho
returns[eps] = reward
weighted_returns = rhos * returns
weighted_returns = np.add.accumulate(weighted_returns)
rhos = np.add.accumulate(rhos)
ordinary_sampling = weighted_returns / np.arange(1, episodes + 1)
with np.errstate(divide='ignore',invalid='ignore'):
weighted_sampling = np.where(rhos != 0, weighted_returns / rhos, 0)
return ordinary_sampling, weighted_sampling
def first_visit_MC_plot():
states_usable_ace_1, states_no_usable_ace_1 = first_visit_MC(10000)
states_usable_ace_2, states_no_usable_ace_2 = first_visit_MC(500000)
states = [states_usable_ace_1, states_usable_ace_2, states_no_usable_ace_1, states_no_usable_ace_2]
titles = ['Usable ace, 10000 eps', 'Usable ace, 500000 eps', 'No usable ace, 10000 eps', 'No usable ace, 5000000 eps']
fig, axes = plt.subplots(2, 2, figsize=(30, 22.5))
plt.subplots_adjust(hspace=0.25, wspace=0.25)
axes = axes.flatten()
for state, ax, title in zip(states, axes, titles):
pc = ax.pcolor(state, cmap=cm.coolwarm)
ax.set_title(title, fontsize=27)
ax.set_ylabel('Player sum', fontsize=20)
y_ticks_loc = range(12, 22)
y_start, y_end = ax.get_ylim()
ax.set_yticks(np.arange(y_start, y_end) + 0.5, minor=False)
ax.set_yticklabels(y_ticks_loc)
ax.set_xlabel('Dealer showing', fontsize=20)
x_ticks_loc = range(1, 11)
x_start, x_end = ax.get_xlim()
ax.set_xticks(np.arange(x_start, x_end) + 0.5, minor=False)
ax.set_xticklabels(x_ticks_loc)
fig.colorbar(pc, ax=ax)
plt.savefig('./blackjack_first_visit_MC.png')
plt.close()
def monte_carlo_ES_plot():
state_action_values = monte_carlo_ES(500000)
# optimal policy
action_no_usable_ace = np.argmax(state_action_values[:, 0, :, :], axis=-1)
action_usable_ace = np.argmax(state_action_values[:, 1, :, :], axis=-1)
# optimal state-value function
state_value_no_usable_ace = np.max(state_action_values[:, 0, :, :], axis=-1)
state_value_usable_ace = np.max(state_action_values[:, 1, :, :], axis=-1)
plots = [action_usable_ace, state_value_usable_ace, action_no_usable_ace, state_value_no_usable_ace]
titles = ['Optimal policy, usable ace', 'Optimal state-value function, usable ace',
'Optimal policy, no usable ace', 'Optimal state-value function, no usable ace']
fig, axes = plt.subplots(2, 2, figsize=(30, 22.5))
plt.subplots_adjust(hspace=0.25, wspace=0.25)
axes = axes.flatten()
for plot, ax, title in zip(plots, axes, titles):
pc = ax.pcolor(plot, cmap=cm.coolwarm)
ax.set_title(title, fontsize=27)
ax.set_ylabel('Player sum', fontsize=20)
y_ticks_loc = range(12, 22)
y_start, y_end = ax.get_ylim()
ax.set_yticks(np.arange(y_start, y_end) + 0.5, minor=False)
ax.set_yticklabels(y_ticks_loc)
ax.set_xlabel('Dealer showing', fontsize=20)
x_ticks_loc = range(1, 11)
x_start, x_end = ax.get_xlim()
ax.set_xticks(np.arange(x_start, x_end) + 0.5, minor=False)
ax.set_xticklabels(x_ticks_loc)
fig.colorbar(pc, ax=ax)
plt.savefig('./blackjack_monte_carlo_es.png')
plt.close()
def monte_carlo_off_policy_plot():
true_value = -0.22726
episodes = 10000
runs = 100
error_ordinary = np.zeros(episodes)
error_weighted = np.zeros(episodes)
for i in tqdm(range(0, runs)):
ordinary_sampling, weighted_sampling = monte_carlo_off_policy(episodes)
# get the squared error
error_ordinary += np.power(ordinary_sampling - true_value, 2)
error_weighted += np.power(weighted_sampling - true_value, 2)
error_ordinary /= runs
error_weighted /= runs
plt.plot(np.arange(1, episodes + 1), error_ordinary, color='green', label='Ordinary Importance Sampling')
plt.plot(np.arange(1, episodes + 1), error_weighted, color='red', label='Weighted Importance Sampling')
plt.ylim(-0.1, 5)
plt.xlabel('Episodes (log scale)')
plt.ylabel(f'Mean square error\n(average over {runs} runs)')
plt.xscale('log')
plt.legend()
plt.savefig('./blackjack_monte_carlo_off_policy.png')
plt.close()
if __name__ == '__main__':
first_visit_MC_plot()
monte_carlo_ES_plot()
monte_carlo_off_policy_plot()