Skip to content

Commit

Permalink
fix(dig): immediately stream responses (#35)
Browse files Browse the repository at this point in the history
Allows to couple `rbmk dig +short` with `rbmk pipe write NAME` to
immediately deliver addresses to the next stage.
  • Loading branch information
bassosimone authored Dec 7, 2024
1 parent 349287b commit 222c1c2
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 4 deletions.
18 changes: 18 additions & 0 deletions pkg/cli/dig/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,24 @@ is printed to the stdout. All responses (including duplicates)
are included in the structured logs. This option is useful
for detecting DNS-based censorship in China and Iran.

Since v0.4.0, each response (including duplicates) is emitted to the
stdout as soon as it is received. This behaviour is particularly useful
when coupling `+short` with writing to an `rbmk pipe`:

```bash
# Waits for duplicates but immediately print addrs when available
(rbmk dig +short +udp=wait-duplicates example.com | rbmk pipe write addrs) &

# Print each unique address as soon as it is available
rbmk pipe read --writers 1 addrs | rbmk ipuniq

# Wait for writer to terminate
wait
```

This pattern ensures that we can process each address as soon as it
is available, even if we are waiting for duplicates.

## Examples

The following invocation resolves `www.example.com` IPv6 address
Expand Down
15 changes: 11 additions & 4 deletions pkg/cli/dig/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,6 @@ func (task *Task) Run(ctx context.Context) error {
if err != nil {
return fmt.Errorf("query round-trip failed: %w", err)
}
fmt.Fprintf(task.ResponseWriter, "\n;; Response:\n%s\n\n", response.String())
fmt.Fprintf(task.ShortWriter, "%s", task.formatShort(response))

// Explicitly close the connections in the pool
pool.Close()
Expand Down Expand Up @@ -224,7 +222,7 @@ func (task *Task) query(
) (*dns.Msg, error) {
// If we're not waiting for duplicates, our job is easy
if !task.WaitDuplicates {
return txp.Query(ctx, addr, query)
return task.streamResponse(txp.Query(ctx, addr, query))
}

// Otherwise, we need to reading duplicate responses
Expand All @@ -237,7 +235,7 @@ func (task *Task) query(
)
respch := txp.QueryWithDuplicates(ctx, addr, query)
for entry := range respch {
resp, err := entry.Msg, entry.Err
resp, err := task.streamResponse(entry.Msg, entry.Err)
once.Do(func() {
resp0, err0 = resp, err
})
Expand All @@ -248,6 +246,15 @@ func (task *Task) query(
return resp0, err0
}

// streamResponse contains common code to immediately stream a response.
func (task *Task) streamResponse(resp *dns.Msg, err error) (*dns.Msg, error) {
if resp != nil && err == nil {
fmt.Fprintf(task.ResponseWriter, "\n;; Response:\n%s\n\n", resp.String())
fmt.Fprintf(task.ShortWriter, "%s", task.formatShort(resp))
}
return resp, err
}

// formatShort returns a short string representation of the DNS response.
func (task *Task) formatShort(response *dns.Msg) string {
var builder strings.Builder
Expand Down

0 comments on commit 222c1c2

Please sign in to comment.