Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.lang.RuntimeException: ManagedChannel allocation site #643

Closed
hktv-mkchoi opened this issue Oct 29, 2020 · 1 comment
Closed

java.lang.RuntimeException: ManagedChannel allocation site #643

hktv-mkchoi opened this issue Oct 29, 2020 · 1 comment
Assignees
Labels
api: bigquerystorage Issues related to the googleapis/java-bigquerystorage API.

Comments

@hktv-mkchoi
Copy link

hktv-mkchoi commented Oct 29, 2020

Environment details

  1. Specify the API at the beginning of the title. For example, "BigQuery: ...").
    General, Core, and Other are also allowed as types
  2. OS type and version:
  3. Java version: jdk-11.0.7
  4. bigquerystorage version(s): 1.5.6
  5. opts: -Xmx2048m -Xms2048m -server -XX:+UseG1GC -verbose:gc -Xlog:gc:stdout -XX:+UseStringDeduplication

Steps to reproduce

Use Jmeter to load test our application, it shows the following errors. The error shows randomly.

  1. ?
  2. ?

Code example

	public BigQueryReadClient getBigQueryReadClient() throws Exception
	{
		final BigQueryReadSettings bigQueryReadSettings = BigQueryReadSettings.newBuilder()
				.setCredentialsProvider(
						FixedCredentialsProvider.create(credentials)
				)
//				.setTransportChannelProvider(channelProvider)
//				.setExecutorProvider(executorProvider)
				.build();

		final BigQueryReadClient readClient = BigQueryReadClient.create(bigQueryReadSettings);

		return readClient;
	}

        @NotNull
	private Flux<T> getTableRecords(long sysSkip, TableId destinationTable, Long sysTakeLong) throws Exception
	{
		final BigQueryReadClient readClient = bigQueryConfig.getBigQueryReadClient();

		final String table =
				String.format(
						"projects/%s/datasets/%s/tables/%s",
						bigQueryConfig.getProjectId(), destinationTable.getDataset(), destinationTable.getTable());
		final String parent = String.format("projects/%s", bigQueryConfig.getProjectId());

		// Prepare the read options
		final ReadSession.TableReadOptions tableReadOptions = api.getFields().keySet().stream()
				.reduce(
						ReadSession.TableReadOptions.newBuilder(),
						ReadSession.TableReadOptions.Builder::addSelectedFields,
						(b1, b2) -> b1
				)
				.setRowRestriction(sysTakeLong == null ?
						"row_number > " + sysSkip :
						"row_number BETWEEN " + sysSkip + " AND " + (sysSkip + sysTakeLong))
				.build();
		final ReadSession.Builder readSessionBuilder =
				ReadSession.newBuilder()
						.setTable(table)
						.setDataFormat(DataFormat.ARROW)
						.setReadOptions(tableReadOptions);
		final CreateReadSessionRequest createReadSessionRequest = CreateReadSessionRequest.newBuilder()
				.setParent(parent)
				.setReadSession(readSessionBuilder)
				.setMaxStreamCount(1)
				.build();

		// Init the read session
		final ReadSession readSession = readClient.createReadSession(createReadSessionRequest);
		if (readSession.getStreamsCount() == 0)
		{
			return Flux.fromIterable(new ArrayList<>());
		}

		// Get the streams
		final ReadStream readStream = readSession.getStreams(0);
		final ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder()
				.setReadStream(readStream.getName())
				.build();
		final ServerStream<ReadRowsResponse> readRowsResponseStream = readClient
				.readRowsCallable()
				.call(readRowsRequest);

		// Allocate the limit
		final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);

		// Prepare the wrapper vectors
		final ArrowSchema arrowSchema = readSession.getArrowSchema();
		final Schema schema = MessageSerializer.deserializeSchema(
				new ReadChannel(new ByteArrayReadableSeekableByteChannel(arrowSchema.getSerializedSchema().toByteArray()))
		);
		final List<FieldVector> vectors = new ArrayList<>();
		for (Field field : schema.getFields())
		{
			vectors.add(field.createVector(allocator));
		}

		final VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(vectors);
		final VectorLoader vectorLoader = new VectorLoader(vectorSchemaRoot);

		final Iterable<ReadRowsResponse> iterable = new Iterable<ReadRowsResponse>()
		{
			Iterator<ReadRowsResponse> iterator = readRowsResponseStream.iterator();

			@NotNull
			@Override
			public Iterator<ReadRowsResponse> iterator()
			{
				return iterator;
			}
		};

		return Flux
				.fromIterable(iterable)
				.flatMapIterable(Unchecked.function(readRowsResponse -> {
					final ArrowRecordBatch batch = readRowsResponse.getArrowRecordBatch();
					final org.apache.arrow.vector.ipc.message.ArrowRecordBatch arrowRecordBatch = MessageSerializer
							.deserializeRecordBatch(
									new ReadChannel(
											new ByteArrayReadableSeekableByteChannel(
													batch.getSerializedRecordBatch().toByteArray())),
									allocator);
					vectorLoader.load(arrowRecordBatch);

					// Release buffers from batch (they are still held in the vectors in root).
					arrowRecordBatch.close();

					final List<FieldVector> sortedFieldVectors = vectorSchemaRoot.getFieldVectors()
							.stream()
							.sorted(Comparator.comparingLong((fv) -> api.getFieldNames().indexOf(fv.getName())))
							.collect(Collectors.toList());
					final int rowCount = vectorSchemaRoot.getRowCount();

					final ArrayList<T> records = new ArrayList<>(1024);
					final ArrayList<Object> row = new ArrayList<>(api.getFieldNames().size());
					for (int i = 0; i < rowCount; i++)
					{
						final ArrayList<Object> rowArrayList = api.getRowArrayList(row, sortedFieldVectors, i);
						final T record = api.fromList(rowArrayList);

						records.add(record);
					}

					// Release buffers from vectors in root.
					vectorSchemaRoot.clear();

					return records;
				}))
				.doOnCancel(() -> {
					logger.info("Cancel.");
				})
				.doFinally(Unchecked.consumer((signalType) -> {
					logger.info("signalType=[{}], iterator=[{}].", signalType, iterable.iterator());

					vectorSchemaRoot.close();
					allocator.close();

					readClient.shutdown();
					final boolean awaitTermination = readClient.awaitTermination(1, TimeUnit.MINUTES);
					logger.info(
							"signalType=[{}], iterator=[{}], awaitTermination=[{}].",
							signalType, iterable.iterator(), awaitTermination
					);
					readClient.close();
				}));
	}

