Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting BigQuery endpoint #32153

Merged
merged 2 commits into from
Aug 21, 2024
Merged

Conversation

kberezin-nshl
Copy link
Contributor

This fixes #28149


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@kberezin-nshl
Copy link
Contributor Author

assign set of reviewers

Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @Abacn for label java.
R: @damondouglas for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@kberezin-nshl
Copy link
Contributor Author

@Abacn @damondouglas could you please have a look?

Basically, I just followed the same pattern as with set/getGcsEndpoint in GcsOptions, and this allows to run pipelines against BQ emulator.

Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! The change looks good, just left a few minor formatting comments

CHANGES.md Outdated
@@ -68,6 +68,7 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Added an ability to set BigQuery endpoint (Java) ([#28149](https://github.com/apache/beam/issues/28149)).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we note that this is for testing purpose? Endpoint can be ambiguous (e.g. regional/zonal endpoint/ like Dataflow endpoint)

consider "BigQuery endpoint can be overridden via PipelineOptions, this enables BigQuery emulators (Java)"

@@ -1615,8 +1642,13 @@ private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions option
.setChannelsPerCpu(2)
.build();

BigQueryWriteSettings.Builder builder = BigQueryWriteSettings.newBuilder();
String endpoint = options.getBigQueryEndpoint();
if (endpoint != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a safer guard is Strings.isNullOrEmpty

@@ -1615,8 +1642,13 @@ private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions option
.setChannelsPerCpu(2)
.build();

BigQueryWriteSettings.Builder builder = BigQueryWriteSettings.newBuilder();
String endpoint = options.getBigQueryEndpoint();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String -> @Nullable String

@kberezin-nshl
Copy link
Contributor Author

Thanks @Abacn , addressed your comments

Copy link
Contributor

Reminder, please take a look at this pr: @Abacn @damondouglas

@Abacn Abacn merged commit a0e4541 into apache:master Aug 21, 2024
21 checks passed
@wattache
Copy link

Hello,

Thank you for this new feature. May I ask you @kberezin-nshl how you intend the user to use this new possibilities ?

I've tried to use test container for my pipeline, but I still get the error message "The project test-project has not enabled BigQuery.", with url being https://bigquery.googleapis.com/bigquery/v2/projects/test-project/datasets/***/tables/***/insertAll?prettyPrint=false

I use TestContainer and TestPipeline

 BigQueryEmulatorContainer container = new BigQueryEmulatorContainer("ghcr.io/goccy/bigquery-emulator:0.4.3");

        String projectId = container.getProjectId();
        
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions pipelineOptions = PipelineOptionsFactory.create().as(org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions.class);
        pipelineOptions.setBigQueryEndpoint(container.getEmulatorHttpEndpoint());
        BigQueryServicesImpl bqService = new BigQueryServicesImpl();
        bqService.getJobService(pipelineOptions);

        Session session = buildExpectedSession();

        WriteResult writeResult = p.apply(Create.of(session))
                .apply(ParDo.of(new ProtobufSessionToBigQueryTableRow()))
                .apply(BigQueryIO.writeTableRows()
                        .withTestServices(bqService)
                        .to(row -> {
                            return new TableDestination(projectId+":***.***", null);
                        }));

Thanks in advance for your help

@lukas-mi
Copy link

lukas-mi commented Sep 27, 2024

I have the same question as @wattache. I've tried the following:

        PipelineOptionsFactory.register(BigQueryOptions.class);
        BigQueryOptions options = PipelineOptionsFactory.create().as(BigQueryOptions.class);

        options.setBigQueryProject(bqEmulator.getEmulatorHttpEndpoint());
        options.setBigQueryProject(bqEmulator.getProjectId());

        var pipeline = Pipeline.create(options);

        var sinkTable = "%s.%s.%s".formatted(bqEmulator.getProjectId(), "test_dataset", "test_table");

        var schema = new TableSchema()
                .setFields(List.of(new TableFieldSchema()
                        .setName("testColumn")
                        .setType("STRING")
                        .setMode("REQUIRED")));

        var row = new TableRow();
        row.set("testColumn", "testValue");
        var rows = Create.of(List.of(row));

        pipeline
                .apply(rows)
                .apply(BigQueryIO
                        .<TableRow>write()
                        .to(sinkTable)
                        .withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
                            @Override
                            public TableRow apply(TableRow input) {
                                return input;
                            }
                        })
                        .withSchema(schema));

        pipeline.run().waitUntilFinish();

Still get the following error:

com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
GET https://bigquery.googleapis.com/bigquery/v2/projects/test-project/datasets/test_dataset?prettyPrint=false
{
  "code": 400,
  "errors": [
    {
      "domain": "global",
      "message": "The project test-project has not enabled BigQuery.",
      "reason": "invalid"
    }
  ],
  "message": "The project test-project has not enabled BigQuery.",
  "status": "INVALID_ARGUMENT"
}

@lukas-mi
Copy link

lukas-mi commented Sep 27, 2024

I've updated on the previous example:

  • previously had setBigQueryProject twice without setting setBigQueryEndpoint
  • added dataset creation
  • added the use of withCustomGcsTempLocation, otherwise an exception is thrown
      PipelineOptionsFactory.register(BigQueryOptions.class);
      BigQueryOptions options = PipelineOptionsFactory.create().as(BigQueryOptions.class);

      options.setBigQueryEndpoint(bqEmulator.getEmulatorHttpEndpoint());
      options.setBigQueryProject(bqEmulator.getProjectId());
      options.setRunner(DirectRunner.class);

      var pipeline = Pipeline.create(options);

      var sinkTable = "%s.%s.%s".formatted(bqEmulator.getProjectId(), "test_dataset", "test_table");

      var datasetId = DatasetId.of(bqEmulator.getProjectId(), "test_dataset");
      var datasetInfo = DatasetInfo.newBuilder(datasetId).build();
      bqClient.create(datasetInfo);

      var schema = new TableSchema()
              .setFields(List.of(new TableFieldSchema()
                      .setName("testColumn")
                      .setType("STRING")
                      .setMode("REQUIRED")));

      var row = new TableRow();
      row.set("testColumn", "testValue");
      var rows = Create.of(List.of(row));

      pipeline
              .apply(rows)
              .apply(BigQueryIO
                      .<TableRow>write()
                      .to(sinkTable)
                      .withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
                          @Override
                          public TableRow apply(TableRow input) {
                              return input;
                          }
                      })
                      .withSchema(schema)
                      .withCustomGcsTempLocation(new ValueProvider<String>() {
                          @Override
                          public String get() {
                              return "/tmp";
                          }

                          @Override
                          public @UnknownKeyFor @NonNull @Initialized boolean isAccessible() {
                              return true;
                          }
                      })
              );

      pipeline.run().waitUntilFinish();

