Skip to content

Commit

Permalink
Fixed: More correct age checks
Browse files Browse the repository at this point in the history
  • Loading branch information
mnot committed Dec 8, 2023
1 parent 26aabe4 commit 82fcb01
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 76 deletions.
166 changes: 92 additions & 74 deletions httplint/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,29 @@
class ResponseCacheChecker:
def __init__(self, response: "HttpResponseLinter") -> None:
self._response = response
self._cc_dict: dict

self._request = cast("HttpRequestLinter", response.related)
self.notes = response.notes
self.request = cast("HttpRequestLinter", response.related)
self.age: int = None
self.store_private: bool
self.freshness_lifetime_private: int = None
self.store_shared: bool
self.freshness_lifetime_shared: int = None

def check(self) -> None:
self.age_value = response.headers.parsed.get("age", 0)
self.date_value = response.headers.parsed.get("date", None)
self.expires_value = response.headers.parsed.get("expires", None)
self.lm_value = response.headers.parsed.get("last-modified", None)
self.etag_value = response.headers.parsed.get("etag", None)
self.vary_value = response.headers.parsed.get("vary", set())
self.cc_value = response.headers.parsed.get("cache-control", [])
self.cc_dict = dict(self.cc_value)

if self._request:
self.request_time = self._request.start_time
else:
self.request_time = None
self.response_time = response.start_time

if not self.check_basic():
return
if not self.check_last_modified():
Expand All @@ -50,67 +62,65 @@ def check(self) -> None:
return
if not self.check_vary():
return
if not self.check_age():
return
if not self.check_freshness():
return

