Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update webhook products in batch #587

Merged
merged 5 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions marketplace/services/vtex/generic_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,11 @@ def __init__(
self,
api_credentials: APICredentials,
catalog: Catalog,
skus_ids: list,
skus_ids: list[str] = None,
webhook: Optional[dict] = None,
sellers_ids: list[str] = None,
product_feed: Optional[ProductFeed] = None,
sellers_skus: list[str] = None,
):
"""
Service for processing product updates via VTEX webhooks.
Expand All @@ -186,6 +187,7 @@ def __init__(
self.app = self.catalog.app
self.webhook = webhook
self.sellers_ids = sellers_ids if sellers_ids else []
self.sellers_skus = sellers_skus if sellers_skus else []
self.product_manager = ProductFacebookManager()

def webhook_product_insert(self):
Expand Down Expand Up @@ -235,26 +237,18 @@ def process_batch_sync(self):
)

# Fetch product data
products_dto = pvt_service.update_webhook_product_info(
all_success = pvt_service.update_batch_webhook(
domain=self.api_credentials.domain,
skus_ids=self.skus_ids,
seller_ids=self.sellers_ids,
sellers_skus=self.sellers_skus,
catalog=self.catalog,
)
if not products_dto:
return None

# Save product data for batch sync
all_success = self.product_manager.save_batch_product_data(
products_dto, self.catalog
)

if not all_success:
raise Exception(
f"Error saving batch products in database for Catalog: {self.catalog.facebook_catalog_id}"
)

return products_dto
return all_success

def _get_sellers_ids(self, service):
seller_id = extract_sellers_ids(self.webhook)
Expand Down
17 changes: 17 additions & 0 deletions marketplace/services/vtex/private/products/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,23 @@ def update_webhook_product_info(

return updated_products_dto

def update_batch_webhook(
self, domain: str, sellers_skus: list, catalog: Catalog
) -> List[FacebookProductDTO]:
config = catalog.vtex_app.config
rules = self._load_rules(config.get("rules", []))
store_domain = config.get("store_domain")
updated_products_dto = self.webhook_data_processor.process_sellers_skus_batch(
service=self,
domain=domain,
store_domain=store_domain,
rules=rules,
catalog=catalog,
seller_sku_pairs=sellers_skus,
)

return updated_products_dto

def get_product_specification(self, product_id, domain):
return self.client.get_product_specification(product_id, domain)

Expand Down
264 changes: 217 additions & 47 deletions marketplace/services/vtex/utils/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ def process_product_data(
upload_on_sync=False,
) -> List[FacebookProductDTO]:
"""
Processes product data and saves batches to the database if upload_on_sync is True.
Process a batch of SKU IDs with optional active sellers using threads if the batch size is large
"""
# Initialize configuration
self.queue = Queue()
self.results = []
self.active_sellers = active_sellers
Expand All @@ -123,6 +124,7 @@ def process_product_data(
self.rules = rules
self.update_product = update_product
self.invalid_products_count = 0
self.valid_products_count = 0
self.catalog = catalog
self.sku_validator = SKUValidator(service, domain, MockZeroShotClient())
self.upload_on_sync = upload_on_sync
Expand All @@ -131,75 +133,114 @@ def process_product_data(
self.use_sku_sellers = self.catalog.vtex_app.config.get(
"use_sku_sellers", False
)
self.use_sync_v2 = self.vtex_app.config.get("use_sync_v2", False)

# Preparing the tqdm progress bar
print("Initiated process of product treatment:")
print("Initiated process of product treatment.")
self.progress_bar = tqdm(total=len(skus_ids), desc="[✓:0, ✗:0]", ncols=0)

# Initializing the queue with SKUs
for sku_id in skus_ids:
self.queue.put(sku_id)

if self.use_threads:
# Using threads for processing
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
futures = [
executor.submit(self.worker) for _ in range(self.max_workers)
]

# Waiting for all the workers to finish
for future in futures:
try:
future.result()
except Exception as e:
print(f"Error in thread execution: {str(e)}")
else:
# Processing without using threads
while not self.queue.empty():
self.worker()

self.progress_bar.close()
try:
# Process items in queue
if self.use_threads:
self._process_queue_with_threads()
else:
self._process_queue_without_threads()
finally:
# Close the progress bar after processing, even in case of errors
self.progress_bar.close()

# Upload remaining items in the buffer
if self.upload_on_sync and self.results:
print(f"Uploading the last {len(self.results)} items to the database.")
self._save_batch_to_database()

print(f"Processing completed. Total valid products: {len(self.results)}")
print(
f"Processing completed. Total valid products: {self.valid_products_count}"
)
return self.results

def _process_queue_with_threads(self):
"""Helper method to process queue items with threads."""
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
futures = [executor.submit(self.worker) for _ in range(self.max_workers)]
for future in futures:
try:
future.result()
except Exception as e:
print(f"Error in thread execution: {str(e)}")

def _process_queue_without_threads(self):
"""Helper method to process queue items without threads."""
while not self.queue.empty():
self.worker()

def worker(self):
"""
Processes items from the queue. For v2 (use_sync_v2), processes `seller_id` and `sku_id`.
For v1, processes only `sku_id`.
"""
while not self.queue.empty():
sku_id = self.queue.get()
try:
result = self.process_single_sku(sku_id)
except Exception as e:
print(f"Error processing SKU {sku_id}: {str(e)}")
result = []

with self.progress_lock:
if result:
self.results.extend(result)
self.progress_bar.set_description(
f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]"
# Extract item from the queue
item = self.queue.get()

# Determine processing logic based on use_sync_v2
if self.use_sync_v2:
seller_id, sku_id = self._parse_seller_sku(item)
processing_result = self.process_seller_sku(
seller_id=seller_id, sku_id=sku_id
)

# Save batch to the database when reaching batch_size
if self.upload_on_sync and len(self.results) >= self.batch_size:
with self.save_lock: # Ensure that only one thread executes
print(
f"Batch size of {self.batch_size} reached. Saving to the database."
)
self._save_batch_to_database()
else:
processing_result = self.process_single_sku(item)

# Handle the result and update the state
self._handle_processing_result(processing_result)

except Exception as e:
print(f"Error processing item: {item}. Details: {str(e)}")
with self.progress_lock:
self.invalid_products_count += 1
self.progress_bar.set_description(
f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]"
)
self.progress_bar.update(1)

self.progress_bar.update(1)
def _parse_seller_sku(self, seller_sku):
"""
Parses a seller#sku string into seller_id and sku_id.
"""
try:
seller_id, sku_id = seller_sku.split("#")
return seller_id, sku_id
except ValueError:
raise ValueError(f"Invalid format for seller_sku: {seller_sku}")

def _handle_processing_result(self, processing_result):
"""
Handles the processing result: updates results, progress, and performs batch saving if necessary.
"""
with self.progress_lock:
if processing_result:
self.valid_products_count += 1
self.results.extend(processing_result)
self.progress_bar.set_description(
f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]"
)
if self.upload_on_sync and len(self.results) >= self.batch_size:
with self.save_lock:
print(
f"Batch size of {self.batch_size} reached. Saving to the database."
)
self._save_batch_to_database()
else:
self.invalid_products_count += 1
self.progress_bar.set_description(
f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]"
)
self.progress_bar.update(1)

def process_single_sku(self, sku_id):
"""
Expand Down Expand Up @@ -341,3 +382,132 @@ def _save_batch_to_database(self):
)
except Exception as e:
print(f"Error while saving batch to the database: {e}")

