-
Notifications
You must be signed in to change notification settings - Fork 202
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add implementation of snapping mechanism
First draft implementation of the snapping mechanism. This addresses a vulnerability in the Laplace mechanism and its derivatives, stemming from floating-point numbers. The mechanism was proposed as a solution to this vulnerability by Ilya Mironov Paper link: https://www.microsoft.com/en-us/research/wp-content/uploads/2012/10/lsbs.pdf
- Loading branch information
Showing
4 changed files
with
339 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,241 @@ | ||
|
||
""" | ||
The Snapping mechanism in differential privacy, which eliminates a weakness to floating point errors in the classic | ||
Laplace mechanism with standard Laplace sampling. | ||
""" | ||
import math | ||
import secrets | ||
import struct | ||
from numbers import Real | ||
|
||
import crlibm | ||
import numpy as np | ||
|
||
from diffprivlib.mechanisms import DPMechanism | ||
from diffprivlib.mechanisms.base import TruncationAndFoldingMixin | ||
|
||
|
||
class Snapping(DPMechanism, TruncationAndFoldingMixin): | ||
r""" | ||
The Snapping mechanism for differential privacy. | ||
First proposed by Ilya Mironov [M12]_. | ||
It eliminates a vulnerability stemming from the representation of reals as floating-point numbers in implementations | ||
of the classic Laplace mechanism and its variants which use the inverse CDF of the Laplace distribution to sample | ||
it. It causes a high degree of reduction in the granularity of the output. | ||
Parameters | ||
---------- | ||
epsilon : float | ||
Privacy parameter :math:`\epsilon` for the mechanism. Must be in [0, ∞]. | ||
sensitivity : float | ||
The sensitivity of the mechanism. Must be in [0, ∞). | ||
lower : float | ||
The lower bound of the mechanism. | ||
upper : float | ||
The upper bound of the mechanism. | ||
References | ||
---------- | ||
.. [Mir12] Mironov, Ilya. "On significance of the least significant bits for differential privacy." Proceedings of | ||
the 2012 ACM conference on Computer and communications security (2012). | ||
""" | ||
def __init__(self, *, epsilon, sensitivity, lower, upper): | ||
super().__init__(epsilon=epsilon, delta=0.0) | ||
self._check_sensitivity(sensitivity) | ||
self.sensitivity = sensitivity | ||
TruncationAndFoldingMixin.__init__(self, lower=lower, upper=upper) | ||
self._bound = self._scale_bound() | ||
|
||
@classmethod | ||
def _check_sensitivity(cls, sensitivity): | ||
if not isinstance(sensitivity, Real): | ||
raise TypeError("Sensitivity must be numeric") | ||
|
||
if sensitivity < 0: | ||
raise ValueError("Sensitivity must be non-negative") | ||
return float(sensitivity) | ||
|
||
def _check_all(self, value): | ||
super()._check_all(value) | ||
TruncationAndFoldingMixin._check_all(self, value) | ||
self._check_sensitivity(sensitivity=self.sensitivity) | ||
|
||
if not isinstance(value, Real): | ||
raise TypeError("Value to be randomised must be a number") | ||
|
||
return True | ||
|
||
def _scale_bound(self): | ||
""" | ||
Scales the lower and upper bounds to be proportionate to sensitivity 1, and symmetrical about 0. | ||
For sensitivity 0, only centres the bound, as scaling up and down is not defined. | ||
Returns | ||
------- | ||
float | ||
A symmetric bound around 0 scaled to sensitivity 1 | ||
""" | ||
if self.sensitivity > 0: | ||
return (self.upper - self.lower) / 2.0 / self.sensitivity | ||
return (self.upper - self.lower) / 2.0 | ||
|
||
def _truncate(self, value): | ||
if value > self._bound: | ||
return self._bound | ||
if value < -self._bound: | ||
return -self._bound | ||
|
||
return value | ||
|
||
def bias(self, value): | ||
raise NotImplementedError | ||
|
||
def variance(self, value): | ||
raise NotImplementedError | ||
|
||
def mse(self, value): | ||
raise NotImplementedError | ||
|
||
def effective_epsilon(self): | ||
r""" | ||
Computes the effective value of :math:`\epsilon` that the Snapping mechanism guarantees compared to an | ||
equivalent Laplace mechanims based on the bounds and the machine epsilon. | ||
Defined in section 5.2 of [Mir12]_. | ||
Returns | ||
------- | ||
float | ||
The effective value of :math:`\epsilon` | ||
""" | ||
machine_epsilon = np.finfo(float).epsneg | ||
return self.epsilon + 12.0 * self._bound * self.epsilon + 2.0 * machine_epsilon | ||
|
||
def _scale_and_offset_value(self, value): | ||
""" | ||
Centre value around 0 with symmetric bound and scale to sensitivity 1 | ||
Parameters | ||
---------- | ||
value : float | ||
value to be scaled | ||
Returns | ||
------- | ||
float | ||
value offset to be centered on 0 and scaled to sensitivity 1 | ||
""" | ||
value_scaled = value / self.sensitivity | ||
return value_scaled - self._bound - (self.lower / self.sensitivity) | ||
|
||
def _reverse_scale_and_offset_value(self, value): | ||
return (value + self._bound) * self.sensitivity + self.lower | ||
|
||
def _get_nearest_power_of_2(self, x): | ||
def float_to_bits(d): | ||
s = struct.pack('>d', d) | ||
return struct.unpack('>q', s)[0] | ||
|
||
def bits_to_float(b): | ||
s = struct.pack('>q', b) | ||
return struct.unpack('>d', s)[0] | ||
|
||
bits = float_to_bits(x) | ||
if bits % (1 << 52) == 0: | ||
return x | ||
return bits_to_float(((bits >> 52) + 1) << 52) | ||
|
||
def _round_to_nearest_power_of_2(self, value): | ||
""" Performs the rounding step from [Mir12]_ with ties resolved towards +∞ | ||
Parameters | ||
---------- | ||
value : float | ||
Value to be rounded | ||
Returns | ||
------- | ||
float | ||
Rounded value | ||
""" | ||
if self.epsilon == float('inf'): # infinitely small rounding | ||
return value | ||
base = self._get_nearest_power_of_2(1.0 / self.epsilon) | ||
remainder = value % base | ||
if remainder > base / 2: | ||
return value - remainder + base | ||
if remainder == base / 2: | ||
return value + remainder | ||
return value - remainder | ||
|
||
def _sample_uniform(self): | ||
""" | ||
Uniformly sample the full domain of floating-point numbers between (0, 1), rather than only multiples of 2^-53. | ||
A uniform distribution over D ∩ (0, 1) can be generated by independently sampling an exponent | ||
from the geometric distribution with parameter .5 and a significand by drawing a uniform string from | ||
{0, 1}^52 [Mir12]_ | ||
Based on code recipe in Python standard library documentation [Py21]_. | ||
Returns | ||
------- | ||
float | ||
A value sampled from float in (0, 1) with probability proportional to the size of the infinite-precision | ||
real interval each float represents | ||
References | ||
---------- | ||
.. [Py21] The Python Standard Library. "random — Generate pseudo-random numbers", 2021 | ||
https://docs.python.org/3/library/random.html#recipes | ||
""" | ||
mantissa = 1 << 52 | secrets.randbits(52) | ||
exponent = -53 | ||
x = 0 | ||
while not x: | ||
x = secrets.randbits(32) | ||
exponent += x.bit_length() - 32 | ||
return math.ldexp(mantissa, exponent) | ||
|
||
def _sample_laplace(self): | ||
r""" | ||
Laplace inverse CDF random sampling implementation which uses full domain uniform sampling and exact log | ||
implementation from crlibm, as mentioned in [Mir12]_. | ||
Outputs a random value scaled according to privacy budget and sensitivity 1, as bounds and input are scaled to | ||
sensitivity 1 before Laplacian noise is added. | ||
Returns | ||
------- | ||
float | ||
Random value from Laplace distribution scaled according to :math:`\epsilon` | ||
""" | ||
sign = secrets.randbits(1) | ||
uniform = self._sample_uniform() | ||
laplace = (-1)**sign * 1.0 / self.epsilon * crlibm.log_rn(uniform) | ||
return laplace | ||
|
||
def randomise(self, value): | ||
"""Randomise `value` with the mechanism. | ||
Parameters | ||
---------- | ||
value : float | ||
The value to be randomised. | ||
Returns | ||
------- | ||
float | ||
The randomised value. | ||
""" | ||
self._check_all(value) | ||
if self.sensitivity > 0: | ||
value_scaled_offset = self._scale_and_offset_value(value) | ||
value_clamped = self._truncate(value_scaled_offset) | ||
laplace = self._sample_laplace() | ||
value_rounded = self._round_to_nearest_power_of_2(value_clamped + laplace) | ||
return self._reverse_scale_and_offset_value(self._truncate(value_rounded)) | ||
else: | ||
return self._truncate(value) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
import numpy as np | ||
from unittest import TestCase | ||
|
||
import pytest | ||
|
||
from diffprivlib.mechanisms import Snapping | ||
from diffprivlib.utils import global_seed | ||
|
||
|
||
class TestSnapping(TestCase): | ||
def setup_method(self, method): | ||
if method.__name__ .endswith("prob"): | ||
global_seed(314159) | ||
|
||
self.mech = Snapping | ||
|
||
def teardown_method(self, method): | ||
del self.mech | ||
|
||
def test_class(self): | ||
from diffprivlib.mechanisms import DPMechanism | ||
self.assertTrue(issubclass(Snapping, DPMechanism)) | ||
|
||
def test_neg_sensitivity(self): | ||
with self.assertRaises(ValueError): | ||
self.mech(epsilon=1, sensitivity=-1, lower=0, upper=1000) | ||
|
||
def test_str_sensitivity(self): | ||
with self.assertRaises(TypeError): | ||
self.mech(epsilon=1, sensitivity="1", lower=0, upper=1000) | ||
|
||
def test_zero_sensitivity(self): | ||
mech = self.mech(epsilon=1, sensitivity=0, lower=0, upper=1000) | ||
|
||
for i in range(1000): | ||
self.assertAlmostEqual(mech.randomise(1), 1) | ||
|
||
def test_neg_epsilon(self): | ||
with self.assertRaises(ValueError): | ||
self.mech(epsilon=-1, sensitivity=1, lower=0, upper=1000) | ||
|
||
def test_inf_epsilon(self): | ||
mech = self.mech(epsilon=float("inf"), sensitivity=1, lower=0, upper=1000) | ||
|
||
for i in range(1000): | ||
self.assertAlmostEqual(mech.randomise(1), 1) | ||
|
||
def test_complex_epsilon(self): | ||
with self.assertRaises(TypeError): | ||
self.mech(epsilon=1 + 2j, sensitivity=1, lower=0, upper=1000) | ||
|
||
def test_string_epsilon(self): | ||
with self.assertRaises(TypeError): | ||
self.mech(epsilon="Two", sensitivity=1, lower=0, upper=1000) | ||
|
||
def test_repr(self): | ||
repr_ = repr(self.mech(epsilon=1, sensitivity=1, lower=0, upper=1000)) | ||
self.assertIn(".Snapping(", repr_) | ||
|
||
def test_epsilon(self): | ||
mech = self.mech(epsilon=1, sensitivity=1, lower=0, upper=1000) | ||
self.assertIsNotNone(mech.randomise(1)) | ||
|
||
def test_non_numeric(self): | ||
mech = self.mech(epsilon=1, sensitivity=1, lower=0, upper=1000) | ||
with self.assertRaises(TypeError): | ||
mech.randomise("Hello") | ||
|
||
def test_zero_median_prob(self): | ||
mech = self.mech(epsilon=1, sensitivity=1, lower=0, upper=1000) | ||
vals = [] | ||
|
||
for i in range(10000): | ||
vals.append(mech.randomise(0)) | ||
|
||
median = float(np.median(vals)) | ||
self.assertAlmostEqual(np.abs(median), 0.0, delta=0.1) | ||
|
||
def test_neighbours_prob(self): | ||
epsilon = 1 | ||
runs = 10000 | ||
mech = self.mech(epsilon=epsilon, sensitivity=1, lower=0, upper=1000) | ||
count = [0, 0] | ||
|
||
for i in range(runs): | ||
val0 = mech.randomise(0) | ||
if val0 <= 0: | ||
count[0] += 1 | ||
|
||
val1 = mech.randomise(1) | ||
if val1 <= 0: | ||
count[1] += 1 | ||
|
||
self.assertGreater(count[0], count[1]) | ||
self.assertLessEqual(count[0] / runs, np.exp(epsilon) * count[1] / runs + 0.1) | ||
|