Skip to content

Commit

Permalink
Fix seeking support to work with the new threading model
Browse files Browse the repository at this point in the history
Now with 100% less getting stuck.
  • Loading branch information
katajakasa committed Nov 11, 2023
1 parent 3e1aa0f commit 1dec78b
Show file tree
Hide file tree
Showing 21 changed files with 667 additions and 570 deletions.
446 changes: 223 additions & 223 deletions Doxyfile

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions include/kitchensink/internal/audio/kitaudio.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#ifndef KITAUDIO_H
#define KITAUDIO_H

#include "kitchensink/internal/kitdecoder.h"
#include "kitchensink/internal/kittimer.h"
#include "kitchensink/kitconfig.h"
#include "kitchensink/kitsource.h"
#include "kitchensink/internal/kitdecoder.h"

KIT_LOCAL Kit_Decoder* Kit_CreateAudioDecoder(const Kit_Source *src, int stream_index);
KIT_LOCAL int Kit_GetAudioDecoderData(Kit_Decoder *dec, unsigned char *buf, int len);
KIT_LOCAL Kit_Decoder* Kit_CreateAudioDecoder(const Kit_Source *src, Kit_Timer *sync_timer, int stream_index);
KIT_LOCAL int Kit_GetAudioDecoderData(Kit_Decoder *dec, unsigned char *buf, size_t len);
KIT_LOCAL int Kit_GetAudioDecoderOutputFormat(const Kit_Decoder *dec, Kit_AudioOutputFormat *output);

#endif // KITAUDIO_H
12 changes: 6 additions & 6 deletions include/kitchensink/internal/kitdecoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,23 @@
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>

#include "kitchensink/kitformat.h"
#include "kitchensink/kitcodec.h"
#include "kitchensink/kitconfig.h"
#include "kitchensink/kitformat.h"
#include "kitchensink/kitsource.h"
#include "kitpacketbuffer.h"
#include "kittimer.h"

typedef struct Kit_Decoder Kit_Decoder;

typedef bool (*dec_input_cb)(const Kit_Decoder *decoder, const AVPacket *packet);
typedef bool (*dec_decode_cb)(const Kit_Decoder *decoder);
typedef bool (*dec_decode_cb)(const Kit_Decoder *decoder, double *pts);
typedef void (*dec_flush_cb)(Kit_Decoder *decoder);
typedef void (*dec_signal_cb)(Kit_Decoder *decoder);
typedef void (*dec_close_cb)(Kit_Decoder *decoder);

