Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added range query. #158

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cyaron/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from random import choice, randint, random, randrange, uniform

#from .visual import visualize
# from .visual import visualize
from . import log
from .compare import Compare
from .consts import *
Expand All @@ -21,3 +21,4 @@
from .string import String
from .utils import *
from .vector import Vector
from .query import RangeQuery
138 changes: 138 additions & 0 deletions cyaron/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""
This module provides a `RangeQuery` class for generating queries
based on limits of each dimension.

Classes:
RangeQuery: A class for generating random queries.

Usage:
n = randint(1, 10)
q = randint(1, 10)
Q = Query.random(q, [(1, n)])
"""

import random
from enum import IntEnum
from typing import Optional, Union, Tuple, List

from .utils import list_like


class RangeQueryRandomMode(IntEnum):
less = 0 # disallow l = r
allow_equal = 1 # allow l = r


class RangeQuery:
"""A class for generating random queries."""
result: List[Tuple[List[int], List[int]]]

def __init__(self):
self.result = []

def __len__(self):
return len(self.result)

def __getitem__(self, item):
return self.result[item]

def __str__(self):
"""__str__(self) -> str
Return a string to output the queries.
The string contains all the queries with l and r in a row, splits with "\\n".
"""
return self.to_str()

def to_str(self):
"""
Return a string to output the queries.
The string contains all the queries with l and r in a row, splits with "\\n".
"""
res = ''
for l, r, in self.result:
l_to_str = [str(x) for x in l]
r_to_str = [str(x) for x in r]
res += ' '.join(l_to_str) + ' ' + ' '.join(r_to_str) + '\n'
return res[:-1] # remove the last '\n'

@staticmethod
def random(
num: int = 1,
position_range: Optional[List[Union[int, Tuple[int, int]]]] = None,
mode: RangeQueryRandomMode = RangeQueryRandomMode.allow_equal,
):
"""
Generate `num` random queries with dimension limit.
Args:
num: the number of queries
position_range: a list of limits for each dimension
single number x represents range [1, x]
list [x, y] or tuple (x, y) represents range [x, y]
mode: the mode queries generate, see Enum Class RangeQueryRandomMode
"""
if position_range is None:
position_range = [10]

ret = RangeQuery()

if not list_like(position_range):
raise TypeError("the 2nd param must be a list or tuple")

for _ in range(num):
ret.result.append(RangeQuery.get_one_query(position_range, mode))
return ret

@staticmethod
def get_one_query(
position_range: Optional[List[Union[int, Tuple[int, int]]]] = None,
mode: RangeQueryRandomMode = RangeQueryRandomMode.allow_equal,
) -> Tuple[List[int], List[int]]:
"""
Generate a pair of query lists (query_l, query_r) based on the given position ranges and mode.
Args:
position_range (Optional[List[Union[int, Tuple[int, int]]]]): A list of position ranges. Each element can be:
- An integer, which will be treated as a range from 1 to that integer.
- A tuple of two integers, representing the lower and upper bounds of the range.
mode (RangeQueryRandomMode): The mode for generating the queries. It can be:
- RangeQueryRandomMode.allow_equal: Allow the generated l and r to be equal.
- RangeQueryRandomMode.less: Ensure that l and r are not equal.
Returns:
Tuple[List[int], List[int]]: A tuple containing two lists:
- query_l: A list of starting positions.
- query_r: A list of ending positions.
Raises:
ValueError: If the upper-bound is smaller than the lower-bound.
ValueError: If the mode is set to less but the upper-bound is equal to the lower-bound.
"""
dimension = len(position_range)
query_l: List[int] = []
query_r: List[int] = []
for i in range(dimension):
cur_range: Tuple[int, int]
if isinstance(position_range[i], int):
cur_range = (1, position_range[i])
elif len(position_range[i]) == 1:
cur_range = (1, position_range[i][0])
else:
cur_range = position_range[i]

if cur_range[0] > cur_range[1]:
raise ValueError(
"upper-bound should be larger than lower-bound")
if mode == RangeQueryRandomMode.less and cur_range[0] == cur_range[
1]:
raise ValueError(
"mode is set to less but upper-bound is equal to lower-bound"
)

l = random.randint(cur_range[0], cur_range[1])
r = random.randint(cur_range[0], cur_range[1])
# Expected complexity is O(log(1 / V))
while mode == RangeQueryRandomMode.less and l == r:
l = random.randint(cur_range[0], cur_range[1])
r = random.randint(cur_range[0], cur_range[1])
if l > r:
l, r = r, l
query_l.append(l)
query_r.append(r)
return (query_l, query_r)
1 change: 1 addition & 0 deletions cyaron/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
from .compare_test import TestCompare
from .graph_test import TestGraph
from .vector_test import TestVector
from .range_query_test import TestRangeQuery
from .general_test import TestGeneral
113 changes: 113 additions & 0 deletions cyaron/tests/range_query_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import unittest
import random
from cyaron.query import *
from cyaron.vector import *


def valid_query(l, r, mode: RangeQueryRandomMode, limits) -> bool:
if len(l) != len(r) or len(l) != len(limits):
return False
dimension = len(l)
for i in range(dimension):
cur_limit = limits[i]
if isinstance(cur_limit, int):
cur_limit = (1, cur_limit)
elif len(limits[i]) == 1:
cur_limit = (1, cur_limit[0])
if l[i] > r[i] or (l[i] == r[i] and mode == RangeQueryRandomMode.less):
print("bound", l[i], r[i])
return False
if not (cur_limit[0] <= l[i] <= r[i] <= cur_limit[1]):
print("limit", cur_limit[0], cur_limit[1], l[i], r[i])
return False
return True


class TestRangeQuery(unittest.TestCase):

def test_allow_equal_v1(self):
dimension = random.randint(1, 10)
limits = Vector.random(dimension, [(1, 1000)]) # n1, n2 ...
Q = RangeQuery.random(10**5, limits)
self.assertEqual(len(Q), 10**5)
for i in range(10**5):
self.assertTrue(
valid_query(Q[i][0], Q[i][1], RangeQueryRandomMode.allow_equal,
limits))

def test_allow_equal_v2_throw(self):
dimension = random.randint(1, 10)
limits = Vector.random(dimension, [(1, 1000), (1, 1000)]) # n1, n2 ...
conflict = False
for i in range(dimension):
conflict = conflict or limits[i][0] > limits[i][1]
throw = False
try:
Q = RangeQuery.random(10**5, limits)
self.assertEqual(len(Q), 10**5)
for i in range(10**5):
self.assertTrue(
valid_query(Q[i][0], Q[i][1],
RangeQueryRandomMode.allow_equal, limits))
except:
throw = True

self.assertEqual(throw, conflict)

def test_allow_equal_v2_no_throw(self):
dimension = random.randint(1, 10)
limits = Vector.random(dimension, [(1, 1000), (1, 1000)]) # n1, n2 ...
for i in range(dimension):
if limits[i][0] > limits[i][1]:
limits[i][0], limits[i][1] = limits[i][1], limits[i][0]
Q = RangeQuery.random(10**5, limits)
self.assertEqual(len(Q), 10**5)
for i in range(10**5):
self.assertTrue(
valid_query(Q[i][0], Q[i][1], RangeQueryRandomMode.allow_equal,
limits))

def test_less_v1(self):
dimension = random.randint(1, 10)
limits = Vector.random(dimension, [(2, 1000)]) # n1, n2 ...
Q = RangeQuery.random(10**5, limits, RangeQueryRandomMode.less)
self.assertEqual(len(Q), 10**5)
for i in range(10**5):
self.assertTrue(
valid_query(Q[i][0], Q[i][1], RangeQueryRandomMode.less,
limits))

def test_less_v2_throw(self):
dimension = random.randint(1, 10)
limits = Vector.random(dimension, [(1, 1000), (1, 1000)]) # n1, n2 ...
conflict = False
for i in range(dimension):
conflict = conflict or limits[i][0] >= limits[i][1]
throw = False
try:
Q = RangeQuery.random(10**5, limits, RangeQueryRandomMode.less)
self.assertEqual(len(Q), 10**5)
for i in range(10**5):
self.assertTrue(
valid_query(Q[i][0], Q[i][1], RangeQueryRandomMode.less,
limits))
except:
throw = True

self.assertEqual(throw, conflict)

def test_less_v2_no_throw(self):
dimension = random.randint(1, 10)
limits = Vector.random(dimension, [(1, 1000), (1, 1000)]) # n1, n2 ...
for i in range(dimension):
while limits[i][0] == limits[i][1]:
limits[i][0] = random.randint(1, 1000)
limits[i][1] = random.randint(1, 1000)
if limits[i][0] > limits[i][1]:
limits[i][0], limits[i][1] = limits[i][1], limits[i][0]
Q = RangeQuery.random(10**5, limits, RangeQueryRandomMode.less)
self.assertEqual(len(Q), 10**5)
for i in range(10**5):
self.assertTrue(
valid_query(Q[i][0], Q[i][1], RangeQueryRandomMode.less,
limits))