Skip to content
This repository has been archived by the owner on Oct 3, 2023. It is now read-only.

docs(samples): Refactoring product package (Import) (new) #480

Merged
merged 29 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c50efe7
Cherry-picked "Update README"
tetiana-karasova Mar 22, 2022
f643f83
Cherry-picked "the bash scripts are added"
t-karasova Mar 31, 2022
62f947d
Cherry-picked "Updated README"
t-karasova Apr 1, 2022
aa1ce1d
Update user_environment_setup.sh
t-karasova Apr 1, 2022
5ec159a
Update user_import_data_to_catalog.sh
t-karasova Apr 1, 2022
b308d9b
Cherry-picked "Updated README"
t-karasova Apr 5, 2022
2c90e8e
Refactoring product package (Import).
sborisenkox Apr 25, 2022
f0f490d
Remove init package.
sborisenkox Apr 25, 2022
fcb2411
Update README
dfirova Jul 1, 2022
eba6357
Removed assuredBreak.
dfirova Jul 14, 2022
c215629
PR fix: removed tests for setup/cleanup.
dfirova Jul 18, 2022
f6ad617
PR fix: fixed the deadline.
dfirova Jul 20, 2022
6274346
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 27, 2022
dab57ce
PR fix: unique bucket name.
dfirova Aug 9, 2022
893e576
PR fix: unique bucket name.
dfirova Aug 9, 2022
5069936
PR fix: removed test.
dfirova Aug 9, 2022
f394e08
Merge branch 'main' into refactoring-products-import
dfirova Aug 11, 2022
24ab126
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Aug 11, 2022
61bcab2
Merge branch 'main' into refactoring-products-import
dfirova Aug 12, 2022
e7cdbb8
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Aug 12, 2022
fe0dcdf
Merge branch 'main' into refactoring-products-import
dfirova Aug 19, 2022
ca8ebda
Merge branch 'googleapis:main' into refactoring-products-import
dfirova Aug 25, 2022
9a7c57b
Merge branch 'main' into refactoring-products-import
dfirova Aug 25, 2022
10b8271
Merge remote-tracking branch 'origin/refactoring-products-import' int…
dfirova Aug 25, 2022
8b8b2b2
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Aug 26, 2022
135e0a7
PR fix: changed tests to be similar to events test.
dfirova Aug 31, 2022
7da8114
Merge remote-tracking branch 'origin/refactoring-products-import' int…
dfirova Aug 31, 2022
2ddf274
PR fix: returned test to previous state.
dfirova Sep 1, 2022
9ff7076
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,30 @@
import com.google.longrunning.Operation;
import com.google.longrunning.OperationsClient;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class ImportProductsBigQueryTable {

public static void main(String[] args) throws IOException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
String projectId = ServiceOptions.getDefaultProjectId();
String branchName =
String.format(
"projects/%s/locations/global/catalogs/default_catalog/branches/0", projectId);
String datasetId = "products";
String tableId = "products";
// TO CHECK ERROR HANDLING USE THE TABLE WITH INVALID PRODUCTS:
// TABLE_ID = "products_some_invalid"
String dataSchema = "product";
// TRY THE FULL RECONCILIATION MODE HERE:
ReconciliationMode reconciliationMode = ReconciliationMode.INCREMENTAL;
// tableId = "products_some_invalid"

ImportProductsRequest importBigQueryRequest =
getImportProductsBigQueryRequest(
reconciliationMode, projectId, datasetId, tableId, dataSchema, branchName);
waitForOperationCompletion(importBigQueryRequest);
importProductsFromBigQuery(projectId, branchName, datasetId, tableId);
}

