Skip to content

Commit

Permalink
chore: Linting (#205)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgmacias95 authored Dec 2, 2024
1 parent 75f2e90 commit a76560f
Show file tree
Hide file tree
Showing 16 changed files with 401 additions and 410 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/testing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest pylint aiohttp pytest_httpserver pytest_asyncio flask
pip install pytest pylint aiohttp aiofiles pytest_httpserver pytest_asyncio flask
- name: Lint
run: |
pylint --rcfile pylintrc vt/ tests examples/
Expand Down
36 changes: 21 additions & 15 deletions examples/hunting_notifications_to_network_infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,27 @@ async def get_network_infrastructure(self):
contacted_domains = relationships["contacted_domains"]["data"]
contacted_ips = relationships["contacted_ips"]["data"]
contacted_urls = relationships["contacted_urls"]["data"]
await self.queue.put({
"contacted_addresses": contacted_domains,
"type": "domains",
"file": file_hash,
})
await self.queue.put({
"contacted_addresses": contacted_ips,
"type": "ips",
"file": file_hash,
})
await self.queue.put({
"contacted_addresses": contacted_urls,
"type": "urls",
"file": file_hash,
})
await self.queue.put(
{
"contacted_addresses": contacted_domains,
"type": "domains",
"file": file_hash,
}
)
await self.queue.put(
{
"contacted_addresses": contacted_ips,
"type": "ips",
"file": file_hash,
}
)
await self.queue.put(
{
"contacted_addresses": contacted_urls,
"type": "urls",
"file": file_hash,
}
)
self.networking_infrastructure[file_hash]["domains"] = contacted_domains
self.networking_infrastructure[file_hash]["ips"] = contacted_ips
self.networking_infrastructure[file_hash]["urls"] = contacted_urls
Expand Down
36 changes: 21 additions & 15 deletions examples/intelligence_search_to_network_infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,27 @@ async def get_network(self):
contacted_urls = relationships["contacted_urls"]["data"]
contacted_ips = relationships["contacted_ips"]["data"]

await self.queue.put({
"contacted_addresses": contacted_domains,
"type": "domains",
"file": checksum,
})
await self.queue.put({
"contacted_addresses": contacted_ips,
"type": "ips",
"file": checksum,
})
await self.queue.put({
"contacted_addresses": contacted_urls,
"type": "urls",
"file": checksum,
})
await self.queue.put(
{
"contacted_addresses": contacted_domains,
"type": "domains",
"file": checksum,
}
)
await self.queue.put(
{
"contacted_addresses": contacted_ips,
"type": "ips",
"file": checksum,
}
)
await self.queue.put(
{
"contacted_addresses": contacted_urls,
"type": "urls",
"file": checksum,
}
)

self.networking_infrastructure[checksum]["domains"] = contacted_domains
self.networking_infrastructure[checksum]["ips"] = contacted_ips
Expand Down
9 changes: 5 additions & 4 deletions examples/livehunt_network_watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
RULESET_LINK = "https://www.virustotal.com/yara-editor/livehunt/"

EMPTY_DOMAIN_LIST_MSG = (
"* Empty domain list, use --add-domain domain.tld or bulk operations to"
" register them"
"* Empty domain list, use --add-domain domain.tld or bulk operations to"
" register them"
)


Expand Down Expand Up @@ -247,8 +247,9 @@ async def main():
return

rulesets = await get_rulesets()
if (not rulesets and
not (args.add_domain or args.bulk_append or args.bulk_replace)):
if not rulesets and not (
args.add_domain or args.bulk_append or args.bulk_replace
):
print(EMPTY_DOMAIN_LIST_MSG)
sys.exit(1)

Expand Down
127 changes: 59 additions & 68 deletions examples/private_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,77 +13,68 @@

console = Console()


async def scan_file_private(
api_key: str,
file_path: Path,
wait: bool = False
api_key: str, file_path: Path, wait: bool = False
) -> None:
"""
Scan a file privately on VirusTotal.
Args:
api_key: VirusTotal API key
file_path: Path to file to scan
wait: Wait for scan completion
"""
async with vt.Client(api_key) as client:
try:
with Progress() as progress:
task = progress.add_task(
"Scanning file...",
total=None if wait else 1
)

analysis = await client.scan_file_private_async(
str(file_path),
wait_for_completion=wait
)

progress.update(task, advance=1)

console.print("\n[green]Scan submitted successfully[/green]")
console.print(f"Analysis ID: {analysis.id}")

if wait:
console.print(f"\nScan Status: {analysis.status}")
if hasattr(analysis, 'stats'):
console.print("Detection Stats:")
for k, v in analysis.stats.items():
console.print(f" {k}: {v}")

except vt.error.APIError as e:
console.print(f"[red]API Error: {e}[/red]")
except Exception as e:
console.print(f"[red]Error: {e}[/red]")
"""
Scan a file privately on VirusTotal.
Args:
api_key: VirusTotal API key
file_path: Path to file to scan
wait: Wait for scan completion
"""
async with vt.Client(api_key) as client:
try:
with Progress() as progress:
task = progress.add_task("Scanning file...", total=None if wait else 1)

analysis = await client.scan_file_private_async(
str(file_path), wait_for_completion=wait
)

progress.update(task, advance=1)

console.print("\n[green]Scan submitted successfully[/green]")
console.print(f"Analysis ID: {analysis.id}")

if wait:
console.print(f"\nScan Status: {analysis.status}")
if hasattr(analysis, "stats"):
console.print("Detection Stats:")
for k, v in analysis.stats.items():
console.print(f" {k}: {v}")

except vt.error.APIError as e:
console.print(f"[red]API Error: {e}[/red]")
except Exception as e: # pylint: disable=broad-exception-caught
console.print(f"[red]Error: {e}[/red]")


def main():
parser = argparse.ArgumentParser(
description="Scan file privately using VirusTotal API"
)
parser.add_argument("--apikey", help="VirusTotal API key")
parser.add_argument("--file_path", help="Path to file to scan")
parser.add_argument(
"--wait",
action="store_true",
help="Wait for scan completion"
)

args = parser.parse_args()
file_path = Path(args.file_path)

if not file_path.exists():
console.print(f"[red]Error: File {file_path} not found[/red]")
sys.exit(1)

if not file_path.is_file():
console.print(f"[red]Error: {file_path} is not a file[/red]")
sys.exit(1)

asyncio.run(scan_file_private(
args.apikey,
file_path,
args.wait
))
parser = argparse.ArgumentParser(
description="Scan file privately using VirusTotal API"
)
parser.add_argument("--apikey", help="VirusTotal API key")
parser.add_argument("--file_path", help="Path to file to scan")
parser.add_argument(
"--wait", action="store_true", help="Wait for scan completion"
)

args = parser.parse_args()
file_path = Path(args.file_path)

if not file_path.exists():
console.print(f"[red]Error: File {file_path} not found[/red]")
sys.exit(1)

if not file_path.is_file():
console.print(f"[red]Error: {file_path} is not a file[/red]")
sys.exit(1)

asyncio.run(scan_file_private(args.apikey, file_path, args.wait))


if __name__ == "__main__":
main()
main()
24 changes: 14 additions & 10 deletions examples/retrohunt_to_network_infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,23 @@ async def get_network_infrastructure(self, file_obj):
contacted_domains = relationships["contacted_domains"]["data"]
contacted_ips = relationships["contacted_ips"]["data"]
contacted_urls = relationships["contacted_urls"]["data"]
await self.networking_queue.put({
"contacted_addresses": contacted_domains,
"type": "domains",
"file": file_hash,
})
await self.networking_queue.put(
{
"contacted_addresses": contacted_domains,
"type": "domains",
"file": file_hash,
}
)
await self.networking_queue.put(
{"contacted_addresses": contacted_ips, "type": "ips", "file": file_hash}
)
await self.networking_queue.put({
"contacted_addresses": contacted_urls,
"type": "urls",
"file": file_hash,
})
await self.networking_queue.put(
{
"contacted_addresses": contacted_urls,
"type": "urls",
"file": file_hash,
}
)
self.networking_infrastructure[file_hash]["domains"] = contacted_domains
self.networking_infrastructure[file_hash]["ips"] = contacted_ips
self.networking_infrastructure[file_hash]["urls"] = contacted_urls
Expand Down
1 change: 1 addition & 0 deletions pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ disable=abstract-method,
wrong-import-order,
xrange-builtin,
zip-builtin-not-iterating,
too-many-positional-arguments,


[REPORTS]
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
python_requires=">=3.7.0",
install_requires=[
"aiohttp==3.8.6 ; python_version=='3.7'",
"aiohttp ; python_version>'3.7'"
"aiohttp ; python_version>'3.7'",
"aiofiles"
],
setup_requires=["pytest-runner"],
extras_require={
Expand Down
Loading

0 comments on commit a76560f

Please sign in to comment.