struct Kit_Decoder {
double clock_sync; ///< Sync source for current stream
Kit_Timer *sync_timer;
double clock_pos; ///< Current pts for the stream
AVRational aspect_ratio; ///< Aspect ratio for the current frame (may change frame-to-frame)
AVCodecContext *codec_ctx; ///< FFMpeg internal: Codec context
Expand All @@ -37,6 +38,7 @@ struct Kit_Decoder {

KIT_LOCAL Kit_Decoder* Kit_CreateDecoder(
AVStream *stream,
Kit_Timer *sync_timer,
int thread_count,
dec_input_cb dec_input,
dec_decode_cb dec_decode,
Expand All @@ -50,11 +52,9 @@ KIT_LOCAL void Kit_CloseDecoder(Kit_Decoder **dec);
KIT_LOCAL int Kit_GetDecoderStreamIndex(const Kit_Decoder *decoder);
KIT_LOCAL int Kit_GetDecoderCodecInfo(const Kit_Decoder *decoder, Kit_Codec *codec);

KIT_LOCAL void Kit_ChangeDecoderClockSync(Kit_Decoder *decoder, double sync);
KIT_LOCAL double Kit_GetDecoderPTS(const Kit_Decoder *decoder);
KIT_LOCAL double Kit_GetDecoderDuration(const Kit_Decoder *decoder);

KIT_LOCAL bool Kit_RunDecoder(const Kit_Decoder *decoder);
KIT_LOCAL bool Kit_RunDecoder(const Kit_Decoder *decoder, double *pts);
KIT_LOCAL bool Kit_AddDecoderPacket(const Kit_Decoder *decoder, const AVPacket *packet);
KIT_LOCAL void Kit_ClearDecoderBuffers(Kit_Decoder *decoder);
KIT_LOCAL void Kit_SignalDecoder(Kit_Decoder *decoder);
Expand Down
2 changes: 2 additions & 0 deletions include/kitchensink/internal/kitdemuxer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ KIT_LOCAL void Kit_CloseDemuxer(Kit_Demuxer **demuxer);
KIT_LOCAL bool Kit_RunDemuxer(Kit_Demuxer *demuxer);
KIT_LOCAL Kit_PacketBuffer* Kit_GetDemuxerPacketBuffer(const Kit_Demuxer *demuxer, KitBufferIndex buffer_index);
KIT_LOCAL void Kit_ClearDemuxerBuffers(const Kit_Demuxer *demuxer);
KIT_LOCAL void Kit_SignalDemuxer(const Kit_Demuxer *demuxer);
KIT_LOCAL bool Kit_DemuxerSeek(Kit_Demuxer *demuxer, int64_t seek_target);

#endif // KITDEMUXER_H
10 changes: 4 additions & 6 deletions include/kitchensink/internal/kitdemuxerthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@ typedef struct Kit_DemuxerThread {
Kit_Demuxer *demuxer;
SDL_Thread *thread;
SDL_atomic_t run;
SDL_atomic_t seek;
int64_t seek_target;
} Kit_DemuxerThread;

KIT_LOCAL Kit_DemuxerThread* Kit_CreateDemuxerThread(
const Kit_Source *src,
int video_index,
int audio_index,
int subtitle_index
);
KIT_LOCAL Kit_DemuxerThread* Kit_CreateDemuxerThread(Kit_Demuxer *demuxer);
KIT_LOCAL void Kit_CloseDemuxerThread(Kit_DemuxerThread **demuxer);

KIT_LOCAL void Kit_SeekDemuxerThread(Kit_DemuxerThread *demuxer_thread, int64_t seek_target);
KIT_LOCAL Kit_PacketBuffer* Kit_GetDemuxerThreadPacketBuffer(const Kit_DemuxerThread *demuxer_thread, KitBufferIndex buffer_index);

KIT_LOCAL void Kit_StartDemuxerThread(Kit_DemuxerThread *demuxer_thread);
Expand Down
8 changes: 7 additions & 1 deletion include/kitchensink/internal/kitpacketbuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ typedef void* (*buf_obj_alloc)();
typedef void (*buf_obj_unref)(void *obj);
typedef void (*buf_obj_free)(void **obj);
typedef void (*buf_obj_move)(void *dst, void *src);
typedef void (*buf_obj_ref)(void *dst, void *src);

typedef struct Kit_PacketBuffer Kit_PacketBuffer;

Expand All @@ -16,7 +17,8 @@ KIT_LOCAL Kit_PacketBuffer* Kit_CreatePacketBuffer(
buf_obj_alloc alloc_cb,
buf_obj_unref unref_cb,
buf_obj_free free_cb,
buf_obj_move move_cb
buf_obj_move move_cb,
buf_obj_ref ref_cb
);
KIT_LOCAL void Kit_FreePacketBuffer(Kit_PacketBuffer **buffer);

Expand All @@ -30,4 +32,8 @@ KIT_LOCAL void Kit_FlushPacketBuffer(Kit_PacketBuffer *buffer);
KIT_LOCAL bool Kit_WritePacketBuffer(Kit_PacketBuffer *buffer, void *src);
KIT_LOCAL bool Kit_ReadPacketBuffer(Kit_PacketBuffer *buffer, void *dst, int timeout);

KIT_LOCAL bool Kit_BeginPacketBufferRead(Kit_PacketBuffer *buffer, void *dst, int timeout);
KIT_LOCAL void Kit_FinishPacketBufferRead(Kit_PacketBuffer *buffer);
KIT_LOCAL void Kit_CancelPacketBufferRead(Kit_PacketBuffer *buffer);

#endif // KITFRAMESTREAM_H
5 changes: 3 additions & 2 deletions include/kitchensink/internal/subtitle/kitsubtitle.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

#include <SDL_render.h>

#include "kitchensink/internal/kitdecoder.h"
#include "kitchensink/internal/kittimer.h"
#include "kitchensink/kitconfig.h"
#include "kitchensink/kitsource.h"
#include "kitchensink/internal/kitdecoder.h"

KIT_LOCAL Kit_Decoder* Kit_CreateSubtitleDecoder(
const Kit_Source *src, int stream_index, int video_w, int video_h, int screen_w, int screen_h);
const Kit_Source *src, Kit_Timer *sync_timer, int stream_index, int video_w, int video_h, int screen_w, int screen_h);
KIT_LOCAL void Kit_GetSubtitleDecoderTexture(const Kit_Decoder *dec, SDL_Texture *texture, double sync_ts);
KIT_LOCAL void Kit_SetSubtitleDecoderSize(const Kit_Decoder *dec, int w, int h);
KIT_LOCAL int Kit_GetSubtitleDecoderInfo(
Expand Down
3 changes: 3 additions & 0 deletions include/kitchensink/internal/subtitle/kitsubtitlepacket.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ KIT_LOCAL void Kit_SetSubtitlePacketData(
KIT_LOCAL void Kit_MoveSubtitlePacketRefs(Kit_SubtitlePacket *dst, Kit_SubtitlePacket *src);
KIT_LOCAL void Kit_DelSubtitlePacketRefs(Kit_SubtitlePacket *packet);

// Not implemented
KIT_LOCAL void Kit_CreateSubtitlePacketRef(Kit_SubtitlePacket *dst, Kit_SubtitlePacket * src);


#endif // KITSUBTITLEPACKET_H
5 changes: 3 additions & 2 deletions include/kitchensink/internal/video/kitvideo.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

#include <SDL_render.h>

#include "kitchensink/internal/kitdecoder.h"
#include "kitchensink/internal/kittimer.h"
#include "kitchensink/kitconfig.h"
#include "kitchensink/kitsource.h"
#include "kitchensink/internal/kitdecoder.h"

KIT_LOCAL Kit_Decoder* Kit_CreateVideoDecoder(const Kit_Source *src, int stream_index);
KIT_LOCAL Kit_Decoder* Kit_CreateVideoDecoder(const Kit_Source *src, Kit_Timer *sync_timer,int stream_index);
KIT_LOCAL int Kit_GetVideoDecoderData(Kit_Decoder *dec, SDL_Texture *texture, SDL_Rect *area);
int Kit_GetVideoDecoderOutputFormat(const Kit_Decoder *dec, Kit_VideoOutputFormat *output);

Expand Down
147 changes: 77 additions & 70 deletions src/internal/audio/kitaudio.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@
#include "kitchensink/kiterror.h"
#include "kitchensink/internal/kitlibstate.h"
#include "kitchensink/internal/utils/kithelpers.h"
#include "kitchensink/internal/utils/kitlog.h"
#include "kitchensink/internal/audio/kitaudio.h"
#include "kitchensink/internal/audio/kitaudioutils.h"

#define KIT_AUDIO_SYNC_THRESHOLD 0.05
#define KIT_AUDIO_EARLY_FAIL 5
#define KIT_AUDIO_EARLY_THRESHOLD 0.01
#define KIT_AUDIO_LATE_THRESHOLD 0.05

#define SAMPLE_BYTES(audio_decoder) (audio_decoder->output.channels * audio_decoder->output.bytes)

typedef struct Kit_AudioDecoder {
SwrContext *swr; ///< Audio resampler context
AVFrame *in_frame; ///< Temporary AVFrame for audio decoding purposes
AVFrame *out_frame; ///< Temporary AVFrame fur audio resampling purposes
AVFrame *current; ///< Audio packet we are currently reading from
int left; ///< How much data we have left in the current packet
Kit_PacketBuffer *buffer; ///< Packet ringbuffer for decoded audio packets
Kit_AudioOutputFormat output; ///< Output audio format description
} Kit_AudioDecoder;
Expand Down Expand Up @@ -46,16 +50,22 @@ static void dec_read_audio(const Kit_Decoder *dec) {
swr_convert_frame(audio_decoder->swr, audio_decoder->out_frame, audio_decoder->in_frame);
av_frame_copy_props(audio_decoder->out_frame, audio_decoder->in_frame);

// Write audio packet to packet buffer. This may block!
// No need to av_packet_unref, since Kit_WritePacketBuffer will move the refs.
Kit_WritePacketBuffer(audio_decoder->buffer, audio_decoder->out_frame);
// Save bytes left in the frame. Misuse crop values, since they are only used for video packets by ffmpeg.
audio_decoder->out_frame->crop_top = SAMPLE_BYTES(audio_decoder) * audio_decoder->out_frame->nb_samples;
audio_decoder->out_frame->crop_bottom = audio_decoder->out_frame->crop_top;

// Write video packet to packet buffer. This may block!
// if write succeeds, no need to av_packet_unref, since Kit_WritePacketBuffer will move the refs.
// If write fails, unref the packet. Fails should only happen if we are closing or seeking, so it is fine.
if (!Kit_WritePacketBuffer(audio_decoder->buffer, audio_decoder->out_frame)) {
av_frame_unref(audio_decoder->out_frame);
}
}

static void dec_flush_audio_cb(Kit_Decoder *decoder) {
assert(decoder);
Kit_AudioDecoder *audio_decoder = decoder->userdata;
Kit_FlushPacketBuffer(audio_decoder->buffer);
av_frame_unref(audio_decoder->current);
}

static void dec_signal_audio_cb(Kit_Decoder *decoder) {
Expand All @@ -70,12 +80,13 @@ static bool dec_input_audio_cb(const Kit_Decoder *dec, const AVPacket *in_packet
return avcodec_send_packet(dec->codec_ctx, in_packet) == 0;
}

static bool dec_decode_audio_cb(const Kit_Decoder *dec) {
assert(dec != NULL);
Kit_AudioDecoder *audio_decoder = dec->userdata;
static bool dec_decode_audio_cb(const Kit_Decoder *decoder, double *pts) {
assert(decoder != NULL);
Kit_AudioDecoder *audio_decoder = decoder->userdata;

if(avcodec_receive_frame(dec->codec_ctx, audio_decoder->in_frame) == 0) {
dec_read_audio(dec);
if(avcodec_receive_frame(decoder->codec_ctx, audio_decoder->in_frame) == 0) {
*pts = audio_decoder->in_frame->best_effort_timestamp * av_q2d(decoder->stream->time_base);
dec_read_audio(decoder);
av_frame_unref(audio_decoder->in_frame);
return true;
}
Expand All @@ -88,20 +99,15 @@ static void dec_close_audio_cb(Kit_Decoder *ref) {
return;
assert(ref->userdata);
Kit_AudioDecoder *audio_dec = ref->userdata;
if(audio_dec->in_frame != NULL)
av_frame_free(&audio_dec->in_frame);
if(audio_dec->out_frame != NULL)
av_frame_free(&audio_dec->out_frame);
if(audio_dec->current != NULL)
av_frame_free(&audio_dec->current);
if(audio_dec->swr != NULL)
swr_free(&audio_dec->swr);
if(audio_dec->buffer != NULL)
Kit_FreePacketBuffer(&audio_dec->buffer);
av_frame_free(&audio_dec->in_frame);
av_frame_free(&audio_dec->out_frame);
av_frame_free(&audio_dec->current);
swr_free(&audio_dec->swr);
Kit_FreePacketBuffer(&audio_dec->buffer);
free(audio_dec);
}

Kit_Decoder* Kit_CreateAudioDecoder(const Kit_Source *src, int stream_index) {
Kit_Decoder* Kit_CreateAudioDecoder(const Kit_Source *src, Kit_Timer *sync_timer, int stream_index) {
assert(src != NULL);

const Kit_LibraryState *state = Kit_GetLibraryState();
Expand Down Expand Up @@ -130,6 +136,7 @@ Kit_Decoder* Kit_CreateAudioDecoder(const Kit_Source *src, int stream_index) {
}
if((decoder = Kit_CreateDecoder(
stream,
sync_timer,
state->thread_count,
dec_input_audio_cb,
dec_decode_audio_cb,
Expand Down Expand Up @@ -157,7 +164,8 @@ Kit_Decoder* Kit_CreateAudioDecoder(const Kit_Source *src, int stream_index) {
(buf_obj_alloc) av_frame_alloc,
(buf_obj_unref) av_frame_unref,
(buf_obj_free) av_frame_free,
(buf_obj_move) av_frame_move_ref)) == NULL) {
(buf_obj_move) av_frame_move_ref,
(buf_obj_ref) av_frame_ref)) == NULL) {
Kit_SetError("Unable to create an output buffer for stream %d", stream_index);
goto exit_current;
}
Expand Down Expand Up @@ -194,7 +202,6 @@ Kit_Decoder* Kit_CreateAudioDecoder(const Kit_Source *src, int stream_index) {
audio_decoder->swr = swr;
audio_decoder->buffer = buffer;
audio_decoder->output = output;
audio_decoder->left = 0;
return decoder;

exit_swr:
Expand All @@ -220,69 +227,69 @@ static double Kit_GetCurrentPTS(const Kit_Decoder *decoder) {
return audio_decoder->current->best_effort_timestamp * av_q2d(decoder->stream->time_base);
}

#define PACKET_SIZE(audio_decoder) (audio_decoder->current->nb_samples * audio_decoder->output.channels * audio_decoder->output.bytes)

/**
* Get a new packet from the audio decoder output and bump it to the current packet slot.
*/
static bool Kit_PopAudioPacket(Kit_Decoder *decoder) {
Kit_AudioDecoder *audio_decoder = decoder->userdata;
av_frame_unref(audio_decoder->current);
if(!Kit_ReadPacketBuffer(audio_decoder->buffer, audio_decoder->current, 0)) {
return false;
}
audio_decoder->left = PACKET_SIZE(audio_decoder);
return true;
}

double Kit_GetClipTime(const Kit_AudioOutputFormat *output, size_t bytes) {
int bytes_per_sample = output->bytes * output->channels;
double bytes_per_second = bytes_per_sample * output->sample_rate;
return ((double)bytes) / bytes_per_second;
}

int Kit_GetAudioDecoderData(Kit_Decoder *decoder, unsigned char *buf, int len) {
int Kit_GetAudioDecoderData(Kit_Decoder *decoder, unsigned char *buf, size_t len) {
assert(decoder != NULL);

Kit_AudioDecoder *audio_decoder = decoder->userdata;
int pos, ret = 0;
double pts;
int pos;
int ret = 0;
size_t *size;
size_t *left;
double sync_ts;

// Immediately bail if nothing is requested.
if(len <= 0)
return 0;
if(!Kit_BeginPacketBufferRead(audio_decoder->buffer, audio_decoder->current, 0))
return 0;

// If we have no data left in current buffer, try to get some more from the decoder output.
if(audio_decoder->left <= 0)
if(!Kit_PopAudioPacket(decoder))
// If packet should not yet be played, stop here and wait.
// If packet should have already been played, skip it and try to find a better packet.
decoder->clock_pos = Kit_GetCurrentPTS(decoder);
sync_ts = Kit_GetTimerElapsed(decoder->sync_timer);

// If packet is far too early, the stream jumped or was seeked. Skip packets until we something valid.
while(decoder->clock_pos > sync_ts + KIT_AUDIO_EARLY_FAIL) {
//LOG("[AUDIO] FAIL-EARLY: pts = %lf < %lf + %lf\n", decoder->clock_pos, sync_ts, KIT_AUDIO_LATE_THRESHOLD);
av_frame_unref(audio_decoder->current);
Kit_FinishPacketBufferRead(audio_decoder->buffer);
if(!Kit_BeginPacketBufferRead(audio_decoder->buffer, audio_decoder->current, 0))
return 0;

// Get the presentation timestamp of the current frame, and set the sync clock if it was not yet set.
pts = Kit_GetCurrentPTS(decoder);
if(decoder->clock_sync < 0) {
decoder->clock_sync = Kit_GetSystemTime() + pts;
decoder->clock_pos = Kit_GetCurrentPTS(decoder);
}

// If packet should not yet be played, stop here and wait.
// If packet should have already been played, skip it and try to find a better packet.
sync_ts = Kit_GetSystemTime() - decoder->clock_sync;
if(pts > sync_ts + KIT_AUDIO_SYNC_THRESHOLD) {
// Packet is too early, wait.
if(decoder->clock_pos > sync_ts + KIT_AUDIO_EARLY_THRESHOLD) {
//LOG("[AUDIO] EARLY pts = %lf > %lf + %lf\n", decoder->clock_pos, sync_ts, KIT_AUDIO_EARLY_THRESHOLD);
av_frame_unref(audio_decoder->current);
Kit_CancelPacketBufferRead(audio_decoder->buffer);
return 0;
}
while(pts < sync_ts - KIT_AUDIO_SYNC_THRESHOLD) {
if(!Kit_PopAudioPacket(decoder))

// Packet is too late, skip packets until we see something reasonable.
while(decoder->clock_pos < sync_ts - KIT_AUDIO_LATE_THRESHOLD) {
//LOG("[AUDIO] LATE: pts = %lf < %lf + %lf\n", decoder->clock_pos, sync_ts, KIT_AUDIO_LATE_THRESHOLD);
av_frame_unref(audio_decoder->current);
Kit_FinishPacketBufferRead(audio_decoder->buffer);
if(!Kit_BeginPacketBufferRead(audio_decoder->buffer, audio_decoder->current, 0))
return 0;
pts = Kit_GetCurrentPTS(decoder);
decoder->clock_pos = Kit_GetCurrentPTS(decoder);
}
//LOG("[AUDIO] >>> SYNC!: pts = %lf, sync = %lf\n", decoder->clock_pos, sync_ts);

decoder->clock_pos = pts;
if(audio_decoder->left) {
ret = (len > audio_decoder->left) ? audio_decoder->left : len;
pos = PACKET_SIZE(audio_decoder) - audio_decoder->left;
size = &audio_decoder->current->crop_top;
left = &audio_decoder->current->crop_bottom;
if(*left) {
ret = (len > *left) ? *left : len;
pos = *size - *left;
memcpy(buf, audio_decoder->current->data[0] + pos, ret);
audio_decoder->left -= ret;
decoder->clock_pos += Kit_GetClipTime(&audio_decoder->output, pos);
*left -= ret;
}

av_frame_unref(audio_decoder->current);
if(*left) {
Kit_CancelPacketBufferRead(audio_decoder->buffer);
} else {
Kit_FinishPacketBufferRead(audio_decoder->buffer);
}
return ret;
}
Loading

0 comments on commit 1dec78b

Please sign in to comment.