Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrate midi stuff #34

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 226 additions & 38 deletions basic_pitch/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,16 @@
import json
import os
import pathlib
import wave
from typing import Dict, List, Optional, Sequence, Tuple, Union

from tensorflow import Tensor, signal, keras, saved_model
import numpy as np
import librosa
import numpy as np
import pretty_midi
import pyaudio
from numpy import ndarray
from tensorflow import Tensor, signal, keras, saved_model

from basic_pitch.constants import (
AUDIO_SAMPLE_RATE,
AUDIO_N_SAMPLES,
ANNOTATIONS_FPS,
FFT_HOP,
)
from basic_pitch import ICASSP_2022_MODEL_PATH, note_creation as infer
from basic_pitch.commandline_printing import (
entertaining_waiting,
Expand All @@ -41,9 +38,15 @@
file_saved_confirmation,
failed_to_save,
)
from basic_pitch.constants import (
AUDIO_SAMPLE_RATE,
AUDIO_N_SAMPLES,
ANNOTATIONS_FPS,
FFT_HOP,
)


def window_audio_file(audio_original: Tensor, hop_size: int) -> Tuple[Tensor, List[Dict[str, int]]]:
def window_audio_file(audio_original: ndarray, hop_size: int) -> Tuple[Tensor, List[Dict[str, int]]]:
"""
Pad appropriately an audio file, and return as
windowed signal, with window length = AUDIO_N_SAMPLES
Expand Down Expand Up @@ -71,7 +74,7 @@ def window_audio_file(audio_original: Tensor, hop_size: int) -> Tuple[Tensor, Li


def get_audio_input(
audio_path: Union[pathlib.Path, str], overlap_len: int, hop_size: int
audio_path: Union[pathlib.Path, str], overlap_len: int, hop_size: int
) -> Tuple[Tensor, List[Dict[str, int]], int]:
"""
Read wave file (as mono), pad appropriately, and return as
Expand Down Expand Up @@ -122,7 +125,7 @@ def unwrap_output(output: Tensor, audio_original_length: int, n_overlapping_fram


def run_inference(
audio_path: Union[pathlib.Path, str], model: keras.Model, debug_file: Optional[pathlib.Path] = None
audio_path: Union[pathlib.Path, str], model: keras.Model, debug_file: Optional[pathlib.Path] = None
) -> Dict[str, np.array]:
"""Run the model on the input audio path.

Expand Down Expand Up @@ -160,6 +163,52 @@ def run_inference(
return unwrapped_output


def run_inference_rt(
audio_data: ndarray,
model: keras.Model,
debug_file: Optional[pathlib.Path] = None
) -> Dict[str, np.array]:
"""Run the model on the input audio path.

Args:
audio_path: The audio to run inference on.
model: A loaded keras model to run inference with.
debug_file: An optional path to output debug data to. Useful for testing/verification.

Returns:
A dictionary with the notes, onsets and contours from model inference.
:param audio_data:
:param debug_file:
:param model:
:param get_input:
"""
# overlap 30 frames
n_overlapping_frames = 5
overlap_len = n_overlapping_frames * FFT_HOP
hop_size = AUDIO_N_SAMPLES - overlap_len

audio_length = audio_data.shape[0]
audio_windowed, window_times = window_audio_file(audio_data, audio_length)

output = model(audio_windowed)
unwrapped_output = {k: unwrap_output(output[k], audio_length, n_overlapping_frames) for k in output}

if debug_file:
with open(debug_file, "a") as f:
json.dump(
{
"audio_windowed": audio_windowed.numpy().tolist(),
"audio_original_length": audio_length,
"hop_size_samples": hop_size,
"overlap_length_samples": overlap_len,
"unwrapped_output": {k: v.tolist() for k, v in unwrapped_output.items()},
},
f,
)

return unwrapped_output


class OutputExtensions(enum.Enum):
MIDI = "mid"
MODEL_OUTPUT_NPZ = "npz"
Expand Down Expand Up @@ -200,7 +249,7 @@ def verify_output_dir(output_dir: Union[pathlib.Path, str]) -> None:


def build_output_path(
audio_path: Union[pathlib.Path, str], output_directory: Union[pathlib.Path, str], output_type: OutputExtensions
audio_path: Union[pathlib.Path, str], output_directory: Union[pathlib.Path, str], output_type: OutputExtensions
) -> pathlib.Path:
"""Create an output path and make sure it doesn't already exist.

Expand Down Expand Up @@ -235,7 +284,7 @@ def build_output_path(


def save_note_events(
note_events: List[Tuple[float, float, int, float, Optional[List[int]]]], save_path: Union[pathlib.Path, str]
note_events: List[Tuple[float, float, int, float, Optional[List[int]]]], save_path: Union[pathlib.Path, str]
) -> None:
"""Save note events to file

Expand All @@ -256,16 +305,16 @@ def save_note_events(


def predict(
audio_path: Union[pathlib.Path, str],
model_or_model_path: Union[keras.Model, pathlib.Path, str] = ICASSP_2022_MODEL_PATH,
onset_threshold: float = 0.5,
frame_threshold: float = 0.3,
minimum_note_length: float = 58,
minimum_frequency: Optional[float] = None,
maximum_frequency: Optional[float] = None,
multiple_pitch_bends: bool = False,
melodia_trick: bool = True,
debug_file: Optional[pathlib.Path] = None,
audio_path: Union[pathlib.Path, str],
model_or_model_path: Union[keras.Model, pathlib.Path, str] = ICASSP_2022_MODEL_PATH,
onset_threshold: float = 0.5,
frame_threshold: float = 0.3,
minimum_note_length: float = 58,
minimum_frequency: Optional[float] = None,
maximum_frequency: Optional[float] = None,
multiple_pitch_bends: bool = False,
melodia_trick: bool = True,
debug_file: Optional[pathlib.Path] = None,
) -> Tuple[Dict[str, np.array], pretty_midi.PrettyMIDI, List[Tuple[float, float, int, float, Optional[List[int]]]]]:
"""Run a single prediction.

Expand Down Expand Up @@ -334,22 +383,161 @@ def predict(
return model_output, midi_data, note_events


def predict_rt(
model_or_model_path: Union[keras.Model, pathlib.Path, str] = ICASSP_2022_MODEL_PATH,
onset_threshold: float = 0.5,
frame_threshold: float = 0.3,
minimum_note_length: float = 58,
minimum_frequency: Optional[float] = None,
maximum_frequency: Optional[float] = None,
multiple_pitch_bends: bool = False,
melodia_trick: bool = True,
) -> Tuple[Dict[str, np.array], pretty_midi.PrettyMIDI, List[Tuple[float, float, int, float, Optional[List[int]]]]]:
"""Run a single prediction.

Args:
model_or_model_path: Path to load the Keras saved model from. Can be local or on GCS.
onset_threshold: Minimum energy required for an onset to be considered present.
frame_threshold: Minimum energy requirement for a frame to be considered present.
minimum_note_length: The minimum allowed note length in frames.
minimum_freq: Minimum allowed output frequency, in Hz. If None, all frequencies are used.
maximum_freq: Maximum allowed output frequency, in Hz. If None, all frequencies are used.
multiple_pitch_bends: If True, allow overlapping notes in midi file to have pitch bends.
melodia_trick: Use the melodia post-processing step.
debug_file: An optional path to output debug data to. Useful for testing/verification.
Returns:
The model output, midi data and note events from a single prediction
"""

# It's convenient to be able to pass in a keras saved model so if
# someone wants to place this function in a loop,
# the model doesn't have to be reloaded every function call
if isinstance(model_or_model_path, (pathlib.Path, str)):
model = saved_model.load(str(model_or_model_path))
else:
model = model_or_model_path

sample_format = pyaudio.paInt16 # 16 bits per sample
channels = 1
fs = AUDIO_SAMPLE_RATE # Record at 44100 samples per second
seconds = 600
sound_filename = pathlib.Path('inference.wav')
debug_file = pathlib.Path('inference.json')

p = pyaudio.PyAudio() # Create an interface to PortAudio

print('Recording')

# overlap 30 frames
n_overlapping_frames = 5
overlap_len = n_overlapping_frames * FFT_HOP
hop_size = AUDIO_N_SAMPLES - overlap_len

stream = p.open(format=sample_format,
channels=channels,
rate=fs,
frames_per_buffer=hop_size,
input=True)

prev_data = np.zeros((int(overlap_len),), dtype=np.float32)

# Initialize array to store frames
frames = []

# Read X seconds
for i in range(0, int(fs / hop_size * seconds)):
data = stream.read(hop_size)
frames.append(data)

data = librosa.util.buf_to_float(data)
flatness = np.mean(librosa.feature.spectral_flatness(y=data))

data_nz = np.count_nonzero(data)
data_e5 = (np.abs(data) < 1e-05).sum()
if flatness > 0.001:
continue

data = np.concatenate([prev_data, data])
prev_data = data[-overlap_len - 1:-1]

model_output = run_inference_rt(audio_data=data, model=model, debug_file=debug_file)

min_note_len = int(np.round(minimum_note_length / 1000 * (AUDIO_SAMPLE_RATE / FFT_HOP)))
midi_data, note_events = infer.model_output_to_notes(
model_output,
onset_thresh=onset_threshold,
frame_thresh=frame_threshold,
min_note_len=min_note_len, # convert to frames
min_freq=minimum_frequency,
max_freq=maximum_frequency,
multiple_pitch_bends=multiple_pitch_bends,
melodia_trick=melodia_trick,
)

for start_time, end_time, pitch, amplitude, pitch_bends in note_events:
note_data = (
int(pitch),
float(amplitude),
float(start_time),
float(end_time),
data_nz,
data_e5, flatness
)
print(f'{note_data}')

with open(debug_file, "a") as f:
json.dump(
{
"min_note_length": min_note_len,
"onset_thresh": onset_threshold,
"frame_thresh": frame_threshold,
"estimated_notes": [
(
float(start_time),
float(end_time),
int(pitch),
float(amplitude),
[int(b) for b in pitch_bends] if pitch_bends else None,
)
for start_time, end_time, pitch, amplitude, pitch_bends in note_events
],
},
f,
)

# Stop and close the stream
stream.stop_stream()
stream.close()
# Terminate the PortAudio interface
p.terminate()

print('Finished recording')

# Save the recorded data as a WAV file
wf = wave.open(str(sound_filename), 'wb')
wf.setnchannels(channels)
wf.setsampwidth(p.get_sample_size(sample_format))
wf.setframerate(fs)
wf.writeframes(b''.join(frames))
wf.close()


def predict_and_save(
audio_path_list: Sequence[Union[pathlib.Path, str]],
output_directory: Union[pathlib.Path, str],
save_midi: bool,
sonify_midi: bool,
save_model_outputs: bool,
save_notes: bool,
model_path: Union[pathlib.Path, str] = ICASSP_2022_MODEL_PATH,
onset_threshold: float = 0.5,
frame_threshold: float = 0.3,
minimum_note_length: float = 58,
minimum_frequency: Optional[float] = None,
maximum_frequency: Optional[float] = None,
multiple_pitch_bends: bool = False,
melodia_trick: bool = True,
debug_file: Optional[pathlib.Path] = None,
audio_path_list: Sequence[Union[pathlib.Path, str]],
output_directory: Union[pathlib.Path, str],
save_midi: bool,
sonify_midi: bool,
save_model_outputs: bool,
save_notes: bool,
model_path: Union[pathlib.Path, str] = ICASSP_2022_MODEL_PATH,
onset_threshold: float = 0.5,
frame_threshold: float = 0.3,
minimum_note_length: float = 58,
minimum_frequency: Optional[float] = None,
maximum_frequency: Optional[float] = None,
multiple_pitch_bends: bool = False,
melodia_trick: bool = True,
debug_file: Optional[pathlib.Path] = None,
) -> None:
"""Make a prediction and save the results to file.

Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ install_requires =
resampy>=0.2.2
scipy>=1.4.1
tensorflow>=2.4.1,<2.7.0
typing_extensions
typing_extensions>=3.7,<3.11

[options.entry_points]
console_scripts =
Expand Down
5 changes: 5 additions & 0 deletions tests/test_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ def test_predict(self) -> None:
assert all(note_pitch_max)
assert isinstance(note_events, list)

def test_predict_rt(self) -> None:
inference.predict_rt(
ICASSP_2022_MODEL_PATH,
)

def test_predict_with_saves(self) -> None:
test_audio_path = RESOURCES_PATH / "vocadito_10.wav"
with tempfile.TemporaryDirectory() as tmpdir:
Expand Down