Skip to content
This repository has been archived by the owner on Oct 3, 2023. It is now read-only.

docs(samples): Refactoring events package (new) #481

Merged
merged 18 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

package events;

import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.retail.v2.BigQuerySource;
Expand All @@ -33,26 +34,28 @@
import com.google.longrunning.Operation;
import com.google.longrunning.OperationsClient;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class ImportUserEventsBigQuery {

public static void main(String[] args) throws IOException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
String projectId = ServiceOptions.getDefaultProjectId();
dfirova marked this conversation as resolved.
Show resolved Hide resolved
String defaultCatalog =
String.format("projects/%s/locations/global/catalogs/default_catalog", projectId);
// TO CHECK ERROR HANDLING PASTE THE INVALID CATALOG NAME HERE: defaultCatalog =
// "invalid_catalog_name"
// TO CHECK ERROR HANDLING PASTE THE INVALID CATALOG NAME HERE:
// defaultCatalog = "invalid_catalog_name"
dfirova marked this conversation as resolved.
Show resolved Hide resolved
String datasetId = "user_events";
String tableId = "events";
// TO CHECK ERROR HANDLING USE THE TABLE OF INVALID USER EVENTS: tableId = "events_some_invalid"
// TO CHECK ERROR HANDLING USE THE TABLE OF INVALID USER EVENTS:
// tableId = "events_some_invalid"
dfirova marked this conversation as resolved.
Show resolved Hide resolved

importUserEventsFromBigQuery(projectId, defaultCatalog, datasetId, tableId);
}

