-
Notifications
You must be signed in to change notification settings - Fork 8
/
get_fyers_access_token.py
101 lines (75 loc) · 3.67 KB
/
get_fyers_access_token.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import base64
import hmac
import os
import struct
import time
from urllib.parse import urlparse, parse_qs
import requests
from fyers_apiv3 import fyersModel
totp_key = "******" # totp_key (ex., "OMKRABCDCDVDFGECLWXK6OVB7T4DTKU5")
username = "******" # Fyers Client ID (ex., "TK01248")
pin = 1246 # four-digit PIN
client_id = "******" # App ID of the created app (ex., "L9NY305RTW-100")
secret_key = "******" # Secret ID of the created app
redirect_uri = "******" # Redircet URL you entered while creating the app (ex., "https://trade.fyers.in/api-login/redirect-uri/index.html")
def read_file():
try:
with open("fyers_token.txt", "r") as f:
token = f.read().strip()
return token
except FileNotFoundError:
return None
def write_file(token):
with open("fyers_token.txt", "w") as f:
f.write(token)
def totp(key, time_step=30, digits=6, digest="sha1"):
key = base64.b32decode(key.upper() + "=" * ((8 - len(key)) % 8))
counter = struct.pack(">Q", int(time.time() / time_step))
mac = hmac.new(key, counter, digest).digest()
offset = mac[-1] & 0x0F
binary = struct.unpack(">L", mac[offset : offset + 4])[0] & 0x7FFFFFFF
return str(binary)[-digits:].zfill(digits)
def get_token():
headers = {
"Accept": "application/json",
"Accept-Language": "en-US,en;q=0.9",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
}
s = requests.Session()
s.headers.update(headers)
data1 = f'{{"fy_id":"{base64.b64encode(f"{username}".encode()).decode()}","app_id":"2"}}'
r1 = s.post("https://api-t2.fyers.in/vagator/v2/send_login_otp_v2", data=data1)
request_key = r1.json()["request_key"]
data2 = f'{{"request_key":"{request_key}","otp":{totp(totp_key)}}}'
r2 = s.post("https://api-t2.fyers.in/vagator/v2/verify_otp", data=data2)
assert r2.status_code == 200, f"Error in r2:\n {r2.text}"
request_key = r2.json()["request_key"]
data3 = f'{{"request_key":"{request_key}","identity_type":"pin","identifier":"{base64.b64encode(f"{pin}".encode()).decode()}"}}'
r3 = s.post("https://api-t2.fyers.in/vagator/v2/verify_pin_v2", data=data3)
assert r3.status_code == 200, f"Error in r3:\n {r3.json()}"
headers = {"authorization": f"Bearer {r3.json()['data']['access_token']}", "content-type": "application/json; charset=UTF-8"}
data4 = f'{{"fyers_id":"{username}","app_id":"{client_id[:-4]}","redirect_uri":"{redirect_uri}","appType":"100","code_challenge":"","state":"abcdefg","scope":"","nonce":"","response_type":"code","create_cookie":true}}'
r4 = s.post("https://api.fyers.in/api/v2/token", headers=headers, data=data4)
assert r4.status_code == 308, f"Error in r4:\n {r4.json()}"
parsed = urlparse(r4.json()["Url"])
auth_code = parse_qs(parsed.query)["auth_code"][0]
session = fyersModel.SessionModel(client_id=client_id, secret_key=secret_key, redirect_uri=redirect_uri, response_type="code", grant_type="authorization_code")
session.set_token(auth_code)
response = session.generate_token()
return response["access_token"]
def get_profile(token):
fyers = fyersModel.FyersModel(client_id=client_id, token=token, log_path=os.getcwd())
return fyers.get_profile()
def main():
token = read_file()
if token is None:
token = get_token()
resp = get_profile(token)
if "error" in resp["s"] or "error" in resp["message"] or "expired" in resp["message"]:
token = get_token()
resp = get_profile(token)
write_file(token)
print("Fyers access token is saved in `fyers_token.txt` file.")
print(resp)
if __name__ == "__main__":
main()