-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmarket_monitor.py
195 lines (162 loc) · 6.7 KB
/
market_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import logging
import simpleaudio
import logging
import threading
import time
import static_data
import models
import spider
import strategy
from logging.handlers import TimedRotatingFileHandler
logger = logging.getLogger('thecryptoyou')
log_format = "%(asctime)s - %(message)s"
handler = TimedRotatingFileHandler("./logs/market_notice.log", when="midnight", interval=1)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(log_format)
handler.setFormatter(formatter)
# add a suffix which you want
handler.suffix = "%Y%m%d"
# finally add handler to logger
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
def play_music(level):
wave_obj = simpleaudio.WaveObject.from_wave_file("music/gold.wav")
if level == static_data.NOTICE_LEVEL_LV1:
wave_obj = simpleaudio.WaveObject.from_wave_file("music/win.wav")
play_obj = wave_obj.play()
while True:
if not (play_obj.is_playing()):
break
# default price is 2022-01-04 price,usdt
currentBABYPrice = 0.714445
currentMILKPrice = 0.001
def get_price(originPrice):
integerPrice = originPrice[0:len(originPrice) - 18]
decimalPrice = originPrice[len(originPrice) - 18:len(originPrice) - 16]
finalPrice = integerPrice + "." + decimalPrice
return finalPrice
def getTotalScore(level, totalAttr, mainAttr, subAttr):
return ((level * 0.05) + 0.5) * mainAttr + ((level * 0.01) + 0.1) * subAttr + (totalAttr * (1 + (0.1 * level)))
noticedInfo = {}
noticedSty = {}
def notice(data, baby, babyLevelOnePriceWithUSDT, styPrice, styName, noticeLevel):
play_music(noticeLevel)
totalScore = getTotalScore(baby.level, baby.getTotalAttr(), baby.getMainAttr(), baby.getSubAttr())
formatTotalScore = '%.2f' % totalScore
price = get_price(data['price'])
priceInUSDT = 0
if data['token'] == 'BABY':
priceInUSDT = float(price) * currentBABYPrice
elif data['token'] == 'MILK':
priceInUSDT = float(price) * currentMILKPrice
expectPercent = babyLevelOnePriceWithUSDT / styPrice
formatPriceInUSDT = '%.2f' % priceInUSDT
formatExpectPercent = '%.2f' % (expectPercent * 100)
importantStr = "*****"
importantWeight = int((1 / expectPercent) * 10)
for i in range(0, importantWeight):
importantStr += "*****"
if (data['nftId'] in noticedInfo) and \
(priceInUSDT - 50) <= noticedInfo[data['nftId']] <= (priceInUSDT + 50) and \
styName in noticedSty[data['nftId']]:
return
styList = []
if data['nftId'] in noticedSty:
styList = noticedSty[data['nftId']]
styList.append(styName)
noticedInfo[data['nftId']] = priceInUSDT
noticedSty[data['nftId']] = styList
logger.info(
"\n" + importantStr +
"\n" + styName +
"\n Price : " + price + " " + data[
'token'] + "(" + formatPriceInUSDT + "USDT)" + " " + formatExpectPercent + "%" +
"\n Baby Name :" + baby.name +
"\n Baby RARITY :" + baby.rarity +
"\n Main Attr(" + static_data.ATTRS_MAP[static_data.MAIN_ATTR_MAP[baby.nftIndex]] + ") :" + str(
baby.getMainAttr()) +
"\n Sub Attr(" + static_data.ATTRS_MAP[static_data.SUB_ATTR_MAP[baby.nftIndex]] + ") :" + str(
baby.getSubAttr()) +
"\n Level : " + str(baby.level) +
"\n TotalAttr : " + str(baby.getTotalAttr()) +
"\n TotalScore : " + str(formatTotalScore) +
"\n ID : " + "https://thecryptoyou.io/market/" + data['nftId'] + "\n")
def isNotice(stgy, baby, babyLevelOnePriceWithUSDT):
if baby.getMainAttr() >= stgy.mainAttr and baby.getSubAttr() >= stgy.subAttr and baby.getTotalAttr() >= stgy.score:
if babyLevelOnePriceWithUSDT == 0:
return False
if babyLevelOnePriceWithUSDT <= stgy.noticePrice:
return True
return False
def check():
# logger.debug("\nMonitor running" +
# "\nBABY current price(USDT)" + str(currentBABYPrice) +
# "\nMILK current price(USDT)" + str(currentMILKPrice))
sortedList = spider.getMarketList()
for num in range(0, 5):
data = sortedList[num]
info = data['info']
baby = models.Baby(info)
token = data['token']
babyPriceWithUSDT = 0
if token == 'BABY':
babyPriceWithUSDT = float(get_price(data['price'])) * currentBABYPrice
elif token == 'MILK':
babyPriceWithUSDT = float(get_price(data['price'])) * currentMILKPrice
babyLevelOnePriceWithUSDT = 0
if baby.level == 5:
babyLevelOnePriceWithUSDT = babyPriceWithUSDT - (
1000 * currentBABYPrice + 220000 * currentMILKPrice)
elif baby.level == 4:
babyLevelOnePriceWithUSDT = babyPriceWithUSDT - (220000 * currentMILKPrice)
elif baby.level == 3:
babyLevelOnePriceWithUSDT = babyPriceWithUSDT - (70000 * currentMILKPrice)
elif baby.level == 2:
babyLevelOnePriceWithUSDT = babyPriceWithUSDT - (20000 * currentMILKPrice)
elif baby.level == 1:
babyLevelOnePriceWithUSDT = babyPriceWithUSDT
for stgy in strategy.strategies:
if (stgy.babyName != "" and stgy.babyName == baby.name) or (stgy.rarity == baby.rarity):
if isNotice(stgy, baby, babyLevelOnePriceWithUSDT):
notice(
data,
baby,
babyLevelOnePriceWithUSDT,
stgy.noticePrice,
stgy.name,
static_data.NOTICE_LEVEL_NORMAL)
for lv1Stg in strategy.lv1Strategies:
if (lv1Stg.babyName != "" and lv1Stg.babyName == baby.name) or (lv1Stg.rarity == baby.rarity):
if isNotice(lv1Stg, baby, babyLevelOnePriceWithUSDT):
notice(
data,
baby,
babyLevelOnePriceWithUSDT,
lv1Stg.noticePrice,
lv1Stg.name,
static_data.NOTICE_LEVEL_LV1)
def AutoUpdateCurrentPrice():
global currentBABYPrice
global currentMILKPrice
while True:
try:
babyP, milkP = spider.getMarketPrice()
if babyP > 0 and milkP > 0:
currentBABYPrice = babyP
currentMILKPrice = milkP
time.sleep(300)
except Exception:
# logger.debug(apiErr)
time.sleep(300)
def AutoCheckMarket():
while True:
try:
check()
time.sleep(3)
except Exception as e:
# logger.debug(e)
time.sleep(3)
t = threading.Thread(target=AutoUpdateCurrentPrice)
t.start()
tAutoCheckoutMarket = threading.Thread(target=AutoCheckMarket)
tAutoCheckoutMarket.start()