diff --git a/go.mod b/go.mod index 29f3841..77b75af 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/go-audio/audio v1.0.0 github.com/go-audio/wav v1.1.0 github.com/gofrs/uuid v4.4.0+incompatible + github.com/gorilla/websocket v1.5.0 github.com/mattn/go-sqlite3 v1.14.22 github.com/mdp/qrterminal v1.0.1 github.com/pemistahl/lingua-go v1.4.0 @@ -17,6 +18,7 @@ require ( github.com/spf13/cobra v1.8.1 github.com/zaf/resample v1.5.0 go.mau.fi/whatsmeow v0.0.0-20240625083845-6acab596dd8c + golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f google.golang.org/protobuf v1.34.1 ) @@ -34,7 +36,6 @@ require ( github.com/go-playground/validator/v10 v10.20.0 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/gorilla/websocket v1.5.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect @@ -53,7 +54,6 @@ require ( go.mau.fi/util v0.4.1 // indirect golang.org/x/arch v0.8.0 // indirect golang.org/x/crypto v0.23.0 // indirect - golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f // indirect golang.org/x/net v0.25.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect diff --git a/packages/audiosocket/main.go b/packages/audiosocket/main.go index 78fadbe..1310070 100644 --- a/packages/audiosocket/main.go +++ b/packages/audiosocket/main.go @@ -1,9 +1,7 @@ package audiosocketserver import ( - "bytes" "context" - "encoding/binary" "fmt" "io" "log" @@ -16,11 +14,8 @@ import ( "github.com/CyCoreSystems/audiosocket" "github.com/felipem1210/freetalkbot/packages/assistants" "github.com/felipem1210/freetalkbot/packages/common" - "github.com/go-audio/audio" - "github.com/go-audio/wav" "github.com/gofrs/uuid" "github.com/pkg/errors" - "github.com/zaf/resample" ) const ( @@ -36,19 +31,18 @@ const ( slinChunkSize = 320 // 8000Hz * 20ms * 2 bytes silenceThreshold = 500 // Silence threshold - silenceDuration = 3 * time.Second // Minimum duration of silence + silenceDuration = 2 * time.Second // Minimum duration of silence MaxCallDuration = 2 * time.Minute // MaxCallDuration is the maximum amount of time to allow a call to be up before it is terminated. ) var ( - audioData []byte - id uuid.UUID - err error - ctx context.Context - cancel context.CancelFunc - assistantLanguage string - language string - openaiClient common.OpenaiClient + audioData []byte + id uuid.UUID + err error + ctx context.Context + cancel context.CancelFunc + language string + openaiClient common.OpenaiClient ) // ErrHangup indicates that the call should be terminated or has been terminated @@ -86,7 +80,6 @@ func listen(ctx context.Context) error { // Handle processes a call func Handle(pCtx context.Context, c net.Conn) { - assistantLanguage = os.Getenv("ASSISTANT_LANGUAGE") var transcription string ctx, cancel = context.WithTimeout(pCtx, MaxCallDuration) @@ -99,12 +92,17 @@ func Handle(pCtx context.Context, c net.Conn) { slog.Info("Begin call process", "callId", id.String()) // Channel to signal end of user speaking - userEndSpeaking := make(chan bool) + playingAudioCh := make(chan bool, 20) // Channel to send audio data audioDataCh := make(chan []byte) + // Channel to detect interrupt + audioInterruptCh := make(chan bool, 20) - defer close(userEndSpeaking) + playingAudioCh <- false + + defer close(playingAudioCh) defer close(audioDataCh) + defer close(audioInterruptCh) // Configure the call timer callTimer := time.NewTimer(MaxCallDuration) @@ -124,10 +122,9 @@ func Handle(pCtx context.Context, c net.Conn) { default: // Start listening for user speech slog.Debug("receiving audio", "callId", id.String()) - go processFromAsterisk(cancel, c, userEndSpeaking, audioDataCh) + go processFromAsterisk(cancel, c, playingAudioCh, audioDataCh, audioInterruptCh) - // Wait for user to stop speaking - <-userEndSpeaking + // Getting audio data from the user audioData = <-audioDataCh slog.Debug("user stopped speaking", "callId", id.String()) start := time.Now() @@ -157,7 +154,7 @@ func Handle(pCtx context.Context, c net.Conn) { go deleteFile(inputAudioFile) } - if language == "" && os.Getenv("ASSISTANT_TOOL") == "rasa" { + if language == "" { language = common.DetectLanguage(transcription) slog.Debug(fmt.Sprintf("detected language: %s", language), "sender", id.String()) } @@ -192,36 +189,52 @@ func Handle(pCtx context.Context, c net.Conn) { } slog.Debug(fmt.Sprintf("completed to create the response in %s", time.Since(start).Round(time.Second).String()), "callId", id.String()) go deleteFile(responseAudioFile) - sendAudio(c, audioData) + go sendAudio(c, audioData, audioInterruptCh, playingAudioCh) } } i++ } } -func choosePicoTtsLanguage(language string) string { - switch language { - case "en": - return "en-US" - case "es": - return "es-ES" - case "fr": - return "fr-FR" - case "de": - return "de-DE" - case "it": - return "it-IT" - case "pt": - return "pt-PT" - default: - return "en-US" +// setInterruptChannel sets the interrupt channel to true when the user starts speaking and the response from IA is playing +func setInterruptChannel(audioInterruptCh chan bool, playingAudioCh chan bool, userBeginSpeakingCh chan bool, done chan bool) { + flag1 := false + flag2 := false + for { + select { + case playingAudio := <-playingAudioCh: + flag1 = playingAudio + case uBp := <-userBeginSpeakingCh: + flag2 = uBp + case <-done: + return + default: + time.Sleep(1000 * time.Millisecond) // Wait until receiving to channels + } + // If the user starts speaking and the response from IA is playing, set audioInterruptCh to true + if flag1 && flag2 { + slog.Debug("Recibed true in playingAudio and userBeginSpeaking, setting audioInterruptCh to true", "callId", id.String()) + audioInterruptCh <- true + userBeginSpeakingCh <- false + } } } -func processFromAsterisk(cancel context.CancelFunc, c net.Conn, userEndSpeaking chan bool, audioDataCh chan []byte) { +// processFromAsterisk processes audio data from the Asterisk server +func processFromAsterisk(cancel context.CancelFunc, c net.Conn, playingAudioCh chan bool, audioDataCh chan []byte, audioInterruptCh chan bool) { var silenceStart time.Time var messageData []byte detectingSilence := false + userBeginSpeaking := false + alreadyUserBeginSpeaking := false + done := make(chan bool) + userBeginSpeakingCh := make(chan bool, 1) + userBeginSpeakingCh <- false + + defer close(userBeginSpeakingCh) + defer close(done) + + go setInterruptChannel(audioInterruptCh, playingAudioCh, userBeginSpeakingCh, done) for { m, err := audiosocket.NextMessage(c) @@ -236,150 +249,71 @@ func processFromAsterisk(cancel context.CancelFunc, c net.Conn, userEndSpeaking return } switch m.Kind() { - case audiosocket.KindHangup: - slog.Debug("audiosocket received hangup command", "callId", id.String()) - userEndSpeaking <- true - return case audiosocket.KindError: slog.Warn("Packet loss when sending to audiosocket", "callId", id.String()) case audiosocket.KindSlin: // Store audio data to send it later in audioDataCh messageData = append(messageData, m.Payload()...) var volume float64 - // Check if there is audio data, indicating the user is speaking if inputAudioFormat == "g711" { volume = calculateVolumeG711(m.Payload(), inputAudioCodec) } else { volume = calculateVolumePCM16(m.Payload()) } + // Check if volume is bigger than silenceTheshold, indicating the user is speaking + // It detects when user starts speaking, so it can interrupt the response from IA if volume < silenceThreshold { - if !detectingSilence { - silenceStart = time.Now() - detectingSilence = true - } else if time.Since(silenceStart) >= silenceDuration { - slog.Debug("Detected silence", "callId", id.String()) - userEndSpeaking <- true - audioDataCh <- messageData - return + if userBeginSpeaking { + if !detectingSilence { + silenceStart = time.Now() + detectingSilence = true + } else if time.Since(silenceStart) >= silenceDuration { + slog.Debug("Detected silence", "callId", id.String()) + audioDataCh <- messageData + return + } } } else { - detectingSilence = false + userBeginSpeaking = true + if !alreadyUserBeginSpeaking { + userBeginSpeakingCh <- true + alreadyUserBeginSpeaking = true + } } } } } -func handleWavFile(filePath string) ([]byte, error) { - // Open the input WAV file - file, err := os.Open(filePath) - if err != nil { - slog.ErrorContext(ctx, "failed to open input file", slog.Any("error", err), "callId", id.String()) - return nil, err - } - defer file.Close() - - // Get the WAV file sample rate - header := make([]byte, 44) - _, err = file.Read(header) - if err != nil { - slog.ErrorContext(ctx, "failed to read WAV header", slog.Any("error", err), "callId", id.String()) - return nil, err - } - wavSampleRate := binary.LittleEndian.Uint32(header[24:28]) - - data, err := io.ReadAll(file) - if err != nil { - slog.ErrorContext(ctx, "failed to read file data", slog.Any("error", err), "callId", id.String()) - return nil, err - } - _, err = file.Seek(0, io.SeekStart) - if err != nil { - slog.ErrorContext(ctx, "failed to seek file", slog.Any("error", err), "callId", id.String()) - return nil, err - } - - // Create a new resampler to convert the WAV file to PCM 16bit lineat 8kHz Mono - var out bytes.Buffer - - resampler, err := resample.New(&out, float64(wavSampleRate), 8000, 1, 3, 6) - if err != nil { - slog.ErrorContext(ctx, "failed to create resampler", slog.Any("error", err), "callId", id.String()) - return nil, err - } - _, err = resampler.Write(data[44:]) - if err != nil { - slog.ErrorContext(ctx, "resampling write failed", slog.Any("error", err), "callId", id.String()) - return nil, err - } - err = resampler.Close() - if err != nil { - slog.ErrorContext(ctx, "failed to close resampler", slog.Any("error", err), "callId", id.String()) - return nil, err - } - - return out.Bytes(), nil -} - // sendAudio sends audio data to the Asterisk server -func sendAudio(w io.Writer, data []byte) error { +func sendAudio(w io.Writer, data []byte, audioInterruptCh chan bool, playingAudioCh chan bool) error { var i, chunks int + playingAudioCh <- true t := time.NewTicker(20 * time.Millisecond) defer t.Stop() - for range t.C { - if i >= len(data) { - return nil - } - var chunkLen = slinChunkSize - if i+slinChunkSize > len(data) { - chunkLen = len(data) - i - } - if _, err := w.Write(audiosocket.SlinMessage(data[i : i+chunkLen])); err != nil { - return errors.Wrap(err, "failed to write chunk to audiosocket") + select { + case audioInterrupt := <-audioInterruptCh: + if audioInterrupt { + slog.Debug("audio interrupted because user doesn't want to hear me anymore", "callId", id.String()) + playingAudioCh <- false + return nil + } + default: + if i >= len(data) { + slog.Debug("audio send finished", "callId", id.String()) + playingAudioCh <- false + return nil + } + var chunkLen = slinChunkSize + if i+slinChunkSize > len(data) { + chunkLen = len(data) - i + } + if _, err := w.Write(audiosocket.SlinMessage(data[i : i+chunkLen])); err != nil { + return errors.Wrap(err, "failed to write chunk to audiosocket") + } + chunks++ + i += chunkLen } - chunks++ - i += chunkLen } return nil } - -// saveToWAV saved data into a wav file. -func saveToWAV(audioData []byte, filename string) error { - // Create output file - outFile, err := os.Create(filename) - if err != nil { - slog.ErrorContext(ctx, "failed to open output wav file", slog.Any("error", err), "callId", id.String()) - return err - } - defer outFile.Close() - - // Create new wav encoder - enc := wav.NewEncoder(outFile, 8000, 16, 1, 1) - - // Convert []byte audio data into a format that the WAV encoder can understand - buf := &audio.IntBuffer{ - Format: &audio.Format{ - SampleRate: 8000, - NumChannels: 1, - }, - Data: make([]int, len(audioData)/2), - } - - for i := 0; i < len(audioData)/2; i++ { - buf.Data[i] = int(int16(audioData[2*i]) | int16(audioData[2*i+1])<<8) - } - - // Write the PCM audio data to the WAV encoder - if err := enc.Write(buf); err != nil { - slog.ErrorContext(ctx, "failed to write audio data to wav encoder", slog.Any("error", err), "callId", id.String()) - return err - } - - // Close the encoder to ensure all data is written - if err := enc.Close(); err != nil { - slog.ErrorContext(ctx, "failed to close wav encoder", slog.Any("error", err), "callId", id.String()) - return err - } - - return nil -} diff --git a/packages/audiosocket/utils.go b/packages/audiosocket/utils.go index a9697b4..301fd87 100644 --- a/packages/audiosocket/utils.go +++ b/packages/audiosocket/utils.go @@ -1,14 +1,19 @@ package audiosocketserver import ( + "bytes" "encoding/binary" "fmt" + "io" "log/slog" "math" "net" "os" "github.com/CyCoreSystems/audiosocket" + "github.com/go-audio/audio" + "github.com/go-audio/wav" + "github.com/zaf/resample" ) // Calculate the volume of the audio data. This is done by calculating the amplitude of the audio data wave. @@ -96,7 +101,7 @@ func calculateVolumeG711(buffer []byte, codec string) float64 { // delete a file func deleteFile(filename string) { if err := os.Remove(filename); err != nil { - slog.Error(fmt.Sprintf("Failed to delete file:", err), "callId", id.String()) + slog.Error(fmt.Sprintf("Failed to delete file: %s", err), "callId", id.String()) } } @@ -105,8 +110,121 @@ func sendHangupSignal(c net.Conn) { language = "" hangupMessage := audiosocket.HangupMessage() if _, err := c.Write(hangupMessage); err != nil { - slog.Error(fmt.Sprintf("Failed to send hangup signal:", err), "callId", id.String()) + slog.Error(fmt.Sprintf("Failed to send hangup signal: %s", err), "callId", id.String()) } else { slog.Info("Hangup signal sent successfully", "callId", id.String()) } } + +// choosePicoTtsLanguage chooses the language for the Pico TTS engine +func choosePicoTtsLanguage(language string) string { + switch language { + case "en": + return "en-US" + case "es": + return "es-ES" + case "fr": + return "fr-FR" + case "de": + return "de-DE" + case "it": + return "it-IT" + // case "pt": + // return "pt-PT" + default: + return "en-US" + } +} + +// saveToWAV saved data into a wav file. +func saveToWAV(audioData []byte, filename string) error { + // Create output file + outFile, err := os.Create(filename) + if err != nil { + slog.ErrorContext(ctx, "failed to open output wav file", slog.Any("error", err), "callId", id.String()) + return err + } + defer outFile.Close() + + // Create new wav encoder + enc := wav.NewEncoder(outFile, 8000, 16, 1, 1) + + // Convert []byte audio data into a format that the WAV encoder can understand + buf := &audio.IntBuffer{ + Format: &audio.Format{ + SampleRate: 8000, + NumChannels: 1, + }, + Data: make([]int, len(audioData)/2), + } + + for i := 0; i < len(audioData)/2; i++ { + buf.Data[i] = int(int16(audioData[2*i]) | int16(audioData[2*i+1])<<8) + } + + // Write the PCM audio data to the WAV encoder + if err := enc.Write(buf); err != nil { + slog.ErrorContext(ctx, "failed to write audio data to wav encoder", slog.Any("error", err), "callId", id.String()) + return err + } + + // Close the encoder to ensure all data is written + if err := enc.Close(); err != nil { + slog.ErrorContext(ctx, "failed to close wav encoder", slog.Any("error", err), "callId", id.String()) + return err + } + + return nil +} + +// function to process a wav file and convert it to []byte PCM 16bit linear 8kHz Mono +func handleWavFile(filePath string) ([]byte, error) { + // Open the input WAV file + file, err := os.Open(filePath) + if err != nil { + slog.ErrorContext(ctx, "failed to open input file", slog.Any("error", err), "callId", id.String()) + return nil, err + } + defer file.Close() + + // Get the WAV file sample rate + header := make([]byte, 44) + _, err = file.Read(header) + if err != nil { + slog.ErrorContext(ctx, "failed to read WAV header", slog.Any("error", err), "callId", id.String()) + return nil, err + } + wavSampleRate := binary.LittleEndian.Uint32(header[24:28]) + + data, err := io.ReadAll(file) + if err != nil { + slog.ErrorContext(ctx, "failed to read file data", slog.Any("error", err), "callId", id.String()) + return nil, err + } + _, err = file.Seek(0, io.SeekStart) + if err != nil { + slog.ErrorContext(ctx, "failed to seek file", slog.Any("error", err), "callId", id.String()) + return nil, err + } + + // Create a new resampler to convert the WAV file to PCM 16bit linear 8kHz Mono + var out bytes.Buffer + + resampler, err := resample.New(&out, float64(wavSampleRate), 8000, 1, 3, 6) + if err != nil { + slog.ErrorContext(ctx, "failed to create resampler", slog.Any("error", err), "callId", id.String()) + return nil, err + } + _, err = resampler.Write(data[44:]) + if err != nil { + slog.ErrorContext(ctx, "resampling write failed", slog.Any("error", err), "callId", id.String()) + return nil, err + } + err = resampler.Close() + if err != nil { + slog.ErrorContext(ctx, "failed to close resampler", slog.Any("error", err), "callId", id.String()) + return nil, err + } + + return out.Bytes(), nil +}