Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: new proc to foster different size retention policy implementations #2463

Merged
merged 4 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions tests/waku_archive/test_retention_policy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ suite "Waku Archive - Retention policy":

let driver = newSqliteArchiveDriver()

let retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
let retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.new(capacity=capacity)
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()

## When
Expand Down Expand Up @@ -61,7 +61,7 @@ suite "Waku Archive - Retention policy":

let driver = newSqliteArchiveDriver()

let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit)
let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.new(size=sizeLimit)
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()

# make sure that the db is empty to before test begins
Expand Down Expand Up @@ -115,7 +115,7 @@ suite "Waku Archive - Retention policy":

let
driver = newSqliteArchiveDriver()
retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.new(capacity=capacity)

let messages = @[
fakeWakuMessage(contentTopic=DefaultContentTopic, ts=ts(0)),
Expand Down
4 changes: 4 additions & 0 deletions waku/waku_archive/driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ method deleteOldestMessagesNotWithinLimit*(driver: ArchiveDriver,
limit: int):
Future[ArchiveDriverResult[void]] {.base, async.} = discard

method decreaseDatabaseSize*(driver: ArchiveDriver,
targetSizeInBytes: int64):
Future[ArchiveDriverResult[void]] {.base, async.} = discard

method close*(driver: ArchiveDriver):
Future[ArchiveDriverResult[void]] {.base, async.} = discard

34 changes: 34 additions & 0 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,40 @@ method deleteOldestMessagesNotWithinLimit*(

return ok()

method decreaseDatabaseSize*(driver: PostgresDriver,
targetSizeInBytes: int64):
Future[ArchiveDriverResult[void]] {.async.} =
## TODO: refactor this implementation and use partition management instead
## To remove 20% of the outdated data from database
const DeleteLimit = 0.80

## when db size overshoots the database limit, shread 20% of outdated messages
## get size of database
let dbSize = (await driver.getDatabaseSize()).valueOr:
return err("failed to get database size: " & $error)

## database size in bytes
let totalSizeOfDB: int64 = int64(dbSize)

if totalSizeOfDB < targetSizeInBytes:
return ok()

## to shread/delete messsges, get the total row/message count
let numMessages = (await driver.getMessagesCount()).valueOr:
return err("failed to get messages count: " & error)

## NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows
## if vacumming is done automatically then we aim to check DB size periodially for efficient
## retention policy implementation.

## 80% of the total messages are to be kept, delete others
let pageDeleteWindow = int(float(numMessages) * DeleteLimit)

(await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr:
return err("deleting oldest messages failed: " & error)

return ok()

method close*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =
## Close the database connection
Expand Down
5 changes: 5 additions & 0 deletions waku/waku_archive/driver/queue_driver/queue_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ method deleteOldestMessagesNotWithinLimit*(driver: QueueDriver,
# TODO: Implement this message_store method
return err("interface method not implemented")

method decreaseDatabaseSize*(driver: QueueDriver,
targetSizeInBytes: int64):
Future[ArchiveDriverResult[void]] {.async.} =
return err("interface method not implemented")

method close*(driver: QueueDriver):
Future[ArchiveDriverResult[void]] {.async.} =
return ok()
34 changes: 34 additions & 0 deletions waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,40 @@ method deleteOldestMessagesNotWithinLimit*(s: SqliteDriver,
Future[ArchiveDriverResult[void]] {.async.} =
return s.db.deleteOldestMessagesNotWithinLimit(limit)

method decreaseDatabaseSize*(driver: SqliteDriver,
targetSizeInBytes: int64):
Future[ArchiveDriverResult[void]] {.async.} =

## To remove 20% of the outdated data from database
const DeleteLimit = 0.80

## when db size overshoots the database limit, shread 20% of outdated messages
## get size of database
let dbSize = (await driver.getDatabaseSize()).valueOr:
return err("failed to get database size: " & $error)

## database size in bytes
let totalSizeOfDB: int64 = int64(dbSize)

if totalSizeOfDB < targetSizeInBytes:
return ok()

## to shread/delete messsges, get the total row/message count
let numMessages = (await driver.getMessagesCount()).valueOr:
return err("failed to get messages count: " & error)

## NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows
## if vacumming is done automatically then we aim to check DB size periodially for efficient
## retention policy implementation.

## 80% of the total messages are to be kept, delete others
let pageDeleteWindow = int(float(numMessages) * DeleteLimit)

(await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr:
return err("deleting oldest messages failed: " & error)

return ok()

method close*(s: SqliteDriver):
Future[ArchiveDriverResult[void]] {.async.} =
## Close the database connection
Expand Down
6 changes: 3 additions & 3 deletions waku/waku_archive/retention_policy/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ proc new*(T: type RetentionPolicy,
except ValueError:
return err("invalid time retention policy argument")

let retPolicy: RetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds)
let retPolicy: RetentionPolicy = TimeRetentionPolicy.new(retentionTimeSeconds)
return ok(some(retPolicy))

elif policy == "capacity":
Expand All @@ -49,7 +49,7 @@ proc new*(T: type RetentionPolicy,
except ValueError:
return err("invalid capacity retention policy argument")

let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity)
let retPolicy: RetentionPolicy = CapacityRetentionPolicy.new(retentionCapacity)
return ok(some(retPolicy))

elif policy == "size":
Expand Down Expand Up @@ -85,7 +85,7 @@ proc new*(T: type RetentionPolicy,
if sizeQuantity <= 0:
return err("invalid size retention policy argument: a non-zero value is required")

let retPolicy: RetentionPolicy = SizeRetentionPolicy.init(sizeQuantity)
let retPolicy: RetentionPolicy = SizeRetentionPolicy.new(sizeQuantity)
return ok(some(retPolicy))

else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ proc calculateOverflowWindow(capacity: int, overflow: float): int =
proc calculateDeleteWindow(capacity: int, overflow: float): int =
calculateOverflowWindow(capacity, overflow) div 2

proc init*(T: type CapacityRetentionPolicy, capacity=DefaultCapacity): T =
proc new*(T: type CapacityRetentionPolicy, capacity=DefaultCapacity): T =
let
totalCapacity = calculateTotalCapacity(capacity, MaxOverflow)
deleteWindow = calculateDeleteWindow(capacity, MaxOverflow)
Expand Down
43 changes: 5 additions & 38 deletions waku/waku_archive/retention_policy/retention_policy_size.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ else:
{.push raises: [].}

import
std/times,
stew/results,
chronicles,
chronos,
os
chronos
import
../driver,
../retention_policy
Expand All @@ -19,51 +17,20 @@ logScope:
# default size is 30 GiB or 32212254720.0 in bytes
const DefaultRetentionSize*: int64 = 32212254720

# to remove 20% of the outdated data from database
const DeleteLimit = 0.80

type
# SizeRetentionPolicy implements auto delete as follows:
# - sizeLimit is the size in bytes the database can grow upto
# to reduce the size of the databases, remove the rows/number-of-messages
# DeleteLimit is the total number of messages to delete beyond this limit
# when the database size crosses the sizeLimit, then only a fraction of messages are kept,
# rest of the outdated message are deleted using deleteOldestMessagesNotWithinLimit(),
# upon deletion process the fragmented space is retrieve back using Vacuum process.
SizeRetentionPolicy* = ref object of RetentionPolicy
sizeLimit: int64
sizeLimit: int64

proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T =
proc new*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T =
SizeRetentionPolicy(
sizeLimit: size
)

method execute*(p: SizeRetentionPolicy,
driver: ArchiveDriver):
Future[RetentionPolicyResult[void]] {.async.} =
## when db size overshoots the database limit, shread 20% of outdated messages
# get size of database
let dbSize = (await driver.getDatabaseSize()).valueOr:
return err("failed to get database size: " & $error)

# database size in bytes
let totalSizeOfDB: int64 = int64(dbSize)

if totalSizeOfDB < p.sizeLimit:
return ok()

# to shread/delete messsges, get the total row/message count
let numMessages = (await driver.getMessagesCount()).valueOr:
return err("failed to get messages count: " & error)

# NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows
# if vacumming is done automatically then we aim to check DB size periodially for efficient
# retention policy implementation.

# 80% of the total messages are to be kept, delete others
let pageDeleteWindow = int(float(numMessages) * DeleteLimit)

(await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr:
return err("deleting oldest messages failed: " & error)
(await driver.decreaseDatabaseSize(p.sizeLimit)).isOkOr:
return err("decreaseDatabaseSize failed: " & $error)

return ok()
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type TimeRetentionPolicy* = ref object of RetentionPolicy
retentionTime: chronos.Duration


proc init*(T: type TimeRetentionPolicy, retentionTime=DefaultRetentionTime): T =
proc new*(T: type TimeRetentionPolicy, retentionTime=DefaultRetentionTime): T =
TimeRetentionPolicy(
retentionTime: retentionTime.seconds
)
Expand Down
Loading