Skip to content

Commit

Permalink
update with more test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
lenkan committed Dec 10, 2024
1 parent 04fc50c commit fe75188
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 6 deletions.
4 changes: 4 additions & 0 deletions src/keria/app/agenting.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ def __init__(self, agency: Agency, username: str | None = None, password: str |
self.agency = agency

def authenticate(self, req: falcon.Request):
# Username AND Password is not set, so no need to authenticate
if self.username is None and self.password is None:
return

Expand All @@ -898,6 +899,9 @@ def authenticate(self, req: falcon.Request):
try:
username, password = b64decode(token).decode('utf-8').split(':')

if username is None or password is None:
raise falcon.HTTPUnauthorized(title="Unauthorized")

if username == self.username and password == self.password:
return

Expand Down
46 changes: 40 additions & 6 deletions tests/app/test_agenting.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ def test_protected_boot_ends(helpers):
app = falcon.App()
client = testing.TestClient(app)

username = "test"
password = "test"
username = "user"
password = "secret"

bootEnd = agenting.BootEnd(agency, username=username, password=password)
app.add_route("/boot", bootEnd)
Expand All @@ -314,19 +314,53 @@ def test_protected_boot_ends(helpers):
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": "Something test"})
assert rep.status_code == 401

rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": "Basic test:test"})
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": "Basic user:secret"})
assert rep.status_code == 401

rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(b'test:foobar').decode('utf-8')}"} )
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(b'test:secret').decode('utf-8')}"} )
assert rep.status_code == 401

rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(b'foobar:test').decode('utf-8')}"} )
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(b'user').decode('utf-8')}"} )
assert rep.status_code == 401

authorization = f"Basic {b64encode(b'test:test').decode('utf-8')}"
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(b'user:test').decode('utf-8')}"} )
assert rep.status_code == 401

authorization = f"Basic {b64encode(b'user:secret').decode('utf-8')}"
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": authorization})
assert rep.status_code == 202

def test_misconfigured_protected_boot_ends(helpers):
agency = agenting.Agency(name="agency", bran=None, temp=True)
doist = doing.Doist(limit=1.0, tock=0.03125, real=True)
doist.enter(doers=[agency])

serder, sigers = helpers.controller()
assert serder.pre == helpers.controllerAID

app = falcon.App()
client = testing.TestClient(app)

# No password set, should return 401
bootEnd = agenting.BootEnd(agency, username="user", password=None)
app.add_route("/boot", bootEnd)

body = dict(
icp=serder.ked,
sig=sigers[0].qb64,
salty=dict(
stem='signify:aid', pidx=0, tier='low', sxlt='OBXYZ',
icodes=[MtrDex.Ed25519_Seed], ncodes=[MtrDex.Ed25519_Seed]
)
)

authorization = f"Basic {b64encode(b'user').decode('utf-8')}"
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": authorization})
assert rep.status_code == 401

authorization = f"Basic {b64encode(b'user:secret').decode('utf-8')}"
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": authorization})
assert rep.status_code == 401

def test_witnesser(helpers):
salt = b'0123456789abcdef'
Expand Down

0 comments on commit fe75188

Please sign in to comment.