From ab68206dc57e5dce5a10531455891a45311f83c3 Mon Sep 17 00:00:00 2001 From: jmacd Date: Sun, 3 Nov 2019 09:22:25 -0800 Subject: [PATCH] Combine multiple commits. --- .circleci/config.yml | 16 +++ LICENSE | 201 ++++++++++++++++++++++++++++++++++++ README.md | 57 +++++++++- benchmark_test.go | 72 +++++++++++++ doc.go | 22 ++++ frequency_test.go | 143 +++++++++++++++++++++++++ go.mod | 5 + go.sum | 11 ++ internal/sampleheap.go | 57 ++++++++++ internal/sampleheap_test.go | 58 +++++++++++ simple/doc.go | 4 + simple/simple.go | 65 ++++++++++++ simple/simple_test.go | 41 ++++++++ varopt.go | 143 +++++++++++++------------ varopt_test.go | 140 ++++++++++++++++++++----- weighted_test.go | 82 +++++++++++++++ 16 files changed, 1029 insertions(+), 88 deletions(-) create mode 100644 .circleci/config.yml create mode 100644 LICENSE create mode 100644 benchmark_test.go create mode 100644 doc.go create mode 100644 frequency_test.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/sampleheap.go create mode 100644 internal/sampleheap_test.go create mode 100644 simple/doc.go create mode 100644 simple/simple.go create mode 100644 simple/simple_test.go create mode 100644 weighted_test.go diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 0000000..3b06f3c --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,16 @@ +version: 2 + +jobs: + test: + working_directory: ~/go/src/github.com/lightstep/varopt + docker: + - image: circleci/golang:1.15 + steps: + - checkout + - run: go test -v ./... + +workflows: + version: 2 + test: + jobs: + - test diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md index c9770ee..131b6b3 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,58 @@ +[![Docs](https://godoc.org/github.com/lightstep/varopt?status.svg)](https://godoc.org/github.com/lightstep/varopt) + +# VarOpt Sampling Algorithm + This is an implementation of VarOpt, an unbiased weighted sampling algorithm described in the paper [Stream sampling for variance-optimal -estimation of subset sums](https://arxiv.org/pdf/0803.0473.pdf). +estimation of subset sums](https://arxiv.org/pdf/0803.0473.pdf) (2008) +by Edith Cohen, Nick Duffield, Haim Kaplan, Carsten Lund, and Mikkel +Thorup. + +VarOpt is a reservoir-type sampler that maintains a fixed-size sample +and provides a mechanism for merging unequal-weight samples. + +This repository also includes a simple reservoir sampling algorithm, +often useful in conjunction with weighed reservoir sampling, that +implements Algorithm R from [Random sampling with a +reservoir](https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_R) +(1985) by Jeffrey Vitter. + +## Usage: Natural Weights + +A typical use of VarOpt sampling is to estimate network flows using +sample packets. In this use-case, the weight applied to each sample +is the size of the packet. Because VarOpt computes an unbiased +sample, sample data points can be summarized along secondary +dimensions. For example, we can select a subset of sampled packets +according to a secondary attribute, sum the sample weights, and the +result is expected to equal the size of packets corresponding to the +secondary attribute from the original population. + +See [weighted_test.go](https://github.com/lightstep/varopt/blob/master/weighted_test.go) for an example. + +## Usage: Inverse-probability Weights + +Another use for VarOpt sampling uses inverse-probability weights to +estimate frequencies while simultaneously controlling sample +diversity. Suppose a sequence of observations can be naturally +categorized into N different buckets. The goal in this case is to +compute a sample where each bucket is well represented, while +maintaining frequency estimates. + +In this use-case, the weight assigned to each observation is the +inverse probability of the bucket it belongs to. The result of +weighted sampling with inverse-probability weights is a uniform +expectated value; in this example we expect an equal number of +observations falling into each bucket. Each observation represents a +frequency of its sample weight (computed by VarOpt) divided by its +original weight (the inverse-probability). + +See [frequency_test.go](https://github.com/lightstep/varopt/blob/master/frequency_test.go) for an example. + +## Usage: Merging Samples + +VarOpt supports merging independently collected samples one +observation at a time. This is useful for building distributed +sampling schemes. In this use-case, each node in a distributed system +computes a weighted sample. To combine samples, simply input all the +observations and their corresponding weights into a new VarOpt sample. \ No newline at end of file diff --git a/benchmark_test.go b/benchmark_test.go new file mode 100644 index 0000000..131a08d --- /dev/null +++ b/benchmark_test.go @@ -0,0 +1,72 @@ +// Copyright 2019, LightStep Inc. +// +// The benchmark results point to a performance drop when the +// largeHeap starts to be used because of interface conversions in and +// out of the heap, primarily due to the heap interface. This +// suggests room for improvement by avoiding the built-in heap. + +/* +BenchmarkAdd_Norm_100-8 37540165 32.1 ns/op 8 B/op 0 allocs/op +BenchmarkAdd_Norm_10000-8 39850280 30.6 ns/op 8 B/op 0 allocs/op +BenchmarkAdd_Norm_1000000-8 7958835 183 ns/op 52 B/op 0 allocs/op +BenchmarkAdd_Exp_100-8 41565934 28.5 ns/op 8 B/op 0 allocs/op +BenchmarkAdd_Exp_10000-8 43622184 29.2 ns/op 8 B/op 0 allocs/op +BenchmarkAdd_Exp_1000000-8 8103663 220 ns/op 55 B/op 0 allocs/op +*/ + +package varopt_test + +import ( + "math" + "math/rand" + "testing" + + "github.com/lightstep/varopt" +) + +func normValue(rnd *rand.Rand) float64 { + return 1 + math.Abs(rnd.NormFloat64()) +} + +func expValue(rnd *rand.Rand) float64 { + return rnd.ExpFloat64() +} + +func BenchmarkAdd_Norm_100(b *testing.B) { + benchmarkAdd(b, 100, normValue) +} + +func BenchmarkAdd_Norm_10000(b *testing.B) { + benchmarkAdd(b, 10000, normValue) +} + +func BenchmarkAdd_Norm_1000000(b *testing.B) { + benchmarkAdd(b, 1000000, normValue) +} + +func BenchmarkAdd_Exp_100(b *testing.B) { + benchmarkAdd(b, 100, expValue) +} + +func BenchmarkAdd_Exp_10000(b *testing.B) { + benchmarkAdd(b, 10000, expValue) +} + +func BenchmarkAdd_Exp_1000000(b *testing.B) { + benchmarkAdd(b, 1000000, expValue) +} + +func benchmarkAdd(b *testing.B, size int, f func(rnd *rand.Rand) float64) { + b.ReportAllocs() + rnd := rand.New(rand.NewSource(3331)) + v := varopt.New(size, rnd) + weights := make([]float64, b.N) + for i := 0; i < b.N; i++ { + weights[i] = f(rnd) + } + + b.StartTimer() + for i := 0; i < b.N; i++ { + v.Add(nil, weights[i]) + } +} diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..fe0e2e6 --- /dev/null +++ b/doc.go @@ -0,0 +1,22 @@ +// Copyright 2019, LightStep Inc. + +/* +Package varopt contains an implementation of VarOpt, an unbiased weighted +sampling algorithm described in the paper "Stream sampling for +variance-optimal estimation of subset sums" +https://arxiv.org/pdf/0803.0473.pdf (2008), by Edith Cohen, Nick +Duffield, Haim Kaplan, Carsten Lund, and Mikkel Thorup. + +VarOpt is a reservoir-type sampler that maintains a fixed-size sample +and provides a mechanism for merging unequal-weight samples. + +This package also includes a simple reservoir sampling algorithm, +often useful in conjunction with weighed reservoir sampling, using +Algorithm R from "Random sampling with a +reservoir", https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_R +(1985), by Jeffrey Vitter. + +See https://github.com/lightstep/varopt/blob/master/README.md for +more detail. +*/ +package varopt diff --git a/frequency_test.go b/frequency_test.go new file mode 100644 index 0000000..e4b3429 --- /dev/null +++ b/frequency_test.go @@ -0,0 +1,143 @@ +// Copyright 2019, LightStep Inc. + +package varopt_test + +import ( + "fmt" + "math" + "math/rand" + + "github.com/lightstep/varopt" +) + +type curve struct { + color string + mean float64 + stddev float64 +} + +type point struct { + color int + xvalue float64 +} + +var colors = []curve{ + {color: "red", mean: 10, stddev: 15}, + {color: "green", mean: 30, stddev: 10}, + {color: "blue", mean: 50, stddev: 20}, +} + +// This example shows how to use Varopt sampling to estimate +// frequencies with the use of inverse probability weights. The use +// of inverse probability creates a uniform expected value, in this of +// the number of sample points per second. +// +// While the number of expected points per second is uniform, the +// output sample weights are expected to match the original +// frequencies. +func ExampleVaropt_GetOriginalWeight() { + // Number of points. + const totalCount = 1e6 + + // Relative size of the sample. + const sampleRatio = 0.01 + + // Ensure this test is deterministic. + rnd := rand.New(rand.NewSource(104729)) + + // Construct a timeseries consisting of three colored signals, + // for x=0 to x=60 seconds. + var points []point + + // origCounts stores the original signals at second granularity. + origCounts := make([][]int, len(colors)) + for i := range colors { + origCounts[i] = make([]int, 60) + } + + // Construct the signals by choosing a random color, then + // using its Gaussian to compute a timestamp. + for len(points) < totalCount { + choose := rnd.Intn(len(colors)) + series := colors[choose] + xvalue := rnd.NormFloat64()*series.stddev + series.mean + + if xvalue < 0 || xvalue > 60 { + continue + } + origCounts[choose][int(math.Floor(xvalue))]++ + points = append(points, point{ + color: choose, + xvalue: xvalue, + }) + } + + // Compute the total number of points per second. This will be + // used to establish the per-second probability. + xcount := make([]int, 60) + for _, point := range points { + xcount[int(math.Floor(point.xvalue))]++ + } + + // Compute the sample with using the inverse probability as a + // weight. This ensures a uniform distribution of points in each + // second. + sampleSize := int(sampleRatio * float64(totalCount)) + sampler := varopt.New(sampleSize, rnd) + for _, point := range points { + second := int(math.Floor(point.xvalue)) + prob := float64(xcount[second]) / float64(totalCount) + sampler.Add(point, 1/prob) + } + + // sampleCounts stores the reconstructed signals. + sampleCounts := make([][]float64, len(colors)) + for i := range colors { + sampleCounts[i] = make([]float64, 60) + } + + // pointCounts stores the number of points per second. + pointCounts := make([]int, 60) + + // Reconstruct the signals using the output sample weights. + // The effective count of each sample point is its output + // weight divided by its original weight. + for i := 0; i < sampler.Size(); i++ { + sample, weight := sampler.Get(i) + origWeight := sampler.GetOriginalWeight(i) + point := sample.(point) + second := int(math.Floor(point.xvalue)) + sampleCounts[point.color][second] += (weight / origWeight) + pointCounts[second]++ + } + + // Compute standard deviation of sample points per second. + sum := 0.0 + mean := float64(sampleSize) / 60 + for s := 0; s < 60; s++ { + e := float64(pointCounts[s]) - mean + sum += e * e + } + stddev := math.Sqrt(sum / (60 - 1)) + + fmt.Printf("Samples per second mean %.2f\n", mean) + fmt.Printf("Samples per second standard deviation %.2f\n", stddev) + + // Compute mean absolute percentage error between sampleCounts + // and origCounts for each signal. + for c := range colors { + mae := 0.0 + for s := 0; s < 60; s++ { + mae += math.Abs(sampleCounts[c][s]-float64(origCounts[c][s])) / float64(origCounts[c][s]) + } + mae /= 60 + fmt.Printf("Mean absolute percentage error (%s) = %.2f%%\n", colors[c].color, mae*100) + } + + // Output: + // Samples per second mean 166.67 + // Samples per second standard deviation 13.75 + // Mean absolute percentage error (red) = 25.16% + // Mean absolute percentage error (green) = 14.30% + // Mean absolute percentage error (blue) = 14.23% +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e61e0b6 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module github.com/lightstep/varopt + +go 1.15 + +require github.com/stretchr/testify v1.4.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8fdee58 --- /dev/null +++ b/go.sum @@ -0,0 +1,11 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/sampleheap.go b/internal/sampleheap.go new file mode 100644 index 0000000..c3d82a5 --- /dev/null +++ b/internal/sampleheap.go @@ -0,0 +1,57 @@ +// Copyright 2019, LightStep Inc. + +package internal + +type Vsample struct { + Sample interface{} + Weight float64 +} + +type SampleHeap []Vsample + +func (sh *SampleHeap) Push(v Vsample) { + l := append(*sh, v) + n := len(l) - 1 + + // This copies the body of heap.up(). + j := n + for { + i := (j - 1) / 2 // parent + if i == j || l[j].Weight >= l[i].Weight { + break + } + l[i], l[j] = l[j], l[i] + j = i + } + + *sh = l +} + +func (sh *SampleHeap) Pop() Vsample { + l := *sh + n := len(l) - 1 + result := l[0] + l[0] = l[n] + l = l[:n] + + // This copies the body of heap.down(). + i := 0 + for { + j1 := 2*i + 1 + if j1 >= n || j1 < 0 { // j1 < 0 after int overflow + break + } + j := j1 // left child + if j2 := j1 + 1; j2 < n && l[j2].Weight < l[j1].Weight { + j = j2 // = 2*i + 2 // right child + } + if l[j].Weight >= l[i].Weight { + break + } + l[i], l[j] = l[j], l[i] + i = j + } + + *sh = l + return result +} diff --git a/internal/sampleheap_test.go b/internal/sampleheap_test.go new file mode 100644 index 0000000..50615c0 --- /dev/null +++ b/internal/sampleheap_test.go @@ -0,0 +1,58 @@ +// Copyright 2019, LightStep Inc. + +package internal_test + +import ( + "container/heap" + "math/rand" + "testing" + + "github.com/lightstep/varopt/internal" + "github.com/stretchr/testify/require" +) + +type simpleHeap []float64 + +func (s *simpleHeap) Len() int { + return len(*s) +} + +func (s *simpleHeap) Swap(i, j int) { + (*s)[i], (*s)[j] = (*s)[j], (*s)[i] +} + +func (s *simpleHeap) Less(i, j int) bool { + return (*s)[i] < (*s)[j] +} + +func (s *simpleHeap) Push(x interface{}) { + *s = append(*s, x.(float64)) +} + +func (s *simpleHeap) Pop() interface{} { + old := *s + n := len(old) + x := old[n-1] + *s = old[0 : n-1] + return x +} + +func TestLargeHeap(t *testing.T) { + var L internal.SampleHeap + var S simpleHeap + + for i := 0; i < 1e6; i++ { + v := rand.NormFloat64() + L.Push(internal.Vsample{Weight: v}) + heap.Push(&S, v) + } + + for len(S) > 0 { + v1 := heap.Pop(&S).(float64) + v2 := L.Pop().Weight + + require.Equal(t, v1, v2) + } + + require.Equal(t, 0, len(L)) +} diff --git a/simple/doc.go b/simple/doc.go new file mode 100644 index 0000000..0d79ebe --- /dev/null +++ b/simple/doc.go @@ -0,0 +1,4 @@ +// Copyright 2019, LightStep Inc. + +/* Package simple implements an unweighted reservoir sampling algorithm. */ +package simple diff --git a/simple/simple.go b/simple/simple.go new file mode 100644 index 0000000..e3fb340 --- /dev/null +++ b/simple/simple.go @@ -0,0 +1,65 @@ +// Copyright 2019, LightStep Inc. + +package simple + +import ( + "math/rand" + + "github.com/lightstep/varopt" +) + +// Simple implements unweighted reservoir sampling using Algorithm R +// from "Random sampling with a reservoir" by Jeffrey Vitter (1985) +// https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_R +type Simple struct { + capacity int + observed int + buffer []varopt.Sample + rnd *rand.Rand +} + +// New returns a simple reservoir sampler with given capacity +// (i.e., reservoir size) and random number generator. +func New(capacity int, rnd *rand.Rand) *Simple { + return &Simple{ + capacity: capacity, + rnd: rnd, + } +} + +// Add considers a new observation for the sample. Items have unit +// weight. +func (s *Simple) Add(span varopt.Sample) { + s.observed++ + + if s.buffer == nil { + s.buffer = make([]varopt.Sample, 0, s.capacity) + } + + if len(s.buffer) < s.capacity { + s.buffer = append(s.buffer, span) + return + } + + // Give this a capacity/observed chance of replacing an existing entry. + index := s.rnd.Intn(s.observed) + if index < s.capacity { + s.buffer[index] = span + } +} + +// Get returns the i'th selected item from the sample. +func (s *Simple) Get(i int) varopt.Sample { + return s.buffer[i] +} + +// Size returns the number of items in the sample. If the reservoir is +// full, Size() equals Capacity(). +func (s *Simple) Size() int { + return len(s.buffer) +} + +// Count returns the number of items that were observed. +func (s *Simple) Count() int { + return s.observed +} diff --git a/simple/simple_test.go b/simple/simple_test.go new file mode 100644 index 0000000..4ea9592 --- /dev/null +++ b/simple/simple_test.go @@ -0,0 +1,41 @@ +// Copyright 2019, LightStep Inc. + +package simple_test + +import ( + "math/rand" + "testing" + + "github.com/lightstep/varopt/simple" + "github.com/stretchr/testify/require" +) + +type iRec int + +func TestSimple(t *testing.T) { + const ( + popSize = 1e6 + sampleProb = 0.1 + sampleSize int = popSize * sampleProb + epsilon = 0.01 + ) + + rnd := rand.New(rand.NewSource(17167)) + + ss := simple.New(sampleSize, rnd) + + psum := 0. + for i := 0; i < popSize; i++ { + ss.Add(iRec(i)) + psum += float64(i) + } + + require.Equal(t, ss.Size(), sampleSize) + + ssum := 0.0 + for i := 0; i < sampleSize; i++ { + ssum += float64(ss.Get(i).(iRec)) + } + + require.InEpsilon(t, ssum/float64(ss.Size()), psum/popSize, epsilon) +} diff --git a/varopt.go b/varopt.go index d3cd2f1..dbe605b 100644 --- a/varopt.go +++ b/varopt.go @@ -1,25 +1,32 @@ -// Stream sampling for variance-optimal estimation of subset sums -// Edith Cohen, Nick Duffield, Haim Kaplan, Carsten Lund, Mikkel Thorup -// 2008 -// https://arxiv.org/pdf/0803.0473.pdf +// Copyright 2019, LightStep Inc. package varopt import ( - "container/heap" "fmt" + "math" "math/rand" + + "github.com/lightstep/varopt/internal" ) +// Varopt implements the algorithm from Stream sampling for +// variance-optimal estimation of subset sums Edith Cohen, Nick +// Duffield, Haim Kaplan, Carsten Lund, Mikkel Thorup 2008 +// +// https://arxiv.org/pdf/0803.0473.pdf type Varopt struct { - // Large-weight items - L largeHeap + // Random number generator + rnd *rand.Rand + + // Large-weight items stored in a min-heap. + L internal.SampleHeap // Light-weight items. - T []vsample + T []internal.Vsample // Temporary buffer. - X []vsample + X []internal.Vsample // Current threshold tau float64 @@ -31,40 +38,56 @@ type Varopt struct { totalWeight float64 } -type vsample struct { - sample Sample - weight float64 -} +// Sample is an empty interface that represents a sample item. +// Sampling algorithms treat these as opaque, as their weight is +// passed in separately. +type Sample interface{} -type largeHeap []vsample +var ErrInvalidWeight = fmt.Errorf("Negative, Zero, Inf or NaN weight") -func NewVaropt(capacity int) *Varopt { - v := InitVaropt(capacity) - return &v -} - -func InitVaropt(capacity int) Varopt { - return Varopt{ +// New returns a new Varopt sampler with given capacity (i.e., +// reservoir size) and random number generator. +func New(capacity int, rnd *rand.Rand) *Varopt { + return &Varopt{ capacity: capacity, + rnd: rnd, + L: make(internal.SampleHeap, 0, capacity), + T: make(internal.SampleHeap, 0, capacity), } } -func (s *Varopt) Add(sample Sample, weight float64) { - individual := vsample{ - sample: sample, - weight: weight, +// Reset returns the sampler to its initial state, maintaining its +// capacity and random number source. +func (s *Varopt) Reset() { + s.L = s.L[:0] + s.T = s.T[:0] + s.X = s.X[:0] + s.tau = 0 + s.totalCount = 0 + s.totalWeight = 0 +} + +// Add considers a new observation for the sample with given weight. +// If there is an item ejected from the sample as a result, the item +// is returned to allow re-use of memory. +// +// An error will be returned if the weight is either negative or NaN. +func (s *Varopt) Add(sample Sample, weight float64) (Sample, error) { + individual := internal.Vsample{ + Sample: sample, + Weight: weight, } - if weight <= 0 { - panic(fmt.Sprint("Invalid weight <= 0: ", weight)) + if weight <= 0 || math.IsNaN(weight) || math.IsInf(weight, 1) { + return nil, ErrInvalidWeight } s.totalCount++ s.totalWeight += weight if s.Size() < s.capacity { - heap.Push(&s.L, individual) - return + s.L.Push(individual) + return nil, nil } // the X <- {} step from the paper is not done here, @@ -73,16 +96,16 @@ func (s *Varopt) Add(sample Sample, weight float64) { W := s.tau * float64(len(s.T)) if weight > s.tau { - heap.Push(&s.L, individual) + s.L.Push(individual) } else { s.X = append(s.X, individual) W += weight } - for len(s.L) > 0 && W >= float64(len(s.T)+len(s.X)-1)*s.L[0].weight { - h := heap.Pop(&s.L).(vsample) + for len(s.L) > 0 && W >= float64(len(s.T)+len(s.X)-1)*s.L[0].Weight { + h := s.L.Pop() s.X = append(s.X, h) - W += h.weight + W += h.Weight } s.tau = W / float64(len(s.T)+len(s.X)-1) @@ -90,27 +113,31 @@ func (s *Varopt) Add(sample Sample, weight float64) { d := 0 for d < len(s.X) && r >= 0 { - wxd := s.X[d].weight + wxd := s.X[d].Weight r -= (1 - wxd/s.tau) d++ } + var eject Sample if r < 0 { if d < len(s.X) { s.X[d], s.X[len(s.X)-1] = s.X[len(s.X)-1], s.X[d] } + eject = s.X[len(s.X)-1].Sample s.X = s.X[:len(s.X)-1] } else { - ti := rand.Intn(len(s.T)) + ti := s.rnd.Intn(len(s.T)) s.T[ti], s.T[len(s.T)-1] = s.T[len(s.T)-1], s.T[ti] + eject = s.T[len(s.T)-1].Sample s.T = s.T[:len(s.T)-1] } s.T = append(s.T, s.X...) s.X = s.X[:0] + return eject, nil } func (s *Varopt) uniform() float64 { for { - r := rand.Float64() + r := s.rnd.Float64() if r != 0.0 { return r } @@ -122,60 +149,48 @@ func (s *Varopt) uniform() float64 { // GetOriginalWeight(i). func (s *Varopt) Get(i int) (Sample, float64) { if i < len(s.L) { - return s.L[i].sample, s.L[i].weight + return s.L[i].Sample, s.L[i].Weight } - return s.T[i-len(s.L)].sample, s.tau + return s.T[i-len(s.L)].Sample, s.tau } +// GetOriginalWeight returns the original input weight of the sample +// item that was passed to Add(). This can be useful for computing a +// frequency from the adjusted sample weight. func (s *Varopt) GetOriginalWeight(i int) float64 { if i < len(s.L) { - return s.L[i].weight + return s.L[i].Weight } - return s.T[i-len(s.L)].weight + return s.T[i-len(s.L)].Weight } +// Capacity returns the size of the reservoir. This is the maximum +// size of the sample. func (s *Varopt) Capacity() int { return s.capacity } +// Size returns the current number of items in the sample. If the +// reservoir is full, this returns Capacity(). func (s *Varopt) Size() int { return len(s.L) + len(s.T) } +// TotalWeight returns the sum of weights that were passed to Add(). func (s *Varopt) TotalWeight() float64 { return s.totalWeight } +// TotalCount returns the number of calls to Add(). func (s *Varopt) TotalCount() int { return s.totalCount } +// Tau returns the current large-weight threshold. Weights larger +// than Tau() carry their exact weight in the sample. See the VarOpt +// paper for details. func (s *Varopt) Tau() float64 { return s.tau } - -func (b largeHeap) Len() int { - return len(b) -} - -func (b largeHeap) Swap(i, j int) { - b[i], b[j] = b[j], b[i] -} - -func (b largeHeap) Less(i, j int) bool { - return b[i].weight < b[j].weight -} - -func (b *largeHeap) Push(x interface{}) { - *b = append(*b, x.(vsample)) -} - -func (b *largeHeap) Pop() interface{} { - old := *b - n := len(old) - x := old[n-1] - *b = old[0 : n-1] - return x -} diff --git a/varopt_test.go b/varopt_test.go index 4a19b80..a56c691 100644 --- a/varopt_test.go +++ b/varopt_test.go @@ -1,9 +1,14 @@ +// Copyright 2019, LightStep Inc. + package varopt_test import ( "math" + "math/rand" "testing" + "github.com/lightstep/varopt" + "github.com/lightstep/varopt/simple" "github.com/stretchr/testify/require" ) @@ -11,18 +16,15 @@ import ( // There are odd and even numbers, in equal amount // There are last-digits 0-9 in equal amount // -// Much like simple_test.go, we will test the mean is correct and, -// because unbiased, also the odd/even and last-digit-0-9 groupings -// will be balanced. +// We will test the mean is correct and, because unbiased, also the +// odd/even and last-digit-0-9 groupings will be balanced. const ( numBlocks = 100 popSize = 1e7 sampleProb = 0.001 sampleSize int = popSize * sampleProb - // TODO epsilon is somewhat variable b/c we're using the - // static rand w/o a fixed seed for the test. - epsilon = 0.06 + epsilon = 0.08 ) func TestUnbiased(t *testing.T) { @@ -53,7 +55,7 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { extra = popSize - bigSize*numBig - smallSize*numSmall ) - population := make([]Sample, popSize) + population := make([]varopt.Sample, popSize) psum := 0.0 @@ -67,17 +69,17 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { // population[i], population[j] = population[j], population[i] // }) - smallBlocks := make([][]Sample, numSmall) - bigBlocks := make([][]Sample, numBig) + smallBlocks := make([][]varopt.Sample, numSmall) + bigBlocks := make([][]varopt.Sample, numBig) for i := 0; i < numSmall; i++ { - smallBlocks[i] = make([]Sample, smallSize) + smallBlocks[i] = make([]varopt.Sample, smallSize) } for i := 0; i < numBig; i++ { if i == 0 { - bigBlocks[0] = make([]Sample, bigSize+extra) + bigBlocks[0] = make([]varopt.Sample, bigSize+extra) } else { - bigBlocks[i] = make([]Sample, bigSize) + bigBlocks[i] = make([]varopt.Sample, bigSize) } } @@ -97,22 +99,23 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { require.Equal(t, len(population), pos) maxDiff := 0.0 + rnd := rand.New(rand.NewSource(98887)) - func(allBlockLists ...[][][]Sample) { + func(allBlockLists ...[][][]varopt.Sample) { for _, blockLists := range allBlockLists { - varopt := NewVaropt(sampleSize) + vsample := varopt.New(sampleSize, rnd) for _, blockList := range blockLists { for _, block := range blockList { - simple := NewSimple(sampleSize) + ss := simple.New(sampleSize, rnd) for _, s := range block { - simple.Add(s) + ss.Add(s) } - weight := simple.Weight() - for i := 0; i < simple.Size(); i++ { - varopt.Add(simple.Get(i), weight) + weight := float64(ss.Count()) / float64(ss.Size()) + for i := 0; i < ss.Size(); i++ { + vsample.Add(ss.Get(i), weight) } } } @@ -121,8 +124,8 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { odd := 0.0 even := 0.0 - for i := 0; i < varopt.Size(); i++ { - v, w := varopt.Get(i) + for i := 0; i < vsample.Size(); i++ { + v, w := vsample.Get(i) vi := v.(int) if vi%2 == 0 { even++ @@ -140,7 +143,98 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { require.InEpsilon(t, odd, even, epsilon) } }( - [][][]Sample{bigBlocks, smallBlocks}, - [][][]Sample{smallBlocks, bigBlocks}, + [][][]varopt.Sample{bigBlocks, smallBlocks}, + [][][]varopt.Sample{smallBlocks, bigBlocks}, ) } + +func TestInvalidWeight(t *testing.T) { + rnd := rand.New(rand.NewSource(98887)) + v := varopt.New(1, rnd) + + _, err := v.Add(nil, math.NaN()) + require.Equal(t, err, varopt.ErrInvalidWeight) + + _, err = v.Add(nil, -1) + require.Equal(t, err, varopt.ErrInvalidWeight) + + _, err = v.Add(nil, 0) + require.Equal(t, err, varopt.ErrInvalidWeight) +} + +func TestReset(t *testing.T) { + const capacity = 10 + const insert = 100 + rnd := rand.New(rand.NewSource(98887)) + v := varopt.New(capacity, rnd) + + sum := 0. + for i := 1.; i <= insert; i++ { + v.Add(nil, i) + sum += i + } + + require.Equal(t, capacity, v.Size()) + require.Equal(t, insert, v.TotalCount()) + require.Equal(t, sum, v.TotalWeight()) + require.Less(t, 0., v.Tau()) + + v.Reset() + + require.Equal(t, 0, v.Size()) + require.Equal(t, 0, v.TotalCount()) + require.Equal(t, 0., v.TotalWeight()) + require.Equal(t, 0., v.Tau()) +} + +func TestEject(t *testing.T) { + const capacity = 100 + const rounds = 10000 + const maxvalue = 10000 + + entries := make([]int, capacity+1) + freelist := make([]*int, capacity+1) + + for i := range entries { + freelist[i] = &entries[i] + } + + // Make two deterministically equal samplers + rnd1 := rand.New(rand.NewSource(98887)) + rnd2 := rand.New(rand.NewSource(98887)) + vsrc := rand.New(rand.NewSource(98887)) + + expected := varopt.New(capacity, rnd1) + ejector := varopt.New(capacity, rnd2) + + for i := 0; i < rounds; i++ { + value := vsrc.Intn(maxvalue) + weight := vsrc.ExpFloat64() + + _, _ = expected.Add(&value, weight) + + lastitem := len(freelist) - 1 + item := freelist[lastitem] + freelist = freelist[:lastitem] + + *item = value + eject, _ := ejector.Add(item, weight) + + if eject != nil { + freelist = append(freelist, eject.(*int)) + } + } + + require.Equal(t, expected.Size(), ejector.Size()) + require.Equal(t, expected.TotalCount(), ejector.TotalCount()) + require.Equal(t, expected.TotalWeight(), ejector.TotalWeight()) + require.Equal(t, expected.Tau(), ejector.Tau()) + + for i := 0; i < capacity; i++ { + expectItem, expectWeight := expected.Get(i) + ejectItem, ejectWeight := expected.Get(i) + + require.Equal(t, *expectItem.(*int), *ejectItem.(*int)) + require.Equal(t, expectWeight, ejectWeight) + } +} diff --git a/weighted_test.go b/weighted_test.go new file mode 100644 index 0000000..1e82043 --- /dev/null +++ b/weighted_test.go @@ -0,0 +1,82 @@ +// Copyright 2019, LightStep Inc. + +package varopt_test + +import ( + "fmt" + "math" + "math/rand" + + "github.com/lightstep/varopt" +) + +type packet struct { + size int + color string + protocol string +} + +func ExampleNew() { + const totalPackets = 1e6 + const sampleRatio = 0.01 + + colors := []string{"red", "green", "blue"} + protocols := []string{"http", "tcp", "udp"} + + sizeByColor := map[string]int{} + sizeByProtocol := map[string]int{} + trueTotalWeight := 0.0 + + rnd := rand.New(rand.NewSource(32491)) + sampler := varopt.New(totalPackets*sampleRatio, rnd) + + for i := 0; i < totalPackets; i++ { + packet := packet{ + size: 1 + rnd.Intn(100000), + color: colors[rnd.Intn(len(colors))], + protocol: protocols[rnd.Intn(len(protocols))], + } + + sizeByColor[packet.color] += packet.size + sizeByProtocol[packet.protocol] += packet.size + trueTotalWeight += float64(packet.size) + + sampler.Add(packet, float64(packet.size)) + } + + estSizeByColor := map[string]float64{} + estSizeByProtocol := map[string]float64{} + estTotalWeight := 0.0 + + for i := 0; i < sampler.Size(); i++ { + sample, weight := sampler.Get(i) + packet := sample.(packet) + estSizeByColor[packet.color] += weight + estSizeByProtocol[packet.protocol] += weight + estTotalWeight += weight + } + + // Compute mean average percentage error for colors + colorMape := 0.0 + for _, c := range colors { + colorMape += math.Abs(float64(sizeByColor[c])-estSizeByColor[c]) / float64(sizeByColor[c]) + } + colorMape /= float64(len(colors)) + + // Compute mean average percentage error for protocols + protocolMape := 0.0 + for _, p := range protocols { + protocolMape += math.Abs(float64(sizeByProtocol[p])-estSizeByProtocol[p]) / float64(sizeByProtocol[p]) + } + protocolMape /= float64(len(protocols)) + + // Compute total sum error percentage + fmt.Printf("Total sum error %.2g%%\n", 100*math.Abs(estTotalWeight-trueTotalWeight)/trueTotalWeight) + fmt.Printf("Color mean absolute percentage error %.2f%%\n", 100*colorMape) + fmt.Printf("Protocol mean absolute percentage error %.2f%%\n", 100*protocolMape) + + // Output: + // Total sum error 2.4e-11% + // Color mean absolute percentage error 0.73% + // Protocol mean absolute percentage error 1.62% +}