Skip to content

Commit

Permalink
Add Webgo Provider (AnalogJ#1102)
Browse files Browse the repository at this point in the history
* Add Webgo Provider

* Update Tests for Webgo

* Change Formatting

* Correction for Flake8

* Update Handling for Main DNS Entry

Co-authored-by: mb <michael.bruenker@nextevolution.de>
  • Loading branch information
2 people authored and MasinAD committed Mar 29, 2022
1 parent f78cd4f commit 1c10cc5
Show file tree
Hide file tree
Showing 29 changed files with 79,318 additions and 0 deletions.
1 change: 1 addition & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ lexicon/providers/ultradns.py @abligh
lexicon/providers/valuedomain.py @wak109
lexicon/providers/vercel.py @adferrand
lexicon/providers/vultr.py @analogj
lexicon/providers/webgo.py @mod242
lexicon/providers/yandex.py @kharkevich
lexicon/providers/zilore.py @adferrand
lexicon/providers/zonomi.py @jarossi
272 changes: 272 additions & 0 deletions lexicon/providers/webgo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
"""Module provider for Webgo"""
import logging


from bs4 import BeautifulSoup # type: ignore
from requests import Session

from lexicon.exceptions import AuthenticationError
from lexicon.providers.base import Provider as BaseProvider

LOGGER = logging.getLogger(__name__)

NAMESERVER_DOMAINS = ["webgo.de"]


def provider_parser(subparser):
"""Configure provider parser for Webgo."""
subparser.description = """A provider for Webgo."""
subparser.add_argument(
"--auth-username", help="specify username for authentication"
)
subparser.add_argument(
"--auth-password", help="specify password for authentication"
)


class Provider(BaseProvider):
"""
webgo.de provider
"""

def __init__(self, config):
super(Provider, self).__init__(config)
self.domain = self.domain
self.domain_id = None
self.session = None

def _authenticate(self):
# Create the session GET the login page to retrieve a session cookie
self.session = Session()
self.session.get("https://login.webgo.de/")

# Hit the login page with authentication info to login the session
login_response = self.session.post(
"https://login.webgo.de/login",
data={
"data[User][username]": self._get_provider_option("auth_username") or "",
"data[User][password]": self._get_provider_option("auth_password") or "",
},
)

# Parse in the HTML, if the div containing the error message is found, error
html = BeautifulSoup(login_response.content, "html.parser")
if html.find("div", {"class": "loginformerror"}) is not None:
LOGGER.warning("Webgo login failed, check Username and Password")
raise AuthenticationError("Webgo login failed, check Username and Password")
return False

# Make an authenticated GET to the DNS management page
zones_response = self.session.get("https://login.webgo.de/domains")

html = BeautifulSoup(zones_response.content, "html.parser")
domain_table = html.find("table", {"class": "alltable"})
rows = domain_table.find_all('tr')
dns_link = None
for row in rows[1:]:
domain = row.findAll('td')[1].renderContents().decode()
if domain == self.domain:
dns_link = row.findAll('td')[5]
dns_link = dns_link.find("a", {"class": "domainButton fcon-sliders"}).get('href')

# If the Domain couldn't be found, error, otherwise, return the value of the tag
if dns_link is None:
LOGGER.warning("Domain %s not found in account", self.domain)
raise AuthenticationError(f"Domain {self.domain} not found in account")

self.domain_id = dns_link.rsplit("/", 1)[1]
LOGGER.debug("Webgo domain ID: %s", self.domain_id)
return True

# Create record. If record already exists with the same content, do nothing
def _create_record(self, rtype, name, content):
LOGGER.debug("Creating record for zone %s", name)
# Pull a list of records and check for ours
if name:
if name == self.domain:
LOGGER.warning("Unable to create record because your main domain %s can't be re-created", self.domain)
return False
name = self._relative_name(name)
if rtype == "CNAME" and not content.endswith("."):
content += "."
records = self._list_records(rtype=rtype, name=name, content=content)
if len(records) >= 1:
LOGGER.warning("Duplicate record %s %s %s, NOOP", rtype, name, content)
return True
data = {
"data[DnsSetting][sub]": name,
"data[DnsSetting][ttl]": "3600",
"data[DnsSetting][rr-typ]": rtype,
"data[DnsSetting][pref-mx]": "0",
"data[DnsSetting][value]": content,
"data[DnsSetting][action]": "newsub",
"data[DnsSetting][domain_id]": self.domain_id, }
ttl = self._get_lexicon_option("ttl")
if ttl:
if ttl <= 0:
data["data[DnsSetting][ttl]"] = "3600"
else:
data["data[DnsSetting][ttl]"] = str(ttl)
prio = self._get_lexicon_option("priority")
if prio:
if prio <= 0:
data["data[DnsSetting][pref-mx]"] = "10"
else:
data["data[DnsSetting][pref-mx]"] = str(prio)

self.session.post("https://login.webgo.de/dns_settings/domainDnsEditForm", data=data)
self.session.get(f"https://login.webgo.de/dnsSettings/domainDnsDo/{self.domain_id}/ok")
# Pull a list of records and check for ours
records = self._list_records(name=name)
if len(records) >= 1:
LOGGER.info("Successfully added record %s", name)
return True
LOGGER.info("Failed to add record %s", name)
return False

# List all records. Return an empty list if no records found.
# type, name and content are used to filter records.
# If possible filter during the query, otherwise filter after response is
# received.
def _list_records(self, rtype=None, name=None, content=None):
return self._list_records_internal(rtype=rtype, name=name, content=content)

def _list_records_internal(
self, rtype=None, name=None, content=None, identifier=None
):
if name:
name = self._relative_name(name)
records = []
# Make an authenticated GET to the DNS management page
edit_response = self.session.get(
f"https://login.webgo.de/dnsSettings/domainDnsEdit/{self.domain_id}",
)

# Parse the HTML response, and list the table rows for DNS records
html = BeautifulSoup(edit_response.content, "html.parser")
dns_table = html.find("table", {"class": "alltable"})
records = dns_table.findAll("tr")
# If the tag couldn't be found, error, otherwise, return the value of the tag
if records is None or not records:
LOGGER.warning("Domains not found in account")
return records
new_records = []
# Find Main Record
rec = {}
mainip = html.find("span", {"class": "mainIp"})
mainip_record = mainip.find_next("span").text
dns_link = mainip.find_next("a").get('href')
rec["name"] = self.domain
rec["ttl"] = "3600"
rec["type"] = "A"
rec["prio"] = "10"
rec["content"] = mainip_record
rec["id"] = dns_link.rsplit("/", 2)[1]
rec["option"] = "main"
new_records.append(rec)
# Find Subrecords
for dns_tr in records[1:]:
tds = dns_tr.findAll("td")
# Process HTML in the TR children to derive each object
rec = {}
rec["name"] = self._full_name(tds[0].string)
rec["ttl"] = tds[1].string
rec["type"] = tds[2].string
rec["prio"] = tds[3].string
rec["content"] = tds[4].string
dns_link = tds[5]
dns_link = dns_link.find("a", {"class": "domainButton fcon-edit"}).get('href')
rec["id"] = dns_link.rsplit("/", 2)[1]
if rec["content"].startswith('"'):
rec = self._clean_TXT_record(rec)
rec["option"] = "sub"
new_records.append(rec)
records = new_records
if identifier:
LOGGER.debug("Filtering %d records by id: %s", len(records), identifier)
records = [record for record in records if str(record["id"]) == str(identifier)]
if rtype:
LOGGER.debug("Filtering %d records by rtype: %s", len(records), rtype)
records = [record for record in records if record["type"] == rtype]
if name:
LOGGER.debug("Filtering %d records by name: %s", len(records), name)
if name.endswith("."):
name = name[:-1]
records = [record for record in records if name in record["name"]]
if content:
LOGGER.debug(
"Filtering %d records by content: %s", len(records), content.lower()
)
records = [
record
for record in records
if record["content"].lower() == content.lower()
]
LOGGER.debug("Final records (%d): %s", len(records), records)

return records

# Create or update a record.
def _update_record(self, identifier=None, rtype=None, name=None, content=None):
maindata = None
sub_update = None
if identifier is not None:
records = self._list_records_internal(identifier=identifier)
else:
records = self._list_records_internal(name=name, rtype=rtype)
for record in records:
# Check whether Main-Domain needs to be updated
if record["option"] == "main":
maindata = {
"data[DnsSetting][value]": content,
"data[DnsSetting][action]": "main",
"data[DnsSetting][domain_id]": record["id"], }
# Update every Subrecord
else:
# Delete record if it exists
# Record ID is changed after Update from main!
self._delete_record(identifier=record["id"])
self._create_record(record["type"], record["name"], content)
sub_update = True
# Check whether we need to update main
if maindata is not None:
# If we updated an Subdomain in the meantime, the ID changed and we need to refresh it for the main-domain
if sub_update is not None:
if identifier is not None:
records = self._list_records_internal(identifier=identifier)
else:
records = self._list_records_internal(name=name, rtype=rtype)
for record in records:
# Check whether Main-Domain needs to be updated
if record["option"] == "main":
maindata = {
"data[DnsSetting][value]": content,
"data[DnsSetting][action]": "main",
"data[DnsSetting][domain_id]": record["id"], }
self.session.post("https://login.webgo.de/dns_settings/domainDnsEditForm", data=maindata)
self.session.get(f"https://login.webgo.de/dnsSettings/domainDnsDo/{self.domain_id}/ok")
LOGGER.debug("Updated Main Domain %s", records[0]["name"])
return True

# Delete an existing record.
# If record does not exist, do nothing.
def _delete_record(self, identifier=None, rtype=None, name=None, content=None):
delete_record_ids = []
records = self._list_records_internal(rtype, name, content, identifier)
if "main" in [record["option"] for record in records]:
LOGGER.warning("Unable to delete records because your main domain %s can't be deleted", self.domain)
return False
delete_record_ids = [record["id"] for record in records]
LOGGER.debug("Record IDs to delete: %s", delete_record_ids)
for rec_id in delete_record_ids:
response = self.session.get(f"https://login.webgo.de/dnsSettings/domainDnsDo/{rec_id}/delete")
if response.status_code == 200:
self.session.get(f"https://login.webgo.de/dnsSettings/domainDnsDo/{self.domain_id}/ok")
else:
LOGGER.warning("Unable to delete record %s", rec_id)
return False
return True

def _request(self, action="GET", url="/", data=None, query_params=None):
# Helper _request is not used in this provider
pass
35 changes: 35 additions & 0 deletions lexicon/tests/providers/test_webgo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Integration tests for Webgo"""
import re
from unittest import TestCase

from lexicon.tests.providers.integration_tests import IntegrationTestsV2


# Hook into testing framework by inheriting unittest.TestCase and reuse
# the tests which *each and every* implementation of the interface must
# pass, by inheritance from integration_tests.IntegrationTests
class WebgoProviderTests(TestCase, IntegrationTestsV2):
"""TestCase for Webgo"""

provider_name = "webgo"
domain = "klugscheissmodus.de"

def _filter_post_data_parameters(self):
return ["data%5BUser%5D%5Busername%5D", "data%5BUser%5D%5Bpassword%5D"]

def _filter_headers(self):
return ["Authorization", "Cookie"]

def _filter_query_parameters(self):
return ["pass"]

def _filter_response(self, response):
body = response["body"]["string"].decode("utf-8")
# Filter out all Customer/Service IDs from Response
body = re.sub(r"\b(16)([0-9]{3})\b", "XXXXX", body)
# Filter out Clearname from Response
body = re.sub(r"<div class=\"welcome\">.*?<\/div>", "<div class=\"welcome\">John Doe</div>", body)
# Filter out all Domains not tested
body = re.sub(r'<\s*td[^>]*>(?!(' + re.escape(self.domain) + r'))(.[A-Za-z0-9]*\.[a-z]{2,3})<\s*/\s*td>', '<td>filtereddomain.de</td>', body)
response["body"]["string"] = body.encode("utf-8")
return response
Loading

0 comments on commit 1c10cc5

Please sign in to comment.