-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
--------- Co-authored-by: Vlad Stan <stan.v.vlad@gmail.com>
- Loading branch information
Showing
18 changed files
with
2,328 additions
and
2,605 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,168 +1,87 @@ | ||
from typing import List, Optional, Union | ||
from time import time | ||
from typing import Optional, Union | ||
|
||
from lnbits.db import Database | ||
from lnbits.helpers import urlsafe_short_hash | ||
from loguru import logger | ||
|
||
from .models import CreateTposData, LNURLCharge, TPoS, TPoSClean | ||
from .models import CreateTposData, LnurlCharge, Tpos, TposClean | ||
|
||
db = Database("ext_tpos") | ||
|
||
|
||
async def get_current_timestamp(): | ||
# Get current DB timestamp | ||
timestamp_query = f"SELECT {db.timestamp_now}" | ||
if db.type in {"POSTGRES", "COCKROACH"}: | ||
timestamp_query = f"SELECT EXTRACT(EPOCH FROM {db.timestamp_now})" | ||
current_timestamp = (await db.fetchone(timestamp_query))[0] | ||
return int(current_timestamp) | ||
|
||
|
||
async def create_tpos(wallet_id: str, data: CreateTposData) -> TPoS: | ||
async def create_tpos(data: CreateTposData) -> Tpos: | ||
tpos_id = urlsafe_short_hash() | ||
await db.execute( | ||
""" | ||
INSERT INTO tpos.pos | ||
( | ||
id, wallet, name, currency, tip_options, tip_wallet, withdrawlimit, | ||
withdrawpin, withdrawamt, withdrawtime, withdrawbtwn, withdrawtimeopt, | ||
withdrawpindisabled, withdrawpremium | ||
) | ||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | ||
""", | ||
( | ||
tpos_id, | ||
wallet_id, | ||
data.name, | ||
data.currency, | ||
data.tip_options, | ||
data.tip_wallet, | ||
data.withdrawlimit, | ||
data.withdrawpin, | ||
0, | ||
0, | ||
data.withdrawbtwn, | ||
data.withdrawtimeopt, | ||
data.withdrawpindisabled, | ||
data.withdrawpremium, | ||
), | ||
) | ||
tpos = await get_tpos(tpos_id) | ||
assert tpos, "Newly created tpos couldn't be retrieved" | ||
tpos = Tpos(id=tpos_id, **data.dict()) | ||
await db.insert("tpos.pos", tpos) | ||
return tpos | ||
|
||
|
||
async def get_tpos(tpos_id: str) -> Optional[TPoS]: | ||
row = await db.fetchone("SELECT * FROM tpos.pos WHERE id = ?", (tpos_id,)) | ||
return TPoS(**row) if row else None | ||
|
||
async def get_tpos(tpos_id: str) -> Optional[Tpos]: | ||
return await db.fetchone( | ||
"SELECT * FROM tpos.pos WHERE id = :id", {"id": tpos_id}, Tpos | ||
) | ||
|
||
async def start_lnurlcharge(tpos_id: str): | ||
tpos = await get_tpos(tpos_id) | ||
assert tpos, f"TPoS with {tpos_id} not found!" | ||
|
||
now = await get_current_timestamp() | ||
withdraw_time_seconds = ( | ||
tpos.withdrawbtwn * 60 if tpos.withdrawtimeopt != "secs" else tpos.withdrawbtwn | ||
async def start_lnurlcharge(tpos: Tpos) -> LnurlCharge: | ||
now = int(time()) | ||
seconds = ( | ||
tpos.withdraw_between * 60 | ||
if tpos.withdraw_time_option != "secs" | ||
else tpos.withdraw_between | ||
) | ||
last_withdraw = tpos.withdraw_time - now | ||
assert ( | ||
now - tpos.withdrawtime > withdraw_time_seconds | ||
last_withdraw < seconds | ||
), f""" | ||
Last withdraw was made too recently, please try again in | ||
{int(withdraw_time_seconds - (now - tpos.withdrawtime))} secs | ||
Last withdraw was made too recently, please try again in | ||
{int(seconds - (last_withdraw))} secs | ||
""" | ||
|
||
token = urlsafe_short_hash() | ||
await db.execute( | ||
""" | ||
INSERT INTO tpos.withdraws (id, tpos_id) | ||
VALUES (?, ?) | ||
VALUES (:id, :tpos_id) | ||
""", | ||
(token, tpos_id), | ||
{"id": token, "tpos_id": tpos.id}, | ||
) | ||
lnurlcharge = await get_lnurlcharge(token) | ||
assert lnurlcharge, "Newly created lnurlcharge couldn't be retrieved" | ||
return lnurlcharge | ||
|
||
|
||
async def get_lnurlcharge(lnurlcharge_id: str) -> Optional[LNURLCharge]: | ||
row = await db.fetchone( | ||
"SELECT * FROM tpos.withdraws WHERE id = ?", (lnurlcharge_id,) | ||
async def get_lnurlcharge(lnurlcharge_id: str) -> Optional[LnurlCharge]: | ||
return await db.fetchone( | ||
"SELECT * FROM tpos.withdraws WHERE id = :id", | ||
{"id": lnurlcharge_id}, | ||
LnurlCharge, | ||
) | ||
return LNURLCharge(**row) if row else None | ||
|
||
|
||
async def update_lnurlcharge(data: LNURLCharge) -> LNURLCharge: | ||
# Construct the SET clause for the SQL query | ||
set_clause = ", ".join([f"{field[0]} = ?" for field in data.dict().items()]) | ||
|
||
# Get the values for the SET clause | ||
set_values = list(data.dict().values()) | ||
set_values.append(data.id) # Add the ID for the WHERE clause | ||
|
||
# Execute the UPDATE statement | ||
await db.execute( | ||
f"UPDATE tpos.withdraws SET {set_clause} WHERE id = ?", tuple(set_values) | ||
) | ||
|
||
lnurlcharge = await get_lnurlcharge(data.id) | ||
assert lnurlcharge, "Withdraw couldn't be retrieved" | ||
|
||
return lnurlcharge | ||
|
||
|
||
async def get_clean_tpos(tpos_id: str) -> Optional[TPoSClean]: | ||
row = await db.fetchone("SELECT * FROM tpos.pos WHERE id = ?", (tpos_id,)) | ||
return TPoSClean(**row) if row else None | ||
async def update_lnurlcharge(charge: LnurlCharge) -> LnurlCharge: | ||
await db.update("tpos.withdraws", charge) | ||
return charge | ||
|
||
|
||
async def update_tpos_withdraw(data: TPoS, tpos_id: str) -> TPoS: | ||
# Calculate the time between withdrawals in seconds | ||
now = await get_current_timestamp() | ||
time_elapsed = now - data.withdrawtime | ||
withdraw_time_seconds = ( | ||
data.withdrawbtwn * 60 if data.withdrawtimeopt != "secs" else data.withdrawbtwn | ||
async def get_clean_tpos(tpos_id: str) -> Optional[TposClean]: | ||
return await db.fetchone( | ||
"SELECT * FROM tpos.pos WHERE id = :id", {"id": tpos_id}, TposClean | ||
) | ||
|
||
logger.debug(f"Time between: {time_elapsed} seconds") | ||
|
||
# Check if the time between withdrawals is less than withdrawbtwn | ||
assert ( | ||
time_elapsed > withdraw_time_seconds | ||
), f""" | ||
Last withdraw was made too recently, please try again in | ||
{int(withdraw_time_seconds - (time_elapsed))} secs" | ||
""" | ||
|
||
# Update the withdraw time in the database | ||
await db.execute( | ||
"UPDATE tpos.pos SET withdrawtime = ? WHERE id = ?", (now, tpos_id) | ||
) | ||
|
||
tpos = await get_tpos(tpos_id) | ||
assert tpos, "Newly updated tpos couldn't be retrieved" | ||
async def update_tpos(tpos: Tpos) -> Tpos: | ||
await db.update("tpos.pos", tpos) | ||
return tpos | ||
|
||
|
||
async def update_tpos(tpos_id: str, **kwargs) -> TPoS: | ||
q = ", ".join([f"{field[0]} = ?" for field in kwargs.items()]) | ||
await db.execute( | ||
f"UPDATE tpos.pos SET {q} WHERE id = ?", (*kwargs.values(), tpos_id) | ||
) | ||
tpos = await get_tpos(tpos_id) | ||
assert tpos, "Newly updated tpos couldn't be retrieved" | ||
return tpos | ||
|
||
|
||
async def get_tposs(wallet_ids: Union[str, List[str]]) -> List[TPoS]: | ||
async def get_tposs(wallet_ids: Union[str, list[str]]) -> list[Tpos]: | ||
if isinstance(wallet_ids, str): | ||
wallet_ids = [wallet_ids] | ||
|
||
q = ",".join(["?"] * len(wallet_ids)) | ||
rows = await db.fetchall( | ||
f"SELECT * FROM tpos.pos WHERE wallet IN ({q})", (*wallet_ids,) | ||
q = ",".join([f"'{wallet_id}'" for wallet_id in wallet_ids]) | ||
return await db.fetchall( | ||
f"SELECT * FROM tpos.pos WHERE wallet IN ({q})", model=Tpos | ||
) | ||
return [TPoS(**row) for row in rows] | ||
|
||
|
||
async def delete_tpos(tpos_id: str) -> None: | ||
await db.execute("DELETE FROM tpos.pos WHERE id = ?", (tpos_id,)) | ||
await db.execute("DELETE FROM tpos.pos WHERE id = :id", {"id": tpos_id}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.