public static void importUserEventsFromBigQuery(
String projectId, String defaultCatalog, String datasetId, String tableId)
throws IOException, InterruptedException {

try {
String dataSchema = "user_event";

Expand All @@ -75,30 +78,32 @@ public static void importUserEventsFromBigQuery(

System.out.printf("Import user events from BigQuery source request: %s%n", importRequest);

// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
// Initialize client that will be used to send requests. This client only
// needs to be created once, and can be reused for multiple requests. After
// completing all of your requests, call the "close" method on the client to
// safely clean up any remaining background resources.
try (UserEventServiceClient serviceClient = UserEventServiceClient.create()) {
String operationName =
serviceClient.importUserEventsCallable().call(importRequest).getName();

System.out.printf("OperationName = %s\n", operationName);
System.out.printf("OperationName = %s%n", operationName);
OperationsClient operationsClient = serviceClient.getOperationsClient();
Operation operation = operationsClient.getOperation(operationName);

while (!operation.getDone()) {
long assuredBreak = System.currentTimeMillis() + 60000; // 60 seconds delay
dfirova marked this conversation as resolved.
Show resolved Hide resolved

while (!operation.getDone() || System.currentTimeMillis() < assuredBreak) {
// Keep polling the operation periodically until the import task is done.
int awaitDuration = 30000;
Thread.sleep(awaitDuration);
TimeUnit.SECONDS.sleep(30);
operation = operationsClient.getOperation(operationName);
}

if (operation.hasMetadata()) {
ImportMetadata metadata = operation.getMetadata().unpack(ImportMetadata.class);
System.out.printf(
"Number of successfully imported events: %s\n", metadata.getSuccessCount());
"Number of successfully imported events: %s%n", metadata.getSuccessCount());
System.out.printf(
"Number of failures during the importing: %s\n", metadata.getFailureCount());
"Number of failures during the importing: %s%n", metadata.getFailureCount());
}

if (operation.hasResponse()) {
Expand All @@ -109,6 +114,8 @@ public static void importUserEventsFromBigQuery(
}
} catch (BigQueryException e) {
System.out.printf("Exception message: %s", e.getMessage());
} catch (NotFoundException e) {
System.out.printf("Catalog name is not found.%n%s%n", e.getMessage());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
package events;

import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.api.gax.rpc.PermissionDeniedException;
import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.retail.v2.GcsSource;
import com.google.cloud.retail.v2.ImportErrorsConfig;
import com.google.cloud.retail.v2.ImportMetadata;
Expand All @@ -34,91 +34,95 @@
import com.google.cloud.retail.v2.UserEventServiceClient;
import com.google.longrunning.Operation;
import com.google.longrunning.OperationsClient;
import events.setup.EventsCreateGcsBucket;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class ImportUserEventsGcs {

public static void main(String[] args) throws IOException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
dfirova marked this conversation as resolved.
Show resolved Hide resolved
String projectId = ServiceOptions.getDefaultProjectId();
String defaultCatalog =
String.format("projects/%s/locations/global/catalogs/default_catalog", projectId);
// TO CHECK ERROR HANDLING PASTE THE INVALID CATALOG NAME HERE: defaultCatalog =
// "invalid_catalog_name"
String gcsEventsObject = "user_events.json";
// TO CHECK ERROR HANDLING USE THE JSON WITH INVALID USER EVENT: gcsEventsObject =
// "user_events_some_invalid.json"
String bucketName = System.getenv("EVENTS_BUCKET_NAME");
String gcsUserEventsObject = "user_events.json";
// TO CHECK ERROR HANDLING USE THE JSON WITH INVALID USER EVENT:
// gcsEventsObject = "user_events_some_invalid.json"
dfirova marked this conversation as resolved.
Show resolved Hide resolved

importUserEventsFromGcs(gcsEventsObject, defaultCatalog);
importUserEventsFromGcs(defaultCatalog, bucketName, gcsUserEventsObject);
}

public static void importUserEventsFromGcs(String gcsEventsObject, String defaultCatalog)
public static void importUserEventsFromGcs(
String defaultCatalog, String bucketName, String gcsUserEventsObject)
throws IOException, InterruptedException {
try {
String gcsBucket = String.format("gs://%s", EventsCreateGcsBucket.getBucketName());
String gcsErrorsBucket = String.format("%s/error", gcsBucket);

GcsSource gcsSource =
GcsSource.newBuilder()
.addInputUris(String.format("%s/%s", gcsBucket, gcsEventsObject))
.build();

UserEventInputConfig inputConfig =
UserEventInputConfig.newBuilder().setGcsSource(gcsSource).build();

ImportErrorsConfig errorsConfig =
ImportErrorsConfig.newBuilder().setGcsPrefix(gcsErrorsBucket).build();

ImportUserEventsRequest importRequest =
ImportUserEventsRequest.newBuilder()
.setParent(defaultCatalog)
.setInputConfig(inputConfig)
.setErrorsConfig(errorsConfig)
.build();

System.out.printf("Import user events from google cloud source request: %s%n", importRequest);

// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
try (UserEventServiceClient serviceClient = UserEventServiceClient.create()) {
String operationName =
serviceClient.importUserEventsCallable().call(importRequest).getName();

System.out.printf("OperationName = %s\n", operationName);

OperationsClient operationsClient = serviceClient.getOperationsClient();
Operation operation = operationsClient.getOperation(operationName);

while (!operation.getDone()) {
// Keep polling the operation periodically until the import task is done.
int awaitDuration = 30000;
Thread.sleep(awaitDuration);
operation = operationsClient.getOperation(operationName);
}

if (operation.hasMetadata()) {
ImportMetadata metadata = operation.getMetadata().unpack(ImportMetadata.class);
System.out.printf(
"Number of successfully imported events: %s\n", metadata.getSuccessCount());
System.out.printf(
"Number of failures during the importing: %s\n", metadata.getFailureCount());
}

if (operation.hasResponse()) {
ImportUserEventsResponse response =
operation.getResponse().unpack(ImportUserEventsResponse.class);
System.out.printf("Operation result: %s%n", response);
}
} catch (InvalidArgumentException e) {
String gcsBucket = String.format("gs://%s", bucketName);
String gcsErrorsBucket = String.format("%s/error", gcsBucket);

GcsSource gcsSource =
GcsSource.newBuilder()
.addInputUris(String.format("%s/%s", gcsBucket, gcsUserEventsObject))
.build();

UserEventInputConfig inputConfig =
UserEventInputConfig.newBuilder().setGcsSource(gcsSource).build();

System.out.println("GRS source: " + gcsSource.getInputUrisList());
dfirova marked this conversation as resolved.
Show resolved Hide resolved

ImportErrorsConfig errorsConfig =
ImportErrorsConfig.newBuilder().setGcsPrefix(gcsErrorsBucket).build();

ImportUserEventsRequest importRequest =
ImportUserEventsRequest.newBuilder()
.setParent(defaultCatalog)
.setInputConfig(inputConfig)
.setErrorsConfig(errorsConfig)
.build();
System.out.printf("Import user events from google cloud source request: %s%n", importRequest);

// Initialize client that will be used to send requests. This client only
// needs to be created once, and can be reused for multiple requests. After
// completing all of your requests, call the "close" method on the client to
// safely clean up any remaining background resources.
try (UserEventServiceClient serviceClient = UserEventServiceClient.create()) {
String operationName = serviceClient.importUserEventsCallable().call(importRequest).getName();

System.out.println("The operation was started.");
System.out.printf("OperationName = %s%n", operationName);

OperationsClient operationsClient = serviceClient.getOperationsClient();
Operation operation = operationsClient.getOperation(operationName);

long assuredBreak = System.currentTimeMillis() + 60000; // 60 seconds delay

while (!operation.getDone() || System.currentTimeMillis() < assuredBreak) {
dfirova marked this conversation as resolved.
Show resolved Hide resolved
System.out.println("Please wait till operation is done.");
TimeUnit.SECONDS.sleep(30);
operation = operationsClient.getOperation(operationName);
}

if (operation.hasMetadata()) {
ImportMetadata metadata = operation.getMetadata().unpack(ImportMetadata.class);
System.out.printf(
"Given GCS input path was not found. %n%s%n "
+ "Please run CreateTestResources class to create resources.",
e.getMessage());
"Number of successfully imported events: %s%n", metadata.getSuccessCount());
System.out.printf(
"Number of failures during the importing: %s%n", metadata.getFailureCount());
} else {
System.out.println("Metadata is empty.");
}

if (operation.hasResponse()) {
ImportUserEventsResponse response =
operation.getResponse().unpack(ImportUserEventsResponse.class);
System.out.printf("Operation result: %s%n", response);
} else {
System.out.println("Operation result is empty.");
}
} catch (BigQueryException e) {
System.out.printf("Exception message: %s", e.getMessage());
} catch (InvalidArgumentException e) {
System.out.printf(
"%s%n'%s' file does not exist in the bucket. Please "
+ "make sure you have followed the setting up instructions.",
e.getMessage(), gcsUserEventsObject);
} catch (PermissionDeniedException e) {
System.out.println(e.getMessage());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class ImportUserEventsInline {

public static void main(String[] args)
throws IOException, ExecutionException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
String projectId = ServiceOptions.getDefaultProjectId();
String defaultCatalog =
String.format("projects/%s/locations/global/catalogs/default_catalog", projectId);
Expand Down Expand Up @@ -90,9 +89,10 @@ public static void importUserEventsFromInlineSource(String defaultCatalog)
.build();
System.out.printf("Import user events from inline source request: %s%n", importRequest);

// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
// Initialize client that will be used to send requests. This client only
// needs to be created once, and can be reused for multiple requests. After
// completing all of your requests, call the "close" method on the client to
// safely clean up any remaining background resources.
try (UserEventServiceClient userEventServiceClient = UserEventServiceClient.create()) {
OperationFuture<ImportUserEventsResponse, ImportMetadata> importOperation =
userEventServiceClient.importUserEventsAsync(importRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class PurgeUserEvent {

public static void main(String[] args)
throws IOException, ExecutionException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
String projectId = ServiceOptions.getDefaultProjectId();
String defaultCatalog =
String.format("projects/%s/locations/global/catalogs/default_catalog", projectId);
Expand All @@ -52,9 +51,10 @@ public static void callPurgeUserEvents(String defaultCatalog, String visitorId)
throws IOException, ExecutionException, InterruptedException {
writeUserEvent(visitorId);

// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
// Initialize client that will be used to send requests. This client only
// needs to be created once, and can be reused for multiple requests. After
// completing all of your requests, call the "close" method on the client to
// safely clean up any remaining background resources.
try (UserEventServiceClient userEventServiceClient = UserEventServiceClient.create()) {
PurgeUserEventsRequest purgeUserEventsRequest =
PurgeUserEventsRequest.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class RejoinUserEvent {

public static void main(String[] args)
throws IOException, ExecutionException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
String projectId = ServiceOptions.getDefaultProjectId();
String defaultCatalog =
String.format("projects/%s/locations/global/catalogs/default_catalog", projectId);
Expand All @@ -54,9 +53,10 @@ public static void callRejoinUserEvents(String defaultCatalog, String visitorId)
throws IOException, ExecutionException, InterruptedException {
writeUserEvent(visitorId);

// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
// Initialize client that will be used to send requests. This client only
// needs to be created once, and can be reused for multiple requests. After
// completing all of your requests, call the "close" method on the client to
// safely clean up any remaining background resources.
try (UserEventServiceClient userEventServiceClient = UserEventServiceClient.create()) {
RejoinUserEventsRequest rejoinUserEventsRequest =
RejoinUserEventsRequest.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class WriteUserEvent {

public static void main(String[] args)
throws IOException, ExecutionException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
String projectId = ServiceOptions.getDefaultProjectId();
String defaultCatalog =
String.format("projects/%s/locations/global/catalogs/default_catalog", projectId);
Expand All @@ -50,9 +49,10 @@ public static void main(String[] args)

public static void writeUserEvent(String defaultCatalog, String visitorId)
throws IOException, ExecutionException, InterruptedException {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
// Initialize client that will be used to send requests. This client only
// needs to be created once, and can be reused for multiple requests. After
// completing all of your requests, call the "close" method on the client to
// safely clean up any remaining background resources.
try (UserEventServiceClient userEventServiceClient = UserEventServiceClient.create()) {
Timestamp timestamp =
Timestamp.newBuilder().setSeconds(Instant.now().getEpochSecond()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,15 @@

public class EventsCreateBigQueryTable {

public static void main(String... args) throws IOException {
public static void main(String[] args) throws IOException {
String dataset = "user_events";
String validEventsTable = "events";
String invalidEventsTable = "events_some_invalid";
String eventsSchemaFilePath = "src/main/resources/events_schema.json";
String validEventsSourceFile =
String.format("gs://%s/user_events.json", EventsCreateGcsBucket.getBucketName());
String.format("gs://%s/user_events.json", System.getenv("EVENTS_BUCKET_NAME"));
String invalidEventsSourceFile =
String.format(
"gs://%s/user_events_some_invalid.json", EventsCreateGcsBucket.getBucketName());
String.format("gs://%s/user_events_some_invalid.json", System.getenv("EVENTS_BUCKET_NAME"));

BufferedReader bufferedReader = new BufferedReader(new FileReader(eventsSchemaFilePath));
String jsonToString = bufferedReader.lines().collect(Collectors.joining());
Expand Down
Loading