This repository has been archived by the owner on Sep 29, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
securitykey.py
104 lines (90 loc) · 3.33 KB
/
securitykey.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# -*- coding: utf-8 -*-
"""
This module provides onetime keys. Such a Securitykey can only be used once to authenticate an action like
edit an entry. Unless specified otherwise, keys are bound to a session. This prevents such actions from beeing
executed without explicit user consent so an attacker can't send special crafted links (like /user/delete/xxx)
to a authenticated user as these links would lack a valid securityKey.
Its also possible to store data along with a securityKey and specify a lifeTime.
"""
from datetime import datetime, timedelta
from server.utils import generateRandomString
from server.session import current as currentSession
from server import db, conf
from server.tasks import PeriodicTask, callDeferred
securityKeyKindName = "viur-securitykeys"
def create( duration=None, **kwargs ):
"""
Creates a new onetime Securitykey for the current session
If duration is not set, this key is valid only for the current session.
Otherwise, the key and its data is serialized and saved inside the datastore
for up to duration-seconds
:param duration: Make this key valid for a fixed timeframe (and independend of the current session)
:type duration: int or None
:returns: The new onetime key
"""
key = generateRandomString()
if duration is None:
sessionDependend = True
duration = 30*60 # 30 Mins from now
else:
sessionDependend = False
duration = int( duration )
dbObj = db.Entity(securityKeyKindName, name=key )
for k, v in kwargs.items():
dbObj[ k ] = v
dbObj["until"] = datetime.now()+timedelta( seconds=duration )
if sessionDependend:
dbObj["session"] = currentSession.getSessionKey()
else:
dbObj["session"] = None
dbObj.set_unindexed_properties( [x for x in dbObj.keys() if not x=="until" ] )
db.Put( dbObj )
return( key )
def validate( key, acceptSessionKey=False ):
"""
Validates a onetime securitykey
:type key: str
:param key: The key to validate
:type acceptSessionKey: Bool
:param acceptSessionKey: If True, we also accept the session's skey
:returns: False if the key was not valid for whatever reasons, the data (given during createSecurityKey) as dictionary or True if the dict is empty.
"""
if acceptSessionKey:
if key==currentSession.getSessionSecurityKey():
return( True )
try:
dbObj = db.Get( db.Key.from_path( securityKeyKindName, key ) )
except:
return( False )
if dbObj:
if "session" in dbObj and dbObj["session"] is not None:
if dbObj["session"] != currentSession.getSessionKey():
return( False )
db.Delete( dbObj.key() )
if dbObj[ "until" ] < datetime.now(): #This key has expired
return( False )
res ={}
for k in dbObj.keys():
res[ k ] = dbObj[ k ]
del res["session"]
del res["until"]
if not res:
return( True )
return( res )
return( False )
@PeriodicTask(60*4)
def startClearSKeys():
"""
Removes old (expired) skeys
"""
doClearSKeys( (datetime.now()-timedelta(seconds=300)).strftime("%d.%m.%Y %H:%M:%S"), None )
@callDeferred
def doClearSKeys( timeStamp, cursor ):
gotAtLeastOne = False
query = db.Query( securityKeyKindName ).filter( "until <", datetime.strptime(timeStamp,"%d.%m.%Y %H:%M:%S") )
for oldKey in query.run(100, keysOnly=True):
gotAtLeastOne = True
db.Delete( oldKey )
newCursor = query.getCursor()
if gotAtLeastOne and newCursor and newCursor.urlsafe()!=cursor:
doClearSKeys( timeStamp, newCursor.urlsafe() )