Skip to content

Commit

Permalink
Add package distinct implementing the CVM algorithm. (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
creachadair authored Aug 12, 2024
1 parent 0ce8166 commit c356b0d
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 1 deletion.
107 changes: 107 additions & 0 deletions distinct/distinct.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Package distinct implements the probabilistic distinct-elements counter
// algorithm of Chakraborty, Vinodchandran, and Meel as described in the paper
// "Distinct Elements in Streams" ([CVM]).
//
// [CVM]: https://arxiv.org/pdf/2301.10191
package distinct

import (
crand "crypto/rand"
"fmt"
"math"
"math/rand/v2"

"github.com/creachadair/mds/mapset"
)

// A Counter estimates the number of distinct comparable elements that have
// been passed to its Add method using the CVM algorithm.
//
// Add elements to a counter using [Counter.Add] method; use [Counter.Count] to
// obtain the current estimate of the number of distinct elements observed.
type Counter[T comparable] struct {
buf mapset.Set[T]
cap int // maximum allowed size of buf
p float64 // eviction probability
rng *rand.Rand
}

// NewCounter constructs a new empty distinct-elements counter using a buffer
// of at most size elements for estimation.
//
// A newly-constructed counter does not pre-allocate the full buffer size. It
// begins with a small buffer that grows as needed up to the limit.
func NewCounter[T comparable](size int) *Counter[T] {
var seed [32]byte
if _, err := crand.Read(seed[:]); err != nil {
panic(fmt.Sprintf("seed RNG: %v", err))
}
return &Counter[T]{
buf: make(mapset.Set[T]),
cap: size,
p: 1,
rng: rand.New(rand.NewChaCha8(seed)),
}
}

// Len reports the number of elements currently buffered by c.
func (c *Counter[T]) Len() int { return c.buf.Len() }

// Reset resets c to its initial state, as if freshly constructed.
// The internal buffer size limit remains unchanged.
func (c *Counter[T]) Reset() { c.buf.Clear(); c.p = 1 }

// Add adds v to the counter.
func (c *Counter[T]) Add(v T) {
if c.p < 1 && c.rng.Float64() >= c.p {
c.buf.Remove(v)
return
}
c.buf.Add(v)
if c.buf.Len() >= c.cap {
// Instead of flipping a coin for each element, grab blocks of 64 random
// bits and use them directly, refilling only as needed.
var nb, rnd uint64

for elt := range c.buf {
if nb == 0 {
rnd = c.rng.Uint64() // refill
nb = 64
}
if rnd&1 == 0 {
c.buf.Remove(elt)
}
rnd >>= 1
nb--
}
c.p /= 2
}
}

// Count returns the current estimate of the number of distinct elements
// observed by the counter.
func (c *Counter[T]) Count() int64 { return int64(float64(c.buf.Len()) / c.p) }

// BufferSize returns a buffer size sufficient to ensure that a counter using
// this size will produce estimates within (1 ± ε) times the true count with
// probability (1 - δ), assuming the expected total number of elements to be
// counted is expSize.
//
// The suggested buffer size guarantees these constraints, but note that the
// estimate is very conservative. In practice, the actual estimates will
// usually be much more accurate. Empirically, values of ε and δ in the 0.05
// range work well.
func BufferSize(ε, δ float64, expSize int) int {
if ε < 0 || ε > 1 {
panic(fmt.Sprintf("error bound out of range: %v", ε))
}
if δ < 0 || δ > 1 {
panic(fmt.Sprintf("error rate out of range: %v", δ))
}
if expSize <= 0 {
panic(fmt.Sprintf("expected size must be positive: %d", expSize))
}

v := math.Ceil((12 / (ε * ε)) * math.Log2((8*float64(expSize))/δ))
return int(v)
}
81 changes: 81 additions & 0 deletions distinct/distinct_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package distinct_test

import (
"flag"
"fmt"
"math"
"testing"

"math/rand/v2"

"github.com/creachadair/mds/distinct"
"github.com/creachadair/mds/mapset"
)

var (
errRate = flag.Float64("error-rate", 0.06, "Error rate")
failProb = flag.Float64("fail-probability", 0.02, "Failure probability")
)

func fill(c *distinct.Counter[int], n int) mapset.Set[int] {
actual := mapset.New[int]()
for range n {
r := rand.Int()
actual.Add(r)
c.Add(r)
}
return actual
}

func TestCounter(t *testing.T) {
t.Run("Empty", func(t *testing.T) {
// An empty counter should report no elements.
c := distinct.NewCounter[int](100)
if got := c.Count(); got != 0 {
t.Errorf("Empty count: got %d, want 0", got)
}
})

t.Run("Small", func(t *testing.T) {
// A counter that has seen fewer values than its buffer size should count
// perfectly.
c := distinct.NewCounter[int](100)
want := len(fill(c, 50))
if got := c.Len(); got != want {
t.Errorf("Small count: got %d, want %d", got, want)
}
})

t.Logf("Error rate: %g%%", 100**errRate)
t.Logf("Failure probability: %g%%", 100**failProb)
for _, tc := range []int{9_999, 100_000, 543_210, 1_000_000, 1_048_576} {
name := fmt.Sprintf("Large/%d", tc)
t.Run(name, func(t *testing.T) {
size := distinct.BufferSize(*errRate, *failProb, tc)
t.Logf("Buffer size estimate: %d", size)

c := distinct.NewCounter[int](size)
actual := fill(c, tc)

t.Logf("Actual count: %d", actual.Len())
t.Logf("Estimated count: %d", c.Count())
t.Logf("Buffer size: %d", c.Len())

e := float64(c.Count()-int64(actual.Len())) / float64(actual.Len())
t.Logf("Error: %.4g%%", 100*e)

if math.Abs(e) > *errRate {
t.Errorf("Error rate = %f, want ≤ %f", e, *errRate)
}
if c.Len() > size {
t.Errorf("Buffer size is %d > %d", c.Len(), size)
}

// After counting, a reset should leave the buffer empty.
c.Reset()
if got := c.Len(); got != 0 {
t.Errorf("After reset: buffer size is %d, want 0", got)
}
})
}
}
43 changes: 43 additions & 0 deletions distinct/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package distinct_test

import (
"fmt"
"math/rand/v2"
"os"

"github.com/creachadair/mds/distinct"
"github.com/creachadair/mds/mapset"
)

func Example() {
// Suggest how big a buffer we need to have an estimate within ± 10% of the
// true value with 95% probability, given we expect to see 60000 inputs.
bufSize := distinct.BufferSize(0.1, 0.05, 60000)

// Construct a counter with the specified buffer size limit.
c := distinct.NewCounter[int](bufSize)

// For demonstration purposes, keep track of the actual count.
// This will generally be impractical for "real" workloads.
var unique mapset.Set[int]

// Observe some (50,000) random inputs...
for range 50000 {
r := rand.IntN(80000)
c.Add(r)

unique.Add(r)
}

fmt.Printf("Buffer limit: %d\n", bufSize)
fmt.Fprintf(os.Stderr, "Unique: %d\n", unique.Len())
fmt.Fprintf(os.Stderr, "Estimate: %d\n", c.Count())
fmt.Fprintf(os.Stderr, "Buffer used: %d\n", c.Len())

// N.B.: Counter results are intentionally omitted here. The exact values
// are not stable even if the RNG is fixed, because the counter uses map
// iteration during update.

// Output:
// Buffer limit: 27834
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module github.com/creachadair/mds

go 1.21
go 1.22

require github.com/google/go-cmp v0.6.0

0 comments on commit c356b0d

Please sign in to comment.