Skip to content

Commit

Permalink
Feat/detect voice activity (#25)
Browse files Browse the repository at this point in the history
Closes #18

* Implemented new channels playingAudioCh, userBeginSpeakingCh and
audioInterruptCh to determine when person began to speak while playing
audio, and setting an interrupt flag so the audio stops playing
* Implemented new go routine to set this interrupt channel
* Changed the sendAudio function to read from the interrupt channel and
cut the audio play. Also the sendAudio is executed as a new go routine
so it does paralel the play and the audio process from asterisk.
  • Loading branch information
felipem1210 authored Oct 9, 2024
1 parent f2e0c8f commit b587029
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 162 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/go-audio/audio v1.0.0
github.com/go-audio/wav v1.1.0
github.com/gofrs/uuid v4.4.0+incompatible
github.com/gorilla/websocket v1.5.0
github.com/mattn/go-sqlite3 v1.14.22
github.com/mdp/qrterminal v1.0.1
github.com/pemistahl/lingua-go v1.4.0
Expand All @@ -17,6 +18,7 @@ require (
github.com/spf13/cobra v1.8.1
github.com/zaf/resample v1.5.0
go.mau.fi/whatsmeow v0.0.0-20240625083845-6acab596dd8c
golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f
google.golang.org/protobuf v1.34.1
)

Expand All @@ -34,7 +36,6 @@ require (
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
Expand All @@ -53,7 +54,6 @@ require (
go.mau.fi/util v0.4.1 // indirect
golang.org/x/arch v0.8.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
Expand Down
250 changes: 92 additions & 158 deletions packages/audiosocket/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package audiosocketserver

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"log"
Expand All @@ -16,11 +14,8 @@ import (
"github.com/CyCoreSystems/audiosocket"
"github.com/felipem1210/freetalkbot/packages/assistants"
"github.com/felipem1210/freetalkbot/packages/common"
"github.com/go-audio/audio"
"github.com/go-audio/wav"
"github.com/gofrs/uuid"
"github.com/pkg/errors"
"github.com/zaf/resample"
)

const (
Expand All @@ -36,19 +31,18 @@ const (
slinChunkSize = 320 // 8000Hz * 20ms * 2 bytes

silenceThreshold = 500 // Silence threshold
silenceDuration = 3 * time.Second // Minimum duration of silence
silenceDuration = 2 * time.Second // Minimum duration of silence
MaxCallDuration = 2 * time.Minute // MaxCallDuration is the maximum amount of time to allow a call to be up before it is terminated.
)

var (
audioData []byte
id uuid.UUID
err error
ctx context.Context
cancel context.CancelFunc
assistantLanguage string
language string
openaiClient common.OpenaiClient
audioData []byte
id uuid.UUID
err error
ctx context.Context
cancel context.CancelFunc
language string
openaiClient common.OpenaiClient
)

// ErrHangup indicates that the call should be terminated or has been terminated
Expand Down Expand Up @@ -86,7 +80,6 @@ func listen(ctx context.Context) error {

// Handle processes a call
func Handle(pCtx context.Context, c net.Conn) {
assistantLanguage = os.Getenv("ASSISTANT_LANGUAGE")
var transcription string

ctx, cancel = context.WithTimeout(pCtx, MaxCallDuration)
Expand All @@ -99,12 +92,17 @@ func Handle(pCtx context.Context, c net.Conn) {
slog.Info("Begin call process", "callId", id.String())

// Channel to signal end of user speaking
userEndSpeaking := make(chan bool)
playingAudioCh := make(chan bool, 20)
// Channel to send audio data
audioDataCh := make(chan []byte)
// Channel to detect interrupt
audioInterruptCh := make(chan bool, 20)

defer close(userEndSpeaking)
playingAudioCh <- false

defer close(playingAudioCh)
defer close(audioDataCh)
defer close(audioInterruptCh)

// Configure the call timer
callTimer := time.NewTimer(MaxCallDuration)
Expand All @@ -124,10 +122,9 @@ func Handle(pCtx context.Context, c net.Conn) {
default:
// Start listening for user speech
slog.Debug("receiving audio", "callId", id.String())
go processFromAsterisk(cancel, c, userEndSpeaking, audioDataCh)
go processFromAsterisk(cancel, c, playingAudioCh, audioDataCh, audioInterruptCh)

// Wait for user to stop speaking
<-userEndSpeaking
// Getting audio data from the user
audioData = <-audioDataCh
slog.Debug("user stopped speaking", "callId", id.String())
start := time.Now()
Expand Down Expand Up @@ -157,7 +154,7 @@ func Handle(pCtx context.Context, c net.Conn) {
go deleteFile(inputAudioFile)
}

if language == "" && os.Getenv("ASSISTANT_TOOL") == "rasa" {
if language == "" {
language = common.DetectLanguage(transcription)
slog.Debug(fmt.Sprintf("detected language: %s", language), "sender", id.String())
}
Expand Down Expand Up @@ -192,36 +189,52 @@ func Handle(pCtx context.Context, c net.Conn) {
}
slog.Debug(fmt.Sprintf("completed to create the response in %s", time.Since(start).Round(time.Second).String()), "callId", id.String())
go deleteFile(responseAudioFile)
sendAudio(c, audioData)
go sendAudio(c, audioData, audioInterruptCh, playingAudioCh)
}
}
i++
}
}

func choosePicoTtsLanguage(language string) string {
switch language {
case "en":
return "en-US"
case "es":
return "es-ES"
case "fr":
return "fr-FR"
case "de":
return "de-DE"
case "it":
return "it-IT"
case "pt":
return "pt-PT"
default:
return "en-US"
// setInterruptChannel sets the interrupt channel to true when the user starts speaking and the response from IA is playing
func setInterruptChannel(audioInterruptCh chan bool, playingAudioCh chan bool, userBeginSpeakingCh chan bool, done chan bool) {
flag1 := false
flag2 := false
for {
select {
case playingAudio := <-playingAudioCh:
flag1 = playingAudio
case uBp := <-userBeginSpeakingCh:
flag2 = uBp
case <-done:
return
default:
time.Sleep(1000 * time.Millisecond) // Wait until receiving to channels
}
// If the user starts speaking and the response from IA is playing, set audioInterruptCh to true
if flag1 && flag2 {
slog.Debug("Recibed true in playingAudio and userBeginSpeaking, setting audioInterruptCh to true", "callId", id.String())
audioInterruptCh <- true
userBeginSpeakingCh <- false
}
}
}

func processFromAsterisk(cancel context.CancelFunc, c net.Conn, userEndSpeaking chan bool, audioDataCh chan []byte) {
// processFromAsterisk processes audio data from the Asterisk server
func processFromAsterisk(cancel context.CancelFunc, c net.Conn, playingAudioCh chan bool, audioDataCh chan []byte, audioInterruptCh chan bool) {
var silenceStart time.Time
var messageData []byte
detectingSilence := false
userBeginSpeaking := false
alreadyUserBeginSpeaking := false
done := make(chan bool)
userBeginSpeakingCh := make(chan bool, 1)
userBeginSpeakingCh <- false

defer close(userBeginSpeakingCh)
defer close(done)

go setInterruptChannel(audioInterruptCh, playingAudioCh, userBeginSpeakingCh, done)

for {
m, err := audiosocket.NextMessage(c)
Expand All @@ -236,150 +249,71 @@ func processFromAsterisk(cancel context.CancelFunc, c net.Conn, userEndSpeaking
return
}
switch m.Kind() {
case audiosocket.KindHangup:
slog.Debug("audiosocket received hangup command", "callId", id.String())
userEndSpeaking <- true
return
case audiosocket.KindError:
slog.Warn("Packet loss when sending to audiosocket", "callId", id.String())
case audiosocket.KindSlin:
// Store audio data to send it later in audioDataCh
messageData = append(messageData, m.Payload()...)
var volume float64
// Check if there is audio data, indicating the user is speaking
if inputAudioFormat == "g711" {
volume = calculateVolumeG711(m.Payload(), inputAudioCodec)
} else {
volume = calculateVolumePCM16(m.Payload())
}
// Check if volume is bigger than silenceTheshold, indicating the user is speaking
// It detects when user starts speaking, so it can interrupt the response from IA
if volume < silenceThreshold {
if !detectingSilence {
silenceStart = time.Now()
detectingSilence = true
} else if time.Since(silenceStart) >= silenceDuration {
slog.Debug("Detected silence", "callId", id.String())
userEndSpeaking <- true
audioDataCh <- messageData
return
if userBeginSpeaking {
if !detectingSilence {
silenceStart = time.Now()
detectingSilence = true
} else if time.Since(silenceStart) >= silenceDuration {
slog.Debug("Detected silence", "callId", id.String())
audioDataCh <- messageData
return
}
}
} else {
detectingSilence = false
userBeginSpeaking = true
if !alreadyUserBeginSpeaking {
userBeginSpeakingCh <- true
alreadyUserBeginSpeaking = true
}
}
}
}
}

func handleWavFile(filePath string) ([]byte, error) {
// Open the input WAV file
file, err := os.Open(filePath)
if err != nil {
slog.ErrorContext(ctx, "failed to open input file", slog.Any("error", err), "callId", id.String())
return nil, err
}
defer file.Close()

// Get the WAV file sample rate
header := make([]byte, 44)
_, err = file.Read(header)
if err != nil {
slog.ErrorContext(ctx, "failed to read WAV header", slog.Any("error", err), "callId", id.String())
return nil, err
}
wavSampleRate := binary.LittleEndian.Uint32(header[24:28])

data, err := io.ReadAll(file)
if err != nil {
slog.ErrorContext(ctx, "failed to read file data", slog.Any("error", err), "callId", id.String())
return nil, err
}
_, err = file.Seek(0, io.SeekStart)
if err != nil {
slog.ErrorContext(ctx, "failed to seek file", slog.Any("error", err), "callId", id.String())
return nil, err
}

// Create a new resampler to convert the WAV file to PCM 16bit lineat 8kHz Mono
var out bytes.Buffer

resampler, err := resample.New(&out, float64(wavSampleRate), 8000, 1, 3, 6)
if err != nil {
slog.ErrorContext(ctx, "failed to create resampler", slog.Any("error", err), "callId", id.String())
return nil, err
}
_, err = resampler.Write(data[44:])
if err != nil {
slog.ErrorContext(ctx, "resampling write failed", slog.Any("error", err), "callId", id.String())
return nil, err
}
err = resampler.Close()
if err != nil {
slog.ErrorContext(ctx, "failed to close resampler", slog.Any("error", err), "callId", id.String())
return nil, err
}

return out.Bytes(), nil
}

// sendAudio sends audio data to the Asterisk server
func sendAudio(w io.Writer, data []byte) error {
func sendAudio(w io.Writer, data []byte, audioInterruptCh chan bool, playingAudioCh chan bool) error {
var i, chunks int
playingAudioCh <- true
t := time.NewTicker(20 * time.Millisecond)
defer t.Stop()

for range t.C {
if i >= len(data) {
return nil
}
var chunkLen = slinChunkSize
if i+slinChunkSize > len(data) {
chunkLen = len(data) - i
}
if _, err := w.Write(audiosocket.SlinMessage(data[i : i+chunkLen])); err != nil {
return errors.Wrap(err, "failed to write chunk to audiosocket")
select {
case audioInterrupt := <-audioInterruptCh:
if audioInterrupt {
slog.Debug("audio interrupted because user doesn't want to hear me anymore", "callId", id.String())
playingAudioCh <- false
return nil
}
default:
if i >= len(data) {
slog.Debug("audio send finished", "callId", id.String())
playingAudioCh <- false
return nil
}
var chunkLen = slinChunkSize
if i+slinChunkSize > len(data) {
chunkLen = len(data) - i
}
if _, err := w.Write(audiosocket.SlinMessage(data[i : i+chunkLen])); err != nil {
return errors.Wrap(err, "failed to write chunk to audiosocket")
}
chunks++
i += chunkLen
}
chunks++
i += chunkLen
}
return nil
}

// saveToWAV saved data into a wav file.
func saveToWAV(audioData []byte, filename string) error {
// Create output file
outFile, err := os.Create(filename)
if err != nil {
slog.ErrorContext(ctx, "failed to open output wav file", slog.Any("error", err), "callId", id.String())
return err
}
defer outFile.Close()

// Create new wav encoder
enc := wav.NewEncoder(outFile, 8000, 16, 1, 1)

// Convert []byte audio data into a format that the WAV encoder can understand
buf := &audio.IntBuffer{
Format: &audio.Format{
SampleRate: 8000,
NumChannels: 1,
},
Data: make([]int, len(audioData)/2),
}

for i := 0; i < len(audioData)/2; i++ {
buf.Data[i] = int(int16(audioData[2*i]) | int16(audioData[2*i+1])<<8)
}

// Write the PCM audio data to the WAV encoder
if err := enc.Write(buf); err != nil {
slog.ErrorContext(ctx, "failed to write audio data to wav encoder", slog.Any("error", err), "callId", id.String())
return err
}

// Close the encoder to ensure all data is written
if err := enc.Close(); err != nil {
slog.ErrorContext(ctx, "failed to close wav encoder", slog.Any("error", err), "callId", id.String())
return err
}

return nil
}
Loading

0 comments on commit b587029

Please sign in to comment.