Skip to content

Commit

Permalink
feat: update webhook products in batch (#587)
Browse files Browse the repository at this point in the history
* feat: save updates products on redis queue

* feat: update webhook products in batch

* fix: adjust task name and data_processor method

* feat: add time.sleep on dequeue and add more log infos
  • Loading branch information
elitonzky authored Dec 12, 2024
1 parent 9a7c510 commit 2edded9
Show file tree
Hide file tree
Showing 5 changed files with 315 additions and 131 deletions.
18 changes: 6 additions & 12 deletions marketplace/services/vtex/generic_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,11 @@ def __init__(
self,
api_credentials: APICredentials,
catalog: Catalog,
skus_ids: list,
skus_ids: list[str] = None,
webhook: Optional[dict] = None,
sellers_ids: list[str] = None,
product_feed: Optional[ProductFeed] = None,
sellers_skus: list[str] = None,
):
"""
Service for processing product updates via VTEX webhooks.
Expand All @@ -186,6 +187,7 @@ def __init__(
self.app = self.catalog.app
self.webhook = webhook
self.sellers_ids = sellers_ids if sellers_ids else []
self.sellers_skus = sellers_skus if sellers_skus else []
self.product_manager = ProductFacebookManager()

def webhook_product_insert(self):
Expand Down Expand Up @@ -235,26 +237,18 @@ def process_batch_sync(self):
)

# Fetch product data
products_dto = pvt_service.update_webhook_product_info(
all_success = pvt_service.update_batch_webhook(
domain=self.api_credentials.domain,
skus_ids=self.skus_ids,
seller_ids=self.sellers_ids,
sellers_skus=self.sellers_skus,
catalog=self.catalog,
)
if not products_dto:
return None

# Save product data for batch sync
all_success = self.product_manager.save_batch_product_data(
products_dto, self.catalog
)

if not all_success:
raise Exception(
f"Error saving batch products in database for Catalog: {self.catalog.facebook_catalog_id}"
)

return products_dto
return all_success

def _get_sellers_ids(self, service):
seller_id = extract_sellers_ids(self.webhook)
Expand Down
17 changes: 17 additions & 0 deletions marketplace/services/vtex/private/products/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,23 @@ def update_webhook_product_info(

return updated_products_dto

def update_batch_webhook(
self, domain: str, sellers_skus: list, catalog: Catalog
) -> List[FacebookProductDTO]:
config = catalog.vtex_app.config
rules = self._load_rules(config.get("rules", []))
store_domain = config.get("store_domain")
updated_products_dto = self.webhook_data_processor.process_sellers_skus_batch(
service=self,
domain=domain,
store_domain=store_domain,
rules=rules,
catalog=catalog,
seller_sku_pairs=sellers_skus,
)

return updated_products_dto

def get_product_specification(self, product_id, domain):
return self.client.get_product_specification(product_id, domain)

Expand Down
264 changes: 217 additions & 47 deletions marketplace/services/vtex/utils/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ def process_product_data(
upload_on_sync=False,
) -> List[FacebookProductDTO]:
"""
Processes product data and saves batches to the database if upload_on_sync is True.
Process a batch of SKU IDs with optional active sellers using threads if the batch size is large
"""
# Initialize configuration
self.queue = Queue()
self.results = []
self.active_sellers = active_sellers
Expand All @@ -123,6 +124,7 @@ def process_product_data(
self.rules = rules
self.update_product = update_product
self.invalid_products_count = 0
self.valid_products_count = 0
self.catalog = catalog
self.sku_validator = SKUValidator(service, domain, MockZeroShotClient())
self.upload_on_sync = upload_on_sync
Expand All @@ -131,75 +133,114 @@ def process_product_data(
self.use_sku_sellers = self.catalog.vtex_app.config.get(
"use_sku_sellers", False
)
self.use_sync_v2 = self.vtex_app.config.get("use_sync_v2", False)

# Preparing the tqdm progress bar
print("Initiated process of product treatment:")
print("Initiated process of product treatment.")
self.progress_bar = tqdm(total=len(skus_ids), desc="[✓:0, ✗:0]", ncols=0)

# Initializing the queue with SKUs
for sku_id in skus_ids:
self.queue.put(sku_id)

if self.use_threads:
# Using threads for processing
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
futures = [
executor.submit(self.worker) for _ in range(self.max_workers)
]

# Waiting for all the workers to finish
for future in futures:
try:
future.result()
except Exception as e:
print(f"Error in thread execution: {str(e)}")
else:
# Processing without using threads
while not self.queue.empty():
self.worker()

self.progress_bar.close()
try:
# Process items in queue
if self.use_threads:
self._process_queue_with_threads()
else:
self._process_queue_without_threads()
finally:
# Close the progress bar after processing, even in case of errors
self.progress_bar.close()

# Upload remaining items in the buffer
if self.upload_on_sync and self.results:
print(f"Uploading the last {len(self.results)} items to the database.")
self._save_batch_to_database()

print(f"Processing completed. Total valid products: {len(self.results)}")
print(
f"Processing completed. Total valid products: {self.valid_products_count}"
)
return self.results

def _process_queue_with_threads(self):
"""Helper method to process queue items with threads."""
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
futures = [executor.submit(self.worker) for _ in range(self.max_workers)]
for future in futures:
try:
future.result()
except Exception as e:
print(f"Error in thread execution: {str(e)}")

def _process_queue_without_threads(self):
"""Helper method to process queue items without threads."""
while not self.queue.empty():
self.worker()

def worker(self):
"""
Processes items from the queue. For v2 (use_sync_v2), processes `seller_id` and `sku_id`.
For v1, processes only `sku_id`.
"""
while not self.queue.empty():
sku_id = self.queue.get()
try:
result = self.process_single_sku(sku_id)
except Exception as e:
print(f"Error processing SKU {sku_id}: {str(e)}")
result = []

with self.progress_lock:
if result:
self.results.extend(result)
self.progress_bar.set_description(
f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]"
# Extract item from the queue
item = self.queue.get()

# Determine processing logic based on use_sync_v2
if self.use_sync_v2:
seller_id, sku_id = self._parse_seller_sku(item)
processing_result = self.process_seller_sku(
seller_id=seller_id, sku_id=sku_id
)

# Save batch to the database when reaching batch_size
if self.upload_on_sync and len(self.results) >= self.batch_size:
with self.save_lock: # Ensure that only one thread executes
print(
f"Batch size of {self.batch_size} reached. Saving to the database."
)
self._save_batch_to_database()
else:
processing_result = self.process_single_sku(item)

# Handle the result and update the state
self._handle_processing_result(processing_result)

except Exception as e:
print(f"Error processing item: {item}. Details: {str(e)}")
with self.progress_lock:
self.invalid_products_count += 1
self.progress_bar.set_description(
f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]"
)
self.progress_bar.update(1)

self.progress_bar.update(1)
def _parse_seller_sku(self, seller_sku):
"""
Parses a seller#sku string into seller_id and sku_id.
"""
try:
seller_id, sku_id = seller_sku.split("#")
return seller_id, sku_id
except ValueError:
raise ValueError(f"Invalid format for seller_sku: {seller_sku}")

def _handle_processing_result(self, processing_result):
"""
Handles the processing result: updates results, progress, and performs batch saving if necessary.
"""
with self.progress_lock:
if processing_result:
self.valid_products_count += 1
self.results.extend(processing_result)
self.progress_bar.set_description(
f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]"
)
if self.upload_on_sync and len(self.results) >= self.batch_size:
with self.save_lock:
print(
f"Batch size of {self.batch_size} reached. Saving to the database."
)
self._save_batch_to_database()
else:
self.invalid_products_count += 1
self.progress_bar.set_description(
f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]"
)
self.progress_bar.update(1)

def process_single_sku(self, sku_id):
"""
Expand Down Expand Up @@ -341,3 +382,132 @@ def _save_batch_to_database(self):
)
except Exception as e:
print(f"Error while saving batch to the database: {e}")

