Skip to content

Commit

Permalink
bug fixes
Browse files Browse the repository at this point in the history
- bump script version to 5.3
- rename `deviceCode` to `device_code`
- fix `session_id` in script responses being bytes
- fix api cdm close
  • Loading branch information
Puyodead1 committed Jul 24, 2024
1 parent 0c152ef commit 55a7be8
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 86 deletions.
12 changes: 6 additions & 6 deletions getwvkeys/download/getwvkeys.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(
auth: str,
verbose: bool = False,
force: bool = False,
deviceCode: str = "",
device_code: str = "",
_headers: dict[str, str] = headers,
**kwargs,
) -> None:
Expand All @@ -64,7 +64,7 @@ def __init__(
self.auth = auth
self.verbose = verbose
self.force = force
self.deviceCode = deviceCode
self.device_code = device_code

self.baseurl = "https://getwvkeys.cc" if API_URL == "__getwvkeys_api_url__" else API_URL
self.api_url = self.baseurl + "/pywidevine"
Expand All @@ -73,7 +73,7 @@ def __init__(
def generate_request(self):
if self.verbose:
print("[+] Generating License Request ")
data = {"pssh": self.pssh, "deviceCode": self.deviceCode, "force": self.force, "license_url": self.url}
data = {"pssh": self.pssh, "device_code": self.device_code, "force": self.force, "license_url": self.url}
header = {"X-API-Key": self.auth, "Content-Type": "application/json"}
r = requests.post(self.api_url, json=data, headers=header)
if not r.ok:
Expand Down Expand Up @@ -108,7 +108,7 @@ def decrypter(self, license_response):
"response": license_response,
"license_url": self.url,
"headers": self.headers,
"deviceCode": self.deviceCode,
"device_code": self.device_code,
"force": self.force,
"session_id": self.session_id,
}
Expand Down Expand Up @@ -187,7 +187,7 @@ def main(self):
default=False,
action="store_true",
)
parser.add_argument("--deviceCode", "-d", default="", help="Use custom device", required=False)
parser.add_argument("--device_code", "-d", default="", help="Use custom device", required=False)
parser.add_argument("--version", "-V", help="Print version and exit", action="store_true")

args = parser.parse_args()
Expand All @@ -211,7 +211,7 @@ def main(self):
if len(sys.argv) == 1:
parser.print_help()
print()
args.deviceCode = ""
args.device_code = ""
args.verbose = False

