Skip to content

Commit

Permalink
feat(trace): add attributes to traces
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkness4 committed Jul 13, 2024
1 parent c8dc8bc commit 9675441
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 18 deletions.
32 changes: 27 additions & 5 deletions fc2/fc2.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"nhooyr.io/websocket"
)
Expand Down Expand Up @@ -83,7 +85,9 @@ func (f *FC2) Watch(ctx context.Context) (*GetMetaData, error) {
}
}

ctx, span := otel.Tracer(tracerName).Start(ctx, "fc2.Watch")
ctx, span := otel.Tracer(tracerName).Start(ctx, "fc2.Watch", trace.WithAttributes(
attribute.String("channelID", f.channelID),
))
defer span.End()

span.AddEvent("getting metadata")
Expand Down Expand Up @@ -343,7 +347,12 @@ func (f *FC2) HandleWS(
fnameStream string,
fnameChat string,
) error {
ctx, span := otel.Tracer(tracerName).Start(ctx, "fc2.HandleWS")
ctx, span := otel.Tracer(tracerName).Start(ctx, "fc2.HandleWS", trace.WithAttributes(
attribute.String("channelID", f.channelID),
attribute.String("wsURL", wsURL),
attribute.String("fnameStream", fnameStream),
attribute.String("fnameChat", fnameChat),
))
defer span.End()

msgChan := make(chan *WSResponse, msgBufMax)
Expand Down Expand Up @@ -398,7 +407,11 @@ func (f *FC2) HandleWS(
})

