From c356b0d18867af74568226673d54ff6cad97e385 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Mon, 12 Aug 2024 15:29:35 -0700 Subject: [PATCH] Add package distinct implementing the CVM algorithm. (#18) --- distinct/distinct.go | 107 ++++++++++++++++++++++++++++++++++++++ distinct/distinct_test.go | 81 +++++++++++++++++++++++++++++ distinct/example_test.go | 43 +++++++++++++++ go.mod | 2 +- 4 files changed, 232 insertions(+), 1 deletion(-) create mode 100644 distinct/distinct.go create mode 100644 distinct/distinct_test.go create mode 100644 distinct/example_test.go diff --git a/distinct/distinct.go b/distinct/distinct.go new file mode 100644 index 0000000..392a33d --- /dev/null +++ b/distinct/distinct.go @@ -0,0 +1,107 @@ +// Package distinct implements the probabilistic distinct-elements counter +// algorithm of Chakraborty, Vinodchandran, and Meel as described in the paper +// "Distinct Elements in Streams" ([CVM]). +// +// [CVM]: https://arxiv.org/pdf/2301.10191 +package distinct + +import ( + crand "crypto/rand" + "fmt" + "math" + "math/rand/v2" + + "github.com/creachadair/mds/mapset" +) + +// A Counter estimates the number of distinct comparable elements that have +// been passed to its Add method using the CVM algorithm. +// +// Add elements to a counter using [Counter.Add] method; use [Counter.Count] to +// obtain the current estimate of the number of distinct elements observed. +type Counter[T comparable] struct { + buf mapset.Set[T] + cap int // maximum allowed size of buf + p float64 // eviction probability + rng *rand.Rand +} + +// NewCounter constructs a new empty distinct-elements counter using a buffer +// of at most size elements for estimation. +// +// A newly-constructed counter does not pre-allocate the full buffer size. It +// begins with a small buffer that grows as needed up to the limit. +func NewCounter[T comparable](size int) *Counter[T] { + var seed [32]byte + if _, err := crand.Read(seed[:]); err != nil { + panic(fmt.Sprintf("seed RNG: %v", err)) + } + return &Counter[T]{ + buf: make(mapset.Set[T]), + cap: size, + p: 1, + rng: rand.New(rand.NewChaCha8(seed)), + } +} + +// Len reports the number of elements currently buffered by c. +func (c *Counter[T]) Len() int { return c.buf.Len() } + +// Reset resets c to its initial state, as if freshly constructed. +// The internal buffer size limit remains unchanged. +func (c *Counter[T]) Reset() { c.buf.Clear(); c.p = 1 } + +// Add adds v to the counter. +func (c *Counter[T]) Add(v T) { + if c.p < 1 && c.rng.Float64() >= c.p { + c.buf.Remove(v) + return + } + c.buf.Add(v) + if c.buf.Len() >= c.cap { + // Instead of flipping a coin for each element, grab blocks of 64 random + // bits and use them directly, refilling only as needed. + var nb, rnd uint64 + + for elt := range c.buf { + if nb == 0 { + rnd = c.rng.Uint64() // refill + nb = 64 + } + if rnd&1 == 0 { + c.buf.Remove(elt) + } + rnd >>= 1 + nb-- + } + c.p /= 2 + } +} + +// Count returns the current estimate of the number of distinct elements +// observed by the counter. +func (c *Counter[T]) Count() int64 { return int64(float64(c.buf.Len()) / c.p) } + +// BufferSize returns a buffer size sufficient to ensure that a counter using +// this size will produce estimates within (1 ± ε) times the true count with +// probability (1 - δ), assuming the expected total number of elements to be +// counted is expSize. +// +// The suggested buffer size guarantees these constraints, but note that the +// estimate is very conservative. In practice, the actual estimates will +// usually be much more accurate. Empirically, values of ε and δ in the 0.05 +// range work well. +func BufferSize(ε, δ float64, expSize int) int { + if ε < 0 || ε > 1 { + panic(fmt.Sprintf("error bound out of range: %v", ε)) + } + if δ < 0 || δ > 1 { + panic(fmt.Sprintf("error rate out of range: %v", δ)) + } + if expSize <= 0 { + panic(fmt.Sprintf("expected size must be positive: %d", expSize)) + } + + v := math.Ceil((12 / (ε * ε)) * math.Log2((8*float64(expSize))/δ)) + return int(v) +} diff --git a/distinct/distinct_test.go b/distinct/distinct_test.go new file mode 100644 index 0000000..7d19b53 --- /dev/null +++ b/distinct/distinct_test.go @@ -0,0 +1,81 @@ +package distinct_test + +import ( + "flag" + "fmt" + "math" + "testing" + + "math/rand/v2" + + "github.com/creachadair/mds/distinct" + "github.com/creachadair/mds/mapset" +) + +var ( + errRate = flag.Float64("error-rate", 0.06, "Error rate") + failProb = flag.Float64("fail-probability", 0.02, "Failure probability") +) + +func fill(c *distinct.Counter[int], n int) mapset.Set[int] { + actual := mapset.New[int]() + for range n { + r := rand.Int() + actual.Add(r) + c.Add(r) + } + return actual +} + +func TestCounter(t *testing.T) { + t.Run("Empty", func(t *testing.T) { + // An empty counter should report no elements. + c := distinct.NewCounter[int](100) + if got := c.Count(); got != 0 { + t.Errorf("Empty count: got %d, want 0", got) + } + }) + + t.Run("Small", func(t *testing.T) { + // A counter that has seen fewer values than its buffer size should count + // perfectly. + c := distinct.NewCounter[int](100) + want := len(fill(c, 50)) + if got := c.Len(); got != want { + t.Errorf("Small count: got %d, want %d", got, want) + } + }) + + t.Logf("Error rate: %g%%", 100**errRate) + t.Logf("Failure probability: %g%%", 100**failProb) + for _, tc := range []int{9_999, 100_000, 543_210, 1_000_000, 1_048_576} { + name := fmt.Sprintf("Large/%d", tc) + t.Run(name, func(t *testing.T) { + size := distinct.BufferSize(*errRate, *failProb, tc) + t.Logf("Buffer size estimate: %d", size) + + c := distinct.NewCounter[int](size) + actual := fill(c, tc) + + t.Logf("Actual count: %d", actual.Len()) + t.Logf("Estimated count: %d", c.Count()) + t.Logf("Buffer size: %d", c.Len()) + + e := float64(c.Count()-int64(actual.Len())) / float64(actual.Len()) + t.Logf("Error: %.4g%%", 100*e) + + if math.Abs(e) > *errRate { + t.Errorf("Error rate = %f, want ≤ %f", e, *errRate) + } + if c.Len() > size { + t.Errorf("Buffer size is %d > %d", c.Len(), size) + } + + // After counting, a reset should leave the buffer empty. + c.Reset() + if got := c.Len(); got != 0 { + t.Errorf("After reset: buffer size is %d, want 0", got) + } + }) + } +} diff --git a/distinct/example_test.go b/distinct/example_test.go new file mode 100644 index 0000000..eebfc82 --- /dev/null +++ b/distinct/example_test.go @@ -0,0 +1,43 @@ +package distinct_test + +import ( + "fmt" + "math/rand/v2" + "os" + + "github.com/creachadair/mds/distinct" + "github.com/creachadair/mds/mapset" +) + +func Example() { + // Suggest how big a buffer we need to have an estimate within ± 10% of the + // true value with 95% probability, given we expect to see 60000 inputs. + bufSize := distinct.BufferSize(0.1, 0.05, 60000) + + // Construct a counter with the specified buffer size limit. + c := distinct.NewCounter[int](bufSize) + + // For demonstration purposes, keep track of the actual count. + // This will generally be impractical for "real" workloads. + var unique mapset.Set[int] + + // Observe some (50,000) random inputs... + for range 50000 { + r := rand.IntN(80000) + c.Add(r) + + unique.Add(r) + } + + fmt.Printf("Buffer limit: %d\n", bufSize) + fmt.Fprintf(os.Stderr, "Unique: %d\n", unique.Len()) + fmt.Fprintf(os.Stderr, "Estimate: %d\n", c.Count()) + fmt.Fprintf(os.Stderr, "Buffer used: %d\n", c.Len()) + + // N.B.: Counter results are intentionally omitted here. The exact values + // are not stable even if the RNG is fixed, because the counter uses map + // iteration during update. + + // Output: + // Buffer limit: 27834 +} diff --git a/go.mod b/go.mod index a76f745..7ce2541 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ module github.com/creachadair/mds -go 1.21 +go 1.22 require github.com/google/go-cmp v0.6.0