def check_basic(self) -> bool:
# Who can store this?
if self.request and self.request.method not in CACHEABLE_METHODS:
if self._request and self._request.method not in CACHEABLE_METHODS:
self.store_shared = self.store_private = False
self.request.notes.add(
"method", METHOD_UNCACHEABLE, method=self.request.method
self._request.notes.add(
"method", METHOD_UNCACHEABLE, method=self._request.method
)
return False

return True

def check_last_modified(self) -> bool:
date_hdr = self._response.headers.parsed.get("date", None)
lm_hdr = self._response.headers.parsed.get("last-modified", None)
if lm_hdr:
serv_date = date_hdr or self._response.start_time
if self.lm_value:
serv_date = self.date_value or self.response_time
if not serv_date:
return True # we don't know
if lm_hdr > serv_date:
if self.lm_value > serv_date:
self.notes.add("header-last-modified", LM_FUTURE)
else:
self.notes.add(
"header-last-modified",
LM_PRESENT,
last_modified_string=relative_time(lm_hdr, serv_date),
last_modified_string=relative_time(self.lm_value, serv_date),
)
return True

def check_cache_control(self) -> bool:
cc_set = self._response.headers.parsed.get("cache-control", [])
cc_list = [k for (k, v) in cc_set]
self._cc_dict = dict(cc_set)
cc_list = [k for (k, v) in self.cc_value]

for cc in self._cc_dict:
for cc in self.cc_dict:
if cc.lower() in KNOWN_CC and cc != cc.lower():
self.notes.add(
"header-cache-control", CC_MISCAP, cc_lower=cc.lower(), cc=cc
)
if cc in KNOWN_CC and cc_list.count(cc) > 1:
self.notes.add("header-cache-control", CC_DUP, cc=cc)

if "no-store" in self._cc_dict:
if "no-store" in self.cc_dict:
self.store_shared = self.store_private = False
self.notes.add("header-cache-control", NO_STORE)
return False

if "private" in self._cc_dict:
if "private" in self.cc_dict:
self.store_shared = False
self.store_private = True
self.notes.add("header-cache-control", PRIVATE_CC)

if "public" in self._cc_dict:
if "public" in self.cc_dict:
self.notes.add("header-cache-control", PRIVATE_PUBLIC_CONFLICT)

elif self.request and "authorization" in [
k.lower() for k, v in self.request.headers.text
elif self._request and "authorization" in [
k.lower() for k, v in self._request.headers.text
]:
if "public" in self._cc_dict:
if "public" in self.cc_dict:
self.store_shared = True
self.store_private = True
self.notes.add("header-cache-control", PUBLIC_AUTH)
Expand All @@ -122,27 +132,25 @@ def check_cache_control(self) -> bool:
self.store_shared = self.store_private = True
self.notes.add("header-cache-control", STORABLE)

if "public" in self._cc_dict:
if "public" in self.cc_dict:
self.notes.add("header-cache-control", PUBLIC_UNNECESSARY)

# no-cache?
if "no-cache" in self._cc_dict:
etag_hdr = self._response.headers.parsed.get("etag", None)
lm_hdr = self._response.headers.parsed.get("last-modified", None)
if lm_hdr is None and etag_hdr is None:
if "no-cache" in self.cc_dict:
if self.lm_value is None and self.etag_value is None:
self.notes.add("header-cache-control", NO_CACHE_NO_VALIDATOR)
else:
self.notes.add("header-cache-control", NO_CACHE)

# pre-check / post-check
if "pre-check" in self._cc_dict or "post-check" in self._cc_dict:
if "pre-check" not in self._cc_dict or "post-check" not in self._cc_dict:
if "pre-check" in self.cc_dict or "post-check" in self.cc_dict:
if "pre-check" not in self.cc_dict or "post-check" not in self.cc_dict:
self.notes.add("header-cache-control", CHECK_SINGLE)
else:
pre_check = post_check = None
try:
pre_check = int(self._cc_dict["pre-check"])
post_check = int(self._cc_dict["post-check"])
pre_check = int(self.cc_dict["pre-check"])
post_check = int(self.cc_dict["post-check"])
except ValueError:
self.notes.add("header-cache-control", CHECK_NOT_INTEGER)
if pre_check is not None and post_check is not None:
Expand All @@ -163,48 +171,54 @@ def check_cache_control(self) -> bool:
return True

def check_vary(self) -> bool:
vary_hdr = self._response.headers.parsed.get("vary", set())
if "*" in vary_hdr:
if "*" in self.vary_value:
self.notes.add("header-vary", VARY_ASTERISK)
return False
if len(vary_hdr) > 3:
self.notes.add("header-vary", VARY_COMPLEX, vary_count=f_num(len(vary_hdr)))
if len(self.vary_value) > 3:
self.notes.add(
"header-vary", VARY_COMPLEX, vary_count=f_num(len(self.vary_value))
)
else:
if "user-agent" in vary_hdr:
if "user-agent" in self.vary_value:
self.notes.add("header-vary", VARY_USER_AGENT)
if "host" in vary_hdr:
if "host" in self.vary_value:
self.notes.add("header-vary", VARY_HOST)
return True

def check_freshness(self) -> bool:
expires_hdr_present = "expires" in [
n.lower() for (n, v) in self._response.headers.text
]
expires_hdr = self._response.headers.parsed.get("expires", None)
lm_hdr = self._response.headers.parsed.get("last-modified", None)
age_hdr = self._response.headers.parsed.get("age", None)
date_hdr = self._response.headers.parsed.get("date", None)

self.age = age_hdr or 0
age_str = relative_time(self.age, 0, 0)
if self._response.start_time and date_hdr and date_hdr > 0:
apparent_age = max(0, int(self._response.start_time) - date_hdr)
def check_age(self) -> bool:
if self.response_time and self.date_value and self.date_value > 0:
apparent_age = max(0, int(self.response_time) - self.date_value)
else:
apparent_age = 0
current_age = max(apparent_age, self.age)
current_age_str = relative_time(current_age, 0, 0)

if self.request_time and self.response_time:
response_delay = self.response_time - self.request_time
corrected_age_value = self.age_value + response_delay
else:
corrected_age_value = self.age_value

corrected_initial_age = max(apparent_age, corrected_age_value)
self.age = corrected_initial_age

if self.age >= 1:
age_str = relative_time(self.age, 0, 0)
self.notes.add("header-age header-date", CURRENT_AGE, age=age_str)
return True

if not date_hdr:
def check_freshness(self) -> bool:
expires_hdr_present = "expires" in [
n.lower() for (n, v) in self._response.headers.text
]

if not self.date_value:
self.notes.add("", DATE_CLOCKLESS)
if expires_hdr or lm_hdr:
if self.expires_value or self.lm_value:
self.notes.add(
"header-expires header-last-modified", DATE_CLOCKLESS_BAD_HDR
)
elif self._response.start_time:
skew = date_hdr - int(self._response.start_time) + (self.age)
if self.age > MAX_CLOCK_SKEW > (current_age - skew):
elif self.response_time:
skew = self.date_value - int(self.response_time) + (self.age_value)
if self.age_value > MAX_CLOCK_SKEW > (self.age_value - skew):
self.notes.add("header-date header-age", AGE_PENALTY)
elif abs(skew) > MAX_CLOCK_SKEW:
self.notes.add(
Expand All @@ -220,36 +234,38 @@ def check_freshness(self) -> bool:
has_explicit_freshness = False
has_cc_freshness = False
freshness_hdrs = ["header-date"]
if "max-age" in self._cc_dict:
self.freshness_lifetime_private = self._cc_dict["max-age"]
self.freshness_lifetime_shared = self._cc_dict["max-age"]
if "max-age" in self.cc_dict:
self.freshness_lifetime_private = self.cc_dict["max-age"]
self.freshness_lifetime_shared = self.cc_dict["max-age"]
freshness_hdrs.append("header-cache-control")
has_explicit_freshness = True
has_cc_freshness = True
if "s-maxage" in self._cc_dict:
self.freshness_lifetime_shared = self._cc_dict["s-maxage"]
if "s-maxage" in self.cc_dict:
self.freshness_lifetime_shared = self.cc_dict["s-maxage"]
freshness_hdrs.append("header-cache-control")
has_explicit_freshness = True
has_cc_freshness = True
if expires_hdr_present and self._response.start_time:
if expires_hdr_present and self.response_time:
# An invalid Expires header means it's automatically stale
has_explicit_freshness = True
freshness_hdrs.append("header-expires")
expires_lifetime = self.freshness_lifetime_shared = (expires_hdr or 0) - (
date_hdr or int(self._response.start_time)
)
expires_lifetime = self.freshness_lifetime_shared = (
self.expires_value or 0
) - (self.date_value or int(self.response_time))
if not self.freshness_lifetime_private:
self.freshness_lifetime_private = expires_lifetime
if not self.freshness_lifetime_shared:
self.freshness_lifetime_shared = expires_lifetime

freshness_left = self.freshness_lifetime_private - current_age
freshness_left = self.freshness_lifetime_private - self.age
freshness_left_str = relative_time(abs(int(freshness_left)), 0, 0)
freshness_lifetime_str = relative_time(
int(self.freshness_lifetime_private), 0, 0
)

fresh = freshness_left > 0
current_age_str = relative_time(self.age, 0, 0)

if has_explicit_freshness:
if fresh:
self.notes.add(
Expand Down Expand Up @@ -281,12 +297,12 @@ def check_freshness(self) -> bool:
else:
self.notes.add("", FRESHNESS_NONE)

if "must-revalidate" in self._cc_dict:
if "must-revalidate" in self.cc_dict:
if fresh:
self.notes.add("header-cache-control", FRESH_MUST_REVALIDATE)
elif has_explicit_freshness:
self.notes.add("header-cache-control", STALE_MUST_REVALIDATE)
elif "proxy-revalidate" in self._cc_dict or "s-maxage" in self._cc_dict:
elif "proxy-revalidate" in self.cc_dict or "s-maxage" in self.cc_dict:
if fresh:
self.notes.add("header-cache-control", FRESH_PROXY_REVALIDATE)
elif has_explicit_freshness:
Expand Down Expand Up @@ -505,9 +521,9 @@ class CURRENT_AGE(Note):
level = levels.INFO
_summary = "%(message)s has been cached for %(age)s."
_text = """\
The `Age` header indicates the age of the response; i.e., how long it has been cached since it was
generated. HTTP takes this as well as any apparent clock skew into account in computing how old the
response already is."""
HTTP defines an algorithm that uses the `Age` header, `Date` header, and local time observations to
determine the age of a response. As a result, caches might use a value other than that in the `Age`
header to determine how old the response is -- and therefore how much longer it can be used."""


class FRESHNESS_FRESH(Note):
Expand Down Expand Up @@ -547,7 +563,9 @@ class FRESHNESS_STALE_ALREADY(Note):
class FRESHNESS_HEURISTIC(Note):
category = categories.CACHING
level = levels.WARN
_summary = "%(message)s allows caches to assign their own freshness lifetimes to it."
_summary = (
"%(message)s allows caches to assign their own freshness lifetimes to it."
)
_text = """\
When responses with certain status codes don't have explicit freshness information (like a `
Cache-Control: max-age` directive, or `Expires` header), caches are allowed to estimate how fresh
Expand Down
4 changes: 2 additions & 2 deletions httplint/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def __init__(self, **kw: Unpack[HttpMessageParams]) -> None:
self.status_code: int = None
self.status_phrase: str = ""
self.is_head_response = False
self.caching = ResponseCacheChecker(self)
self.caching: ResponseCacheChecker

def process_response_topline(
self, version: bytes, status_code: bytes, status_phrase: bytes = None
Expand All @@ -209,7 +209,7 @@ def can_have_content(self) -> bool:
return True

def post_checks(self) -> None:
self.caching.check()
self.caching = ResponseCacheChecker(self)


class CL_CORRECT(Note):
Expand Down

0 comments on commit 82fcb01

Please sign in to comment.