-
Notifications
You must be signed in to change notification settings - Fork 0
/
genetic_optimizer.py
250 lines (206 loc) · 10.4 KB
/
genetic_optimizer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
import json
import os
import random
import copy
import time
import strategy_text_generator
def generate_initial_population(parameters, population_size):
"""
Generate initial population
:param parameters: dict of parameters to generate (from strategy file)
:param population_size: int (number of candidates in the population)
:return: list of dicts (population, which is a list of candidates)
"""
population = []
for i in range(population_size):
candidate = {}
for key in parameters.keys():
candidate[key] = {}
candidate[key]['type'] = parameters[key]['type']
candidate[key]['space'] = parameters[key]['space']
if parameters[key]['type'] == 'int':
candidate[key]['low'] = parameters[key]['low']
candidate[key]['high'] = parameters[key]['high']
candidate[key]['default'] = random.randint(parameters[key]['low'], parameters[key]['high'])
elif parameters[key]['type'] == 'float':
candidate[key]['low'] = parameters[key]['low']
candidate[key]['high'] = parameters[key]['high']
candidate[key]['default'] = random.uniform(parameters[key]['low'], parameters[key]['high'])
candidate[key]['decimals'] = parameters[key]['decimals']
candidate[key]['default'] = round(candidate[key]['default'], candidate[key]['decimals'])
elif parameters[key]['type'] == 'categorical':
candidate[key]['options'] = parameters[key]['options']
candidate[key]['default'] = random.choice(parameters[key]['options'])
elif parameters[key]['type'] == 'boolean':
candidate[key]['default'] = random.choice([True, False])
population.append(candidate)
return population
def generate_strategy_text_population(strategy_class, population):
"""
Generate strategy text for each candidate in the population
:param strategy_class: str (name of the strategy class)
:param population: list of dicts
:return: str (path to the generated strategy file)
"""
for i, candidate in enumerate(population):
with open(f"user_data/strategies/new_{strategy_class}{i}.py", "w") as file:
text, _ = strategy_text_generator.generate_text(strategy_class, candidate, i)
file.write(text)
return f"user_data/strategies/new_{strategy_class}.py"
def evaluate_population(population, classname, timeframe='1h', timerange='20240101-20240405'):
"""
Evaluate the population. It will run backtesting for each candidate in the population and calculate the loss. Very
time consuming.
:param population: list of dicts
:param classname: str (name of the strategy class)
:param timeframe: str (timeframe for backtesting, e.g. '5m', '1h', '1d')
:return: list of dicts (population with loss)
"""
classname = f'New{classname}'
names_string = ' '.join([f'{classname}{i}' for i in range(len(population))])
os.system(
f"docker compose run --rm freqtrade backtesting --strategy-list {names_string} --timerange {timerange} --timeframe {timeframe}")
while not os.path.exists("user_data/backtest_results/.last_result.json"):
pass
with open("user_data/backtest_results/.last_result.json", "r") as file:
name = json.load(file)['latest_backtest']
os.remove("user_data/backtest_results/.last_result.json")
with open(f"user_data/backtest_results/{name}", "r") as file:
obj = json.load(file)['strategy_comparison']
for i in range(len(population)):
profit = obj[i].get('profit_total')
# print(profit)
# calmar ratio can be calculated as profit / max_drawdown
# max_drawdown = obj[i].get('max_drawdown_abs')
# if float(max_drawdown) != 0:
# population[i]['loss'] = profit / float(max_drawdown)
# else:
# population[i]['loss'] = profit
population[i]['loss'] = profit
os.remove(f"user_data/backtest_results/{name}")
return population
def mutate_candidate(candidate):
"""
Mutate a candidate
:param candidate: dict
:return: dict (new candidate)
"""
candidate_copy = copy.deepcopy(candidate)
mutated_candidate = dict(candidate_copy)
for key in mutated_candidate.keys():
if type(mutated_candidate[key]) is dict:
if 'default' in mutated_candidate[key].keys():
if mutated_candidate[key]['type'] == 'int':
mutated_candidate[key]['default'] = random.randint(mutated_candidate[key]['low'],
mutated_candidate[key]['high'])
elif mutated_candidate[key]['type'] == 'float':
mutated_candidate[key]['default'] = random.uniform(mutated_candidate[key]['low'],
mutated_candidate[key]['high'])
mutated_candidate[key]['default'] = round(mutated_candidate[key]['default'],
mutated_candidate[key]['decimals'])
elif mutated_candidate[key]['type'] == 'categorical':
mutated_candidate[key]['default'] = random.choice(mutated_candidate[key]['options'])
elif mutated_candidate[key]['type'] == 'boolean':
mutated_candidate[key]['default'] = random.choice([True, False])
return mutated_candidate
def crossover_candidates(candidate1, candidate2):
"""
Crossover two candidates
:param candidate1: dict
:param candidate2: dict
:return: dict (new candidate)
"""
crossover_point = random.randint(0, len(candidate1.keys()))
new_candidate = {}
for i, key in enumerate(candidate1.keys()):
new_candidate[key] = candidate1[key] if i < crossover_point else candidate2[key]
return new_candidate
def genetic_algorithm(parameters, population_size, generations, strategy_class='Diamond', timeframe='1h',
timerange='20240101-20240405'):
"""
Genetic algorithm to optimize strategy parameters
:param parameters: dict of parameters (from strategy file)
:param population_size: int (number of candidates in the population)
:param generations: int (number of generations)
:param strategy_class: str (name of the strategy class)
:param timeframe: str (timeframe for backtesting, e.g. '5m', '1h', '1d')
:return: dict of parameters and loss of the best candidate
"""
t = time.time()
populations = []
best_losses = []
avg_losses = []
best_candidates = []
population = generate_initial_population(parameters, population_size)
population_without_loss = copy.deepcopy(population)
for i in range(generations):
print("Generation", i, time.time() - t)
generate_strategy_text_population(strategy_class, population_without_loss)
population = evaluate_population(population, strategy_class, timeframe, timerange)
population = sorted(population, key=lambda x: -x['loss'])
new_population = []
for j in range(population_size // 3):
new_population.append(population[j])
new_population.append(mutate_candidate(population[random.randint(0, population_size - 1)]))
new_population.append(crossover_candidates(population[random.randint(0, population_size - 1)],
population[random.randint(0, population_size - 1)]))
while len(new_population) < population_size:
new_population.append(mutate_candidate(population[random.randint(0, population_size - 1)]))
population = copy.deepcopy(new_population)
population = sorted(population, key=lambda x: -x['loss'])
# print losses of the population
avg_loss = sum([population[i]['loss'] for i in range(len(population))]) / len(population)
print(population[0]['loss'], avg_loss)
# update report lists
best_losses.append(population[0]['loss'])
avg_losses.append(avg_loss)
best_candidates.append(population[0])
# save the population to print it at the end
fixed_population = copy.deepcopy(population)
populations.append(fixed_population)
if i == generations - 1:
final_time = time.time() - t
print("Losses of the populations")
for pop in populations:
print([candidate['loss'] for candidate in pop])
print("Best candidates of the population")
for pop in populations:
print(pop[0])
print("Losses of the best candidates")
for pop in populations:
print(pop[0]['loss'])
print("Average losses")
for j in range(len(populations)):
print(avg_losses[j])
with open(f"reports/{strategy_class}.json", "w") as file:
json.dump({'losses': best_losses, 'avg_losses': avg_losses, 'best_candidates': best_candidates,
'final_time': final_time}, file)
return population[0]
# create a population without loss for the population to be evaluated
population_without_loss = copy.deepcopy(population)
for j in range(len(population_without_loss)):
if 'loss' in population_without_loss[j]:
population_without_loss[j].pop('loss')
def delete_new_strategy_files():
"""
Delete files starting with new_ in user_data/strategies and user_data/backtest_results
"""
for file in os.listdir('user_data/strategies'):
# remove files starting with new_
if file.startswith(f'new_'):
os.remove(f'user_data/strategies/{file}')
for file in os.listdir('user_data/backtest_results'):
# remove files starting with new_
if file.startswith(f'new_'):
os.remove(f'user_data/backtest_results/{file}')
if __name__ == "__main__":
# params as parsed from strategy file
strategy_class = 'Diamond'
strategy_file = f"user_data/strategies/diamond_strategy.py"
parameters, timeframe = strategy_text_generator.parse_parameters(strategy_file)
print(parameters)
print(timeframe)
best_candidate = genetic_algorithm(parameters, 20, 10, strategy_class, timeframe)
print('Final result:')
print(best_candidate)
delete_new_strategy_files()