diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b97de95 --- /dev/null +++ b/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SOURCEDIR = docs +BUILDDIR = build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/README.md b/README.md index aace01b..67f4208 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ # osrparse, a python parser for osu! replays -This is a parser for osu! replay files (.osr) as described by . +This is a parser for the ``.osr`` format for osu! replay files, as described by [the wiki](https://osu.ppy.sh/wiki/en/Client/File_formats/Osr_%28file_format%29). ## Installation @@ -15,140 +15,47 @@ pip install osrparse ## Documentation -### Parsing +Please see the full documentation for a comprehensive guide: . A quickstart follows below for the impatient, but you should read the full documentation if you are at all confused. -To parse a replay from a filepath: +### Quickstart ```python -from osrparse import parse_replay_file +from osrparse import Replay, parse_replay_data +# parse from a path +replay = Replay.from_path("path/to/osr.osr") -# returns a Replay object -replay = parse_replay_file("path/to/osr.osr") -``` - -To parse a replay from an lzma string (such as the one returned from the `/get_replay` osu! api endpoint): - -```python -from osrparse import parse_replay - -# returns a Replay object that only has a `play_data` attribute -replay = parse_replay(lzma_string, pure_lzma=True) -``` +# or from an opened file object +with open("path/to/osr.osr") as f: + replay = Replay.from_file(f) -Note that if you use the `/get_replay` endpoint to retrieve a replay, you must decode the response before passing it to osrparse, as the response is encoded in base 64 by default. +# or from a string +with open("path/to/osr.osr") as f: + replay_string = f.read() +replay = Replay.from_string(replay_string) -### Dumping +# a replay has various attributes +r = replay +print(r.mode, r.game_version, r.beatmap_hash, r.username, + r.r_hash, r.count_300, r.count_100, r.count_50, r.count_geki, + r.count_miss, r.score, r.max_combo, r.perfect, r.mods, + r.life_bar_graph, r.timestamp, r.r_data, r.r_id, r.rng_seed) -Existing `Replay` objects can be "dumped" back to a `.osr` file: +# parse the replay data from api v1's /get_replay endpoint +lzma_string = retrieve_from_api() +replay_data = parse_replay_data(lzma_string) +# replay_data is a list of ReplayEvents -```python +# write a replay back to a path +replay.write_path("path/to/osr.osr") -replay.dump("path/to/osr.osr") # or to an opened file object with open("path/to/osr.osr") as f: - replay.dump(f) -``` - -You can also edit osr files by parsing a replay, editing an attribute, and dumping it back to its file: - -```python -replay = parse_replay_file("path/to/osr.osr") -replay.player_name = "fake username" -replay.dump(""path/to/osr.osr") -``` - -### Attributes - -`Replay` objects have the following attibutes: - -```python -self.game_mode # GameMode enum -self.game_version # int -self.beatmap_hash # str -self.player_name # str -self.replay_hash # str -self.number_300s # int -self.number_100s # int -self.number_50s # int -self.gekis # int -self.katus # int -self.misses # int -self.score # int -self.max_combo # int -self.is_perfect_combo # bool -self.mod_combination # Mod enum -self.life_bar_graph # str, currently unparsed -self.timestamp # datetime.datetime object -# list of either ReplayEventOsu, ReplayEventTaiko, ReplayEventCatch, -# or ReplayEventMania objects, depending on self.game_mode -self.play_data -``` - -`ReplayEventOsu` objects have the following attributes: - -```python -self.time_delta # int, time since previous event in milliseconds -self.x # float, x axis location -self.y # float, y axis location -self.keys # Key enum, keys pressed -``` - -`ReplayEventTaiko` objects have the following attributes: - -```python -self.time_delta # int, time since previous event in milliseconds -self.x # float, x axis location -self.keys # KeyTaiko enum, keys pressed -``` + replay.write_file(f) -`ReplayEventCatch` objects have the following attributes: +# or to a string +packed = replay.pack() -```python -self.time_delta # int, time since previous event in milliseconds -self.x # float, x axis location -self.dashing # bool, whether the player was dashing or not -``` - -`ReplayEventMania` objects have the following attributes: - -```python -self.time_delta # int, time since previous event in milliseconds -self.keys # KeyMania enum -``` - -The `Key` enums used in the above `ReplayEvent`s are defined as follows: - -```python -class Key(IntFlag): - M1 = 1 << 0 - M2 = 1 << 1 - K1 = 1 << 2 - K2 = 1 << 3 - SMOKE = 1 << 4 - -class KeyTaiko(IntFlag): - LEFT_DON = 1 << 0 - LEFT_KAT = 1 << 1 - RIGHT_DON = 1 << 2 - RIGHT_KAT = 1 << 3 - -class KeyMania(IntFlag): - K1 = 1 << 0 - K2 = 1 << 1 - K3 = 1 << 2 - K4 = 1 << 3 - K5 = 1 << 4 - K6 = 1 << 5 - K7 = 1 << 6 - K8 = 1 << 7 - K9 = 1 << 8 - K10 = 1 << 9 - K11 = 1 << 10 - K12 = 1 << 11 - K13 = 1 << 12 - K14 = 1 << 13 - K15 = 1 << 14 - K16 = 1 << 15 - K17 = 1 << 16 - K18 = 1 << 17 +# edited attributes are saved +replay.username = "fake username" +replay.write_path("path/to/new_osr.osr") ``` diff --git a/docs/appendix.rst b/docs/appendix.rst new file mode 100644 index 0000000..3ba71f2 --- /dev/null +++ b/docs/appendix.rst @@ -0,0 +1,12 @@ +Appendix +======== + +Replay +------ +.. automodule:: osrparse.replay + :members: + +Utils +----- +.. automodule:: osrparse.utils + :members: diff --git a/docs/conf.py b/docs/conf.py new file mode 100644 index 0000000..54d8fbd --- /dev/null +++ b/docs/conf.py @@ -0,0 +1,66 @@ +# Configuration file for the Sphinx documentation builder. +# +# This file only contains a selection of the most common options. For a full +# list see the documentation: +# https://www.sphinx-doc.org/en/master/usage/configuration.html + +from osrparse import __version__ + +project = "osrparse" +copyright = "2022, Kevin Lim, Liam DeVoe" +author = "Kevin Lim, Liam DeVoe" +release = "v" + __version__ +version = "v" + __version__ +master_doc = 'index' + +# https://www.sphinx-doc.org/en/master/usage/configuration.html#confval-html_show_copyright +html_show_copyright = False +# https://www.sphinx-doc.org/en/master/usage/configuration.html#confval-html_show_sphinx +html_show_sphinx = False + +extensions = [ + "sphinx.ext.autodoc", + "sphinx.ext.napoleon", + "sphinx.ext.intersphinx", + "sphinx.ext.viewcode", + "sphinx.ext.todo" +] + +intersphinx_mapping = {"python": ("https://docs.python.org/3", None)} +# https://stackoverflow.com/a/37210251 +autodoc_member_order = "bysource" + +html_theme = "furo" + +# Add any paths that contain templates here, relative to this directory. +templates_path = ["_templates"] + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +# This pattern also affects html_static_path and html_extra_path. +exclude_patterns = [] + + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +# html_static_path = ["_static"] + +# references that we want to use easily in any file +rst_prolog = """ +.. |Replay| replace:: :class:`~osrparse.replay.Replay` +.. |from_path| replace:: :func:`Replay.from_path() ` +.. |from_file| replace:: :func:`Replay.from_file() ` +.. |from_string| replace:: :func:`Replay.from_string() ` +.. |write_path| replace:: :func:`Replay.write_path() ` +.. |write_file| replace:: :func:`Replay.write_file() ` +.. |pack| replace:: :func:`Replay.pack() ` +.. |parse_replay_data| replace:: :func:`parse_replay_data() ` + +.. |br| raw:: html + +
+""" + +# linebreak workaround documented here +# https://stackoverflow.com/a/9664844/12164878 diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..74b4c1f --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,50 @@ +osrparse +========== + +osrparse is a parser for the ``.osr`` format, as described `on the osu! wiki `__. + +osrparse is maintained by: + +* `tybug `__ +* `kszlim `__ + +Installation +------------ + +osrparse can be installed from pip: + +.. code-block:: console + + $ pip install osrparse + +Links +----- + +| Github: https://github.com/kszlim/osu-replay-parser +| Documentation: https://kevin-lim.ca/osu-replay-parser/ + + +.. + couple notes about these toctrees - the first toctree is so our sidebar has + a link back to the index page. the ``self`` keyword comes with its share of + issues (https://github.com/sphinx-doc/sphinx/issues/2103), but none that matter + that much to us. It's better than using ``index`` which works but generates + many warnings when building. + + Hidden toctrees appear on the sidebar but not as text on the table of contents + displayed on this page. + +Contents +-------- + +.. toctree:: + :hidden: + + self + +.. toctree:: + :maxdepth: 2 + + parsing-replays + writing-replays + appendix diff --git a/docs/parsing-replays.rst b/docs/parsing-replays.rst new file mode 100644 index 0000000..e2aa3c0 --- /dev/null +++ b/docs/parsing-replays.rst @@ -0,0 +1,52 @@ +Parsing Replays +=============== + +Creating a Replay +----------------- + +Depending on the type of data you have, a |Replay| can be created multiple ways, using either one of |from_path|, |from_file|, or |from_string|: + +.. code-block:: python + + from osrparse import Replay + # from a path + replay = Replay.from_path("path/to/osr.osr") + + # or from an opened file object + with open("path/to/osr.osr") as f: + replay = Replay.from_file(f) + + # or from a string + with open("path/to/osr.osr") as f: + replay_string = f.read() + replay = Replay.from_string(replay_string) + +Most likely, you will be using |from_path| to create a |Replay|. + +Parsing Just Replay Data +------------------------ + +Unfortunately, the `/get_replay `__ endpoint of `osu!api v1 `__ does not return the full contents of a replay, but only the replay data potion. This means that you cannot create a full replay from the response of this endpoint. + +For this, we provide |parse_replay_data|, a function that takes the response of this endpoint and returns List[:class:`~osrparse.utils.ReplayEvent`] (ie, the parsed replay data): + +.. code-block:: python + + from osrparse import parse_replay_data + import base64 + import lzma + + lzma_string = retrieve_from_api() + replay_data = parse_replay_data(lzma_string) + assert isinstance(replay_data[0], ReplayEvent) + + # or parse an already decoded lzma string + lzma_string = retrieve_from_api() + lzma_string = base64.b64decode(lzma_string) + replay_data = parse_replay_data(lzma_string, decoded=True) + + # or parse an already decoded and decompressed lzma string + lzma_string = retrieve_from_api() + lzma_string = base64.b64decode(lzma_string) + lzma_string = lzma.decompress(lzma_string).decode("ascii") + replay_data = parse_replay_data(lzma_string, decompressed=True) diff --git a/docs/writing-replays.rst b/docs/writing-replays.rst new file mode 100644 index 0000000..d160160 --- /dev/null +++ b/docs/writing-replays.rst @@ -0,0 +1,34 @@ +Writing Replays +=============== + +Writing a Replay +---------------- + +Just as replays can be parsed from a path, file, or string, they can also be written back to a path, file, or string, with |write_path|, |write_file|, and |pack| respectively: + + +.. code-block:: python + + replay.write_path("path/to/new_osr.osr") + + # or to an opened file object + with open("path/to/new_osr.osr") as f: + replay.write_file(f) + + # or to a string + packed = replay.pack() + +Editing a Replay +---------------- + +The writing facilities of osrparse can be used to parse a replay, edit some or all of its attributes, and write it back to its file. The result is an edited replay. + +For instance, to change the username of a replay: + +.. code-block:: python + + from osrparse import Replay + + replay = Replay.from_path("path/to/osr.osr") + replay.username = "fake username" + replay.write_path("path/to/osr.osr") diff --git a/osrparse/__init__.py b/osrparse/__init__.py index 7109dfa..6881b95 100644 --- a/osrparse/__init__.py +++ b/osrparse/__init__.py @@ -1,12 +1,10 @@ from osrparse.utils import (GameMode, Mod, Key, ReplayEvent, ReplayEventOsu, ReplayEventTaiko, ReplayEventMania, ReplayEventCatch, KeyTaiko, KeyMania) -from osrparse.parse import parse_replay_file, parse_replay -from osrparse.replay import Replay +from osrparse.replay import Replay, parse_replay_data -__version__ = "5.0.0" +__version__ = "6.0.0" -__all__ = ["GameMode", "Mod", "parse_replay_file", "parse_replay", - "Replay", "ReplayEvent", "Key", +__all__ = ["GameMode", "Mod", "Replay", "ReplayEvent", "Key", "ReplayEventOsu", "ReplayEventTaiko", "ReplayEventMania", - "ReplayEventCatch", "KeyTaiko", "KeyMania"] + "ReplayEventCatch", "KeyTaiko", "KeyMania", "parse_replay_data"] diff --git a/osrparse/dump.py b/osrparse/dump.py deleted file mode 100644 index 4d9b9c3..0000000 --- a/osrparse/dump.py +++ /dev/null @@ -1,94 +0,0 @@ -import lzma -import struct - -from osrparse.utils import (ReplayEventOsu, ReplayEventTaiko, ReplayEventCatch, - ReplayEventMania) - -def pack_byte(data: int): - return struct.pack("> 7 - - if (i == 0 and byte & 0x40 == 0) or (i == -1 and byte & 0x40 != 0): - r.append(byte) - return b"".join(map(pack_byte, r)) - - r.append(0x80 | byte) - -def pack_string(data: str): - if data: - return pack_byte(11) + pack_ULEB128(data) + data.encode("utf-8") - return pack_byte(11) + pack_byte(0) - -def dump_timestamp(replay): - # windows ticks starts at year 0001, in contrast to unix time (1970). - # 62135596800 is the number of seconds between these two years and is added - # to account for this difference. - # The factor of 10000000 converts seconds to ticks. - ticks = (62135596800 + replay.timestamp.timestamp()) * 10000000 - ticks = int(ticks) - return pack_long(ticks) - - -def dump_replay_data(replay): - replay_data = "" - for event in replay.play_data: - if isinstance(event, ReplayEventOsu): - replay_data += f"{event.time_delta}|{event.x}|{event.y}|{event.keys.value}," - elif isinstance(event, ReplayEventTaiko): - replay_data += f"{event.time_delta}|{event.x}|0|{event.keys.value}," - elif isinstance(event, ReplayEventCatch): - replay_data += f"{event.time_delta}|{event.x}|0|{int(event.dashing)}," - elif isinstance(event, ReplayEventMania): - replay_data += f"{event.time_delta}|{event.keys}|0|0," - - filters = [{"id": lzma.FILTER_LZMA1, "dict_size": 1 << 21, "mode": lzma.MODE_FAST}] - compressed = lzma.compress(replay_data.encode("ascii"), format=lzma.FORMAT_ALONE, filters=filters) - - return pack_int(len(compressed)) + compressed - - -def dump_replay(replay): - data = b"" - - data += pack_byte(replay.game_mode.value) - data += pack_int(replay.game_version) - data += pack_string(replay.beatmap_hash) - - data += pack_string(replay.player_name) - data += pack_string(replay.replay_hash) - - data += pack_short(replay.number_300s) - data += pack_short(replay.number_100s) - data += pack_short(replay.number_50s) - data += pack_short(replay.gekis) - data += pack_short(replay.katus) - data += pack_short(replay.misses) - - data += pack_int(replay.score) - data += pack_short(replay.max_combo) - data += pack_byte(replay.is_perfect_combo) - - data += pack_int(replay.mod_combination.value) - data += pack_string(replay.life_bar_graph) - data += dump_timestamp(replay) - - data += dump_replay_data(replay) - data += pack_long(replay.replay_id) - - return data diff --git a/osrparse/parse.py b/osrparse/parse.py deleted file mode 100644 index 7285a32..0000000 --- a/osrparse/parse.py +++ /dev/null @@ -1,51 +0,0 @@ -import os -from typing import Union - -from osrparse.replay import Replay - -def parse_replay(replay_data: str, pure_lzma: bool = False, decompressed_lzma: bool = False) -> Replay: - """ - Parses a Replay from the given replay data. - - Args: - String replay_data: The replay data from either parsing an osr file or from the api get_replay endpoint. - Boolean pure_lzma: Whether replay_data conatins the entirety of an osr file, or only the lzma compressed - data containing the cursor movements and keyboard presses of the player. - If replay data was loaded from an osr, this value should be False, as an osr contains - more information than just the lzma, such as username and game version (see - https://osu.ppy.sh/help/wiki/osu!_File_Formats/Osr_(file_format)). If replay data - was retrieved from the api, this value should be True, as the api only - returns the lzma data (see https://github.com/ppy/osu-api/wiki#apiget_replay) - Boolean decompressed_lzma: Whether replay_data is compressed lzma, or decompressed - (and decoded to ascii) lzma. For example, the following calls are equivalent: - ``` - >>> osrparse.parse_replay(lzma_string, pure_lzma=True) - ``` - and - ``` - >>> lzma_string = lzma.decompress(lzma_string).decode("ascii") - >>> osrparse.parse_replay(lzma_string, pure_lzma=True, decompressed_lzma=True) - ``` - This parameter only has an affect if ``pure_lzma`` is ``True``. - Returns: - A Replay object with the fields specific in the Replay's init method. If pure_lzma is False, all fields will - be filled (nonnull). If pure_lzma is True, only the play_data will be filled. - """ - - return Replay(replay_data, pure_lzma, decompressed_lzma) - -def parse_replay_file(replay_path: Union[os.PathLike, str], pure_lzma: bool = False) -> Replay: - """ - Parses a Replay from the file at the given path. - - Args: - [String or Path]: A pathlike object representing the absolute path to the file to parse data from. - Boolean pure_lzma: False if the file contains data equivalent to an osr file (or is itself an osr file), - and True if the file contains only lzma data. See parse_replay documentation for - more information on the difference between these two and how each affect the - fields in the final Replay object. - """ - - with open(replay_path, 'rb') as f: - data = f.read() - return parse_replay(data, pure_lzma) diff --git a/osrparse/replay.py b/osrparse/replay.py index 4940e36..5852d13 100644 --- a/osrparse/replay.py +++ b/osrparse/replay.py @@ -1,79 +1,25 @@ import lzma import struct from datetime import datetime, timezone, timedelta -from typing import List -from io import TextIOWrapper +from typing import List, Optional +import base64 +from dataclasses import dataclass from osrparse.utils import (Mod, GameMode, ReplayEvent, ReplayEventOsu, - ReplayEventCatch, ReplayEventMania, ReplayEventTaiko) -from osrparse.dump import dump_replay + ReplayEventCatch, ReplayEventMania, ReplayEventTaiko, Key, KeyMania, + KeyTaiko, LifeBarState) -class Replay: - # first version with rng seed value added as the last frame in the lzma data - LAST_FRAME_SEED_VERSION = 20130319 - _BYTE = 1 - _SHORT = 2 - _INT = 4 - _LONG = 8 - - def __init__(self, replay_data: List[ReplayEvent], pure_lzma: bool, decompressed_lzma: bool): + +class _Unpacker: + """ + Helper class for dealing with the ``.osr`` format. Not intended to be used + by consumers. + """ + def __init__(self, replay_data): + self.replay_data = replay_data self.offset = 0 - self.game_mode = None - self.game_version = None - self.beatmap_hash = None - self.player_name = None - self.replay_hash = None - self.number_300s = None - self.number_100s = None - self.number_50s = None - self.gekis = None - self.katus = None - self.misses = None - self.score = None - self.max_combo = None - self.is_perfect_combo = None - self.mod_combination = None - self.life_bar_graph = None - self.timestamp = None - self.play_data = None - self.replay_id = None - self.replay_length = None - self._parse_replay_and_initialize_fields(replay_data, pure_lzma, decompressed_lzma) - - def _parse_replay_and_initialize_fields(self, replay_data, pure_lzma, decompressed_lzma): - if pure_lzma: - self.data_from_lmza(replay_data, decompressed_lzma) - return - self._parse_game_mode_and_version(replay_data) - self._parse_beatmap_hash(replay_data) - self._parse_player_name(replay_data) - self._parse_replay_hash(replay_data) - self._parse_score_stats(replay_data) - self._parse_life_bar_graph(replay_data) - self._parse_timestamp_and_replay_length(replay_data) - self._parse_play_data(replay_data) - self._parse_replay_id(replay_data) - - def _parse_game_mode_and_version(self, replay_data): - format_specifier = "= self.LAST_FRAME_SEED_VERSION and self.play_data: - if self.play_data[-1].time_delta != -12345: - pass - # I've disabled this warning temporarily as it turns out that - # many replays (perhaps all replays in non-std gamemodes?) don't - # have an RNG seed value even after the expected version, so - # this was more of an annoying false positive than anything. - - # print("The RNG seed value was expected in the last frame, but was not found. " - # f"\nGame Version: {self.game_version}, version threshold: " - # f"{self.LAST_FRAME_SEED_VERSION}, replay hash: {self.replay_hash}") - else: - del self.play_data[-1] - - def data_from_lmza(self, lzma_string, decompressed_lzma): - if decompressed_lzma: - # replay data is already decompressed and decoded. - # Remove last character (comma) so splitting works, same below - datastring = lzma_string[:-1] - else: - datastring = lzma.decompress(lzma_string, format=lzma.FORMAT_AUTO).decode('ascii')[:-1] - events = [eventstring.split('|') for eventstring in datastring.split(',')] - self.play_data = [ReplayEventOsu(int(event[0]), float(event[1]), float(event[2]), int(event[3])) for event in events] + @staticmethod + def parse_replay_data(replay_data_str, mode): + # remove trailing comma to make splitting easier + replay_data_str = replay_data_str[:-1] + events = [event.split('|') for event in replay_data_str.split(',')] - if self.play_data[-1].time_delta == -12345: - del self.play_data[-1] + rng_seed = None + play_data = [] + for event in events: + time_delta = int(event[0]) + x = event[1] + y = event[2] + keys = int(event[3]) - def _parse_replay_id(self, replay_data): - format_specifier = "> 7 + + if (i == 0 and byte & 0x40 == 0) or (i == -1 and byte & 0x40 != 0): + r.append(byte) + return b"".join(map(self.pack_byte, r)) + + r.append(0x80 | byte) + + def pack_string(self, data): + if data: + return (self.pack_byte(11) + self.pack_ULEB128(data) + + data.encode("utf-8")) + return self.pack_byte(11) + self.pack_byte(0) + + def pack_timestamp(self): + # windows ticks starts at year 0001, in contrast to unix time (1970). + # 62135596800 is the number of seconds between these two years and is + # added to account for this difference. + # The factor of 10000000 converts seconds to ticks. + + ticks = (62135596800 + self.replay.timestamp.timestamp()) * 10000000 + ticks = int(ticks) + return self.pack_long(ticks) + + def pack_life_bar(self): + data = "" + if self.replay.life_bar_graph is None: + return self.pack_string(data) + + for state in self.replay.life_bar_graph: + life = state.life + # store 0 or 1 instead of 0.0 or 1.0 + if int(life) == life: + life = int(state.life) + + data += f"{state.time}|{life}," + + return self.pack_string(data) + + def pack_replay_data(self): + data = "" + for event in self.replay.replay_data: + t = event.time_delta + if isinstance(event, ReplayEventOsu): + data += f"{t}|{event.x}|{event.y}|{event.keys.value}," + elif isinstance(event, ReplayEventTaiko): + data += f"{t}|{event.x}|0|{event.keys.value}," + elif isinstance(event, ReplayEventCatch): + data += f"{t}|{event.x}|0|{int(event.dashing)}," + elif isinstance(event, ReplayEventMania): + data += f"{t}|{event.keys.value}|0|0," + + if self.replay.rng_seed: + data += f"-12345|0|0|{self.replay.rng_seed}," + + filters = [ + { + "id": lzma.FILTER_LZMA1, + "dict_size": self.dict_size, + "mode": self.mode + } + ] + + data = data.encode("ascii") + compressed = lzma.compress(data, format=lzma.FORMAT_ALONE, + filters=filters) + + return self.pack_int(len(compressed)) + compressed + + def pack(self): + r = self.replay + data = b"" + + data += self.pack_byte(r.mode.value) + data += self.pack_int(r.game_version) + data += self.pack_string(r.beatmap_hash) + data += self.pack_string(r.username) + data += self.pack_string(r.replay_hash) + data += self.pack_short(r.count_300) + data += self.pack_short(r.count_100) + data += self.pack_short(r.count_50) + data += self.pack_short(r.count_geki) + data += self.pack_short(r.count_katu) + data += self.pack_short(r.count_miss) + data += self.pack_int(r.score) + data += self.pack_short(r.max_combo) + data += self.pack_byte(r.perfect) + data += self.pack_int(r.mods.value) + data += self.pack_life_bar() + data += self.pack_timestamp() + data += self.pack_replay_data() + data += self.pack_long(r.replay_id) + + return data + + +@dataclass +class Replay: + """ + A replay found in a ``.osr`` file, or following the osr format. To create a + replay, you likely want ``Replay.from_path``, ``Replay.from_file``, or + ``Replay.from_string``. + + Attributes + ---------- + mode: GameMode + The game mode this replay was played on. + game_version: int + The game version this replay was played on. + beatmap_hash: str + The hash of the beatmap this replay was played on. + username: str + The user that played this replay. + replay_hash: + The hash of this replay. + count_300: int + The number of 300 judgments in this replay. + count_100: int + The number of 100 judgments in this replay. + count_50: int + The number of 50 judgments in this replay. + count_geki: int + The number of geki judgments in this replay. + count_katu: int + The number of katu judgments in this replay. + count_miss: int + The number of misses in this replay. + score: int + The score of this replay. + max_combo: int + The maximum combo attained in this replay. + perfect: bool + Whether this replay was perfect or not. + mods: Mod + The mods this replay was played with. + life_bar_graph: Optional[List[LifeBarState]] + The life bar of this replay over time. + replay_data: List[ReplayEvent] + The replay data of the replay, including cursor position and keys + pressed. + replay_id: int + The replay id of this replay, or 0 if not submitted. + rng_seed: Optional[int] + The rng seed of this replay, or ``None`` if not present (typically not + present on older replays). + """ + mode: GameMode + game_version: int + beatmap_hash: str + username: str + replay_hash: str + count_300: int + count_100: int + count_50: int + count_geki: int + count_katu: int + count_miss: int + score: int + max_combo: int + perfect: bool + mods: Mod + life_bar_graph: Optional[List[LifeBarState]] + timestamp: datetime + replay_data: List[ReplayEvent] + replay_id: int + rng_seed: Optional[int] + + @staticmethod + def from_path(path): + """ + Creates a new ``Replay`` object from the ``.osr`` file at the given + ``path``. + + Parameters + ---------- + path: str or os.PathLike + The path to the osr file to read from. + + Returns + ------- + Replay + The parsed replay object. + """ + with open(path, "rb") as f: + return Replay.from_file(f) + + @staticmethod + def from_file(file): + """ + Creates a new ``Replay`` object from an open file object. + + Parameters + ---------- + file: file-like + The file object to read from. + + Returns + ------- + Replay + The parsed replay object. + """ + data = file.read() + return Replay.from_string(data) + + @staticmethod + def from_string(data): + """ + Creates a new ``Replay`` object from a string containing ``.osr`` data. + + Parameters + ---------- + data: str + The data to parse. + + Returns + ------- + Replay + The parsed replay object. + """ + return _Unpacker(data).unpack() + + def write_path(self, path, *, dict_size=None, mode=None): + """ + Writes the replay to the given ``path``. + + Parameters + ---------- + path: str or os.PathLike + The path to where to write the replay. + + Notes + ----- + This uses the current values of any attributes, and so can be used to + create an edited version of a replay, by first reading a replay, editing + an attribute, then writing the replay back to its file. + """ + with open(path, "wb") as f: + self.write_file(f, dict_size=dict_size, mode=mode) + + def write_file(self, file, *, dict_size=None, mode=None): + """ + Writes the replay to an open file object. + + Parameters + ---------- + file: file-like + The file object to write to. + """ + packed = self.pack(dict_size=dict_size, mode=mode) + file.write(packed) + + def pack(self, *, dict_size=None, mode=None): + """ + Returns the text representing this ``Replay``, in ``.osr`` format. + The text returned by this method is suitable for writing to a file as a + valid ``.osr`` file. + + Returns + ------- + str + The text representing this ``Replay``, in ``.osr`` format. + """ + return _Packer(self, dict_size=dict_size, mode=mode).pack() + + +def parse_replay_data(data_string, *, decoded=False, decompressed=False, + mode=GameMode.STD) -> List[ReplayEvent]: + """ + Parses the replay data portion of a replay from a string. This method is + siutable for use with the replay data returned by api v1's ``/get_replay`` + endpoint, for instance. + + Parameters + ---------- + data_string: str or bytes + The replay data to parse. + decoded: bool + Whether ``data_string`` has already been decoded from a b64 + representation. Api v1 returns a base 64 encoded string, for instance. + decompressed: bool + Whether ``data_string`` has already been both decompressed from lzma, + and decoded to ascii. + |br| + For instance, the following two calls are equivalent: + ``` + >>> parse_replay_data(lzma_string, decoded=True) + >>> ... + >>> lzma_string = lzma.decompress(lzma_string).decode("ascii") + >>> parse_replay_data(lzma_string, decompressed=True) + ``` + |br| + If ``decompressed`` is ``True``, ``decoded`` is automatically set to + ``True`` as well (ie, if ``decompressed`` is ``True``, we will assume + ``data_string`` is not base 64 encoded). + mode: GameMode + What mode to parse the replay data as. + """ + # assume the data is already decoded if it's been decompressed + if not decoded and not decompressed: + data_string = base64.b64decode(data_string) + if not decompressed: + data_string = lzma.decompress(data_string, format=lzma.FORMAT_AUTO) + data_string = data_string.decode("ascii") + (replay_data, _seed) = _Unpacker.parse_replay_data(data_string, mode) + return replay_data diff --git a/osrparse/utils.py b/osrparse/utils.py index 25aa36a..6f0c42e 100644 --- a/osrparse/utils.py +++ b/osrparse/utils.py @@ -1,13 +1,19 @@ from enum import Enum, IntFlag -import abc +from dataclasses import dataclass class GameMode(Enum): + """ + An osu! game mode. + """ STD = 0 TAIKO = 1 CTB = 2 MANIA = 3 class Mod(IntFlag): + """ + An osu! mod, or combination of mods. + """ NoMod = 0 NoFail = 1 << 0 Easy = 1 << 1 @@ -42,6 +48,10 @@ class Mod(IntFlag): Mirror = 1 << 30 class Key(IntFlag): + """ + A key that can be pressed during osu!standard gameplay - mouse 1 and 2, key + 1 and 2, and smoke. + """ M1 = 1 << 0 M2 = 1 << 1 K1 = 1 << 2 @@ -49,12 +59,18 @@ class Key(IntFlag): SMOKE = 1 << 4 class KeyTaiko(IntFlag): + """ + A key that can be pressed during osu!taiko gameplay. + """ LEFT_DON = 1 << 0 LEFT_KAT = 1 << 1 RIGHT_DON = 1 << 2 RIGHT_KAT = 1 << 3 class KeyMania(IntFlag): + """ + A key that can be pressed during osu!mania gameplay + """ K1 = 1 << 0 K2 = 1 << 1 K3 = 1 << 2 @@ -78,72 +94,102 @@ class KeyMania(IntFlag): # the reference I used for non-std replay events below: # https://github.com/kszlim/osu-replay-parser/pull/27#issuecomment-845679072. -class ReplayEvent(abc.ABC): - def __init__(self, time_delta: int): - self.time_delta = time_delta +@dataclass +class ReplayEvent: + """ + Base class for an event (ie a frame) in a replay. - @abc.abstractmethod - def _members(self): - pass - - def __eq__(self, other): - if not isinstance(other, ReplayEvent): - return False - return all(m1 == m2 for m1, m2 in zip(self._members(), other._members())) - - def __hash__(self): - return hash(self._members()) + Attributes + ---------- + time_delta: int + The time since the previous event (ie frame). + """ + time_delta: int +@dataclass class ReplayEventOsu(ReplayEvent): - def __init__(self, time_delta: int, x: float, y: float, - keys: int): - super().__init__(time_delta) - self.x = x - self.y = y - self.keys = Key(keys) - - def __str__(self): - return (f"{self.time_delta} ({self.x}, {self.y}) " - f"{self.keys}") - - def _members(self): - return (self.time_delta, self.x, self.y, self.keys) - + """ + A single frame in an osu!standard replay. + + Attributes + ---------- + time_delta: int + The time since the previous event (ie frame). + x: float + The x position of the cursor. + y: float + The y position of the cursor. + keys: Key + The keys pressed. + """ + x: float + y: float + keys: Key + +@dataclass class ReplayEventTaiko(ReplayEvent): - def __init__(self, time_delta: int, x: int, keys: int): - super().__init__(time_delta) - # we have no idea what this is supposed to represent. It's always one - # of 0, 320, or 640, depending on ``keys``. Leaving untouched for now. - self.x = x - self.keys = KeyTaiko(keys) - - def __str__(self): - return f"{self.time_delta} {self.x} {self.keys}" - - def _members(self): - return (self.time_delta, self.x, self.keys) - + """ + A single frame in an osu!taiko replay. + + Attributes + ---------- + time_delta: int + The time since the previous event (ie frame). + x: int + Unknown what this represents. Always one of 0, 320, or 640, depending on + ``keys``. + keys: KeyTaiko + The keys pressed. + """ + # we have no idea what this is supposed to represent. It's always one of 0, + # 320, or 640, depending on `keys`. Leaving untouched for now. + x: int + keys: KeyTaiko + +@dataclass class ReplayEventCatch(ReplayEvent): - def __init__(self, time_delta: int, x: int, keys: int): - super().__init__(time_delta) - self.x = x - self.dashing = keys == 1 - - def __str__(self): - return f"{self.time_delta} {self.x} {self.dashing}" - - def _members(self): - return (self.time_delta, self.x, self.dashing) - + """ + A single frame in an osu!catch replay. + + Attributes + ---------- + time_delta: int + The time since the previous event (ie frame). + x: float + The x position of the player. + dashing: bool + Whether we are dashing or not. + """ + x: float + dashing: bool + +@dataclass class ReplayEventMania(ReplayEvent): - def __init__(self, time_delta: int, x: int): - super().__init__(time_delta) - # no, this isn't a typo. osu! really stores keys pressed inside ``x`` - # for mania. - self.keys = KeyMania(x) - - def __str__(self): - return f"{self.time_delta} {self.keys}" - - def _members(self): - return (self.time_delta, self.keys) + """ + A single frame in an osu!mania replay. + + Attributes + ---------- + time_delta: int + The time since the previous event (ie frame). + keys: KeyMania + The keys pressed. + """ + keys: KeyMania + +@dataclass +class LifeBarState: + """ + A state of the lifebar shown on the results screen, at a particular point in + time. + + Attributes + ---------- + time: int + The time, in ms, this life bar state corresponds to in the replay. + The time since the previous event (ie frame). + life: float + The amount of life at this life bar state. + """ + time: int + life: float diff --git a/tests/test_dumping.py b/tests/test_dumping.py index 7ecaf53..fd9caf0 100644 --- a/tests/test_dumping.py +++ b/tests/test_dumping.py @@ -2,7 +2,7 @@ from unittest import TestCase from tempfile import TemporaryDirectory -from osrparse import parse_replay_file +from osrparse import Replay RES = Path(__file__).parent / "resources" @@ -11,21 +11,21 @@ class TestDumping(TestCase): @classmethod def setUpClass(cls): - cls.replay = parse_replay_file(RES / "replay.osr") + cls.replay = Replay.from_path(RES / "replay.osr") def test_dumping(self): with TemporaryDirectory() as tempdir: r2_path = Path(tempdir) / "dumped.osr" - self.replay.dump(r2_path) - r2 = parse_replay_file(r2_path) + self.replay.write_path(r2_path) + r2 = Replay.from_path(r2_path) # `replay_length` is intentionally not tested for equality here, as the # length of the compressed replay data may change after dumping due to # varying lzma settings. - attrs = ["game_mode", "game_version", "beatmap_hash", "player_name", - "replay_hash", "number_300s", "number_100s", "number_50s", "gekis", - "katus", "misses", "score", "max_combo", "is_perfect_combo", - "mod_combination", "life_bar_graph", "timestamp", "play_data", + attrs = ["mode", "game_version", "beatmap_hash", "username", + "replay_hash", "count_300", "count_100", "count_50", "count_geki", + "count_katu", "count_miss", "score", "max_combo", "perfect", + "mods", "life_bar_graph", "timestamp", "replay_data", "replay_id"] for attr in attrs: self.assertEqual(getattr(self.replay, attr), getattr(r2, attr), diff --git a/tests/test_replay.py b/tests/test_replay.py index 75d3962..e293f18 100644 --- a/tests/test_replay.py +++ b/tests/test_replay.py @@ -1,8 +1,8 @@ from pathlib import Path from unittest import TestCase from datetime import datetime, timezone -from osrparse import (parse_replay, parse_replay_file, ReplayEventOsu, GameMode, - Mod, ReplayEventTaiko, ReplayEventCatch, ReplayEventMania) +from osrparse import (ReplayEventOsu, GameMode, Mod, ReplayEventTaiko, + ReplayEventCatch, ReplayEventMania, Replay) RES = Path(__file__).parent / "resources" @@ -14,13 +14,13 @@ def setUpClass(cls): replay1_path = RES / "replay.osr" with open(replay1_path, "rb") as f: data = f.read() - cls._replays = [parse_replay(data, pure_lzma=False), parse_replay_file(replay1_path)] - cls._combination_replay = parse_replay_file(RES / "replay2.osr") - cls._old_replayid_replay = parse_replay_file(RES / "replay_old_replayid.osr") + cls._replays = [Replay.from_string(data), Replay.from_path(replay1_path)] + cls._combination_replay = Replay.from_path(RES / "replay2.osr") + cls._old_replayid_replay = Replay.from_path(RES / "replay_old_replayid.osr") def test_replay_mode(self): for replay in self._replays: - self.assertEqual(replay.game_mode, GameMode.STD, "Game mode is incorrect") + self.assertEqual(replay.mode, GameMode.STD, "Game mode is incorrect") def test_game_version(self): for replay in self._replays: @@ -32,16 +32,16 @@ def test_beatmap_hash(self): def test_player_name(self): for replay in self._replays: - self.assertEqual(replay.player_name, "Cookiezi", "Player name is incorrect") + self.assertEqual(replay.username, "Cookiezi", "Player name is incorrect") def test_number_hits(self): for replay in self._replays: - self.assertEqual(replay.number_300s, 1982, "Number of 300s is wrong") - self.assertEqual(replay.number_100s, 1, "Number of 100s is wrong") - self.assertEqual(replay.number_50s, 0, "Number of 50s is wrong") - self.assertEqual(replay.gekis, 250, "Number of gekis is wrong") - self.assertEqual(replay.katus, 1, "Number of katus is wrong") - self.assertEqual(replay.misses, 0, "Number of misses is wrong") + self.assertEqual(replay.count_300, 1982, "Number of 300s is wrong") + self.assertEqual(replay.count_100, 1, "Number of 100s is wrong") + self.assertEqual(replay.count_50, 0, "Number of 50s is wrong") + self.assertEqual(replay.count_geki, 250, "Number of gekis is wrong") + self.assertEqual(replay.count_katu, 1, "Number of katus is wrong") + self.assertEqual(replay.count_miss, 0, "Number of misses is wrong") def test_max_combo(self): for replay in self._replays: @@ -49,14 +49,14 @@ def test_max_combo(self): def test_is_perfect_combo(self): for replay in self._replays: - self.assertEqual(replay.is_perfect_combo, True, "is_perfect_combo is wrong") + self.assertEqual(replay.perfect, True, "is_perfect_combo is wrong") def test_nomod(self): for replay in self._replays: - self.assertEqual(replay.mod_combination, Mod.NoMod, "Mod combination is wrong") + self.assertEqual(replay.mods, Mod.NoMod, "Mod combination is wrong") def test_mod_combination(self): - self.assertEqual(self._combination_replay.mod_combination, Mod.Hidden | Mod.HardRock, "Mod combination is wrong") + self.assertEqual(self._combination_replay.mods, Mod.Hidden | Mod.HardRock, "Mod combination is wrong") def test_timestamp(self): for replay in self._replays: @@ -64,8 +64,8 @@ def test_timestamp(self): def test_play_data(self): for replay in self._replays: - self.assertIsInstance(replay.play_data[0], ReplayEventOsu, "Replay data is wrong") - self.assertEqual(len(replay.play_data), 17500, "Replay data is wrong") + self.assertIsInstance(replay.replay_data[0], ReplayEventOsu, "Replay data is wrong") + self.assertEqual(len(replay.replay_data), 17500, "Replay data is wrong") def test_replay_id(self): for replay in self._replays: @@ -78,31 +78,31 @@ class TestTaikoReplay(TestCase): @classmethod def setUpClass(cls): - cls.replay = parse_replay_file(RES / "taiko.osr") + cls.replay = Replay.from_path(RES / "taiko.osr") def test_play_data(self): - play_data = self.replay.play_data - self.assertIsInstance(play_data[0], ReplayEventTaiko, "Replay data is wrong") - self.assertEqual(len(play_data), 17475, "Replay data is wrong") + replay_data = self.replay.replay_data + self.assertIsInstance(replay_data[0], ReplayEventTaiko, "Replay data is wrong") + self.assertEqual(len(replay_data), 17475, "Replay data is wrong") class TestCatchReplay(TestCase): @classmethod def setUpClass(cls): - cls.replay = parse_replay_file(RES / "ctb.osr") + cls.replay = Replay.from_path(RES / "ctb.osr") def test_play_data(self): - play_data = self.replay.play_data - self.assertIsInstance(play_data[0], ReplayEventCatch, "Replay data is wrong") - self.assertEqual(len(play_data), 10439, "Replay data is wrong") + replay_data = self.replay.replay_data + self.assertIsInstance(replay_data[0], ReplayEventCatch, "Replay data is wrong") + self.assertEqual(len(replay_data), 10439, "Replay data is wrong") class TestManiaReplay(TestCase): @classmethod def setUpClass(cls): - cls.replay = parse_replay_file(RES / "mania.osr") + cls.replay = Replay.from_path(RES / "mania.osr") def test_play_data(self): - play_data = self.replay.play_data + play_data = self.replay.replay_data self.assertIsInstance(play_data[0], ReplayEventMania, "Replay data is wrong") self.assertEqual(len(play_data), 17432, "Replay data is wrong")