def process_sellers_skus_batch(
self,
service,
domain,
store_domain,
rules,
catalog,
seller_sku_pairs,
upload_on_sync=True,
):
"""
Process a batch of seller and SKU pairs using threads if the batch size is large.
"""
# Initialize configuration
self.queue = Queue()
self.results = []
self.invalid_products_count = 0
self.valid_products_count = 0
self.service = service
self.domain = domain
self.store_domain = store_domain
self.rules = rules
self.catalog = catalog
self.vtex_app = self.catalog.vtex_app
self.upload_on_sync = upload_on_sync
self.use_sync_v2 = self.vtex_app.config.get("use_sync_v2", False)
self.sku_validator = SKUValidator(service, domain, MockZeroShotClient())
self.sent_to_db_count = 0 # Tracks the number of items sent to the database.
self.update_product = True

# Populate the queue
initial_batch_count = len(seller_sku_pairs)
print("Initiated process of product treatment.")
self.progress_bar = tqdm(total=initial_batch_count, desc="[✓:0, ✗:0]", ncols=0)
for seller_sku in seller_sku_pairs:
self.queue.put(seller_sku)

# Determine whether to use threads
use_threads = len(seller_sku_pairs) > 10

try:
# Process items in queue
if use_threads:
self._process_queue_with_threads()
else:
self._process_queue_without_threads()
finally:
# Close the progress bar after processing, even in case of errors
self.progress_bar.close()

# Save remaining items in buffer
if self.results:
print(f"Uploading the last {len(self.results)} items to the database.")
self._save_batch_to_database()

# Final log and return
print(
f"Processing completed. Total valid products: {self.valid_products_count}"
)
if initial_batch_count > 0 and len(self.results) == 0:
print("All items processed successfully.")
return True

print("Some errors occurred during processing.")
return self.results

def process_seller_sku(self, seller_id, sku_id):
facebook_products = []
try:
product_details = self.sku_validator.validate_product_details(
sku_id, self.catalog
)
if not product_details:
return facebook_products
except CustomAPIException as e:
if e.status_code == 404:
print(f"SKU {sku_id} not found. Skipping...")
elif e.status_code == 500:
print(f"SKU {sku_id} returned status: {e.status_code}. Skipping...")

print(f"An error {e} occurred on get_product_details. SKU: {sku_id}")
return []

is_active = product_details.get("IsActive")
if not is_active and not self.update_product:
return facebook_products

if not seller_id:
print(f"No seller to sync for SKU {sku_id}. Skipping...")
return facebook_products

# Perform the simulation for seller
try:
availability_result = self.service.simulate_cart_for_seller(
sku_id, seller_id, self.domain
)
except CustomAPIException as e:
print(
f"Failed to simulate cart for SKU {sku_id} with seller {seller_id}: {e}"
)
return facebook_products

# Process simulation results

if not availability_result["is_available"] and not self.update_product:
return facebook_products

product_dto = DataProcessor.extract_fields(
self.store_domain, product_details, availability_result
)
if not self._validate_product_dto(product_dto):
return facebook_products

params = {
"seller_id": seller_id,
"service": self.service,
"domain": self.domain,
}
all_rules_applied = True
for rule in self.rules:
if not rule.apply(product_dto, **params):
all_rules_applied = False
break

if all_rules_applied:
facebook_products.append(product_dto)

return facebook_products
Loading

0 comments on commit 2edded9

Please sign in to comment.