g.Go(func() error {
ctx, span := otel.Tracer(tracerName).Start(ctx, "fc2.HandleWS.download")
ctx, span := otel.Tracer(tracerName).
Start(ctx, "fc2.HandleWS.download", trace.WithAttributes(
attribute.String("channelID", f.channelID),
attribute.String("wsURL", wsURL),
))
defer span.End()

playlistChan := make(chan *Playlist)
Expand Down Expand Up @@ -521,7 +534,10 @@ func (f *FC2) HandleWS(

func (f *FC2) downloadStream(ctx context.Context, playlists <-chan *Playlist, fName string) error {
// TODO: This function requires serious documentation.
ctx, span := otel.Tracer(tracerName).Start(ctx, "fc2.downloadStream")
ctx, span := otel.Tracer(tracerName).Start(ctx, "fc2.downloadStream", trace.WithAttributes(
attribute.String("channelID", f.channelID),
attribute.String("fName", fName),
))
defer span.End()

file, err := os.Create(fName)
Expand Down Expand Up @@ -559,6 +575,10 @@ func (f *FC2) downloadStream(ctx context.Context, playlists <-chan *Playlist, fN
}

f.log.Info().Any("playlist", playlist).Msg("received new HLS info")
span.AddEvent("playlist received", trace.WithAttributes(
attribute.String("url", playlist.URL),
attribute.Int("mode", playlist.Mode),
))
downloader := hls.NewDownloader(
f.Client,
f.log,
Expand Down Expand Up @@ -740,7 +760,9 @@ func (f *FC2) FetchPlaylist(
msgChan chan *WSResponse,
verbose bool,
) (*Playlist, error) {
ctx, span := otel.Tracer(tracerName).Start(ctx, "fc2.FetchPlaylist")
ctx, span := otel.Tracer(tracerName).Start(ctx, "fc2.FetchPlaylist", trace.WithAttributes(
attribute.String("channelID", f.channelID),
))
defer span.End()

expectedMode := int(f.params.Quality) + int(f.params.Latency) - 1
Expand Down
3 changes: 0 additions & 3 deletions fc2/fc2_livestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ func (ls *LiveStream) GetWebSocketURL(ctx context.Context) (string, error) {
return "", err
}
defer resp.Body.Close()
span.AddEvent("response received")

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
Expand All @@ -282,8 +281,6 @@ func (ls *LiveStream) GetWebSocketURL(ctx context.Context) (string, error) {
return "", err
}

span.AddEvent("response body received")

info := GetControlServerResponse{}
if err := json.Unmarshal(body, &info); err != nil {
ls.log.Error().Str("body", string(body)).Msg("failed to decode body")
Expand Down
10 changes: 8 additions & 2 deletions fc2/fc2_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"nhooyr.io/websocket"
)

Expand Down Expand Up @@ -61,7 +63,9 @@ func NewWebSocket(

// Dial connects to the WebSocket server.
func (w *WebSocket) Dial(ctx context.Context) (*websocket.Conn, error) {
ctx, span := otel.Tracer(tracerName).Start(ctx, "ws.Dial")
ctx, span := otel.Tracer(tracerName).Start(ctx, "ws.Dial", trace.WithAttributes(
attribute.String("url", w.url),
))
defer span.End()
// Connect to the websocket server
conn, _, err := websocket.Dial(ctx, w.url, &websocket.DialOptions{
Expand All @@ -82,7 +86,9 @@ func (w *WebSocket) GetHLSInformation(
conn *websocket.Conn,
msgChan <-chan *WSResponse,
) (*HLSInformation, error) {
ctx, span := otel.Tracer(tracerName).Start(ctx, "ws.GetHLSInformation")
ctx, span := otel.Tracer(tracerName).Start(ctx, "ws.GetHLSInformation", trace.WithAttributes(
attribute.String("url", w.url),
))
defer span.End()
msgObj, err := w.sendMessageAndWaitResponse(
ctx,
Expand Down
14 changes: 12 additions & 2 deletions hls/hls_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (

"github.com/rs/zerolog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

const tracerName = "hls"
Expand Down Expand Up @@ -152,7 +154,11 @@ func (hls *Downloader) fillQueue(
urlChan chan<- string,
checkpoint Checkpoint,
) (newCheckpoint Checkpoint, err error) {
ctx, span := otel.Tracer(tracerName).Start(ctx, "hls.fillQueue")
ctx, span := otel.Tracer(tracerName).Start(ctx, "hls.fillQueue", trace.WithAttributes(
attribute.String("lastFragmentName", checkpoint.LastFragmentName),
attribute.String("lastFragmentTime", checkpoint.LastFragmentTime.String()),
attribute.Bool("useTimeBasedSorting", checkpoint.UseTimeBasedSorting),
))
defer span.End()

// Used for termination
Expand Down Expand Up @@ -323,7 +329,11 @@ func (hls *Downloader) Read(
out chan<- []byte,
checkpoint Checkpoint,
) (newCheckpoint Checkpoint, err error) {
ctx, span := otel.Tracer(tracerName).Start(ctx, "hls.Read")
ctx, span := otel.Tracer(tracerName).Start(ctx, "hls.Read", trace.WithAttributes(
attribute.String("lastFragmentName", checkpoint.LastFragmentName),
attribute.String("lastFragmentTime", checkpoint.LastFragmentTime.String()),
attribute.Bool("useTimeBasedSorting", checkpoint.UseTimeBasedSorting),
))
defer span.End()

errChan := make(chan error, 1)
Expand Down
17 changes: 14 additions & 3 deletions video/concat/concat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import "C"
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -94,11 +95,21 @@ func applyOptions(opts []Option) *Options {

// Do concat multiple video streams.
func Do(ctx context.Context, output string, inputs []string, opts ...Option) error {
ctx, span := otel.Tracer(tracerName).Start(ctx, "concat.Do")
defer span.End()

o := applyOptions(opts)

attrs := make([]attribute.KeyValue, 0, len(inputs))
for idx, input := range inputs {
attrs = append(attrs, attribute.String(fmt.Sprintf("input%d", idx), input))
}
attrs = append(attrs, attribute.String("output", output))
attrs = append(attrs, attribute.Bool("audioOnly", o.audioOnly == 1))
attrs = append(attrs, attribute.Bool("numbered", o.numbered))
attrs = append(attrs, attribute.Bool("ignoreSingle", o.ignoreSingle))

ctx, span := otel.Tracer(tracerName).
Start(ctx, "concat.Do", trace.WithAttributes(attrs...))
defer span.End()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down
10 changes: 9 additions & 1 deletion video/concat/remux_intermediate_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package concat

import (
"context"
"fmt"
"io"
"os"
"sync"
Expand All @@ -12,6 +13,8 @@ import (
"github.com/Darkness4/fc2-live-dl-go/utils"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// remuxMixedTS remuxes mixed TS/AAC files into intermediate format.
Expand All @@ -20,7 +23,12 @@ func remuxMixedTS(
files []string,
opts ...Option,
) (intermediates []string, useFIFO bool, err error) {
ctx, span := otel.Tracer(tracerName).Start(ctx, "concat.remuxMixedTS")
attrs := make([]attribute.KeyValue, 0, len(files))
for idx, file := range files {
attrs = append(attrs, attribute.String(fmt.Sprintf("input%d", idx), file))
}
ctx, span := otel.Tracer(tracerName).
Start(ctx, "concat.remuxMixedTS", trace.WithAttributes(attrs...))
defer span.End()

intermediates = make([]string, 0, len(files))
Expand Down
10 changes: 9 additions & 1 deletion video/concat/remux_intermediate_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ package concat

import (
"context"
"fmt"
"sync"

"github.com/Darkness4/fc2-live-dl-go/utils"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// remuxMixedTS remuxes mixed TS/AAC files into intermediate format.
Expand All @@ -17,7 +20,12 @@ func remuxMixedTS(
files []string,
opts ...Option,
) (intermediates []string, useFIFO bool, err error) {
ctx, span := otel.Tracer(tracerName).Start(ctx, "concat.Do")
attrs := make([]attribute.KeyValue, 0, len(files))
for idx, file := range files {
attrs = append(attrs, attribute.String(fmt.Sprintf("input%d", idx), file))
}
ctx, span := otel.Tracer(tracerName).
Start(ctx, "concat.remuxMixedTS", trace.WithAttributes(attrs...))
defer span.End()

intermediates = make([]string, 0, len(files))
Expand Down
10 changes: 9 additions & 1 deletion video/probe/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ import "C"
import (
"context"
"errors"
"fmt"
"unsafe"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

const tracerName = "video/probe"
Expand Down Expand Up @@ -46,7 +49,12 @@ func applyOptions(opts []Option) *Options {

// Do probe multiple video streams.
func Do(inputs []string, opts ...Option) error {
_, span := otel.Tracer(tracerName).Start(context.Background(), "probe.Do")
attrs := make([]attribute.KeyValue, 0, len(inputs))
for idx, input := range inputs {
attrs = append(attrs, attribute.String(fmt.Sprintf("input%d", idx), input))
}
_, span := otel.Tracer(tracerName).
Start(context.Background(), "probe.Do", trace.WithAttributes(attrs...))
defer span.End()

o := applyOptions(opts)
Expand Down

0 comments on commit 9675441

Please sign in to comment.