Skip to content

Commit

Permalink
fix: extended Postgres code to support retention policy + refactoring (
Browse files Browse the repository at this point in the history
…#2244)

* updated Postgres retention policy code + refactoring

* Update waku/waku_archive/driver/postgres_driver/postgres_driver.nim

Co-authored-by: Simon-Pierre Vivier <simvivier@status.im>

* updated code review changes

* data unit fixed, processing everything in bytes now

---------

Co-authored-by: Simon-Pierre Vivier <simvivier@status.im>
  • Loading branch information
ABresting and SionoiS authored Nov 24, 2023
1 parent 110de90 commit a1ed517
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 30 deletions.
10 changes: 4 additions & 6 deletions tests/waku_archive/test_retention_policy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ suite "Waku Archive - Retention policy":
test "size retention policy - windowed message deletion":
## Given
let
# in megabytes
sizeLimit:float = 0.05
# in bytes
sizeLimit:int64 = 52428
excess = 325

let driver = newTestArchiveDriver()
Expand Down Expand Up @@ -92,9 +92,7 @@ suite "Waku Archive - Retention policy":

## Then
# calculate the current database size
let pageSize = (waitFor driver.getPagesSize()).tryGet()
let pageCount = (waitFor driver.getPagesCount()).tryGet()
let sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0)
let sizeDB = int64((waitFor driver.getDatabaseSize()).tryGet())

# NOTE: since vacuumin is done manually, this needs to be revisited if vacuuming done automatically

Expand All @@ -105,7 +103,7 @@ suite "Waku Archive - Retention policy":
require (sizeDB >= sizeLimit)
require (waitFor retentionPolicy.execute(driver)).isOk()

# get the number or rows from DB
# get the number or rows from database
let rowCountAfterDeletion = (waitFor driver.getMessagesCount()).tryGet()

check:
Expand Down
14 changes: 14 additions & 0 deletions waku/common/databases/db_sqlite.nim
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,20 @@ proc getPageCount*(db: SqliteDatabase): DatabaseResult[int64] =

return ok(count)

proc getDatabaseSize*(db: SqliteDatabase): DatabaseResult[int64] =
# get the database page size in bytes
var pageSize: int64 = ?db.getPageSize()

if pageSize == 0:
return err("failed to get page size ")

# get the database page count
let pageCount = ?db.getPageCount()

let databaseSize = (pageSize * pageCount)

return ok(databaseSize)

proc gatherSqlitePageStats*(db: SqliteDatabase):
DatabaseResult[(int64, int64, int64)] =
let
Expand Down
3 changes: 3 additions & 0 deletions waku/waku_archive/driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ method getPagesCount*(driver: ArchiveDriver):
method getPagesSize*(driver: ArchiveDriver):
Future[ArchiveDriverResult[int64]] {.base, async.} = discard

method getDatabaseSize*(driver: ArchiveDriver):
Future[ArchiveDriverResult[int64]] {.base, async.} = discard

method performVacuum*(driver: ArchiveDriver):
Future[ArchiveDriverResult[void]] {.base, async.} = discard

