From a79ae50585d00ca6dd84f5f746e675a510415c15 Mon Sep 17 00:00:00 2001 From: elitonzky Date: Wed, 11 Dec 2024 17:35:48 -0300 Subject: [PATCH 1/4] feat: save updates products on redis queue --- marketplace/services/vtex/generic_service.py | 8 +- .../services/vtex/utils/data_processor.py | 2 +- marketplace/wpp_products/tasks.py | 147 ++++++++++++++---- marketplace/wpp_products/utils.py | 45 ++++++ 4 files changed, 167 insertions(+), 35 deletions(-) diff --git a/marketplace/services/vtex/generic_service.py b/marketplace/services/vtex/generic_service.py index 2c9ea29e..8a7e3489 100644 --- a/marketplace/services/vtex/generic_service.py +++ b/marketplace/services/vtex/generic_service.py @@ -170,8 +170,10 @@ def __init__( api_credentials: APICredentials, catalog: Catalog, skus_ids: list, - webhook: dict, + webhook: Optional[dict] = None, + sellers_ids: list[str] = None, product_feed: Optional[ProductFeed] = None, + ): """ Service for processing product updates via VTEX webhooks. @@ -183,6 +185,7 @@ def __init__( self.product_feed = product_feed self.app = self.catalog.app self.webhook = webhook + self.sellers_ids = sellers_ids if sellers_ids else [] self.product_manager = ProductFacebookManager() def webhook_product_insert(self): @@ -230,13 +233,12 @@ def process_batch_sync(self): pvt_service = self.get_private_service( self.api_credentials.app_key, self.api_credentials.app_token ) - seller_ids = self._get_sellers_ids(pvt_service) # Fetch product data products_dto = pvt_service.update_webhook_product_info( domain=self.api_credentials.domain, skus_ids=self.skus_ids, - seller_ids=seller_ids, + seller_ids=self.sellers_ids, catalog=self.catalog, ) if not products_dto: diff --git a/marketplace/services/vtex/utils/data_processor.py b/marketplace/services/vtex/utils/data_processor.py index 2090fe8c..3edd6e05 100644 --- a/marketplace/services/vtex/utils/data_processor.py +++ b/marketplace/services/vtex/utils/data_processor.py @@ -227,7 +227,7 @@ def process_single_sku(self, sku_id): # Define the sellers to be synchronized sellers_to_sync = [] - if self.use_sku_sellers: + if self.use_sku_sellers and not self.update_product: sku_sellers = product_details.get("SkuSellers") for seller in sku_sellers: seller_id = seller.get("SellerId") diff --git a/marketplace/wpp_products/tasks.py b/marketplace/wpp_products/tasks.py index 790acf72..8fff860a 100644 --- a/marketplace/wpp_products/tasks.py +++ b/marketplace/wpp_products/tasks.py @@ -4,6 +4,7 @@ from celery import shared_task +from django_redis import get_redis_connection from django.db import reset_queries, close_old_connections from django.db.models import Exists, OuterRef from django.core.cache import cache @@ -34,6 +35,7 @@ from marketplace.wpp_products.utils import ( ProductBatchUploader, ProductUploader, + RedisQueue, SellerSyncUtils, UploadManager, ProductSyncMetaPolices, @@ -425,10 +427,24 @@ def send_sync(app_uuid: str, webhook: dict): celery_queue = app.config.get("celery_queue_name", "product_synchronization") if use_sync_v2: - logger.info(f"App {app_uuid} uses Sync v2. Forwarding to batch update task.") + logger.info(f"App {app_uuid} uses Sync v2. Enqueuing for batch update.") + + # Extract seller_id from webhook + seller_id = _extract_sellers_ids(webhook) + if not seller_id: + raise ValueError(f"Seller ID not found in webhook. App:{str(app.uuid)}") + + # Enqueue the seller and SKU in the task_enqueue_webhook celery_app.send_task( - "task_update_batch_products", - kwargs={"app_uuid": app_uuid, "webhook": webhook}, + "task_enqueue_webhook", + kwargs={"app_uuid": app_uuid, "seller": seller_id, "sku_id": sku_id}, + queue=celery_queue, + ignore_result=True, + ) + # Dequeue + celery_app.send_task( + "task_dequeue_webhooks", + kwargs={"app_uuid": app_uuid, "celery_queue": celery_queue}, queue=celery_queue, ignore_result=True, ) @@ -544,39 +560,34 @@ def task_sync_product_policies(): @celery_app.task(name="task_update_batch_products") -def task_update_batch_products(**kwargs): +def task_update_batch_products(app_uuid: str, seller: str, sku_id: str): """ - Processes product updates for a VTEX app based on a webhook. + Processes product updates for a VTEX app based on a seller and SKU. """ start_time = datetime.now() vtex_base_service = VtexServiceBase() - app_uuid = kwargs.get("app_uuid") - webhook = kwargs.get("webhook") - - sku_id = webhook.get("IdSku") - seller_an = webhook.get("An") - seller_chain = webhook.get("SellerChain") - try: logger.info( - f"Processing product update for App UUID: {app_uuid}, SKU_ID: {sku_id}. " - f"'An': {seller_an}, 'SellerChain': {seller_chain}." + f"Processing product update for App UUID: {app_uuid}, SKU_ID: {sku_id}, Seller: {seller}." ) + # Fetch app configuration from cache or database cache_key = f"app_cache_{app_uuid}" vtex_app = cache.get(cache_key) if not vtex_app: vtex_app = App.objects.get(uuid=app_uuid, configured=True, code="vtex") cache.set(cache_key, vtex_app, timeout=300) - # Ensure the app is configured for synchronization + # Ensure synchronization is enabled for the app if not vtex_app.config.get("initial_sync_completed", False): logger.info(f"Initial sync not completed for App: {app_uuid}. Task ending.") return + # Get VTEX credentials api_credentials = vtex_base_service.get_vtex_credentials_or_raise(vtex_app) + # Fetch catalog catalog = vtex_app.vtex_catalogs.first() if not catalog: logger.info(f"No catalog found for VTEX app: {vtex_app.uuid}") @@ -587,35 +598,109 @@ def task_update_batch_products(**kwargs): api_credentials=api_credentials, catalog=catalog, skus_ids=[sku_id], - webhook=webhook, + sellers_ids=[seller], ) - # Process products + # Process product updates in batch products = vtex_update_service.process_batch_sync() if products is None: logger.info( - f"No products to process for VTEX app: {app_uuid}. Task ending." + f"No products to process for App: {app_uuid}, SKU: {sku_id}. Task ending." ) return - # Log webhook + # Log webhook after successful processing close_old_connections() - WebhookLog.objects.create(sku_id=sku_id, data=webhook, vtex_app=vtex_app) + WebhookLog.objects.create( + sku_id=sku_id, data={"IdSku": sku_id, "An": seller}, vtex_app=vtex_app + ) except Exception as e: logger.error( - f"An error occurred during the updating Webhook VTEX products for App: {app_uuid}, {e}" + f"An error occurred during the processing of SKU: {sku_id} for App: {app_uuid}. Error: {e}" ) - # Log task duration - end_time = datetime.now() - duration = (end_time - start_time).total_seconds() - minutes, seconds = divmod(duration, 60) + finally: + # Log task duration + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + minutes, seconds = divmod(duration, 60) - logger.info( - f"Finished processing update for SKU: {sku_id}, App: {app_uuid}. " - f"Task completed in {int(minutes)} minutes and {int(seconds)} seconds." - ) + logger.info( + f"Finished processing update for SKU: {sku_id}, App: {app_uuid}. " + f"Task completed in {int(minutes)} minutes and {int(seconds)} seconds." + ) - # Start upload task - UploadManager.check_and_start_upload(app_uuid) + # Start upload task + UploadManager.check_and_start_upload(app_uuid) + + +@celery_app.task(name="task_enqueue_webhook") +def task_enqueue_webhook(app_uuid: str, seller: str, sku_id: str): + """ + Enqueues the seller and SKU in Redis for batch processing. + """ + try: + queue = RedisQueue(f"webhook_queue:{app_uuid}") + value = f"{seller}#{sku_id}" + + # Added to queue if it doesn't exist + queue.insert(value) + + print( + f"Webhook enqueued for App: {app_uuid}, Item: {value}, Total Enqueue: {queue.length()}" + ) + except Exception as e: + logger.error(f"Failed to enqueue webhook for App: {app_uuid}, {e}") + + +@celery_app.task(name="task_dequeue_webhooks") +def task_dequeue_webhooks(app_uuid: str, celery_queue: str): + """ + Dequeues webhooks from Redis and dispatches them to `task_update_batch_products`. + """ + queue_key = f"webhook_queue:{app_uuid}" + queue = RedisQueue(queue_key) + lock_key = f"lock:{queue_key}" + redis = queue.redis + + lock_ttl_seconds = 60 * 5 # Lock expires in 5 minutes + + # Attempt to acquire the lock + if not redis.set(lock_key, "locked", nx=True, ex=lock_ttl_seconds): + logger.info(f"Task already running for App: {app_uuid}. Skipping dequeue.") + return + + try: + print( + f"Starting dequeue process for App: {app_uuid}. Total items: {queue.length()}" + ) + import time + + while True: + # Renew the lock to ensure we don't lose it while processing + redis.expire(lock_key, lock_ttl_seconds) + + item = queue.remove() # Fetch the next item from the queue + if not item: + break # Exit when the queue is empty + + seller, sku_id = item.split("#") + celery_app.send_task( + "task_update_batch_products", + kwargs={ + "app_uuid": app_uuid, + "seller": seller, + "sku_id": sku_id, + }, + queue=celery_queue, + ignore_result=True, + ) + print({"app_uuid": app_uuid, "seller": seller, "sku_id": sku_id}) + logger.info(f"Dispatched task for Seller: {seller}, SKU: {sku_id}") + + except Exception as e: + logger.error(f"Error during dequeue process for App: {app_uuid}, {e}") + finally: + # Release the lock at the end of the task + redis.delete(lock_key) diff --git a/marketplace/wpp_products/utils.py b/marketplace/wpp_products/utils.py index 885c02b0..3b113df0 100644 --- a/marketplace/wpp_products/utils.py +++ b/marketplace/wpp_products/utils.py @@ -1,6 +1,7 @@ import io import logging import json +import time from typing import List, Dict, Any @@ -547,3 +548,47 @@ def log_sent_products(self, product_ids: List[str]): sku_id=sku_id, vtex_app=self.catalog.vtex_app ) print(f"Logged {len(product_ids)} products as sent.") + + +class RedisQueue: + def __init__(self, queue_key): + self.queue_key = queue_key + self.redis = get_redis_connection() + + def insert(self, value): + """Add an item to the ZSET queue with a timestamp score.""" + # Check if the item already exists + if self.redis.zscore(self.queue_key, value) is not None: + print(value, "already exists") + return False # Skip insertion if it exists + + # Add the item with the current timestamp as the score + score = time.time() + self.redis.zadd(self.queue_key, {value: score}) + self.redis.expire(self.queue_key, 3600 * 24) # TTL of 24 hours + return True + + def remove(self): + """Remove and return the first item from the queue (FIFO).""" + items = self.redis.zrange( + self.queue_key, 0, 0, withscores=False + ) # Get the first item + if not items: + return None + self.redis.zrem(self.queue_key, items[0]) # Remove the first item + return items[0].decode("utf-8") + + def order(self): + """List all items in the queue in order.""" + items = self.redis.zrange(self.queue_key, 0, -1, withscores=False) + return [item.decode("utf-8") for item in items] + + def length(self): + """Returns the total number of items in the queue.""" + return self.redis.zcard(self.queue_key) + + def get_batch(self, batch_size): + items = self.redis.zrange(self.queue_key, 0, batch_size - 1, withscores=False) + if items: + self.redis.zrem(self.queue_key, *items) + return [item.decode("utf-8") for item in items] From bcf57d544430554ff96ffe0560f2351c0eefd1a1 Mon Sep 17 00:00:00 2001 From: elitonzky Date: Wed, 11 Dec 2024 19:46:53 -0300 Subject: [PATCH 2/4] feat: update webhook products in batch --- marketplace/services/vtex/generic_service.py | 19 +- .../services/vtex/private/products/service.py | 17 ++ .../services/vtex/utils/data_processor.py | 262 ++++++++++++++---- marketplace/wpp_products/tasks.py | 84 ++++-- 4 files changed, 300 insertions(+), 82 deletions(-) diff --git a/marketplace/services/vtex/generic_service.py b/marketplace/services/vtex/generic_service.py index 8a7e3489..bb409455 100644 --- a/marketplace/services/vtex/generic_service.py +++ b/marketplace/services/vtex/generic_service.py @@ -169,11 +169,11 @@ def __init__( self, api_credentials: APICredentials, catalog: Catalog, - skus_ids: list, + skus_ids: list[str] = None, webhook: Optional[dict] = None, sellers_ids: list[str] = None, product_feed: Optional[ProductFeed] = None, - + sellers_skus: list[str] = None, ): """ Service for processing product updates via VTEX webhooks. @@ -186,6 +186,7 @@ def __init__( self.app = self.catalog.app self.webhook = webhook self.sellers_ids = sellers_ids if sellers_ids else [] + self.sellers_skus = sellers_skus if sellers_skus else [] self.product_manager = ProductFacebookManager() def webhook_product_insert(self): @@ -235,26 +236,18 @@ def process_batch_sync(self): ) # Fetch product data - products_dto = pvt_service.update_webhook_product_info( + all_success = pvt_service.update_batch_webhook( domain=self.api_credentials.domain, - skus_ids=self.skus_ids, - seller_ids=self.sellers_ids, + sellers_skus=self.sellers_skus, catalog=self.catalog, ) - if not products_dto: - return None - - # Save product data for batch sync - all_success = self.product_manager.save_batch_product_data( - products_dto, self.catalog - ) if not all_success: raise Exception( f"Error saving batch products in database for Catalog: {self.catalog.facebook_catalog_id}" ) - return products_dto + return all_success def _get_sellers_ids(self, service): seller_id = extract_sellers_ids(self.webhook) diff --git a/marketplace/services/vtex/private/products/service.py b/marketplace/services/vtex/private/products/service.py index 48ec87a7..7d72ee66 100644 --- a/marketplace/services/vtex/private/products/service.py +++ b/marketplace/services/vtex/private/products/service.py @@ -171,6 +171,23 @@ def update_webhook_product_info( return updated_products_dto + def update_batch_webhook( + self, domain: str, sellers_skus: list, catalog: Catalog + ) -> List[FacebookProductDTO]: + config = catalog.vtex_app.config + rules = self._load_rules(config.get("rules", [])) + store_domain = config.get("store_domain") + updated_products_dto = self.webhook_data_processor.process_sellers_skus_batch( + service=self, + domain=domain, + store_domain=store_domain, + rules=rules, + catalog=catalog, + seller_sku_pairs=sellers_skus, + ) + + return updated_products_dto + def get_product_specification(self, product_id, domain): return self.client.get_product_specification(product_id, domain) diff --git a/marketplace/services/vtex/utils/data_processor.py b/marketplace/services/vtex/utils/data_processor.py index 3edd6e05..ecbcd4da 100644 --- a/marketplace/services/vtex/utils/data_processor.py +++ b/marketplace/services/vtex/utils/data_processor.py @@ -112,8 +112,9 @@ def process_product_data( upload_on_sync=False, ) -> List[FacebookProductDTO]: """ - Processes product data and saves batches to the database if upload_on_sync is True. + Process a batch of SKU IDs with optional active sellers using threads if the batch size is large """ + # Initialize configuration self.queue = Queue() self.results = [] self.active_sellers = active_sellers @@ -132,74 +133,112 @@ def process_product_data( "use_sku_sellers", False ) - # Preparing the tqdm progress bar - print("Initiated process of product treatment:") + print("Initiated process of product treatment.") self.progress_bar = tqdm(total=len(skus_ids), desc="[✓:0, ✗:0]", ncols=0) - - # Initializing the queue with SKUs for sku_id in skus_ids: self.queue.put(sku_id) - if self.use_threads: - # Using threads for processing - with concurrent.futures.ThreadPoolExecutor( - max_workers=self.max_workers - ) as executor: - futures = [ - executor.submit(self.worker) for _ in range(self.max_workers) - ] - - # Waiting for all the workers to finish - for future in futures: - try: - future.result() - except Exception as e: - print(f"Error in thread execution: {str(e)}") - else: - # Processing without using threads - while not self.queue.empty(): - self.worker() - - self.progress_bar.close() + try: + # Process items in queue + if self.use_threads: + self._process_queue_with_threads() + else: + self._process_queue_without_threads() + finally: + # Close the progress bar after processing, even in case of errors + self.progress_bar.close() # Upload remaining items in the buffer if self.upload_on_sync and self.results: print(f"Uploading the last {len(self.results)} items to the database.") self._save_batch_to_database() - print(f"Processing completed. Total valid products: {len(self.results)}") + print( + f"Processing completed. Total valid products: {self.valid_products_count}" + ) return self.results + def _process_queue_with_threads(self): + """Helper method to process queue items with threads.""" + with concurrent.futures.ThreadPoolExecutor( + max_workers=self.max_workers + ) as executor: + futures = [executor.submit(self.worker) for _ in range(self.max_workers)] + for future in futures: + try: + future.result() + except Exception as e: + print(f"Error in thread execution: {str(e)}") + + def _process_queue_without_threads(self): + """Helper method to process queue items without threads.""" + while not self.queue.empty(): + self.worker() + def worker(self): + """ + Processes items from the queue. For v2 (use_sync_v2), processes `seller_id` and `sku_id`. + For v1, processes only `sku_id`. + """ while not self.queue.empty(): - sku_id = self.queue.get() try: - result = self.process_single_sku(sku_id) - except Exception as e: - print(f"Error processing SKU {sku_id}: {str(e)}") - result = [] - - with self.progress_lock: - if result: - self.results.extend(result) - self.progress_bar.set_description( - f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]" + # Extract item from the queue + item = self.queue.get() + + # Determine processing logic based on use_sync_v2 + if self.use_sync_v2: + seller_id, sku_id = self._parse_seller_sku(item) + processing_result = self.process_seller_sku( + seller_id=seller_id, sku_id=sku_id ) - - # Save batch to the database when reaching batch_size - if self.upload_on_sync and len(self.results) >= self.batch_size: - with self.save_lock: # Ensure that only one thread executes - print( - f"Batch size of {self.batch_size} reached. Saving to the database." - ) - self._save_batch_to_database() else: + processing_result = self.process_single_sku(item) + + # Handle the result and update the state + self._handle_processing_result(processing_result) + + except Exception as e: + print(f"Error processing item: {item}. Details: {str(e)}") + with self.progress_lock: self.invalid_products_count += 1 self.progress_bar.set_description( f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]" ) + self.progress_bar.update(1) - self.progress_bar.update(1) + def _parse_seller_sku(self, seller_sku): + """ + Parses a seller#sku string into seller_id and sku_id. + """ + try: + seller_id, sku_id = seller_sku.split("#") + return seller_id, sku_id + except ValueError: + raise ValueError(f"Invalid format for seller_sku: {seller_sku}") + + def _handle_processing_result(self, processing_result): + """ + Handles the processing result: updates results, progress, and performs batch saving if necessary. + """ + with self.progress_lock: + if processing_result: + self.valid_products_count += 1 + self.results.extend(processing_result) + self.progress_bar.set_description( + f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]" + ) + if self.upload_on_sync and len(self.results) >= self.batch_size: + with self.save_lock: + print( + f"Batch size of {self.batch_size} reached. Saving to the database." + ) + self._save_batch_to_database() + else: + self.invalid_products_count += 1 + self.progress_bar.set_description( + f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]" + ) + self.progress_bar.update(1) def process_single_sku(self, sku_id): """ @@ -341,3 +380,132 @@ def _save_batch_to_database(self): ) except Exception as e: print(f"Error while saving batch to the database: {e}") + + def process_sellers_skus_batch( + self, + service, + domain, + store_domain, + rules, + catalog, + seller_sku_pairs, + upload_on_sync=True, + ): + """ + Process a batch of seller and SKU pairs using threads if the batch size is large. + """ + # Initialize configuration + self.queue = Queue() + self.results = [] + self.invalid_products_count = 0 + self.valid_products_count = 0 + self.service = service + self.domain = domain + self.store_domain = store_domain + self.rules = rules + self.catalog = catalog + self.vtex_app = self.catalog.vtex_app + self.upload_on_sync = upload_on_sync + self.use_sync_v2 = self.vtex_app.config.get("use_sync_v2", False) + self.sku_validator = SKUValidator(service, domain, MockZeroShotClient()) + self.sent_to_db_count = 0 # Tracks the number of items sent to the database. + self.update_product = True + + # Populate the queue + initial_batch_count = len(seller_sku_pairs) + print("Initiated process of product treatment.") + self.progress_bar = tqdm(total=initial_batch_count, desc="[✓:0, ✗:0]", ncols=0) + for seller_sku in seller_sku_pairs: + self.queue.put(seller_sku) + + # Determine whether to use threads + use_threads = len(seller_sku_pairs) > 2 + + try: + # Process items in queue + if use_threads: + self._process_queue_with_threads() + else: + self._process_queue_without_threads() + finally: + # Close the progress bar after processing, even in case of errors + self.progress_bar.close() + + # Save remaining items in buffer + if self.results: + print(f"Uploading the last {len(self.results)} items to the database.") + self._save_batch_to_database() + + # Final log and return + print( + f"Processing completed. Total valid products: {self.valid_products_count}" + ) + if initial_batch_count > 0 and len(self.results) == 0: + print("All items processed successfully.") + return True + + print("Some errors occurred during processing.") + return self.results + + def process_seller_sku(self, seller_id, sku_id): + facebook_products = [] + try: + product_details = self.sku_validator.validate_product_details( + sku_id, self.catalog + ) + if not product_details: + return facebook_products + except CustomAPIException as e: + if e.status_code == 404: + print(f"SKU {sku_id} not found. Skipping...") + elif e.status_code == 500: + print(f"SKU {sku_id} returned status: {e.status_code}. Skipping...") + + print(f"An error {e} occurred on get_product_details. SKU: {sku_id}") + return [] + + is_active = product_details.get("IsActive") + if not is_active and not self.update_product: + return facebook_products + + if not seller_id: + print(f"No seller to sync for SKU {sku_id}. Skipping...") + return facebook_products + + # Perform the simulation for seller + try: + availability_result = self.service.simulate_cart_for_seller( + sku_id, seller_id, self.domain + ) + except CustomAPIException as e: + print( + f"Failed to simulate cart for SKU {sku_id} with seller {seller_id}: {e}" + ) + return facebook_products + + # Process simulation results + + if not availability_result["is_available"] and not self.update_product: + return facebook_products + + product_dto = DataProcessor.extract_fields( + self.store_domain, product_details, availability_result + ) + if not self._validate_product_dto(product_dto): + return facebook_products + + params = { + "seller_id": seller_id, + "service": self.service, + "domain": self.domain, + } + all_rules_applied = True + for rule in self.rules: + if not rule.apply(product_dto, **params): + all_rules_applied = False + break + + if all_rules_applied: + facebook_products.append(product_dto) + + return facebook_products diff --git a/marketplace/wpp_products/tasks.py b/marketplace/wpp_products/tasks.py index 8fff860a..c380ea92 100644 --- a/marketplace/wpp_products/tasks.py +++ b/marketplace/wpp_products/tasks.py @@ -30,8 +30,6 @@ from marketplace.services.vtex.generic_service import APICredentials from marketplace.applications.models import App -from django_redis import get_redis_connection - from marketplace.wpp_products.utils import ( ProductBatchUploader, ProductUploader, @@ -655,9 +653,9 @@ def task_enqueue_webhook(app_uuid: str, seller: str, sku_id: str): @celery_app.task(name="task_dequeue_webhooks") -def task_dequeue_webhooks(app_uuid: str, celery_queue: str): +def task_dequeue_webhooks(app_uuid: str, celery_queue: str, batch_size: int = 5000): """ - Dequeues webhooks from Redis and dispatches them to `task_update_batch_products`. + Dequeues webhooks from Redis and dispatches them in batches. """ queue_key = f"webhook_queue:{app_uuid}" queue = RedisQueue(queue_key) @@ -672,35 +670,77 @@ def task_dequeue_webhooks(app_uuid: str, celery_queue: str): return try: - print( + logger.info( f"Starting dequeue process for App: {app_uuid}. Total items: {queue.length()}" ) - import time - while True: - # Renew the lock to ensure we don't lose it while processing - redis.expire(lock_key, lock_ttl_seconds) + while queue.length() > 0: + redis.expire(lock_key, lock_ttl_seconds) # Renew lock - item = queue.remove() # Fetch the next item from the queue - if not item: - break # Exit when the queue is empty + # Get batch of items + batch = queue.get_batch(batch_size) + if not batch: + break - seller, sku_id = item.split("#") celery_app.send_task( - "task_update_batch_products", - kwargs={ - "app_uuid": app_uuid, - "seller": seller, - "sku_id": sku_id, - }, + "task_update_webhook_batch_products", + kwargs={"app_uuid": app_uuid, "batch": batch}, queue=celery_queue, ignore_result=True, ) - print({"app_uuid": app_uuid, "seller": seller, "sku_id": sku_id}) - logger.info(f"Dispatched task for Seller: {seller}, SKU: {sku_id}") + logger.info(f"Dispatched batch of {len(batch)} items for App: {app_uuid}.") except Exception as e: logger.error(f"Error during dequeue process for App: {app_uuid}, {e}") finally: - # Release the lock at the end of the task redis.delete(lock_key) + + +@celery_app.task(name="task_update_batch_products") +def task_update_webhook_batch_products(app_uuid: str, batch: list): + """ + Processes product updates in batches for a VTEX app. + """ + start_time = datetime.now() + vtex_base_service = VtexServiceBase() + + try: + logger.info(f"Processing batch of {len(batch)} items for App: {app_uuid}.") + + # Fetch app configuration + cache_key = f"app_cache_{app_uuid}" + vtex_app = cache.get(cache_key) + if not vtex_app: + vtex_app = App.objects.get(uuid=app_uuid, configured=True, code="vtex") + cache.set(cache_key, vtex_app, timeout=300) + + if not vtex_app.config.get("initial_sync_completed", False): + logger.info(f"Initial sync not completed for App: {app_uuid}. Task ending.") + return + + # Get VTEX credentials + api_credentials = vtex_base_service.get_vtex_credentials_or_raise(vtex_app) + catalog = vtex_app.vtex_catalogs.first() + if not catalog: + logger.info(f"No catalog found for VTEX app: {vtex_app.uuid}") + return + + # Initialize ProductUpdateService + vtex_update_service = ProductUpdateService( + api_credentials=api_credentials, catalog=catalog, sellers_skus=batch + ) + + success = vtex_update_service.process_batch_sync() + if not success: + logger.info(f"Fail to process batch for App: {app_uuid}.") + return + + except Exception as e: + logger.error(f"Error during batch processing for App: {app_uuid}, {e}") + + finally: + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + logger.info( + f"Finished processing batch for App: {app_uuid}. Duration: {duration:.2f} seconds." + ) From 8cc6ce34ae90ab6707372f22d655ff6278351ff5 Mon Sep 17 00:00:00 2001 From: elitonzky Date: Thu, 12 Dec 2024 11:45:14 -0300 Subject: [PATCH 3/4] fix: adjust task name and data_processor method --- .../services/vtex/utils/data_processor.py | 4 +- marketplace/wpp_products/tasks.py | 48 +------------------ 2 files changed, 4 insertions(+), 48 deletions(-) diff --git a/marketplace/services/vtex/utils/data_processor.py b/marketplace/services/vtex/utils/data_processor.py index ecbcd4da..7165bdff 100644 --- a/marketplace/services/vtex/utils/data_processor.py +++ b/marketplace/services/vtex/utils/data_processor.py @@ -124,6 +124,7 @@ def process_product_data( self.rules = rules self.update_product = update_product self.invalid_products_count = 0 + self.valid_products_count = 0 self.catalog = catalog self.sku_validator = SKUValidator(service, domain, MockZeroShotClient()) self.upload_on_sync = upload_on_sync @@ -132,6 +133,7 @@ def process_product_data( self.use_sku_sellers = self.catalog.vtex_app.config.get( "use_sku_sellers", False ) + self.use_sync_v2 = self.vtex_app.config.get("use_sync_v2", False) print("Initiated process of product treatment.") self.progress_bar = tqdm(total=len(skus_ids), desc="[✓:0, ✗:0]", ncols=0) @@ -419,7 +421,7 @@ def process_sellers_skus_batch( self.queue.put(seller_sku) # Determine whether to use threads - use_threads = len(seller_sku_pairs) > 2 + use_threads = len(seller_sku_pairs) > 10 try: # Process items in queue diff --git a/marketplace/wpp_products/tasks.py b/marketplace/wpp_products/tasks.py index c380ea92..beba9ccb 100644 --- a/marketplace/wpp_products/tasks.py +++ b/marketplace/wpp_products/tasks.py @@ -275,52 +275,6 @@ def task_update_vtex_products(**kwargs): print("=" * 40) -@celery_app.task(name="task_forward_vtex_webhook") -def task_forward_vtex_webhook(**kwargs): # TODO: removes this method not in use - """ - Forwards the VTEX webhook to the appropriate task based on the app configuration. - """ - app_uuid = kwargs.get("app_uuid") - webhook = kwargs.get("webhook") - - try: - app = App.objects.get(uuid=app_uuid, configured=True, code="vtex") - except App.DoesNotExist: - logger.info(f"No VTEX App configured with the provided UUID: {app_uuid}") - return - - # Check if the initial sync has been completed - if not app.config.get("initial_sync_completed", False): - logger.info(f"Initial sync not completed for App: {app_uuid}. Task skipped.") - return - - # Get the SKU ID from the webhook - sku_id = webhook.get("IdSku") - if not sku_id: - raise ValueError(f"SKU ID is missing in the webhook for App: {app_uuid}") - - # Check if the app uses the new batch sync - use_sync_v2 = app.config.get("use_sync_v2", False) - celery_queue = app.config.get("celery_queue_name", "product_synchronization") - - if use_sync_v2: - logger.info(f"App {app_uuid} uses Sync v2. Forwarding to batch update task.") - celery_app.send_task( - "task_update_batch_products", - kwargs={"app_vtex_uuid": app_uuid}, - queue=celery_queue, - ignore_result=True, - ) - else: - logger.info(f"App {app_uuid} uses legacy sync. Forwarding to update task.") - celery_app.send_task( - "task_update_vtex_products", - kwargs={"app_uuid": app_uuid, "webhook": webhook}, - queue=celery_queue, - ignore_result=True, - ) - - @celery_app.task(name="task_upload_vtex_products") def task_upload_vtex_products(**kwargs): app_vtex_uuid = kwargs.get("app_vtex_uuid") @@ -696,7 +650,7 @@ def task_dequeue_webhooks(app_uuid: str, celery_queue: str, batch_size: int = 50 redis.delete(lock_key) -@celery_app.task(name="task_update_batch_products") +@celery_app.task(name="task_update_webhook_batch_products") def task_update_webhook_batch_products(app_uuid: str, batch: list): """ Processes product updates in batches for a VTEX app. From 7bb9366b25728a6f2a243d32bfd47dcf979a1206 Mon Sep 17 00:00:00 2001 From: elitonzky Date: Thu, 12 Dec 2024 15:20:21 -0300 Subject: [PATCH 4/4] feat: add time.sleep on dequeue and add more log infos --- marketplace/wpp_products/models.py | 9 +++------ marketplace/wpp_products/tasks.py | 11 ++++++++++- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/marketplace/wpp_products/models.py b/marketplace/wpp_products/models.py index 51c69671..d1b857e6 100644 --- a/marketplace/wpp_products/models.py +++ b/marketplace/wpp_products/models.py @@ -162,10 +162,11 @@ def remove_duplicates(cls, catalog: Catalog) -> None: .filter(count__gt=1) ) - for duplicate in duplicates: + if duplicates: print( - f"Found duplicate entries for facebook_product_id: {duplicate['facebook_product_id']}" + f"Found duplicate entries for catalog : {catalog.name}, Deleted all but the most recent records" ) + for duplicate in duplicates: # Get all records for the duplicate facebook_product_id, ordered by most recent first duplicate_records = cls.objects.filter( facebook_product_id=duplicate["facebook_product_id"], @@ -181,10 +182,6 @@ def remove_duplicates(cls, catalog: Catalog) -> None: catalog=catalog, ).exclude(id=most_recent_record.id).delete() - print( - f"Deleted all but the most recent record with ID {most_recent_record.id}" - ) - @classmethod def get_latest_products( cls, catalog: Catalog, status: str = "pending", batch_size: Optional[int] = None diff --git a/marketplace/wpp_products/tasks.py b/marketplace/wpp_products/tasks.py index beba9ccb..35941ef6 100644 --- a/marketplace/wpp_products/tasks.py +++ b/marketplace/wpp_products/tasks.py @@ -1,5 +1,7 @@ import logging +import time + from datetime import datetime, timedelta from celery import shared_task @@ -634,6 +636,7 @@ def task_dequeue_webhooks(app_uuid: str, celery_queue: str, batch_size: int = 50 # Get batch of items batch = queue.get_batch(batch_size) if not batch: + print(f"No items to process for App: {app_uuid}. Stopping dequeue.") break celery_app.send_task( @@ -643,10 +646,16 @@ def task_dequeue_webhooks(app_uuid: str, celery_queue: str, batch_size: int = 50 ignore_result=True, ) logger.info(f"Dispatched batch of {len(batch)} items for App: {app_uuid}.") - + print( + f"Wait for 5 seconds before the next batch processing for app : {app_uuid}" + ) + time.sleep(5) except Exception as e: logger.error(f"Error during dequeue process for App: {app_uuid}, {e}") finally: + print( + f"Dequeue process completed for App: {app_uuid}. Removing lock key: {lock_key}" + ) redis.delete(lock_key)