-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
11 changed files
with
290 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import logging | ||
import logging.config | ||
|
||
from typing import Optional | ||
|
||
from vulnerabilitylookup.default import AbstractManager, get_config | ||
from vulnerabilitylookup.feeders.pysec import PySec | ||
|
||
logging.config.dictConfig(get_config('logging')) | ||
|
||
|
||
class PySecImporter(AbstractManager): | ||
|
||
def __init__(self, loglevel: Optional[int]=None): | ||
super().__init__(loglevel) | ||
self.script_name = 'pysec_importer' | ||
self.pysec = PySec() | ||
|
||
def _to_run_forever(self): | ||
self.pysec.pysec_update() | ||
|
||
|
||
def main(): | ||
gsd = PySecImporter() | ||
gsd.run(sleep_in_sec=3600) | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Submodule pysec-advisories
added at
ae92b6
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
import json | ||
import logging | ||
import logging.config | ||
import re | ||
|
||
from datetime import datetime | ||
from pathlib import Path | ||
from typing import Set, Dict | ||
|
||
import yaml | ||
|
||
from git import Repo | ||
from redis import Redis | ||
|
||
from ..default import get_config, get_homedir | ||
from ..helpers import get_config_feeder, fromisoformat_wrapper | ||
|
||
|
||
def json_serial(obj): | ||
"""Make sure to have json-dump compatible data types""" | ||
|
||
if isinstance(obj, datetime): | ||
return obj.isoformat() | ||
|
||
|
||
class PySec(): | ||
def __init__(self): | ||
self._load_logging_config() | ||
self.logger = logging.getLogger(f'{self.__class__.__name__}') | ||
self.config = get_config_feeder('pysec') | ||
|
||
if 'level' in self.config: | ||
self.logger.setLevel(self.config['level']) | ||
else: | ||
self.logger.setLevel(get_config('generic', 'loglevel')) | ||
|
||
self.storage = Redis(host=get_config('generic', 'storage_db_hostname'), | ||
port=get_config('generic', 'storage_db_port')) | ||
|
||
root_repo = Repo(get_homedir()) | ||
root_repo.submodule('pysec-advisories').update(init=True) | ||
|
||
self.path_to_repo = get_homedir() / 'vulnerabilitylookup' / 'feeders' / 'pysec-advisories' | ||
self.pysec_git = Repo(self.path_to_repo) | ||
|
||
def _load_logging_config(self): | ||
cur_path = Path(__file__) | ||
if not (cur_path.parent / f'{cur_path.stem}_logging.json').exists(): | ||
return | ||
with (cur_path.parent / f'{cur_path.stem}_logging.json').open() as f: | ||
log_config = json.load(f) | ||
logging.config.dictConfig(log_config) | ||
|
||
def pysec_update(self) -> bool: | ||
|
||
self.pysec_git.remotes.origin.pull('main') | ||
|
||
paths_to_import: Set[Path] = set() | ||
if _last_update := self.storage.hget('last_updates', 'pysec'): | ||
_last_update_str = _last_update.decode() | ||
if _last_update_str == self.pysec_git.head.commit.hexsha: | ||
# No changes | ||
self.logger.info('No new commit.') | ||
return False | ||
for commit in self.pysec_git.iter_commits(f'{_last_update_str}...HEAD'): | ||
for line in self.pysec_git.git.show(commit.hexsha, name_only=True).split('\n'): | ||
if not line.endswith('.yaml'): | ||
continue | ||
p_path = self.path_to_repo / Path(line) | ||
if p_path.exists() and re.match(r'PYSEC-\d{4}-\d+.yaml', p_path.name): | ||
paths_to_import.add(p_path) | ||
else: | ||
# First run, get all files | ||
for p_path in self.path_to_repo.rglob('*.yaml'): | ||
if p_path.exists() and re.match(r'PYSEC-\d{4}-\d+.yaml', p_path.name): | ||
paths_to_import.add(p_path) | ||
|
||
if not paths_to_import: | ||
self.logger.info('Nothing new to import.') | ||
return False | ||
|
||
p = self.storage.pipeline() | ||
pysecids: Dict[str, float] = {} | ||
for path in paths_to_import: | ||
needs_lastmodified_from_git = False | ||
last_modified = None | ||
# Store all cves individually | ||
with path.open() as vuln_entry: | ||
vuln = yaml.safe_load(vuln_entry) | ||
if not isinstance(vuln['modified'], datetime): | ||
last_modified = fromisoformat_wrapper(vuln['modified']) | ||
else: | ||
last_modified = vuln['modified'] | ||
|
||
if not last_modified: | ||
if not needs_lastmodified_from_git: | ||
self.logger.warning(f'Unable to process {path}, please have a look yourself, good luck!') | ||
continue | ||
|
||
# NOTE old approach: there is no indication when the entry was last updated in the json, | ||
# using the last time that file was commited | ||
# It is slow as hell, but that's the best we can do. | ||
|
||
commit = next(self.pysec_git.iter_commits(max_count=1, paths=path)) | ||
last_modified = commit.committed_datetime | ||
|
||
pysecids[vuln['id']] = last_modified.timestamp() | ||
if 'aliases' in vuln: | ||
for alias in vuln['aliases']: | ||
p.sadd(f"{vuln['id']}:link", alias) | ||
p.sadd(f'{alias}:link', vuln['id']) | ||
p.set(path.stem, json.dumps(vuln, default=json_serial)) | ||
|
||
if len(pysecids) > 1000: | ||
# Avoid a massive execute on first import | ||
p.zadd('index:pysec', pysecids) # type: ignore | ||
p.zadd('index', pysecids) # type: ignore | ||
p.execute() | ||
|
||
# reset pipeline | ||
p = self.storage.pipeline() | ||
pysecids = {} | ||
|
||
if pysecids: | ||
# remaining entries | ||
p.zadd('index:pysec', pysecids) # type: ignore | ||
p.zadd('index', pysecids) # type: ignore | ||
p.execute() | ||
self.storage.hset('last_updates', mapping={'pysec': self.pysec_git.head.commit.hexsha}) | ||
self.logger.info('Import done.') | ||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
{ | ||
"version": 1, | ||
"disable_existing_loggers": false, | ||
"formatters": { | ||
"simple": { | ||
"format": "%(asctime)s %(name)s %(levelname)s:%(message)s" | ||
} | ||
}, | ||
"handlers": { | ||
"stdout": { | ||
"class": "logging.StreamHandler", | ||
"level": "INFO", | ||
"formatter": "simple", | ||
"stream": "ext://sys.stdout" | ||
}, | ||
"file": { | ||
"class": "logging.handlers.RotatingFileHandler", | ||
"level": "WARNING", | ||
"formatter": "simple", | ||
"filename": "logs/pysec_warning.log", | ||
"mode": "a", | ||
"maxBytes": 1000000, | ||
"backupCount": 5 | ||
} | ||
}, | ||
"root": { | ||
"level": "DEBUG", | ||
"handlers": [ | ||
"stdout", | ||
"file" | ||
] | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters