-
Notifications
You must be signed in to change notification settings - Fork 28
/
api.py
177 lines (146 loc) · 5.52 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import http.server
import threading
import requests
import json
import time
import sys
token = None
def setup():
resp = requests.post('https://github.com/login/device/code', headers={
'accept': 'application/json',
'editor-version': 'Neovim/0.6.1',
'editor-plugin-version': 'copilot.vim/1.16.0',
'content-type': 'application/json',
'user-agent': 'GithubCopilot/1.155.0',
'accept-encoding': 'gzip,deflate,br'
}, data='{"client_id":"Iv1.b507a08c87ecfe98","scope":"read:user"}')
# Parse the response json, isolating the device_code, user_code, and verification_uri
resp_json = resp.json()
device_code = resp_json.get('device_code')
user_code = resp_json.get('user_code')
verification_uri = resp_json.get('verification_uri')
# Print the user code and verification uri
print(f'Please visit {verification_uri} and enter code {user_code} to authenticate.')
while True:
time.sleep(5)
resp = requests.post('https://github.com/login/oauth/access_token', headers={
'accept': 'application/json',
'editor-version': 'Neovim/0.6.1',
'editor-plugin-version': 'copilot.vim/1.16.0',
'content-type': 'application/json',
'user-agent': 'GithubCopilot/1.155.0',
'accept-encoding': 'gzip,deflate,br'
}, data=f'{{"client_id":"Iv1.b507a08c87ecfe98","device_code":"{device_code}","grant_type":"urn:ietf:params:oauth:grant-type:device_code"}}')
# Parse the response json, isolating the access_token
resp_json = resp.json()
access_token = resp_json.get('access_token')
if access_token:
break
# Save the access token to a file
with open('.copilot_token', 'w') as f:
f.write(access_token)
print('Authentication success!')
def get_token():
global token
# Check if the .copilot_token file exists
while True:
try:
with open('.copilot_token', 'r') as f:
access_token = f.read()
break
except FileNotFoundError:
setup()
# Get a session with the access token
resp = requests.get('https://api.github.com/copilot_internal/v2/token', headers={
'authorization': f'token {access_token}',
'editor-version': 'Neovim/0.6.1',
'editor-plugin-version': 'copilot.vim/1.16.0',
'user-agent': 'GithubCopilot/1.155.0'
})
# Parse the response json, isolating the token
resp_json = resp.json()
token = resp_json.get('token')
def token_thread():
global token
while True:
get_token()
time.sleep(25 * 60)
def copilot(prompt, language='python'):
global token
# If the token is None, get a new one
if token is None or is_token_invalid(token):
get_token()
try:
resp = requests.post('https://copilot-proxy.githubusercontent.com/v1/engines/copilot-codex/completions', headers={'authorization': f'Bearer {token}'}, json={
'prompt': prompt,
'suffix': '',
'max_tokens': 1000,
'temperature': 0,
'top_p': 1,
'n': 1,
'stop': ['\n'],
'nwo': 'github/copilot.vim',
'stream': True,
'extra': {
'language': language
}
})
except requests.exceptions.ConnectionError:
return ''
result = ''
# Parse the response text, splitting it by newlines
resp_text = resp.text.split('\n')
for line in resp_text:
# If the line contains a completion, print it
if line.startswith('data: {'):
# Parse the completion from the line as json
json_completion = json.loads(line[6:])
completion = json_completion.get('choices')[0].get('text')
if completion:
result += completion
else:
result += '\n'
return result
# Check if the token is invalid through the exp field
def is_token_invalid(token):
if token is None or 'exp' not in token or extract_exp_value(token) <= time.time():
return True
return False
def extract_exp_value(token):
pairs = token.split(';')
for pair in pairs:
key, value = pair.split('=')
if key.strip() == 'exp':
return int(value.strip())
return None
class HTTPRequestHandler(http.server.BaseHTTPRequestHandler):
def do_POST(self):
# Get the request body
content_length = int(self.headers['Content-Length'])
body = self.rfile.read(content_length)
# Parse the request body as json
body_json = json.loads(body)
# Get the prompt from the request body
prompt = body_json.get('prompt')
language = body_json.get('language', 'python')
# Get the completion from the copilot function
completion = copilot(prompt, language)
# Send the completion as the response
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(completion.encode())
def main():
# Every 25 minutes, get a new token
threading.Thread(target=token_thread).start()
# Get the port to listen on from the command line
if len(sys.argv) < 2:
port = 8080
else:
port = int(sys.argv[1])
# Start the http server
httpd = http.server.HTTPServer(('0.0.0.0', port), HTTPRequestHandler)
print(f'Listening on port 0.0.0.0:{port}...')
httpd.serve_forever()
if __name__ == '__main__':
main()