public static ImportProductsRequest getImportProductsBigQueryRequest(
ReconciliationMode reconciliationMode,
String projectId,
String datasetId,
String tableId,
String dataSchema,
String branchName) {
public static void importProductsFromBigQuery(
String projectId, String branchName, String datasetId, String tableId)
throws IOException, InterruptedException {
// TRY THE FULL RECONCILIATION MODE HERE:
ReconciliationMode reconciliationMode = ReconciliationMode.INCREMENTAL;
String dataSchema = "product";

BigQuerySource bigQuerySource =
BigQuerySource.newBuilder()
.setProjectId(projectId)
Expand All @@ -82,30 +76,31 @@ public static ImportProductsRequest getImportProductsBigQueryRequest(
.build();
System.out.printf("Import products from big query table request: %s%n", importRequest);

return importRequest;
}

public static void waitForOperationCompletion(ImportProductsRequest importRequest)
throws IOException, InterruptedException {
// Initialize client that will be used to send requests. This client only
// needs to be created once, and can be reused for multiple requests. After
// completing all of your requests, call the "close" method on the client to
// safely clean up any remaining background resources.
try (ProductServiceClient serviceClient = ProductServiceClient.create()) {
String operationName = serviceClient.importProductsCallable().call(importRequest).getName();
System.out.printf("OperationName = %s\n", operationName);
System.out.printf("OperationName = %s%n", operationName);

OperationsClient operationsClient = serviceClient.getOperationsClient();
Operation operation = operationsClient.getOperation(operationName);

while (!operation.getDone()) {
long assuredBreak = System.currentTimeMillis() + 60000; // 60 seconds delay
dfirova marked this conversation as resolved.
Show resolved Hide resolved

while (!operation.getDone() || System.currentTimeMillis() < assuredBreak) {
// Keep polling the operation periodically until the import task is done.
Thread.sleep(30_000);
TimeUnit.SECONDS.sleep(30);
operation = operationsClient.getOperation(operationName);
}

if (operation.hasMetadata()) {
ImportMetadata metadata = operation.getMetadata().unpack(ImportMetadata.class);
System.out.printf(
"Number of successfully imported products: %s\n", metadata.getSuccessCount());
"Number of successfully imported products: %s%n", metadata.getSuccessCount());
System.out.printf(
"Number of failures during the importing: %s\n", metadata.getFailureCount());
"Number of failures during the importing: %s%n", metadata.getFailureCount());
}

if (operation.hasResponse()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

package product;

import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.api.gax.rpc.PermissionDeniedException;
import com.google.cloud.ServiceOptions;
import com.google.cloud.retail.v2.GcsSource;
import com.google.cloud.retail.v2.ImportErrorsConfig;
Expand All @@ -35,32 +37,33 @@
import com.google.longrunning.OperationsClient;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class ImportProductsGcs {

public static void main(String[] args) throws IOException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
String projectId = ServiceOptions.getDefaultProjectId();
String branchName =
String.format(
"projects/%s/locations/global/catalogs/default_catalog/branches/0", projectId);
String gcsBucket = String.format("gs://%s", System.getenv("BUCKET_NAME"));
String gcsErrorBucket = String.format("%s/errors", gcsBucket);
String bucketName = System.getenv("BUCKET_NAME");
String gcsBucket = String.format("gs://%s", bucketName);
String gscProductsObject = "products.json";
// TO CHECK ERROR HANDLING USE THE JSON WITH INVALID PRODUCT
// GCS_PRODUCTS_OBJECT = "products_some_invalid.json"
// gscProductsObject = "products_some_invalid.json"

ImportProductsRequest importGcsRequest =
getImportProductsGcsRequest(gscProductsObject, gcsBucket, gcsErrorBucket, branchName);
waitForOperationCompletion(importGcsRequest);
importProductsFromGcs(branchName, bucketName, gcsBucket, gscProductsObject);
}

public static ImportProductsRequest getImportProductsGcsRequest(
String gcsObjectName, String gcsBucket, String gcsErrorBucket, String branchName) {
public static void importProductsFromGcs(
String branchName, String bucketName, String gcsBucket, String gscProductsObject)
throws IOException, InterruptedException {
String gcsErrorBucket = String.format("%s/errors", gcsBucket);

GcsSource gcsSource =
GcsSource.newBuilder()
.addAllInputUris(
Collections.singleton(String.format("%s/%s", gcsBucket, gcsObjectName)))
Collections.singleton(String.format("%s/%s", gcsBucket, gscProductsObject)))
.build();

ProductInputConfig inputConfig =
Expand All @@ -79,39 +82,53 @@ public static ImportProductsRequest getImportProductsGcsRequest(
.setErrorsConfig(errorsConfig)
.build();

System.out.println("Import products from google cloud source request: " + importRequest);
System.out.printf("Import products from google cloud source request: %s%n", importRequest);

return importRequest;
}

public static void waitForOperationCompletion(ImportProductsRequest importRequest)
throws IOException, InterruptedException {
// Initialize client that will be used to send requests. This client only
// needs to be created once, and can be reused for multiple requests. After
// completing all of your requests, call the "close" method on the client to
// safely clean up any remaining background resources.
try (ProductServiceClient serviceClient = ProductServiceClient.create()) {
String operationName = serviceClient.importProductsCallable().call(importRequest).getName();
System.out.printf("OperationName = %s\n", operationName);

System.out.println("The operation was started.");
System.out.printf("OperationName = %s%n", operationName);

OperationsClient operationsClient = serviceClient.getOperationsClient();
Operation operation = operationsClient.getOperation(operationName);

while (!operation.getDone()) {
// Keep polling the operation periodically until the import task is done.
Thread.sleep(30_000);
long assuredBreak = System.currentTimeMillis() + 60000; // 60 seconds delay
dfirova marked this conversation as resolved.
Show resolved Hide resolved

while (!operation.getDone() || System.currentTimeMillis() < assuredBreak) {
System.out.println("Please wait till operation is done.");
TimeUnit.SECONDS.sleep(30);
operation = operationsClient.getOperation(operationName);
}

if (operation.hasMetadata()) {
ImportMetadata metadata = operation.getMetadata().unpack(ImportMetadata.class);
System.out.printf(
"Number of successfully imported products: %s\n", metadata.getSuccessCount());
"Number of successfully imported products: %s%n", metadata.getSuccessCount());
System.out.printf(
"Number of failures during the importing: %s\n", metadata.getFailureCount());
"Number of failures during the importing: %s%n", metadata.getFailureCount());
} else {
System.out.println("Metadata is empty.");
}

if (operation.hasResponse()) {
ImportProductsResponse response =
operation.getResponse().unpack(ImportProductsResponse.class);
System.out.printf("Operation result: %s%n", response);
} else {
System.out.println("Operation result is empty.");
}
} catch (InvalidArgumentException e) {
System.out.printf(
"%s%n'%s' file does not exist in the bucket. Please "
+ "make sure you have followed the setting up instructions.",
e.getMessage(), gscProductsObject);
} catch (PermissionDeniedException e) {
System.out.println(e.getMessage());
}
}
}
Expand Down
Loading