Skip to content

Commit

Permalink
docs(samples): Refactoring events package (#481)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfirova committed Sep 13, 2022
1 parent cb79689 commit a02d40c
Show file tree
Hide file tree
Showing 19 changed files with 264 additions and 331 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

package events;

import com.google.cloud.ServiceOptions;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.retail.v2.BigQuerySource;
import com.google.cloud.retail.v2.ImportMetadata;
Expand All @@ -31,26 +31,30 @@
import com.google.longrunning.Operation;
import com.google.longrunning.OperationsClient;
import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.TimeUnit;

public class ImportUserEventsBigQuery {

public static void main(String[] args) throws IOException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
String projectId = ServiceOptions.getDefaultProjectId();
String projectId = "your-project-id";
String defaultCatalog =
String.format("projects/%s/locations/global/catalogs/default_catalog", projectId);
// TO CHECK ERROR HANDLING PASTE THE INVALID CATALOG NAME HERE: defaultCatalog =
// "invalid_catalog_name"
// TO CHECK ERROR HANDLING PASTE THE INVALID CATALOG NAME HERE:
// defaultCatalog = "invalid_catalog_name";
String datasetId = "user_events";
String tableId = "events";
// TO CHECK ERROR HANDLING USE THE TABLE OF INVALID USER EVENTS: tableId = "events_some_invalid"
// TO CHECK ERROR HANDLING USE THE TABLE OF INVALID USER EVENTS:
// tableId = "events_some_invalid";

importUserEventsFromBigQuery(projectId, defaultCatalog, datasetId, tableId);
}

public static void importUserEventsFromBigQuery(
String projectId, String defaultCatalog, String datasetId, String tableId)
throws IOException, InterruptedException {

try {
String dataSchema = "user_event";

Expand All @@ -73,30 +77,32 @@ public static void importUserEventsFromBigQuery(

System.out.printf("Import user events from BigQuery source request: %s%n", importRequest);

// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
// Initialize client that will be used to send requests. This client only
// needs to be created once, and can be reused for multiple requests. After
// completing all of your requests, call the "close" method on the client to
// safely clean up any remaining background resources.
try (UserEventServiceClient serviceClient = UserEventServiceClient.create()) {
String operationName =
serviceClient.importUserEventsCallable().call(importRequest).getName();

System.out.printf("OperationName = %s\n", operationName);
System.out.printf("OperationName = %s%n", operationName);
OperationsClient operationsClient = serviceClient.getOperationsClient();
Operation operation = operationsClient.getOperation(operationName);

while (!operation.getDone()) {
Instant deadline = Instant.now().plusSeconds(60);

while (!operation.getDone() || Instant.now().isBefore(deadline)) {
// Keep polling the operation periodically until the import task is done.
int awaitDuration = 30000;
Thread.sleep(awaitDuration);
TimeUnit.SECONDS.sleep(30);
operation = operationsClient.getOperation(operationName);
}

if (operation.hasMetadata()) {
ImportMetadata metadata = operation.getMetadata().unpack(ImportMetadata.class);
System.out.printf(
"Number of successfully imported events: %s\n", metadata.getSuccessCount());
"Number of successfully imported events: %s%n", metadata.getSuccessCount());
System.out.printf(
"Number of failures during the importing: %s\n", metadata.getFailureCount());
"Number of failures during the importing: %s%n", metadata.getFailureCount());
}

if (operation.hasResponse()) {
Expand All @@ -107,6 +113,8 @@ public static void importUserEventsFromBigQuery(
}
} catch (BigQueryException e) {
System.out.printf("Exception message: %s", e.getMessage());
} catch (NotFoundException e) {
System.out.printf("Catalog name is not found.%n%s%n", e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
package events;

import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.BigQueryException;
import com.google.api.gax.rpc.PermissionDeniedException;
import com.google.cloud.retail.v2.GcsSource;
import com.google.cloud.retail.v2.ImportErrorsConfig;
import com.google.cloud.retail.v2.ImportMetadata;
Expand All @@ -32,91 +31,99 @@
import com.google.cloud.retail.v2.UserEventServiceClient;
import com.google.longrunning.Operation;
import com.google.longrunning.OperationsClient;
import events.setup.EventsCreateGcsBucket;
import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.TimeUnit;

public class ImportUserEventsGcs {

public static void main(String[] args) throws IOException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
String projectId = ServiceOptions.getDefaultProjectId();
String projectId = "your-project-id";
String defaultCatalog =
String.format("projects/%s/locations/global/catalogs/default_catalog", projectId);
// TO CHECK ERROR HANDLING PASTE THE INVALID CATALOG NAME HERE: defaultCatalog =
// "invalid_catalog_name"
String gcsEventsObject = "user_events.json";
// TO CHECK ERROR HANDLING USE THE JSON WITH INVALID USER EVENT: gcsEventsObject =
// "user_events_some_invalid.json"

importUserEventsFromGcs(gcsEventsObject, defaultCatalog);
// TO CHECK ERROR HANDLING PASTE THE INVALID CATALOG NAME HERE:
// defaultCatalog = "invalid_catalog_name";
String bucketName = System.getenv("EVENTS_BUCKET_NAME");
String gcsUserEventsObject = "user_events.json";
// TO CHECK ERROR HANDLING USE THE JSON WITH INVALID USER EVENT:
// gcsUserEventsObject = "user_events_some_invalid.json";

importUserEventsFromGcs(defaultCatalog, bucketName, gcsUserEventsObject);
}

public static void importUserEventsFromGcs(String gcsEventsObject, String defaultCatalog)
public static void importUserEventsFromGcs(
String defaultCatalog, String bucketName, String gcsUserEventsObject)
throws IOException, InterruptedException {
try {
String gcsBucket = String.format("gs://%s", EventsCreateGcsBucket.getBucketName());
String gcsErrorsBucket = String.format("%s/error", gcsBucket);

GcsSource gcsSource =
GcsSource.newBuilder()
.addInputUris(String.format("%s/%s", gcsBucket, gcsEventsObject))
.build();

UserEventInputConfig inputConfig =
UserEventInputConfig.newBuilder().setGcsSource(gcsSource).build();

ImportErrorsConfig errorsConfig =
ImportErrorsConfig.newBuilder().setGcsPrefix(gcsErrorsBucket).build();

ImportUserEventsRequest importRequest =
ImportUserEventsRequest.newBuilder()
.setParent(defaultCatalog)
.setInputConfig(inputConfig)
.setErrorsConfig(errorsConfig)
.build();

System.out.printf("Import user events from google cloud source request: %s%n", importRequest);

// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
try (UserEventServiceClient serviceClient = UserEventServiceClient.create()) {
String operationName =
serviceClient.importUserEventsCallable().call(importRequest).getName();

System.out.printf("OperationName = %s\n", operationName);

OperationsClient operationsClient = serviceClient.getOperationsClient();
Operation operation = operationsClient.getOperation(operationName);

while (!operation.getDone()) {
// Keep polling the operation periodically until the import task is done.
int awaitDuration = 30000;
Thread.sleep(awaitDuration);
operation = operationsClient.getOperation(operationName);
}

if (operation.hasMetadata()) {
ImportMetadata metadata = operation.getMetadata().unpack(ImportMetadata.class);
System.out.printf(
"Number of successfully imported events: %s\n", metadata.getSuccessCount());
System.out.printf(
"Number of failures during the importing: %s\n", metadata.getFailureCount());
}

if (operation.hasResponse()) {
ImportUserEventsResponse response =
operation.getResponse().unpack(ImportUserEventsResponse.class);
System.out.printf("Operation result: %s%n", response);
}
} catch (InvalidArgumentException e) {
String gcsBucket = String.format("gs://%s", bucketName);
String gcsErrorsBucket = String.format("%s/error", gcsBucket);

GcsSource gcsSource =
GcsSource.newBuilder()
.addInputUris(String.format("%s/%s", gcsBucket, gcsUserEventsObject))
.build();

UserEventInputConfig inputConfig =
UserEventInputConfig.newBuilder().setGcsSource(gcsSource).build();

System.out.println("GCS source: " + gcsSource.getInputUrisList());

ImportErrorsConfig errorsConfig =
ImportErrorsConfig.newBuilder().setGcsPrefix(gcsErrorsBucket).build();

ImportUserEventsRequest importRequest =
ImportUserEventsRequest.newBuilder()
.setParent(defaultCatalog)
.setInputConfig(inputConfig)
.setErrorsConfig(errorsConfig)
.build();
System.out.printf("Import user events from google cloud source request: %s%n", importRequest);

// Initialize client that will be used to send requests. This client only
// needs to be created once, and can be reused for multiple requests. After
// completing all of your requests, call the "close" method on the client to
// safely clean up any remaining background resources.
try (UserEventServiceClient serviceClient = UserEventServiceClient.create()) {
String operationName = serviceClient.importUserEventsCallable().call(importRequest).getName();

System.out.println("The operation was started.");
System.out.printf("OperationName = %s%n", operationName);

OperationsClient operationsClient = serviceClient.getOperationsClient();
Operation operation = operationsClient.getOperation(operationName);

Instant deadline = Instant.now().plusSeconds(60);

while (!operation.getDone() || Instant.now().isBefore(deadline)) {
System.out.println("Please wait till operation is done.");
TimeUnit.SECONDS.sleep(30);
operation = operationsClient.getOperation(operationName);
}

if (operation.hasMetadata()) {
ImportMetadata metadata = operation.getMetadata().unpack(ImportMetadata.class);
System.out.printf(
"Number of successfully imported events: %s%n", metadata.getSuccessCount());
System.out.printf(
"Given GCS input path was not found. %n%s%n "
+ "Please run CreateTestResources class to create resources.",
e.getMessage());
"Number of failures during the importing: %s%n", metadata.getFailureCount());
} else {
System.out.println("Metadata is empty.");
}

if (operation.hasResponse()) {
ImportUserEventsResponse response =
operation.getResponse().unpack(ImportUserEventsResponse.class);
System.out.printf("Operation result: %s%n", response);
} else {
System.out.println("Operation result is empty.");
}
} catch (BigQueryException e) {
System.out.printf("Exception message: %s", e.getMessage());
} catch (InvalidArgumentException e) {
System.out.printf(
"%s%n'%s' file does not exist in the bucket. Please "
+ "make sure you have followed the setting up instructions.",
e.getMessage(), gcsUserEventsObject);
} catch (PermissionDeniedException e) {
System.out.println(e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package events;

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.retail.v2.ImportMetadata;
import com.google.cloud.retail.v2.ImportUserEventsRequest;
Expand All @@ -44,7 +43,7 @@ public class ImportUserEventsInline {
public static void main(String[] args)
throws IOException, ExecutionException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
String projectId = ServiceOptions.getDefaultProjectId();
String projectId = "your-project-id";
String defaultCatalog =
String.format("projects/%s/locations/global/catalogs/default_catalog", projectId);

Expand Down Expand Up @@ -88,9 +87,10 @@ public static void importUserEventsFromInlineSource(String defaultCatalog)
.build();
System.out.printf("Import user events from inline source request: %s%n", importRequest);

// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
// Initialize client that will be used to send requests. This client only
// needs to be created once, and can be reused for multiple requests. After
// completing all of your requests, call the "close" method on the client to
// safely clean up any remaining background resources.
try (UserEventServiceClient userEventServiceClient = UserEventServiceClient.create()) {
OperationFuture<ImportUserEventsResponse, ImportMetadata> importOperation =
userEventServiceClient.importUserEventsAsync(importRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static setup.SetupCleanup.writeUserEvent;

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.ServiceOptions;
import com.google.cloud.retail.v2.PurgeMetadata;
import com.google.cloud.retail.v2.PurgeUserEventsRequest;
import com.google.cloud.retail.v2.PurgeUserEventsResponse;
Expand All @@ -37,7 +36,7 @@ public class PurgeUserEvent {
public static void main(String[] args)
throws IOException, ExecutionException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
String projectId = ServiceOptions.getDefaultProjectId();
String projectId = "your-project-id";
String defaultCatalog =
String.format("projects/%s/locations/global/catalogs/default_catalog", projectId);
// visitorId generated randomly.
Expand All @@ -50,9 +49,10 @@ public static void callPurgeUserEvents(String defaultCatalog, String visitorId)
throws IOException, ExecutionException, InterruptedException {
writeUserEvent(visitorId);

// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
// Initialize client that will be used to send requests. This client only
// needs to be created once, and can be reused for multiple requests. After
// completing all of your requests, call the "close" method on the client to
// safely clean up any remaining background resources.
try (UserEventServiceClient userEventServiceClient = UserEventServiceClient.create()) {
PurgeUserEventsRequest purgeUserEventsRequest =
PurgeUserEventsRequest.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static setup.SetupCleanup.writeUserEvent;

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.ServiceOptions;
import com.google.cloud.retail.v2.RejoinUserEventsMetadata;
import com.google.cloud.retail.v2.RejoinUserEventsRequest;
import com.google.cloud.retail.v2.RejoinUserEventsRequest.UserEventRejoinScope;
Expand All @@ -39,7 +38,7 @@ public class RejoinUserEvent {
public static void main(String[] args)
throws IOException, ExecutionException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
String projectId = ServiceOptions.getDefaultProjectId();
String projectId = "your-project-id";
String defaultCatalog =
String.format("projects/%s/locations/global/catalogs/default_catalog", projectId);
// visitorId generated randomly.
Expand All @@ -52,9 +51,10 @@ public static void callRejoinUserEvents(String defaultCatalog, String visitorId)
throws IOException, ExecutionException, InterruptedException {
writeUserEvent(visitorId);

// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
// Initialize client that will be used to send requests. This client only
// needs to be created once, and can be reused for multiple requests. After
// completing all of your requests, call the "close" method on the client to
// safely clean up any remaining background resources.
try (UserEventServiceClient userEventServiceClient = UserEventServiceClient.create()) {
RejoinUserEventsRequest rejoinUserEventsRequest =
RejoinUserEventsRequest.newBuilder()
Expand Down
Loading

0 comments on commit a02d40c

Please sign in to comment.