def process_sellers_skus_batch(
self,
service,
domain,
store_domain,
rules,
catalog,
seller_sku_pairs,
upload_on_sync=True,
):
"""
Process a batch of seller and SKU pairs using threads if the batch size is large.
"""
# Initialize configuration
self.queue = Queue()
self.results = []
self.invalid_products_count = 0
self.valid_products_count = 0
self.service = service
self.domain = domain
self.store_domain = store_domain
self.rules = rules
self.catalog = catalog
self.vtex_app = self.catalog.vtex_app
self.upload_on_sync = upload_on_sync
self.use_sync_v2 = self.vtex_app.config.get("use_sync_v2", False)
self.sku_validator = SKUValidator(service, domain, MockZeroShotClient())
self.sent_to_db_count = 0 # Tracks the number of items sent to the database.
self.update_product = True

# Populate the queue
initial_batch_count = len(seller_sku_pairs)
print("Initiated process of product treatment.")
self.progress_bar = tqdm(total=initial_batch_count, desc="[✓:0, ✗:0]", ncols=0)
for seller_sku in seller_sku_pairs:
self.queue.put(seller_sku)

# Determine whether to use threads
use_threads = len(seller_sku_pairs) > 10

try:
# Process items in queue
if use_threads:
self._process_queue_with_threads()
else:
self._process_queue_without_threads()
finally:
# Close the progress bar after processing, even in case of errors
self.progress_bar.close()

# Save remaining items in buffer
if self.results:
print(f"Uploading the last {len(self.results)} items to the database.")
self._save_batch_to_database()

# Final log and return
print(
f"Processing completed. Total valid products: {self.valid_products_count}"
)
if initial_batch_count > 0 and len(self.results) == 0:
print("All items processed successfully.")
return True

print("Some errors occurred during processing.")
return self.results

def process_seller_sku(self, seller_id, sku_id):
facebook_products = []
try:
product_details = self.sku_validator.validate_product_details(
sku_id, self.catalog
)
if not product_details:
return facebook_products
except CustomAPIException as e:
if e.status_code == 404:
print(f"SKU {sku_id} not found. Skipping...")
elif e.status_code == 500:
print(f"SKU {sku_id} returned status: {e.status_code}. Skipping...")

print(f"An error {e} occurred on get_product_details. SKU: {sku_id}")
return []

is_active = product_details.get("IsActive")
if not is_active and not self.update_product:
return facebook_products

if not seller_id:
print(f"No seller to sync for SKU {sku_id}. Skipping...")
return facebook_products

# Perform the simulation for seller
try:
availability_result = self.service.simulate_cart_for_seller(
sku_id, seller_id, self.domain
)
except CustomAPIException as e:
print(
f"Failed to simulate cart for SKU {sku_id} with seller {seller_id}: {e}"
)
return facebook_products

# Process simulation results

if not availability_result["is_available"] and not self.update_product:
return facebook_products

product_dto = DataProcessor.extract_fields(
self.store_domain, product_details, availability_result
)
if not self._validate_product_dto(product_dto):
return facebook_products

params = {
"seller_id": seller_id,
"service": self.service,
"domain": self.domain,
}
all_rules_applied = True
for rule in self.rules:
if not rule.apply(product_dto, **params):
all_rules_applied = False
break

if all_rules_applied:
facebook_products.append(product_dto)

return facebook_products
Loading
Loading