Now I'm getting an exception related to serialization:

unable to serialize DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.io.gcp.bigquery.BatchLoads$3@5b76b891, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[], fieldAccessDescriptor=*}}
java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.io.gcp.bigquery.BatchLoads$3@5b76b891, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[], fieldAccessDescriptor=*}}

@kberezin-nshl
Copy link
Contributor Author

kberezin-nshl commented Sep 27, 2024

Hey, @wattache @lukas-mi

Unfortunately, currently there is a bug causing the behavior that you're seeing. The fix for that has been merged already, so hopefully you'll be able to use this feature with the next release.

In the meantime, it is possible to have the following workaround, first create this class:

public class WorkaroundBQServices extends BigQueryServicesImpl {

  private final String bigQueryEndpoint;

  public WorkaroundBQServices(String bigQueryEndpoint) {
    this.bigQueryEndpoint = bigQueryEndpoint;
  }

  @Override
  public JobService getJobService(BigQueryOptions options) {
    options.setBigQueryEndpoint(bigQueryEndpoint);
    return super.getJobService(options);
  }

  @Override
  public DatasetService getDatasetService(BigQueryOptions options) {
    options.setBigQueryEndpoint(bigQueryEndpoint);
    return super.getDatasetService(options);
  }

  @Override
  public WriteStreamService getWriteStreamService(BigQueryOptions options) {
    options.setBigQueryEndpoint(bigQueryEndpoint);
    return super.getWriteStreamService(options);
  }

  @Override
  public StorageClient getStorageClient(BigQueryOptions options) throws IOException {
    options.setBigQueryEndpoint(bigQueryEndpoint);
    return super.getStorageClient(options);
  }
}

then in your pipeline, set it as

BigQueryIO.<...>write()
            .withTestServices(new WorkaroundBQServices(options.getBigQueryEndpoint()))

As I said, once new release come out, you should be able to remove this workaround.

@lukas-mi
Copy link

@kberezin-nshl Thanks for the update, I have a follow up issue with the use of emulator, now related to credentials:

[2024-09-30 11:22:05,955] WARN  Request failed with code 400, performed 0 retries due to IOExceptions, performed 0 retries due to unsuccessful status codes, HTTP framework says request can be retried, (caller responsible for retrying): http://192.168.107.2:33491/bigquery/v2/projects/test-project/jobs?prettyPrint=false.  (org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer) (direct-runner-worker)
[2024-09-30 11:22:05,956] INFO  Failed to insert job GenericData{classInfo=[jobId, location, projectId], {jobId=beam_bq_job_LOAD_optionshelperlukasm09300922042972ed08_6acb3cf1f92f4e17bdf54c8f653d9e18_508a6d1432e4e3bd21efd7521577a82e_00001_00000-0, projectId=test-project}}, will retry: (org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl) (direct-runner-worker)
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
POST http://192.168.107.2:33491/bigquery/v2/projects/test-project/jobs?prettyPrint=false
{
  "code": 400,
  "errors": [
    {
      "location": "",
      "message": "failed to import from gcs: dialing: google: could not find default credentials. See https://cloud.google.com/docs/authentication/external/set-up-adc for more information",
      "reason": "jobInternalError",
      "debugInfo": ""
    }
  ],
  "message": "failed to import from gcs: dialing: google: could not find default credentials. See https://cloud.google.com/docs/authentication/external/set-up-adc for more information"
}

I tried setting options.setGcpCredential(NoCredentials.getInstance()) but it results in:

java.lang.IllegalStateException: OAuth2Credentials instance does not support refreshing the access token. An instance with a new access token should be used, or a derived type that supports refreshing.
java.lang.RuntimeException: java.lang.IllegalStateException: OAuth2Credentials instance does not support refreshing the access token. An instance with a new access token should be used, or a derived type that supports refreshing.
	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.validate(BigQueryIO.java:3296)
	at org.apache.beam.sdk.transforms.PTransform.validate(PTransform.java:180)
	at org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform(Pipeline.java:673)
	at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:581)
	at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
	at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)
	at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
	at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:477)
	at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:609)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
	at com.gowish.data.processor.pipeline.kafka2bq.product.ProductChangePipelineTest.shouldWork3(ProductChangePipelineTest.java:262)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)

@lukas-mi
Copy link

Another question, maybe related to the previous, is what to set on withCustomGcsTempLocation? Would you use some GS mock service? If so, which one?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request]: Allow setting BigQuery endpoint, for example to use bigquery emulator
4 participants