Expand Down
9 changes: 9 additions & 0 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,15 @@ proc getInt(s: PostgresDriver,

return ok(retInt)

method getDatabaseSize*(s: PostgresDriver):
Future[ArchiveDriverResult[int64]] {.async.} =

let intRes = (await s.getInt("SELECT pg_database_size(current_database())")).valueOr:
return err("error in getDatabaseSize: " & error)

let databaseSize: int64 = int64(intRes)
return ok(databaseSize)

method getMessagesCount*(s: PostgresDriver):
Future[ArchiveDriverResult[int64]] {.async.} =

Expand Down
4 changes: 4 additions & 0 deletions waku/waku_archive/driver/queue_driver/queue_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ method getPagesSize*(driver: QueueDriver):
Future[ArchiveDriverResult[int64]] {.async} =
return ok(int64(driver.len()))

method getDatabasesSize*(driver: QueueDriver):
Future[ArchiveDriverResult[int64]] {.async} =
return ok(int64(driver.len()))

method performVacuum*(driver: QueueDriver):
Future[ArchiveDriverResult[void]] {.async.} =
return err("interface method not implemented")
Expand Down
4 changes: 4 additions & 0 deletions waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ method getPagesSize*(s: SqliteDriver):
Future[ArchiveDriverResult[int64]] {.async.} =
return s.db.getPageSize()

method getDatabaseSize*(s: SqliteDriver):
Future[ArchiveDriverResult[int64]] {.async.} =
return s.db.getDatabaseSize()

method performVacuum*(s: SqliteDriver):
Future[ArchiveDriverResult[void]] {.async.} =
return s.db.performSqliteVacuum()
Expand Down
15 changes: 9 additions & 6 deletions waku/waku_archive/retention_policy/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,27 @@ proc new*(T: type RetentionPolicy,
var retentionSize: string
retentionSize = policyArgs

# captures the size unit such as Gb or Mb
# captures the size unit such as GB or MB
let sizeUnit = retentionSize.substr(retentionSize.len-2)
# captures the string type number data of the size provided
let sizeQuantityStr = retentionSize.substr(0,retentionSize.len-3)
# to hold the numeric value data of size
var sizeQuantity: float
var inptSizeQuantity: float
var sizeQuantity: int64

if sizeUnit in ["gb", "Gb", "GB", "gB"]:
# parse the actual value into integer type var
try:
sizeQuantity = parseFloat(sizeQuantityStr)
inptSizeQuantity = parseFloat(sizeQuantityStr)
except ValueError:
return err("invalid size retention policy argument: " & getCurrentExceptionMsg())
# Gb data is converted into Mb for uniform processing
sizeQuantity = sizeQuantity * 1024
# GB data is converted into bytes for uniform processing
sizeQuantity = int64(inptSizeQuantity * 1024.0 * 1024.0 * 1024.0)
elif sizeUnit in ["mb", "Mb", "MB", "mB"]:
try:
sizeQuantity = parseFloat(sizeQuantityStr)
inptSizeQuantity = parseFloat(sizeQuantityStr)
# MB data is converted into bytes for uniform processing
sizeQuantity = int64(inptSizeQuantity * 1024.0 * 1024.0)
except ValueError:
return err("invalid size retention policy argument")
else:
Expand Down
27 changes: 9 additions & 18 deletions waku/waku_archive/retention_policy/retention_policy_size.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@ import
logScope:
topics = "waku archive retention_policy"

# default size is 30 Gb
const DefaultRetentionSize*: float = 30_720
# default size is 30 GiB or 32212254720.0 in bytes
const DefaultRetentionSize*: int64 = 32212254720

# to remove 20% of the outdated data from database
const DeleteLimit = 0.80

type
# SizeRetentionPolicy implements auto delete as follows:
# - sizeLimit is the size in megabytes (Mbs) the database can grow upto
# - sizeLimit is the size in bytes the database can grow upto
# to reduce the size of the databases, remove the rows/number-of-messages
# DeleteLimit is the total number of messages to delete beyond this limit
# when the database size crosses the sizeLimit, then only a fraction of messages are kept,
# rest of the outdated message are deleted using deleteOldestMessagesNotWithinLimit(),
# upon deletion process the fragmented space is retrieve back using Vacuum process.
SizeRetentionPolicy* = ref object of RetentionPolicy
sizeLimit: float
sizeLimit: int64

proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T =
SizeRetentionPolicy(
Expand All @@ -42,21 +42,12 @@ method execute*(p: SizeRetentionPolicy,
driver: ArchiveDriver):
Future[RetentionPolicyResult[void]] {.async.} =
## when db size overshoots the database limit, shread 20% of outdated messages
# get size of database
let dbSize = (await driver.getDatabaseSize()).valueOr:
return err("failed to get database size: " & $error)

# get page size of database
let pageSizeRes = await driver.getPagesSize()
let pageSize: int64 = int64(pageSizeRes.valueOr(0) div 1024)

if pageSize == 0:
return err("failed to get Page size: " & pageSizeRes.error)

# to get the size of the database, pageCount and PageSize is required
# get page count in "messages" database
let pageCount = (await driver.getPagesCount()).valueOr:
return err("failed to get Pages count: " & $error)

# database size in megabytes (Mb)
let totalSizeOfDB: float = float(pageSize * pageCount)/1024.0
# database size in bytes
let totalSizeOfDB: int64 = int64(dbSize)

if totalSizeOfDB < p.sizeLimit:
return ok()
Expand Down

0 comments on commit a1ed517

Please sign in to comment.