-
Notifications
You must be signed in to change notification settings - Fork 0
/
wooly_scraper.py
109 lines (88 loc) · 4.93 KB
/
wooly_scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
from database.product import Product
from helpers import read_meta_data
class WoolyScraper():
def __init__(self, driver, session, path):
self.driver = driver
self.input_data = read_meta_data(path) # Meta data about what to scrape
self.url = "https://www.wollplatz.de/wolle/herstellers"
self.session = session
def __del__(self):
self.driver.close()
self.session.close()
def get_page(self, url):
self.driver.get(url)
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
soup = BeautifulSoup(self.driver.page_source, "lxml")
return soup
def start_parsing(self):
soup = self.get_page(self.url)
brand_found = False
PRODUCT_BRAND_SELECTOR = ".productlistholder"
for query_brand_title in self.input_data.keys():
for brand in soup.select(PRODUCT_BRAND_SELECTOR):
PRODUCT_BRAND_TITLE_SELECTOR = ".productlist-imgholder"
product_brand = brand.select_one(PRODUCT_BRAND_TITLE_SELECTOR)
brand_title = product_brand["title"]
brand_url = product_brand["href"]
# First search for the brand
if brand_title == query_brand_title:
brand_found = True
for query_name in self.input_data[brand_title]:
self.url = brand_url
self.url = self.url + "?page=" + str(1)
# Second search for product in the brand's products page
product_url = self.parse_products_page(self.get_page(self.url), query_name)
if product_url:
self.url = product_url
# Third get the product details
product = self.parse_element_page(self.get_page(self.url))
# Finally add it to the database
self.session.add(product)
else:
print("Product not found :", query_brand_title, query_name)
if not brand_found:
print("Brand not found :", query_brand_title)
# Reset
brand_found = False
self.session.commit()
def parse_products_page(self, response, query_name):
PRODUCT_SELECTOR = ".productlistholder"
for product in response.select(PRODUCT_SELECTOR):
PRODUCT_TITLE_SELECTOR = ".productlist-imgholder"
product_title = product.select_one(PRODUCT_TITLE_SELECTOR)
product_brand = product_title["title"].split(' ', 1)[0] # extract the brand name from the product name
product_name = product_title["title"].split(' ', 1)[1] # drop the brand name from the product name
product_url = product_title["href"]
if product_name == query_name:
return product_url
# In case that the element that we are searching for is not found
NEXT_BUTTON_SELECTOR = "li[class=paging-volgende]"
if response.select_one(NEXT_BUTTON_SELECTOR):
current_page = int(self.url[-1])
self.url = self.url[:-1] + str(current_page + 1) # Go to the next page
return self.parse_products_page(self.get_page(self.url), query_name)
def parse_element_page(self, response):
PRODUCT_TITLE_SELECTOR = "#pageheadertitle"
PRODUCT_PRICE_SELECTOR = "#ContentPlaceHolder1_upPricePanel > span.product-price > span.product-price-amount"
PRODUCT_COMPOSITION_SELECTOR = "#pdetailTableSpecs > table > tbody > tr:nth-child(4) > td:nth-child(2)"
PRODUCT_NEEDLE_SIZE_SELECTOR = "#pdetailTableSpecs > table > tbody > tr:nth-child(5) > td:nth-child(2)"
product_title = response.select_one(PRODUCT_TITLE_SELECTOR).text \
if response.select(PRODUCT_TITLE_SELECTOR) else ""
product_brand_name = product_title.split(" ", 1)[0]
product_name = product_title.split(" ", 1)[1]
product_price = response.select_one(PRODUCT_PRICE_SELECTOR).text \
if response.select(PRODUCT_PRICE_SELECTOR) else ""
product_composition = response.select_one(PRODUCT_COMPOSITION_SELECTOR).text \
if response.select(PRODUCT_COMPOSITION_SELECTOR) else ""
product_needle_size = response.select_one(PRODUCT_NEEDLE_SIZE_SELECTOR).text \
if response.select(PRODUCT_NEEDLE_SIZE_SELECTOR) else ""
product = Product(name=product_name,
brand=product_brand_name,
price=product_price,
composition=product_composition,
needle_size=product_needle_size,
deliver_time=None) # None, since there is no such field on the website
return product