From 13ae33010a593d9e9d7de564213ab9c3df5819ac Mon Sep 17 00:00:00 2001 From: Sergey Stepanov Date: Thu, 31 Aug 2023 15:34:48 +0300 Subject: [PATCH] Use generic audio callback in caged apps --- pkg/worker/caged/app/app.go | 22 ++++ pkg/worker/caged/caged.go | 25 ++--- pkg/worker/caged/libretro/frontend.go | 100 ++++++------------ pkg/worker/caged/libretro/frontend_test.go | 6 +- .../caged/libretro/nanoarch/nanoarch.go | 11 +- pkg/worker/caged/libretro/recording.go | 12 ++- pkg/worker/coordinatorhandlers.go | 4 + pkg/worker/recorder/recorder.go | 2 +- pkg/worker/recorder/recorder_test.go | 4 +- pkg/worker/recorder/wavstream.go | 2 +- pkg/worker/room.go | 18 ++-- 11 files changed, 98 insertions(+), 108 deletions(-) create mode 100644 pkg/worker/caged/app/app.go diff --git a/pkg/worker/caged/app/app.go b/pkg/worker/caged/app/app.go new file mode 100644 index 000000000..c14e1ee30 --- /dev/null +++ b/pkg/worker/caged/app/app.go @@ -0,0 +1,22 @@ +package app + +import "unsafe" + +type App interface { + AudioSampleRate() int + Init() error + ViewportSize() (int, int) + Start() + Close() + + SetAudioCb(func(Audio)) +} + +type Audio struct { + Data []byte + Duration int64 +} + +func (a Audio) ToPCM() []int16 { + return unsafe.Slice((*int16)(unsafe.Pointer(unsafe.SliceData(a.Data))), len(a.Data)>>1) +} diff --git a/pkg/worker/caged/caged.go b/pkg/worker/caged/caged.go index af4fa5f2c..2328d96f1 100644 --- a/pkg/worker/caged/caged.go +++ b/pkg/worker/caged/caged.go @@ -6,19 +6,12 @@ import ( "github.com/giongto35/cloud-game/v3/pkg/config" "github.com/giongto35/cloud-game/v3/pkg/logger" + "github.com/giongto35/cloud-game/v3/pkg/worker/caged/app" "github.com/giongto35/cloud-game/v3/pkg/worker/caged/libretro" ) -type App interface { - AudioSampleRate() int - Init() error - ViewportSize() (int, int) - Start() - Close() -} - type Manager struct { - list map[ModName]App + list map[ModName]app.App log *logger.Logger } @@ -27,18 +20,18 @@ type ModName string const Libretro ModName = "libretro" func NewManager(log *logger.Logger) *Manager { - return &Manager{log: log, list: make(map[ModName]App)} + return &Manager{log: log, list: make(map[ModName]app.App)} } -func (m *Manager) Get(name ModName) App { return m.list[name] } +func (m *Manager) Get(name ModName) app.App { return m.list[name] } func (m *Manager) Load(name ModName, conf any) error { if name == Libretro { - app, err := m.loadLibretro(conf) + caged, err := m.loadLibretro(conf) if err != nil { return err } - m.list[name] = app + m.list[name] = caged } return nil } @@ -60,9 +53,9 @@ func (m *Manager) loadLibretro(conf any) (*libretro.Caged, error) { Recording: r.Interface().(config.Recording), } - app := libretro.Cage(c, m.log) - if err := app.Init(); err != nil { + caged := libretro.Cage(c, m.log) + if err := caged.Init(); err != nil { return nil, err } - return &app, nil + return &caged, nil } diff --git a/pkg/worker/caged/libretro/frontend.go b/pkg/worker/caged/libretro/frontend.go index be244c5cf..23a1580ae 100644 --- a/pkg/worker/caged/libretro/frontend.go +++ b/pkg/worker/caged/libretro/frontend.go @@ -8,20 +8,22 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "github.com/giongto35/cloud-game/v3/pkg/config" "github.com/giongto35/cloud-game/v3/pkg/logger" "github.com/giongto35/cloud-game/v3/pkg/os" + "github.com/giongto35/cloud-game/v3/pkg/worker/caged/app" "github.com/giongto35/cloud-game/v3/pkg/worker/caged/libretro/image" "github.com/giongto35/cloud-game/v3/pkg/worker/caged/libretro/nanoarch" ) type Emulator interface { - // SetAudio sets the audio callback - SetAudio(func(*GameAudio)) + // SetAudioCb sets the audio callback + SetAudioCb(func(app.Audio)) // SetVideo sets the video callback SetVideo(func(*GameFrame)) - Audio() func(*GameAudio) + Audio() func(app.Audio) Video() func(*GameFrame) LoadCore(name string) LoadGame(path string) error @@ -53,34 +55,22 @@ type Emulator interface { } type Frontend struct { - onVideo func(*GameFrame) - onAudio func(*GameAudio) - - input InputState - + canvas *image.Canvas conf config.Emulator + done chan struct{} + input InputState + log *logger.Logger + nano *nanoarch.Nanoarch + onAudio func(app.Audio) + onVideo func(*GameFrame) storage Storage + th int // draw threads + vw, vh int // out frame size - // out frame size - vw, vh int - // draw threads - th int + mu sync.Mutex - Canvas *image.Canvas DisableCanvasPool bool - - done chan struct{} - log *logger.Logger - - frames int - - started chan struct{} - - nano *nanoarch.Nanoarch - - SaveOnClose bool - - mu sync.Mutex + SaveOnClose bool } type ( @@ -88,10 +78,6 @@ type ( Data *image.Frame Duration time.Duration } - GameAudio struct { - Data *[]int16 - Duration time.Duration - } InputEvent struct { RawState []byte } @@ -114,14 +100,9 @@ const ( dpadAxes = 4 ) -const rawAudioBuffer = 4096 // 4K -var ( - audioCopyPool sync.Pool - audioPool sync.Pool -) - var ( - noAudio = func(*GameAudio) {} + audioPool sync.Pool + noAudio = func(app.Audio) {} noVideo = func(*GameFrame) {} videoPool sync.Pool ) @@ -163,7 +144,6 @@ func NewFrontend(conf config.Emulator, log *logger.Logger) (*Frontend, error) { log: log, onAudio: noAudio, onVideo: noVideo, - started: make(chan struct{}, 1), storage: store, th: conf.Threads, } @@ -189,29 +169,17 @@ func (f *Frontend) LoadCore(emu string) { f.mu.Unlock() } -func (f *Frontend) handleAudio(data []int16, samples int) { - sampleRate := f.nano.AudioSampleRate() - - dst, _ := audioCopyPool.Get().(*[]int16) - if dst == nil { - x := make([]int16, rawAudioBuffer) - dst = &x - } - xx := (*dst)[:samples] - copy(xx, data) - - // 1600 = x / 1000 * 48000 * 2 - estimate := float64(samples) / float64(sampleRate<<1) * 1000000000 - - fr, _ := audioPool.Get().(*GameAudio) +func (f *Frontend) handleAudio(audio unsafe.Pointer, samples int) { + fr, _ := audioPool.Get().(*app.Audio) if fr == nil { - fr = &GameAudio{} + fr = &app.Audio{} } - fr.Data = &xx - fr.Duration = time.Duration(estimate) // used in recordings - f.onAudio(fr) + // !to look if we need a copy + fr.Data = unsafe.Slice((*byte)(audio), samples<<1) + // due to audio buffering for opus fixed frames and const duration up in the hierarchy, + // we skip Duration here + f.onAudio(*fr) audioPool.Put(fr) - audioCopyPool.Put(dst) } func (f *Frontend) handleVideo(data []byte, delta int64, fi nanoarch.FrameInfo) { @@ -225,10 +193,10 @@ func (f *Frontend) handleVideo(data []byte, delta int64, fi nanoarch.FrameInfo) } // !to fix possible nil pointer dereference // when the internal pool can be nil during first Get??? - fr.Data = f.Canvas.Draw(pixFmt, rot, fi.W, fi.H, fi.Packed, bpp, data, f.th) + fr.Data = f.canvas.Draw(pixFmt, rot, fi.W, fi.H, fi.Packed, bpp, data, f.th) fr.Duration = time.Duration(delta) f.onVideo(fr) - f.Canvas.Put(fr.Data) + f.canvas.Put(fr.Data) videoPool.Put(fr) } @@ -236,8 +204,8 @@ func (f *Frontend) Shutdown() { f.log.Debug().Msgf("run loop cleanup") f.mu.Lock() f.nano.Shutdown() - f.Canvas.Clear() - f.SetAudio(noAudio) + f.canvas.Clear() + f.SetAudioCb(noAudio) f.SetVideo(noVideo) f.mu.Unlock() f.log.Debug().Msgf("run loop finished") @@ -290,7 +258,7 @@ func (f *Frontend) Start() { } func (f *Frontend) FrameSize() (int, int) { return f.nano.GeometryBase() } -func (f *Frontend) Audio() func(*GameAudio) { return f.onAudio } +func (f *Frontend) Audio() func(app.Audio) { return f.onAudio } func (f *Frontend) Video() func(*GameFrame) { return f.onVideo } func (f *Frontend) FPS() int { return f.nano.VideoFramerate() } func (f *Frontend) HashPath() string { return f.storage.GetSavePath() } @@ -304,16 +272,16 @@ func (f *Frontend) RestoreGameState() error { return f.Load() } func (f *Frontend) IsPortrait() bool { return f.nano.IsPortrait() } func (f *Frontend) SaveGameState() error { return f.Save() } func (f *Frontend) Scale(factor int) { w, h := f.ViewportSize(); f.SetViewport(w, h, factor) } -func (f *Frontend) SetAudio(ff func(*GameAudio)) { f.onAudio = ff } +func (f *Frontend) SetAudioCb(cb func(app.Audio)) { f.onAudio = cb } func (f *Frontend) SetSessionId(name string) { f.storage.SetMainSaveName(name) } func (f *Frontend) SetViewport(width int, height int, scale int) { f.mu.Lock() f.vw, f.vh = width, height mw, mh := f.nano.GeometryMax() size := mw * scale * mh * scale - f.Canvas = image.NewCanvas(width, height, size) + f.canvas = image.NewCanvas(width, height, size) if f.DisableCanvasPool { - f.Canvas.SetEnabled(false) + f.canvas.SetEnabled(false) } f.mu.Unlock() } diff --git a/pkg/worker/caged/libretro/frontend_test.go b/pkg/worker/caged/libretro/frontend_test.go index 1d6ad0755..245b27ab4 100644 --- a/pkg/worker/caged/libretro/frontend_test.go +++ b/pkg/worker/caged/libretro/frontend_test.go @@ -15,6 +15,7 @@ import ( "github.com/giongto35/cloud-game/v3/pkg/config" "github.com/giongto35/cloud-game/v3/pkg/logger" + "github.com/giongto35/cloud-game/v3/pkg/worker/caged/app" "github.com/giongto35/cloud-game/v3/pkg/worker/caged/libretro/nanoarch" ) @@ -75,7 +76,6 @@ func GetEmulatorMock(room string, system string) *EmulatorMock { done: make(chan struct{}), th: conf.Emulator.Threads, log: l2, - started: make(chan struct{}), SaveOnClose: false, }, @@ -101,7 +101,7 @@ func GetDefaultFrontend(room string, system string, rom string) *EmulatorMock { mock := GetEmulatorMock(room, system) mock.loadRom(rom) mock.SetVideo(func(_ *GameFrame) {}) - mock.SetAudio(func(_ *GameAudio) {}) + mock.SetAudioCb(func(app.Audio) {}) return mock } @@ -358,7 +358,7 @@ func TestStateConcurrency(t *testing.T) { t.Errorf("It seems that rom video frame was empty, which is strange!") } }) - mock.SetAudio(func(_ *GameAudio) {}) + mock.SetAudioCb(func(app.Audio) {}) t.Logf("Random seed is [%v]\n", test.seed) t.Logf("Save path is [%v]\n", mock.paths.save) diff --git a/pkg/worker/caged/libretro/nanoarch/nanoarch.go b/pkg/worker/caged/libretro/nanoarch/nanoarch.go index 4a7ec0fa5..861fcc951 100644 --- a/pkg/worker/caged/libretro/nanoarch/nanoarch.go +++ b/pkg/worker/caged/libretro/nanoarch/nanoarch.go @@ -37,7 +37,7 @@ type Nanoarch struct { Handlers struct { OnDpad func(port uint, axis uint) (shift int16) OnKeyPress func(port uint, key int) int - OnAudio func(data []int16, frames int) + OnAudio func(ptr unsafe.Pointer, frames int) OnVideo func(data []byte, delta int64, fi FrameInfo) } LastFrameTime int64 @@ -102,12 +102,12 @@ var Nan0 = Nanoarch{ Handlers: struct { OnDpad func(uint, uint) int16 OnKeyPress func(uint, int) int - OnAudio func([]int16, int) + OnAudio func(unsafe.Pointer, int) OnVideo func([]byte, int64, FrameInfo) }{ OnDpad: func(uint, uint) int16 { return 0 }, OnKeyPress: func(uint, int) int { return 0 }, - OnAudio: func([]int16, int) {}, + OnAudio: func(unsafe.Pointer, int) {}, OnVideo: func([]byte, int64, FrameInfo) {}, }, } @@ -635,10 +635,7 @@ func coreAudioSampleBatch(data unsafe.Pointer, frames C.size_t) C.size_t { } return frames } - - samples := int(frames) << 1 - Nan0.Handlers.OnAudio(unsafe.Slice((*int16)(data), samples), samples) - + Nan0.Handlers.OnAudio(data, int(frames)<<1) return frames } diff --git a/pkg/worker/caged/libretro/recording.go b/pkg/worker/caged/libretro/recording.go index f0a490be9..a0d7be428 100644 --- a/pkg/worker/caged/libretro/recording.go +++ b/pkg/worker/caged/libretro/recording.go @@ -1,8 +1,11 @@ package libretro import ( + "time" + "github.com/giongto35/cloud-game/v3/pkg/config" "github.com/giongto35/cloud-game/v3/pkg/logger" + "github.com/giongto35/cloud-game/v3/pkg/worker/caged/app" "github.com/giongto35/cloud-game/v3/pkg/worker/recorder" ) @@ -27,10 +30,13 @@ func WithRecording(fe Emulator, rec bool, user string, game string, conf config. return rr } -func (r *RecordingFrontend) SetAudio(fn func(*GameAudio)) { - r.Emulator.SetAudio(func(audio *GameAudio) { +func (r *RecordingFrontend) SetAudioCb(fn func(app.Audio)) { + r.Emulator.SetAudioCb(func(audio app.Audio) { if r.IsRecording() { - r.rec.WriteAudio(recorder.Audio{Samples: audio.Data, Duration: audio.Duration}) + pcm := audio.ToPCM() + // example: 1600 = x / 1000 * 48000 * 2 + l := time.Duration(float64(len(pcm)) / float64(r.AudioSampleRate()<<1) * 1000000000) + r.rec.WriteAudio(recorder.Audio{Samples: pcm, Duration: l}) } fn(audio) }) diff --git a/pkg/worker/coordinatorhandlers.go b/pkg/worker/coordinatorhandlers.go index 40805dd36..3229c2ea3 100644 --- a/pkg/worker/coordinatorhandlers.go +++ b/pkg/worker/coordinatorhandlers.go @@ -2,6 +2,7 @@ package worker import ( "encoding/base64" + "unsafe" "github.com/giongto35/cloud-game/v3/pkg/api" "github.com/giongto35/cloud-game/v3/pkg/com" @@ -31,6 +32,9 @@ func emulator(wtf any) *libretro.Caged { return wtf.(*libretro.Caged) } func recorder(wtf any) *libretro.RecordingFrontend { return (emulator(wtf).Emulator).(*libretro.RecordingFrontend) } +func unwrapAudio(a []byte) []int16 { + return unsafe.Slice((*int16)(unsafe.Pointer(unsafe.SliceData(a))), len(a)/2) +} func (c *coordinator) HandleWebrtcInit(rq api.WebrtcInitRequest[com.Uid], w *Worker, connApi *webrtc.ApiFactory) api.Out { peer := webrtc.New(c.log, connApi) diff --git a/pkg/worker/recorder/recorder.go b/pkg/worker/recorder/recorder.go index fdcbcea79..24edc4c8d 100644 --- a/pkg/worker/recorder/recorder.go +++ b/pkg/worker/recorder/recorder.go @@ -56,7 +56,7 @@ type videoStream interface { type ( Audio struct { - Samples *[]int16 + Samples []int16 Duration time.Duration } Video struct { diff --git a/pkg/worker/recorder/recorder_test.go b/pkg/worker/recorder/recorder_test.go index 9816434a6..e44417491 100644 --- a/pkg/worker/recorder/recorder_test.go +++ b/pkg/worker/recorder/recorder_test.go @@ -53,7 +53,7 @@ func TestName(t *testing.T) { imgWg.Done() }() go func() { - recorder.WriteAudio(Audio{&[]int16{0, 0, 0, 0, 0, 1, 11, 11, 11, 1}, 1}) + recorder.WriteAudio(Audio{[]int16{0, 0, 0, 0, 0, 1, 11, 11, 11, 1}, 1}) audioWg.Done() }() } @@ -125,7 +125,7 @@ func benchmarkRecorder(w, h int, comp int, b *testing.B) { ticks.Done() }() go func() { - recorder.WriteAudio(Audio{&samples, 1}) + recorder.WriteAudio(Audio{samples, 1}) atomic.AddInt64(&bytes, int64(len(samples)*2)) ticks.Done() }() diff --git a/pkg/worker/recorder/wavstream.go b/pkg/worker/recorder/wavstream.go index f63ea70e4..7b4e0b097 100644 --- a/pkg/worker/recorder/wavstream.go +++ b/pkg/worker/recorder/wavstream.go @@ -52,7 +52,7 @@ func (w *wavStream) Close() (err error) { } func (w *wavStream) Write(data Audio) { - pcm := *data.Samples + pcm := data.Samples bs := make([]byte, len(pcm)*2) // int & 0xFF + (int >> 8) & 0xFF for i, ln := 0, len(pcm); i < ln; i++ { diff --git a/pkg/worker/room.go b/pkg/worker/room.go index 47780f7dc..03e4a1d92 100644 --- a/pkg/worker/room.go +++ b/pkg/worker/room.go @@ -8,7 +8,7 @@ import ( "github.com/giongto35/cloud-game/v3/pkg/config" "github.com/giongto35/cloud-game/v3/pkg/logger" "github.com/giongto35/cloud-game/v3/pkg/network/webrtc" - "github.com/giongto35/cloud-game/v3/pkg/worker/caged" + "github.com/giongto35/cloud-game/v3/pkg/worker/caged/app" "github.com/giongto35/cloud-game/v3/pkg/worker/caged/libretro" "github.com/giongto35/cloud-game/v3/pkg/worker/encoder" "github.com/giongto35/cloud-game/v3/pkg/worker/encoder/h264" @@ -16,7 +16,7 @@ import ( ) type AppRoom interface { - App() caged.App + App() app.App Close() Id() string StartApp() @@ -26,7 +26,7 @@ type AppRoom interface { } type Room struct { - app caged.App + app app.App id string log *logger.Logger users com.NetMap[Session] @@ -36,7 +36,7 @@ type Room struct { HandleClose func(self *Room) } -func NewRoom(id string, app caged.App, conf config.WorkerConfig, log *logger.Logger) *Room { +func NewRoom(id string, app app.App, conf config.WorkerConfig, log *logger.Logger) *Room { room := &Room{id: id, app: app, users: com.NewNetMap[Session](), log: log} w, h := app.ViewportSize() room.initVideo(w, h, conf.Encoder.Video) @@ -44,9 +44,9 @@ func NewRoom(id string, app caged.App, conf config.WorkerConfig, log *logger.Log return room } -func (r *Room) App() caged.App { return r.app } -func (r *Room) Id() string { return r.id } -func (r *Room) StartApp() { r.app.Start() } +func (r *Room) App() app.App { return r.app } +func (r *Room) Id() string { return r.id } +func (r *Room) StartApp() { r.app.Start() } func (r *Room) OnUserConnect(user Session) bool { r.log.Debug().Str("user", user.Id().String()).Msg("User has joined the room") @@ -92,8 +92,8 @@ func (r *Room) initAudio(srcHz int, conf config.Audio) { } frameDur := time.Duration(conf.Frame) * time.Millisecond - emulator(r.app).SetAudio(func(raw *libretro.GameAudio) { - buf.write(*raw.Data, func(pcm samples) { + emulator(r.app).SetAudioCb(func(raw app.Audio) { + buf.write(unwrapAudio(raw.Data), func(pcm samples) { data, err := opus_.Encode(pcm) audioPool.Put((*[]int16)(&pcm)) if err != nil {