try:
Expand Down
118 changes: 60 additions & 58 deletions getwvkeys/libraries.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def __init__(
self,
gwvk: GetWVKeys,
user_id,
deviceCode: str,
device_code: str,
# TODO: we really shouldn't do this, but vinetrimmer doesn't send license urls without modifications
license_url="VINETRIMMER",
pssh=None,
Expand All @@ -207,7 +207,7 @@ def __init__(
self.gwvk = gwvk
self.license_url = license_url
self.headers = headers
self.device_code = deviceCode
self.device_code = device_code
self.force = force
self.time = int(time.time())
self.content_keys: list[CachedKey] = list()
Expand All @@ -219,7 +219,7 @@ def __init__(
if self.proxy and isinstance(self.proxy, str):
self.proxy = {"http": self.proxy, "https": self.proxy}
self.store_request = {}
self.session_id = session_id
self.session_id = bytes.fromhex(session_id) if session_id else None
self.disable_privacy = disable_privacy

try:
Expand Down Expand Up @@ -250,7 +250,7 @@ def _cache_keys(self, dv=False):
"license_url": self.license_url,
"added_at": self.time,
"keys": list(),
"session_id": self.session_id,
"session_id": self.session_id.hex(),
}
for key in self.content_keys:
# s = urlsplit(self.license_url)
Expand Down Expand Up @@ -285,44 +285,44 @@ def post_data(license_url: str, headers: dict[str, str] | None, data: bytes, pro
except ConnectionError as e:
raise BadRequest(f"Connection error: {e.args[0].reason}")

def external_license(self, method, params, web=False):
entry = next((entry for entry in config.EXTERNAL_API_DEVICES if entry["deviceCode"] == self.device_code), None)
if not entry:
raise BadRequest("Invalid device code")
api = entry["url"]
payload = {"method": method, "params": params, "token": entry["token"]}
r = requests.post(api, headers=self.headers, json=payload, proxies=self.proxy)
if r.status_code != 200:
if "message" in r.text:
raise Exception(f"Error: {r.json()['message']}")
raise Exception(f"Unknown Error: [{r.status_code}] {r.text}")
if method == "GetChallenge":
d = r.json()
if entry["version"] == 2:
challenge = d["message"]["challenge"]
self.session_id = d["message"]["session_id"]
else:
challenge = d["challenge"]
self.session_id = d["session_id"]
if not web:
return jsonify({"challenge": challenge, "session_id": self.session_id})
return challenge
elif method == "GetKeys":
d = r.json()
if entry["version"] == 2:
keys = d["message"]["keys"]
else:
keys = d["keys"]
for x in keys:
kid = x["kid"]
key = x["key"]
self.content_keys.append(
CachedKey(kid, self.time, self.user_id, self.license_url, "{}:{}".format(kid, key))
)
elif method == "GetKeysX":
raise NotImplemented()
else:
raise Exception("Unknown method")
# def external_license(self, method, params, web=False):
# entry = next((entry for entry in config.EXTERNAL_API_DEVICES if entry["device_code"] == self.device_code), None)
# if not entry:
# raise BadRequest("Invalid device code")
# api = entry["url"]
# payload = {"method": method, "params": params, "token": entry["token"]}
# r = requests.post(api, headers=self.headers, json=payload, proxies=self.proxy)
# if r.status_code != 200:
# if "message" in r.text:
# raise Exception(f"Error: {r.json()['message']}")
# raise Exception(f"Unknown Error: [{r.status_code}] {r.text}")
# if method == "GetChallenge":
# d = r.json()
# if entry["version"] == 2:
# challenge = d["message"]["challenge"]
# self.session_id = d["message"]["session_id"]
# else:
# challenge = d["challenge"]
# self.session_id = d["session_id"]
# if not web:
# return jsonify({"challenge": challenge, "session_id": self.session_id})
# return challenge
# elif method == "GetKeys":
# d = r.json()
# if entry["version"] == 2:
# keys = d["message"]["keys"]
# else:
# keys = d["keys"]
# for x in keys:
# kid = x["kid"]
# key = x["key"]
# self.content_keys.append(
# CachedKey(kid, self.time, self.user_id, self.license_url, "{}:{}".format(kid, key))
# )
# elif method == "GetKeysX":
# raise NotImplemented()
# else:
# raise Exception("Unknown method")

def main(self, curl=False):
# Search for cached keys first
Expand All @@ -343,7 +343,7 @@ def main(self, curl=False):
except (Exception,):
self.headers = self.yamldomagic(self.headers)

# if is_custom_device_key(self.deviceCode):
# if is_custom_device_key(self.device_code):
# if not self.server_certificate:
# try:
# self.server_certificate = self.post_data(
Expand Down Expand Up @@ -382,22 +382,22 @@ def main(self, curl=False):
cdm = sessions[(self.user_id, self.device_code)] = Cdm.from_device(device)

try:
session_id = cdm.open()
self.session_id = cdm.open()
except TooManySessions as e:
raise InternalServerError("Too many open sessions, please try again in a few minutes")

privacy_mode = False

if self.service_certificate:
cdm.set_service_certificate(session_id=session_id, certificate=self.service_certificate)
cdm.set_service_certificate(session_id=self.session_id, certificate=self.service_certificate)
privacy_mode = True
# elif not self.disable_privacy:
# cdm.set_service_certificate(session_id=session_id, certificate=common_privacy_cert)
# privacy_mode = True

try:
license_request = cdm.get_license_challenge(
session_id=session_id, pssh=self.pssh, license_type="STREAMING", privacy_mode=privacy_mode
session_id=self.session_id, pssh=self.pssh, license_type="STREAMING", privacy_mode=privacy_mode
)
except InvalidInitData as e:
logger.exception(e)
Expand All @@ -409,7 +409,7 @@ def main(self, curl=False):
license_response = self.post_data(self.license_url, self.headers, license_request, self.proxy)

try:
cdm.parse_license(session_id=session_id, license_message=license_response)
cdm.parse_license(session_id=self.session_id, license_message=license_response)
except InvalidLicenseMessage as e:
logger.exception(e)
raise BadRequest("Invalid license message")
Expand All @@ -421,7 +421,7 @@ def main(self, curl=False):
raise BadRequest("Signature mismatch")

try:
keys = cdm.get_keys(session_id=session_id, type_="CONTENT")
keys = cdm.get_keys(session_id=self.session_id, type_="CONTENT")
except ValueError as e:
logger.exception(e)
raise BadRequest("Failed to get keys")
Expand All @@ -432,7 +432,7 @@ def main(self, curl=False):
# caching
data = self._cache_keys()
# close the session
cdm.close(session_id=session_id)
cdm.close(session_id=self.session_id)
if curl:
return jsonify(data)
return render_template("success.html", page_title="Success", results=data)
Expand Down Expand Up @@ -479,22 +479,22 @@ def api(self):
cdm = sessions[(self.user_id, self.device_code)] = Cdm.from_device(device)

try:
session_id = cdm.open()
self.session_id = cdm.open()
except TooManySessions as e:
raise InternalServerError("Too many open sessions, please try again in a few minutes")

privacy_mode = False

if self.service_certificate:
cdm.set_service_certificate(session_id=session_id, certificate=self.service_certificate)
cdm.set_service_certificate(session_id=self.session_id, certificate=self.service_certificate)
privacy_mode = True
# elif not self.disable_privacy:
# cdm.set_service_certificate(session_id=session_id, certificate=common_privacy_cert)
# privacy_mode = True

try:
license_request = cdm.get_license_challenge(
session_id=session_id, pssh=self.pssh, license_type="STREAMING", privacy_mode=privacy_mode
session_id=self.session_id, pssh=self.pssh, license_type="STREAMING", privacy_mode=privacy_mode
)
except InvalidInitData as e:
logger.exception(e)
Expand All @@ -503,10 +503,12 @@ def api(self):
logger.exception(e)
raise BadRequest("Invalid license type")

return jsonify({"challenge": base64.b64encode(license_request).decode(), "session_id": self.session_id})
return jsonify(
{"challenge": base64.b64encode(license_request).decode(), "session_id": self.session_id.hex()}
)

if is_custom_device_key(self.device_code):
params = {"cdmkeyresponse": self.license_response, "session_id": self.session_id}
params = {"cdmkeyresponse": self.license_response, "session_id": self.session_id.hex()}
self.external_license("GetKeys", params=params)
output = self._cache_keys()
return jsonify(output)
Expand All @@ -519,7 +521,7 @@ def api(self):
raise BadRequest("Session not found, did you generate a challenge first?")

try:
cdm.parse_license(session_id=session_id, license_message=self.license_response)
cdm.parse_license(session_id=self.session_id, license_message=self.license_response)
except InvalidLicenseMessage as e:
logger.exception(e)
raise BadRequest("Invalid license message")
Expand All @@ -531,7 +533,7 @@ def api(self):
raise BadRequest("Signature mismatch")

try:
keys = cdm.get_keys(session_id=session_id, type_="CONTENT")
keys = cdm.get_keys(session_id=self.session_id, type_="CONTENT")
except ValueError as e:
logger.exception(e)
raise BadRequest("Failed to get keys")
Expand All @@ -542,12 +544,12 @@ def api(self):
# caching
output = self._cache_keys()
# close the session
cdm.close_session()
cdm.close(session_id=self.session_id)
return jsonify(output)

# def vinetrimmer(self, library: Library):
# if self.response is None:
# wvdecrypt = WvDecrypt(self.pssh, deviceconfig.DeviceConfig(library, self.deviceCode))
# wvdecrypt = WvDecrypt(self.pssh, deviceconfig.DeviceConfig(library, self.device_code))
# challenge = wvdecrypt.create_challenge()
# if len(sessions) > config.MAX_SESSIONS:
# # remove the oldest session
Expand Down
Loading

0 comments on commit 55a7be8

Please sign in to comment.