Stack trace

2020-10-29 11:19:42.763 ERROR 57248 --- [     elastic-20] i.g.i.ManagedChannelOrphanWrapper        : *~*~*~ Channel ManagedChannelImpl{logId=97, target=bigquerystorage.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.

java.lang.RuntimeException: ManagedChannel allocation site
	at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
	at io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:596)
	at io.grpc.ForwardingChannelBuilder.build(ForwardingChannelBuilder.java:255)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:314)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1600(InstantiatingGrpcChannelProvider.java:71)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel(InstantiatingGrpcChannelProvider.java:210)
	at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:217)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:200)
	at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:169)
	at com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStub.create(EnhancedBigQueryReadStub.java:89)
	at com.google.cloud.bigquery.storage.v1.BigQueryReadClient.<init>(BigQueryReadClient.java:129)
	at com.google.cloud.bigquery.storage.v1.BigQueryReadClient.create(BigQueryReadClient.java:110)
	at hk.com.mycomp.BigQueryConfig.getBigQueryReadClient(BigQueryConfig.java:96)
	at hk.com.mycomp.ApiAdapter.getTableRecords(ApiAdapter.java:200)
	at hk.com.mycomp.ApiAdapter.lambda$getItems$7(ApiAdapter.java:191)
	at org.jooq.lambda.Unchecked.lambda$function$21(Unchecked.java:878)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:378)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
	at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:249)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

