Skip to content

Commit

Permalink
Introduce the AsyncSecurityPolicy class. (#10622)
Browse files Browse the repository at this point in the history
This is the async variant of SecurityPolicy, allowing callers to implement security checks based on slow calls that aren't meant to block the gRPC thread.

BinderTransportSecurity.checkAuthorization **STILL** blocks while attempting to resolve the ListenableFuture<Status> it gets from the policy object. That will still be adressed in a follow-up.

Relate issue: #10566
  • Loading branch information
mateusazis authored Oct 26, 2023
1 parent cd810c5 commit b6947de
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import androidx.test.core.app.ApplicationProvider;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Empty;
import io.grpc.CallOptions;
import io.grpc.ManagedChannel;
Expand Down Expand Up @@ -155,7 +158,7 @@ public void testAllowedCall() throws Exception {
}

@Test
public void testServerDisllowsCalls() throws Exception {
public void testServerDisallowsCalls() throws Exception {
createChannel(
ServerSecurityPolicy.newBuilder()
.servicePolicy("foo", policy((uid) -> false))
Expand Down Expand Up @@ -197,6 +200,25 @@ public void testPerServicePolicy() throws Exception {
}
}

@Test
public void testPerServicePolicyAsync() throws Exception {
createChannel(
ServerSecurityPolicy.newBuilder()
.servicePolicy("foo", asyncPolicy((uid) -> Futures.immediateFuture(true)))
.servicePolicy("bar", asyncPolicy((uid) -> Futures.immediateFuture(false)))
.build(),
SecurityPolicies.internalOnly());

assertThat(methods).isNotEmpty();
for (MethodDescriptor<Empty, Empty> method : methods.values()) {
if (method.getServiceName().equals("bar")) {
assertCallFailure(method, Status.PERMISSION_DENIED);
} else {
assertCallSuccess(method);
}
}
}

@Test
public void testSecurityInterceptorIsClosestToTransport() throws Exception {
createChannel(
Expand Down Expand Up @@ -227,6 +249,20 @@ public Status checkAuthorization(int uid) {
};
}

private static AsyncSecurityPolicy asyncPolicy(
Function<Integer, ListenableFuture<Boolean>> func) {
return new AsyncSecurityPolicy() {
@Override
public ListenableFuture<Status> checkAuthorizationAsync(int uid) {
return Futures
.transform(
func.apply(uid),
allowed -> allowed ? Status.OK : Status.PERMISSION_DENIED,
MoreExecutors.directExecutor());
}
};
}

private final class CountingServerInterceptor implements ServerInterceptor {
int numInterceptedCalls;

Expand Down
71 changes: 71 additions & 0 deletions binder/src/main/java/io/grpc/binder/AsyncSecurityPolicy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.binder;

import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.ExperimentalApi;
import io.grpc.Status;

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import javax.annotation.CheckReturnValue;

/**
* Decides whether a given Android UID is authorized to access some resource.
*
* <p>This class provides the asynchronous version of {@link SecurityPolicy}, allowing
* implementations of authorization logic that involves slow or asynchronous calls without
* necessarily blocking the calling thread.
*
* @see SecurityPolicy
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10566")
@CheckReturnValue
public abstract class AsyncSecurityPolicy extends SecurityPolicy {

/**
* @deprecated Prefer {@link #checkAuthorizationAsync(int)} for async or slow calls or subclass
* {@link SecurityPolicy} directly for quick, synchronous implementations.
*/
@Override
@Deprecated
public final Status checkAuthorization(int uid) {
try {
return checkAuthorizationAsync(uid).get();
} catch (ExecutionException e) {
return Status.fromThrowable(e);
} catch (CancellationException e) {
return Status.CANCELLED.withCause(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // re-set the current thread's interruption state
return Status.CANCELLED.withCause(e);
}
}

/**
* Decides whether the given Android UID is authorized. (Validity is implementation dependent).
*
* <p>As long as any given UID has active processes, this method should return the same value for
* that UID. In order words, policy changes which occur while a transport instance is active, will
* have no effect on that transport instance.
*
* @param uid The Android UID to authenticate.
* @return A {@link ListenableFuture} for a gRPC {@link Status} object, with OK indicating
* authorized.
*/
abstract ListenableFuture<Status> checkAuthorizationAsync(int uid);
}
4 changes: 4 additions & 0 deletions binder/src/main/java/io/grpc/binder/ServerSecurityPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public Status checkAuthorizationForService(int uid, String serviceName) {
@CheckReturnValue
ListenableFuture<Status> checkAuthorizationForServiceAsync(int uid, String serviceName) {
SecurityPolicy securityPolicy = perServicePolicies.getOrDefault(serviceName, defaultPolicy);
if (securityPolicy instanceof AsyncSecurityPolicy) {
return ((AsyncSecurityPolicy) securityPolicy).checkAuthorizationAsync(uid);
}

try {
Status status = securityPolicy.checkAuthorization(uid);
return Futures.immediateFuture(status);
Expand Down
153 changes: 153 additions & 0 deletions binder/src/test/java/io/grpc/binder/ServerSecurityPolicyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,22 @@

import static com.google.common.truth.Truth.assertThat;

import static org.junit.Assert.fail;
import android.os.Process;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Status;
import io.grpc.StatusException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.robolectric.RobolectricTestRunner;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;

@RunWith(RobolectricTestRunner.class)
public final class ServerSecurityPolicyTest {
Expand Down Expand Up @@ -81,6 +91,86 @@ public void testPerService() {
.isEqualTo(Status.OK.getCode());
}

@Test
public void testPerServiceAsync() {
policy =
ServerSecurityPolicy.newBuilder()
.servicePolicy(SERVICE2, asyncPolicy(uid -> {
// Add some extra future transformation to confirm that a chain
// of futures gets properly handled.
ListenableFuture<Void> dependency = Futures.immediateVoidFuture();
return Futures
.transform(dependency, unused -> Status.OK, MoreExecutors.directExecutor());
}))
.build();

assertThat(policy.checkAuthorizationForService(MY_UID, SERVICE1).getCode())
.isEqualTo(Status.OK.getCode());
assertThat(policy.checkAuthorizationForService(OTHER_UID, SERVICE1).getCode())
.isEqualTo(Status.PERMISSION_DENIED.getCode());
assertThat(policy.checkAuthorizationForService(MY_UID, SERVICE2).getCode())
.isEqualTo(Status.OK.getCode());
assertThat(policy.checkAuthorizationForService(OTHER_UID, SERVICE2).getCode())
.isEqualTo(Status.OK.getCode());
}

@Test
public void testPerService_throwingExceptionAsynchronously_propagatesStatusFromException() {
policy =
ServerSecurityPolicy.newBuilder()
.servicePolicy(SERVICE1, asyncPolicy(uid ->
Futures
.immediateFailedFuture(
new StatusException(Status.fromCode(Status.Code.ALREADY_EXISTS)))
))
.build();

assertThat(policy.checkAuthorizationForService(MY_UID, SERVICE1).getCode())
.isEqualTo(Status.ALREADY_EXISTS.getCode());
}

@Test
public void testPerServiceAsync_cancelledFuture_propagatesStatus() {
policy =
ServerSecurityPolicy.newBuilder()
.servicePolicy(SERVICE1, asyncPolicy(unused -> Futures.immediateCancelledFuture()))
.build();

assertThat(policy.checkAuthorizationForService(MY_UID, SERVICE1).getCode())
.isEqualTo(Status.CANCELLED.getCode());
}

@Test
public void testPerServiceAsync_interrupted_cancelledStatus() {
ListeningExecutorService listeningExecutorService =
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
CountDownLatch unsatisfiedLatch = new CountDownLatch(1);
ListenableFuture<Status> toBeInterruptedFuture = listeningExecutorService.submit(() -> {
unsatisfiedLatch.await(); // waits forever
return null;
});

CyclicBarrier barrier = new CyclicBarrier(2);
Thread testThread = Thread.currentThread();
new Thread(() -> {
awaitOrFail(barrier);
testThread.interrupt();
}).start();

policy =
ServerSecurityPolicy.newBuilder()
.servicePolicy(SERVICE1, asyncPolicy(unused -> {
awaitOrFail(barrier);
return toBeInterruptedFuture;
}))
.build();

assertThat(policy.checkAuthorizationForService(MY_UID, SERVICE1).getCode())
.isEqualTo(Status.CANCELLED.getCode());
assertThat(Thread.currentThread().isInterrupted()).isTrue();
listeningExecutorService.shutdownNow();
}

@Test
public void testPerServiceNoDefault() {
policy =
Expand Down Expand Up @@ -109,6 +199,49 @@ SERVICE2, policy((uid) -> uid == OTHER_UID ? Status.OK : Status.PERMISSION_DENIE
.isEqualTo(Status.PERMISSION_DENIED.getCode());
}

@Test
public void testPerServiceNoDefaultAsync() {
policy =
ServerSecurityPolicy.newBuilder()
.servicePolicy(
SERVICE1,
asyncPolicy((uid) -> Futures.immediateFuture(Status.INTERNAL)))
.servicePolicy(
SERVICE2, asyncPolicy((uid) -> {
// Add some extra future transformation to confirm that a chain
// of futures gets properly handled.
ListenableFuture<Boolean> anotherUidFuture =
Futures.immediateFuture(uid == OTHER_UID);
return Futures
.transform(
anotherUidFuture,
anotherUid ->
anotherUid
? Status.OK
: Status.PERMISSION_DENIED,
MoreExecutors.directExecutor());
}))
.build();

// Uses the specified policy for service1.
assertThat(policy.checkAuthorizationForService(MY_UID, SERVICE1).getCode())
.isEqualTo(Status.INTERNAL.getCode());
assertThat(policy.checkAuthorizationForService(OTHER_UID, SERVICE1).getCode())
.isEqualTo(Status.INTERNAL.getCode());

// Uses the specified policy for service2.
assertThat(policy.checkAuthorizationForService(MY_UID, SERVICE2).getCode())
.isEqualTo(Status.PERMISSION_DENIED.getCode());
assertThat(policy.checkAuthorizationForService(OTHER_UID, SERVICE2).getCode())
.isEqualTo(Status.OK.getCode());

// Falls back to the default.
assertThat(policy.checkAuthorizationForService(MY_UID, SERVICE3).getCode())
.isEqualTo(Status.OK.getCode());
assertThat(policy.checkAuthorizationForService(OTHER_UID, SERVICE3).getCode())
.isEqualTo(Status.PERMISSION_DENIED.getCode());
}

private static SecurityPolicy policy(Function<Integer, Status> func) {
return new SecurityPolicy() {
@Override
Expand All @@ -117,4 +250,24 @@ public Status checkAuthorization(int uid) {
}
};
}

private static AsyncSecurityPolicy asyncPolicy(Function<Integer, ListenableFuture<Status>> func) {
return new AsyncSecurityPolicy() {
@Override
public ListenableFuture<Status> checkAuthorizationAsync(int uid) {
return func.apply(uid);
}
};
}

private static void awaitOrFail(CyclicBarrier barrier) {
try {
barrier.await();
} catch (BrokenBarrierException e) {
fail(e.getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fail(e.getMessage());
}
}
}

0 comments on commit b6947de

Please sign in to comment.