Skip to content

Commit

Permalink
Add Sink class, implement sink methods and tests (#1063)
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard authored Jun 21, 2016
1 parent 0edaca8 commit add9489
Show file tree
Hide file tree
Showing 9 changed files with 1,390 additions and 4 deletions.
8 changes: 7 additions & 1 deletion gcloud-java-logging/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>0.12.0</version>
<version>0.14.0</version>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
Expand All @@ -48,6 +48,12 @@
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.4</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,123 @@

package com.google.cloud.logging;

import com.google.cloud.AsyncPage;
import com.google.cloud.Page;
import com.google.cloud.Service;

import java.util.Map;
import java.util.concurrent.Future;

public interface Logging extends AutoCloseable, Service<LoggingOptions> {

/**
* Class for specifying options for listing sinks, monitored resources and monitored resource
* descriptors.
*/
final class ListOption extends Option {

private static final long serialVersionUID = -6857294816115909271L;

enum OptionType implements Option.OptionType {
PAGE_SIZE, PAGE_TOKEN;

@SuppressWarnings("unchecked")
<T> T get(Map<Option.OptionType, ?> options) {
return (T) options.get(this);
}
}

private ListOption(OptionType option, Object value) {
super(option, value);
}

/**
* Returns an option to specify the maximum number of resources returned per page.
*/
public static ListOption pageSize(int pageSize) {
return new ListOption(OptionType.PAGE_SIZE, pageSize);
}

/**
* Returns an option to specify the page token from which to start listing resources.
*/
public static ListOption pageToken(String pageToken) {
return new ListOption(OptionType.PAGE_TOKEN, pageToken);
}
}

/**
* Creates a new sink.
*
* @return the created sink
* @throws LoggingException upon failure
*/
Sink create(SinkInfo sink);

/**
* Sends a request for creating a sink. This method returns a {@code Future} object to consume the
* result. {@link Future#get()} returns the created sink or {@code null} if not found.
*/
Future<Sink> createAsync(SinkInfo sink);

/**
* Updates a sink or creates one if it does not exist.
*
* @return the created sink
* @throws LoggingException upon failure
*/
Sink update(SinkInfo sink);

/**
* Sends a request for updating a sink (or creating it, if it does not exist). This method returns
* a {@code Future} object to consume the result. {@link Future#get()} returns the
* updated/created sink or {@code null} if not found.
*/
Future<Sink> updateAsync(SinkInfo sink);

/**
* Returns the requested sink or {@code null} if not found.
*
* @throws LoggingException upon failure
*/
Sink getSink(String sink);

/**
* Sends a request for getting a sink. This method returns a {@code Future} object to consume the
* result. {@link Future#get()} returns the requested sink or {@code null} if not found.
*
* @throws LoggingException upon failure
*/
Future<Sink> getSinkAsync(String sink);

/**
* Lists the sinks. This method returns a {@link Page} object that can be used to consume
* paginated results. Use {@link ListOption} to specify the page size or the page token from which
* to start listing sinks.
*
* @throws LoggingException upon failure
*/
Page<Sink> listSinks(ListOption... options);

/**
* Sends a request for listing sinks. This method returns a {@code Future} object to consume
* the result. {@link Future#get()} returns an {@link AsyncPage} object that can be used to
* asynchronously handle paginated results. Use {@link ListOption} to specify the page size or the
* page token from which to start listing sinks.
*/
Future<AsyncPage<Sink>> listSinksAsync(ListOption... options);

/**
* Deletes the requested sink.
*
* @return {@code true} if the sink was deleted, {@code false} if it was not found
*/
boolean deleteSink(String sink);

/**
* Sends a request for deleting a sink. This method returns a {@code Future} object to consume the
* result. {@link Future#get()} returns {@code true} if the sink was deleted, {@code false} if it
* was not found.
*/
Future<Boolean> deleteSinkAsync(String sink);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.logging;

import static com.google.api.client.util.Preconditions.checkArgument;
import static com.google.cloud.logging.Logging.ListOption.OptionType.PAGE_SIZE;
import static com.google.cloud.logging.Logging.ListOption.OptionType.PAGE_TOKEN;
import static com.google.common.util.concurrent.Futures.lazyTransform;

import com.google.cloud.AsyncPage;
import com.google.cloud.AsyncPageImpl;
import com.google.cloud.BaseService;
import com.google.cloud.Page;
import com.google.cloud.PageImpl;
import com.google.cloud.logging.spi.LoggingRpc;
import com.google.cloud.logging.spi.v2.ConfigServiceV2Api;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.logging.v2.CreateSinkRequest;
import com.google.logging.v2.DeleteSinkRequest;
import com.google.logging.v2.GetSinkRequest;
import com.google.logging.v2.ListSinksRequest;
import com.google.logging.v2.ListSinksResponse;
import com.google.logging.v2.UpdateSinkRequest;
import com.google.protobuf.Empty;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

class LoggingImpl extends BaseService<LoggingOptions> implements Logging {

private final LoggingRpc rpc;
private boolean closed;

private static final Function<Empty, Boolean> EMPTY_TO_BOOLEAN_FUNCTION =
new Function<Empty, Boolean>() {
@Override
public Boolean apply(Empty input) {
return input != null;
}
};

LoggingImpl(LoggingOptions options) {
super(options);
rpc = options.rpc();
}

private static <V> V get(Future<V> future) {
try {
return Uninterruptibles.getUninterruptibly(future);
} catch (ExecutionException ex) {
throw Throwables.propagate(ex.getCause());
}
}

private abstract static class BasePageFetcher<T> implements AsyncPageImpl.NextPageFetcher<T> {

private static final long serialVersionUID = 5095123855547444030L;

private final LoggingOptions serviceOptions;
private final Map<Option.OptionType, ?> requestOptions;

private BasePageFetcher(LoggingOptions serviceOptions, String cursor,
Map<Option.OptionType, ?> requestOptions) {
this.serviceOptions = serviceOptions;
this.requestOptions =
PageImpl.nextRequestOptions(PAGE_TOKEN, cursor, requestOptions);
}

LoggingOptions serviceOptions() {
return serviceOptions;
}

Map<Option.OptionType, ?> requestOptions() {
return requestOptions;
}
}

private static class SinkPageFetcher extends BasePageFetcher<Sink> {

private static final long serialVersionUID = 4879364260060886875L;

SinkPageFetcher(LoggingOptions serviceOptions, String cursor,
Map<Option.OptionType, ?> requestOptions) {
super(serviceOptions, cursor, requestOptions);
}

@Override
public Future<AsyncPage<Sink>> nextPage() {
return listSinksAsync(serviceOptions(), requestOptions());
}
}

@Override
public Sink create(SinkInfo sink) {
return get(createAsync(sink));
}

@Override
public Future<Sink> createAsync(SinkInfo sink) {
CreateSinkRequest request = CreateSinkRequest.newBuilder()
.setProjectName(ConfigServiceV2Api.formatProjectName(options().projectId()))
.setSink(sink.toPb(options().projectId()))
.build();
return lazyTransform(rpc.create(request), Sink.fromPbFunction(this));
}

@Override
public Sink update(SinkInfo sink) {
return get(updateAsync(sink));
}

@Override
public Future<Sink> updateAsync(SinkInfo sink) {
UpdateSinkRequest request = UpdateSinkRequest.newBuilder()
.setSinkName(ConfigServiceV2Api.formatSinkName(options().projectId(), sink.name()))
.setSink(sink.toPb(options().projectId()))
.build();
return lazyTransform(rpc.update(request), Sink.fromPbFunction(this));
}

@Override
public Sink getSink(String sink) {
return get(getSinkAsync(sink));
}

@Override
public Future<Sink> getSinkAsync(String sink) {
GetSinkRequest request = GetSinkRequest.newBuilder()
.setSinkName(ConfigServiceV2Api.formatSinkName(options().projectId(), sink))
.build();
return lazyTransform(rpc.get(request), Sink.fromPbFunction(this));
}

private static ListSinksRequest listSinksRequest(LoggingOptions serviceOptions,
Map<Option.OptionType, ?> options) {
ListSinksRequest.Builder builder = ListSinksRequest.newBuilder();
builder.setProjectName(ConfigServiceV2Api.formatProjectName(serviceOptions.projectId()));
Integer pageSize = PAGE_SIZE.get(options);
String pageToken = PAGE_TOKEN.get(options);
if (pageSize != null) {
builder.setPageSize(pageSize);
}
if (pageToken != null) {
builder.setPageToken(pageToken);
}
return builder.build();
}

private static Future<AsyncPage<Sink>> listSinksAsync(final LoggingOptions serviceOptions,
final Map<Option.OptionType, ?> options) {
final ListSinksRequest request = listSinksRequest(serviceOptions, options);
Future<ListSinksResponse> list = serviceOptions.rpc().list(request);
return lazyTransform(list, new Function<ListSinksResponse, AsyncPage<Sink>>() {
@Override
public AsyncPage<Sink> apply(ListSinksResponse listSinksResponse) {
List<Sink> sinks = listSinksResponse.getSinksList() == null ? ImmutableList.<Sink>of()
: Lists.transform(listSinksResponse.getSinksList(),
Sink.fromPbFunction(serviceOptions.service()));
String cursor = listSinksResponse.getNextPageToken().equals("") ? null
: listSinksResponse.getNextPageToken();
return new AsyncPageImpl<>(
new SinkPageFetcher(serviceOptions, cursor, options), cursor, sinks);
}
});
}

@Override
public Page<Sink> listSinks(ListOption... options) {
return get(listSinksAsync(options));
}

@Override
public Future<AsyncPage<Sink>> listSinksAsync(ListOption... options) {
return listSinksAsync(options(), optionMap(options));
}

@Override
public boolean deleteSink(String sink) {
return get(deleteSinkAsync(sink));
}

@Override
public Future<Boolean> deleteSinkAsync(String sink) {
DeleteSinkRequest request = DeleteSinkRequest.newBuilder()
.setSinkName(ConfigServiceV2Api.formatSinkName(options().projectId(), sink))
.build();
return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
}

@Override
public void close() throws Exception {
if (closed) {
return;
}
closed = true;
rpc.close();
}

static <T extends Option.OptionType> Map<Option.OptionType, ?> optionMap(Option... options) {
Map<Option.OptionType, Object> optionMap = Maps.newHashMap();
for (Option option : options) {
Object prev = optionMap.put(option.optionType(), option.value());
checkArgument(prev == null, "Duplicate option %s", option);
}
return optionMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ public static class DefaultLoggingFactory implements LoggingFactory {

@Override
public Logging create(LoggingOptions options) {
// todo(mziccard) uncomment once LoggingImpl is implemented
// return new LoggingImpl(options);
return null;
return new LoggingImpl(options);
}
}

Expand Down
Loading

0 comments on commit add9489

Please sign in to comment.