External references such as API reference guides

  • ?

Any additional information below

Following these steps guarantees the quickest resolution possible.

Thanks!

@product-auto-label product-auto-label bot added the api: bigquerystorage Issues related to the googleapis/java-bigquerystorage API. label Oct 29, 2020
@hktv-mkchoi
Copy link
Author

I have found the culprit. It is caused by my careless mistake. I didn't close the readClient when the table is empty. (readSession.getStreamsCount() == 0)

if (readSession.getStreamsCount() == 0)
{
    return Flux.fromIterable(new ArrayList<>());
}

The correct way to fix this is to close the readClient properly.

return Flux
        .fromIterable(new ArrayList<T>())
        .doFinally(Unchecked.consumer((signalType) -> {
            logger.info("signalType=[{}].", signalType);

            readClient.shutdown();
            final boolean awaitTermination = readClient.awaitTermination(1, TimeUnit.MINUTES);
            if (!awaitTermination)
            {
                readClient.shutdownNow();
            }

            logger.info("signalType=[{}], awaitTermination=[{}].", signalType, awaitTermination);

            readClient.close();
        }));

shubhwip pushed a commit to shubhwip/java-bigquerystorage that referenced this issue Oct 7, 2023
…oogleapis#643)

[![WhiteSource Renovate](https://app.renovatebot.com/images/banner.svg)](https://renovatebot.com)

This PR contains the following updates:

| Package | Change | Age | Adoption | Passing | Confidence |
|---|---|---|---|---|---|
| [com.google.cloud:google-cloud-core](https://github.com/googleapis/java-core) | `2.0.2` -> `2.0.5` | [![age](https://badges.renovateapi.com/packages/maven/com.google.cloud:google-cloud-core/2.0.5/age-slim)](https://docs.renovatebot.com/merge-confidence/) | [![adoption](https://badges.renovateapi.com/packages/maven/com.google.cloud:google-cloud-core/2.0.5/adoption-slim)](https://docs.renovatebot.com/merge-confidence/) | [![passing](https://badges.renovateapi.com/packages/maven/com.google.cloud:google-cloud-core/2.0.5/compatibility-slim/2.0.2)](https://docs.renovatebot.com/merge-confidence/) | [![confidence](https://badges.renovateapi.com/packages/maven/com.google.cloud:google-cloud-core/2.0.5/confidence-slim/2.0.2)](https://docs.renovatebot.com/merge-confidence/) |

---

### Release Notes

<details>
<summary>googleapis/java-core</summary>

### [`v2.0.5`](https://github.com/googleapis/java-core/blob/master/CHANGELOG.md#&#8203;205-httpswwwgithubcomgoogleapisjava-corecomparev204v205-2021-08-11)

[Compare Source](https://github.com/googleapis/java-core/compare/v2.0.4...v2.0.5)

### [`v2.0.4`](https://github.com/googleapis/java-core/blob/master/CHANGELOG.md#&#8203;204-httpswwwgithubcomgoogleapisjava-corecomparev203v204-2021-08-11)

[Compare Source](https://github.com/googleapis/java-core/compare/v2.0.3...v2.0.4)

### [`v2.0.3`](https://github.com/googleapis/java-core/blob/master/CHANGELOG.md#&#8203;203-httpswwwgithubcomgoogleapisjava-corecomparev202v203-2021-08-10)

[Compare Source](https://github.com/googleapis/java-core/compare/v2.0.2...v2.0.3)

</details>

---

### Configuration

📅 **Schedule**: At any time (no schedule defined).

🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied.

♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox.

🔕 **Ignore**: Close this PR and you won't be reminded about this update again.

---

 - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check this box.

---

This PR has been generated by [WhiteSource Renovate](https://renovate.whitesourcesoftware.com). View repository job log [here](https://app.renovatebot.com/dashboard#github/googleapis/java-storage-nio).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquerystorage Issues related to the googleapis/java-bigquerystorage API.
Projects
None yet
Development

No branches or pull requests

1 participant