From 2edded9ba5e83f1c25e8902c5cbd9368fcaea24e Mon Sep 17 00:00:00 2001 From: Eliton Jorge Date: Thu, 12 Dec 2024 17:04:15 -0300 Subject: [PATCH] feat: update webhook products in batch (#587) * feat: save updates products on redis queue * feat: update webhook products in batch * fix: adjust task name and data_processor method * feat: add time.sleep on dequeue and add more log infos --- marketplace/services/vtex/generic_service.py | 18 +- .../services/vtex/private/products/service.py | 17 ++ .../services/vtex/utils/data_processor.py | 264 ++++++++++++++---- marketplace/wpp_products/models.py | 9 +- marketplace/wpp_products/tasks.py | 138 ++++----- 5 files changed, 315 insertions(+), 131 deletions(-) diff --git a/marketplace/services/vtex/generic_service.py b/marketplace/services/vtex/generic_service.py index 9108825a..d119a5ba 100644 --- a/marketplace/services/vtex/generic_service.py +++ b/marketplace/services/vtex/generic_service.py @@ -170,10 +170,11 @@ def __init__( self, api_credentials: APICredentials, catalog: Catalog, - skus_ids: list, + skus_ids: list[str] = None, webhook: Optional[dict] = None, sellers_ids: list[str] = None, product_feed: Optional[ProductFeed] = None, + sellers_skus: list[str] = None, ): """ Service for processing product updates via VTEX webhooks. @@ -186,6 +187,7 @@ def __init__( self.app = self.catalog.app self.webhook = webhook self.sellers_ids = sellers_ids if sellers_ids else [] + self.sellers_skus = sellers_skus if sellers_skus else [] self.product_manager = ProductFacebookManager() def webhook_product_insert(self): @@ -235,26 +237,18 @@ def process_batch_sync(self): ) # Fetch product data - products_dto = pvt_service.update_webhook_product_info( + all_success = pvt_service.update_batch_webhook( domain=self.api_credentials.domain, - skus_ids=self.skus_ids, - seller_ids=self.sellers_ids, + sellers_skus=self.sellers_skus, catalog=self.catalog, ) - if not products_dto: - return None - - # Save product data for batch sync - all_success = self.product_manager.save_batch_product_data( - products_dto, self.catalog - ) if not all_success: raise Exception( f"Error saving batch products in database for Catalog: {self.catalog.facebook_catalog_id}" ) - return products_dto + return all_success def _get_sellers_ids(self, service): seller_id = extract_sellers_ids(self.webhook) diff --git a/marketplace/services/vtex/private/products/service.py b/marketplace/services/vtex/private/products/service.py index 48ec87a7..7d72ee66 100644 --- a/marketplace/services/vtex/private/products/service.py +++ b/marketplace/services/vtex/private/products/service.py @@ -171,6 +171,23 @@ def update_webhook_product_info( return updated_products_dto + def update_batch_webhook( + self, domain: str, sellers_skus: list, catalog: Catalog + ) -> List[FacebookProductDTO]: + config = catalog.vtex_app.config + rules = self._load_rules(config.get("rules", [])) + store_domain = config.get("store_domain") + updated_products_dto = self.webhook_data_processor.process_sellers_skus_batch( + service=self, + domain=domain, + store_domain=store_domain, + rules=rules, + catalog=catalog, + seller_sku_pairs=sellers_skus, + ) + + return updated_products_dto + def get_product_specification(self, product_id, domain): return self.client.get_product_specification(product_id, domain) diff --git a/marketplace/services/vtex/utils/data_processor.py b/marketplace/services/vtex/utils/data_processor.py index 3edd6e05..7165bdff 100644 --- a/marketplace/services/vtex/utils/data_processor.py +++ b/marketplace/services/vtex/utils/data_processor.py @@ -112,8 +112,9 @@ def process_product_data( upload_on_sync=False, ) -> List[FacebookProductDTO]: """ - Processes product data and saves batches to the database if upload_on_sync is True. + Process a batch of SKU IDs with optional active sellers using threads if the batch size is large """ + # Initialize configuration self.queue = Queue() self.results = [] self.active_sellers = active_sellers @@ -123,6 +124,7 @@ def process_product_data( self.rules = rules self.update_product = update_product self.invalid_products_count = 0 + self.valid_products_count = 0 self.catalog = catalog self.sku_validator = SKUValidator(service, domain, MockZeroShotClient()) self.upload_on_sync = upload_on_sync @@ -131,75 +133,114 @@ def process_product_data( self.use_sku_sellers = self.catalog.vtex_app.config.get( "use_sku_sellers", False ) + self.use_sync_v2 = self.vtex_app.config.get("use_sync_v2", False) - # Preparing the tqdm progress bar - print("Initiated process of product treatment:") + print("Initiated process of product treatment.") self.progress_bar = tqdm(total=len(skus_ids), desc="[✓:0, ✗:0]", ncols=0) - - # Initializing the queue with SKUs for sku_id in skus_ids: self.queue.put(sku_id) - if self.use_threads: - # Using threads for processing - with concurrent.futures.ThreadPoolExecutor( - max_workers=self.max_workers - ) as executor: - futures = [ - executor.submit(self.worker) for _ in range(self.max_workers) - ] - - # Waiting for all the workers to finish - for future in futures: - try: - future.result() - except Exception as e: - print(f"Error in thread execution: {str(e)}") - else: - # Processing without using threads - while not self.queue.empty(): - self.worker() - - self.progress_bar.close() + try: + # Process items in queue + if self.use_threads: + self._process_queue_with_threads() + else: + self._process_queue_without_threads() + finally: + # Close the progress bar after processing, even in case of errors + self.progress_bar.close() # Upload remaining items in the buffer if self.upload_on_sync and self.results: print(f"Uploading the last {len(self.results)} items to the database.") self._save_batch_to_database() - print(f"Processing completed. Total valid products: {len(self.results)}") + print( + f"Processing completed. Total valid products: {self.valid_products_count}" + ) return self.results + def _process_queue_with_threads(self): + """Helper method to process queue items with threads.""" + with concurrent.futures.ThreadPoolExecutor( + max_workers=self.max_workers + ) as executor: + futures = [executor.submit(self.worker) for _ in range(self.max_workers)] + for future in futures: + try: + future.result() + except Exception as e: + print(f"Error in thread execution: {str(e)}") + + def _process_queue_without_threads(self): + """Helper method to process queue items without threads.""" + while not self.queue.empty(): + self.worker() + def worker(self): + """ + Processes items from the queue. For v2 (use_sync_v2), processes `seller_id` and `sku_id`. + For v1, processes only `sku_id`. + """ while not self.queue.empty(): - sku_id = self.queue.get() try: - result = self.process_single_sku(sku_id) - except Exception as e: - print(f"Error processing SKU {sku_id}: {str(e)}") - result = [] - - with self.progress_lock: - if result: - self.results.extend(result) - self.progress_bar.set_description( - f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]" + # Extract item from the queue + item = self.queue.get() + + # Determine processing logic based on use_sync_v2 + if self.use_sync_v2: + seller_id, sku_id = self._parse_seller_sku(item) + processing_result = self.process_seller_sku( + seller_id=seller_id, sku_id=sku_id ) - - # Save batch to the database when reaching batch_size - if self.upload_on_sync and len(self.results) >= self.batch_size: - with self.save_lock: # Ensure that only one thread executes - print( - f"Batch size of {self.batch_size} reached. Saving to the database." - ) - self._save_batch_to_database() else: + processing_result = self.process_single_sku(item) + + # Handle the result and update the state + self._handle_processing_result(processing_result) + + except Exception as e: + print(f"Error processing item: {item}. Details: {str(e)}") + with self.progress_lock: self.invalid_products_count += 1 self.progress_bar.set_description( f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]" ) + self.progress_bar.update(1) - self.progress_bar.update(1) + def _parse_seller_sku(self, seller_sku): + """ + Parses a seller#sku string into seller_id and sku_id. + """ + try: + seller_id, sku_id = seller_sku.split("#") + return seller_id, sku_id + except ValueError: + raise ValueError(f"Invalid format for seller_sku: {seller_sku}") + + def _handle_processing_result(self, processing_result): + """ + Handles the processing result: updates results, progress, and performs batch saving if necessary. + """ + with self.progress_lock: + if processing_result: + self.valid_products_count += 1 + self.results.extend(processing_result) + self.progress_bar.set_description( + f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]" + ) + if self.upload_on_sync and len(self.results) >= self.batch_size: + with self.save_lock: + print( + f"Batch size of {self.batch_size} reached. Saving to the database." + ) + self._save_batch_to_database() + else: + self.invalid_products_count += 1 + self.progress_bar.set_description( + f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]" + ) + self.progress_bar.update(1) def process_single_sku(self, sku_id): """ @@ -341,3 +382,132 @@ def _save_batch_to_database(self): ) except Exception as e: print(f"Error while saving batch to the database: {e}") + + def process_sellers_skus_batch( + self, + service, + domain, + store_domain, + rules, + catalog, + seller_sku_pairs, + upload_on_sync=True, + ): + """ + Process a batch of seller and SKU pairs using threads if the batch size is large. + """ + # Initialize configuration + self.queue = Queue() + self.results = [] + self.invalid_products_count = 0 + self.valid_products_count = 0 + self.service = service + self.domain = domain + self.store_domain = store_domain + self.rules = rules + self.catalog = catalog + self.vtex_app = self.catalog.vtex_app + self.upload_on_sync = upload_on_sync + self.use_sync_v2 = self.vtex_app.config.get("use_sync_v2", False) + self.sku_validator = SKUValidator(service, domain, MockZeroShotClient()) + self.sent_to_db_count = 0 # Tracks the number of items sent to the database. + self.update_product = True + + # Populate the queue + initial_batch_count = len(seller_sku_pairs) + print("Initiated process of product treatment.") + self.progress_bar = tqdm(total=initial_batch_count, desc="[✓:0, ✗:0]", ncols=0) + for seller_sku in seller_sku_pairs: + self.queue.put(seller_sku) + + # Determine whether to use threads + use_threads = len(seller_sku_pairs) > 10 + + try: + # Process items in queue + if use_threads: + self._process_queue_with_threads() + else: + self._process_queue_without_threads() + finally: + # Close the progress bar after processing, even in case of errors + self.progress_bar.close() + + # Save remaining items in buffer + if self.results: + print(f"Uploading the last {len(self.results)} items to the database.") + self._save_batch_to_database() + + # Final log and return + print( + f"Processing completed. Total valid products: {self.valid_products_count}" + ) + if initial_batch_count > 0 and len(self.results) == 0: + print("All items processed successfully.") + return True + + print("Some errors occurred during processing.") + return self.results + + def process_seller_sku(self, seller_id, sku_id): + facebook_products = [] + try: + product_details = self.sku_validator.validate_product_details( + sku_id, self.catalog + ) + if not product_details: + return facebook_products + except CustomAPIException as e: + if e.status_code == 404: + print(f"SKU {sku_id} not found. Skipping...") + elif e.status_code == 500: + print(f"SKU {sku_id} returned status: {e.status_code}. Skipping...") + + print(f"An error {e} occurred on get_product_details. SKU: {sku_id}") + return [] + + is_active = product_details.get("IsActive") + if not is_active and not self.update_product: + return facebook_products + + if not seller_id: + print(f"No seller to sync for SKU {sku_id}. Skipping...") + return facebook_products + + # Perform the simulation for seller + try: + availability_result = self.service.simulate_cart_for_seller( + sku_id, seller_id, self.domain + ) + except CustomAPIException as e: + print( + f"Failed to simulate cart for SKU {sku_id} with seller {seller_id}: {e}" + ) + return facebook_products + + # Process simulation results + + if not availability_result["is_available"] and not self.update_product: + return facebook_products + + product_dto = DataProcessor.extract_fields( + self.store_domain, product_details, availability_result + ) + if not self._validate_product_dto(product_dto): + return facebook_products + + params = { + "seller_id": seller_id, + "service": self.service, + "domain": self.domain, + } + all_rules_applied = True + for rule in self.rules: + if not rule.apply(product_dto, **params): + all_rules_applied = False + break + + if all_rules_applied: + facebook_products.append(product_dto) + + return facebook_products diff --git a/marketplace/wpp_products/models.py b/marketplace/wpp_products/models.py index 51c69671..d1b857e6 100644 --- a/marketplace/wpp_products/models.py +++ b/marketplace/wpp_products/models.py @@ -162,10 +162,11 @@ def remove_duplicates(cls, catalog: Catalog) -> None: .filter(count__gt=1) ) - for duplicate in duplicates: + if duplicates: print( - f"Found duplicate entries for facebook_product_id: {duplicate['facebook_product_id']}" + f"Found duplicate entries for catalog : {catalog.name}, Deleted all but the most recent records" ) + for duplicate in duplicates: # Get all records for the duplicate facebook_product_id, ordered by most recent first duplicate_records = cls.objects.filter( facebook_product_id=duplicate["facebook_product_id"], @@ -181,10 +182,6 @@ def remove_duplicates(cls, catalog: Catalog) -> None: catalog=catalog, ).exclude(id=most_recent_record.id).delete() - print( - f"Deleted all but the most recent record with ID {most_recent_record.id}" - ) - @classmethod def get_latest_products( cls, catalog: Catalog, status: str = "pending", batch_size: Optional[int] = None diff --git a/marketplace/wpp_products/tasks.py b/marketplace/wpp_products/tasks.py index 64db4fa8..35941ef6 100644 --- a/marketplace/wpp_products/tasks.py +++ b/marketplace/wpp_products/tasks.py @@ -1,5 +1,7 @@ import logging +import time + from datetime import datetime, timedelta from celery import shared_task @@ -275,52 +277,6 @@ def task_update_vtex_products(**kwargs): print("=" * 40) -@celery_app.task(name="task_forward_vtex_webhook") -def task_forward_vtex_webhook(**kwargs): # TODO: removes this method not in use - """ - Forwards the VTEX webhook to the appropriate task based on the app configuration. - """ - app_uuid = kwargs.get("app_uuid") - webhook = kwargs.get("webhook") - - try: - app = App.objects.get(uuid=app_uuid, configured=True, code="vtex") - except App.DoesNotExist: - logger.info(f"No VTEX App configured with the provided UUID: {app_uuid}") - return - - # Check if the initial sync has been completed - if not app.config.get("initial_sync_completed", False): - logger.info(f"Initial sync not completed for App: {app_uuid}. Task skipped.") - return - - # Get the SKU ID from the webhook - sku_id = webhook.get("IdSku") - if not sku_id: - raise ValueError(f"SKU ID is missing in the webhook for App: {app_uuid}") - - # Check if the app uses the new batch sync - use_sync_v2 = app.config.get("use_sync_v2", False) - celery_queue = app.config.get("celery_queue_name", "product_synchronization") - - if use_sync_v2: - logger.info(f"App {app_uuid} uses Sync v2. Forwarding to batch update task.") - celery_app.send_task( - "task_update_batch_products", - kwargs={"app_vtex_uuid": app_uuid}, - queue=celery_queue, - ignore_result=True, - ) - else: - logger.info(f"App {app_uuid} uses legacy sync. Forwarding to update task.") - celery_app.send_task( - "task_update_vtex_products", - kwargs={"app_uuid": app_uuid, "webhook": webhook}, - queue=celery_queue, - ignore_result=True, - ) - - @celery_app.task(name="task_upload_vtex_products") def task_upload_vtex_products(**kwargs): app_vtex_uuid = kwargs.get("app_vtex_uuid") @@ -653,9 +609,9 @@ def task_enqueue_webhook(app_uuid: str, seller: str, sku_id: str): @celery_app.task(name="task_dequeue_webhooks") -def task_dequeue_webhooks(app_uuid: str, celery_queue: str): +def task_dequeue_webhooks(app_uuid: str, celery_queue: str, batch_size: int = 5000): """ - Dequeues webhooks from Redis and dispatches them to `task_update_batch_products`. + Dequeues webhooks from Redis and dispatches them in batches. """ queue_key = f"webhook_queue:{app_uuid}" queue = RedisQueue(queue_key) @@ -670,34 +626,84 @@ def task_dequeue_webhooks(app_uuid: str, celery_queue: str): return try: - print( + logger.info( f"Starting dequeue process for App: {app_uuid}. Total items: {queue.length()}" ) - while True: - # Renew the lock to ensure we don't lose it while processing - redis.expire(lock_key, lock_ttl_seconds) + while queue.length() > 0: + redis.expire(lock_key, lock_ttl_seconds) # Renew lock - item = queue.remove() # Fetch the next item from the queue - if not item: - break # Exit when the queue is empty + # Get batch of items + batch = queue.get_batch(batch_size) + if not batch: + print(f"No items to process for App: {app_uuid}. Stopping dequeue.") + break - seller, sku_id = item.split("#") celery_app.send_task( - "task_update_batch_products", - kwargs={ - "app_uuid": app_uuid, - "seller": seller, - "sku_id": sku_id, - }, + "task_update_webhook_batch_products", + kwargs={"app_uuid": app_uuid, "batch": batch}, queue=celery_queue, ignore_result=True, ) - print({"app_uuid": app_uuid, "seller": seller, "sku_id": sku_id}) - logger.info(f"Dispatched task for Seller: {seller}, SKU: {sku_id}") - + logger.info(f"Dispatched batch of {len(batch)} items for App: {app_uuid}.") + print( + f"Wait for 5 seconds before the next batch processing for app : {app_uuid}" + ) + time.sleep(5) except Exception as e: logger.error(f"Error during dequeue process for App: {app_uuid}, {e}") finally: - # Release the lock at the end of the task + print( + f"Dequeue process completed for App: {app_uuid}. Removing lock key: {lock_key}" + ) redis.delete(lock_key) + + +@celery_app.task(name="task_update_webhook_batch_products") +def task_update_webhook_batch_products(app_uuid: str, batch: list): + """ + Processes product updates in batches for a VTEX app. + """ + start_time = datetime.now() + vtex_base_service = VtexServiceBase() + + try: + logger.info(f"Processing batch of {len(batch)} items for App: {app_uuid}.") + + # Fetch app configuration + cache_key = f"app_cache_{app_uuid}" + vtex_app = cache.get(cache_key) + if not vtex_app: + vtex_app = App.objects.get(uuid=app_uuid, configured=True, code="vtex") + cache.set(cache_key, vtex_app, timeout=300) + + if not vtex_app.config.get("initial_sync_completed", False): + logger.info(f"Initial sync not completed for App: {app_uuid}. Task ending.") + return + + # Get VTEX credentials + api_credentials = vtex_base_service.get_vtex_credentials_or_raise(vtex_app) + catalog = vtex_app.vtex_catalogs.first() + if not catalog: + logger.info(f"No catalog found for VTEX app: {vtex_app.uuid}") + return + + # Initialize ProductUpdateService + vtex_update_service = ProductUpdateService( + api_credentials=api_credentials, catalog=catalog, sellers_skus=batch + ) + + success = vtex_update_service.process_batch_sync() + if not success: + logger.info(f"Fail to process batch for App: {app_uuid}.") + return + + except Exception as e: + logger.error(f"Error during batch processing for App: {app_uuid}, {e}") + + finally: + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + logger.info( + f"Finished processing batch for App: {app_uuid}. Duration: {duration:.2f} seconds." + )