Skip to content

Commit

Permalink
Throw error that entries are not found on watching if want (#653)
Browse files Browse the repository at this point in the history
### Motivation
- #532
- To-Do of #610

### Modifications
- Put value of `notify-entry-not-found` on `Prefer` request header to get error that entries are not found on watching file/repository
- Propagate error that entries are not found into `Watcher#initialValueFuture`

### Result
- You can get error on watching file/repository if the entry doesn't exist.
  • Loading branch information
di-seo authored Dec 9, 2021
1 parent 88ce886 commit 82fb33c
Show file tree
Hide file tree
Showing 12 changed files with 451 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,9 @@ public CompletableFuture<PushResult> push(String projectName, String repositoryN
public CompletableFuture<Revision> watchRepository(String projectName, String repositoryName,
Revision lastKnownRevision,
String pathPattern,
long timeoutMillis) {
long timeoutMillis,
boolean errorOnEntryNotFound) {
// Legacy client does not support 'errorOnEntryNotFound'
final CompletableFuture<WatchRepositoryResult> future = run(callback -> {
validateProjectAndRepositoryName(projectName, repositoryName);
requireNonNull(lastKnownRevision, "lastKnownRevision");
Expand All @@ -491,8 +493,8 @@ public CompletableFuture<Revision> watchRepository(String projectName, String re
@Override
public <T> CompletableFuture<Entry<T>> watchFile(String projectName, String repositoryName,
Revision lastKnownRevision, Query<T> query,
long timeoutMillis) {

long timeoutMillis, boolean errorOnEntryNotFound) {
// Legacy client does not support 'errorOnEntryNotFound'
final CompletableFuture<WatchFileResult> future = run(callback -> {
validateProjectAndRepositoryName(projectName, repositoryName);
requireNonNull(lastKnownRevision, "lastKnownRevision");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ public CompletableFuture<PushResult> push(String projectName, String repositoryN
@Override
public CompletableFuture<Revision> watchRepository(String projectName, String repositoryName,
Revision lastKnownRevision, String pathPattern,
long timeoutMillis) {
long timeoutMillis, boolean errorOnEntryNotFound) {
try {
validateProjectAndRepositoryName(projectName, repositoryName);
requireNonNull(lastKnownRevision, "lastKnownRevision");
Expand All @@ -815,8 +815,8 @@ public CompletableFuture<Revision> watchRepository(String projectName, String re
}
path.append(encodePathPattern(pathPattern));

return watch(lastKnownRevision, timeoutMillis, path.toString(), QueryType.IDENTITY,
ArmeriaCentralDogma::watchRepository);
return watch(lastKnownRevision, timeoutMillis, errorOnEntryNotFound, path.toString(),
QueryType.IDENTITY, ArmeriaCentralDogma::watchRepository);
} catch (Exception e) {
return exceptionallyCompletedFuture(e);
}
Expand All @@ -838,7 +838,7 @@ private static Revision watchRepository(AggregatedHttpResponse res, QueryType un
@Override
public <T> CompletableFuture<Entry<T>> watchFile(String projectName, String repositoryName,
Revision lastKnownRevision, Query<T> query,
long timeoutMillis) {
long timeoutMillis, boolean errorOnEntryNotFound) {
try {
validateProjectAndRepositoryName(projectName, repositoryName);
requireNonNull(lastKnownRevision, "lastKnownRevision");
Expand All @@ -856,7 +856,7 @@ public <T> CompletableFuture<Entry<T>> watchFile(String projectName, String repo
path.setLength(path.length() - 1);
}

return watch(lastKnownRevision, timeoutMillis, path.toString(), query.type(),
return watch(lastKnownRevision, timeoutMillis, errorOnEntryNotFound, path.toString(), query.type(),
ArmeriaCentralDogma::watchFile);
} catch (Exception e) {
return exceptionallyCompletedFuture(e);
Expand All @@ -878,11 +878,15 @@ private static <T> Entry<T> watchFile(AggregatedHttpResponse res, QueryType quer
}

private <T> CompletableFuture<T> watch(Revision lastKnownRevision, long timeoutMillis,
boolean errorOnEntryNotFound,
String path, QueryType queryType,
BiFunction<AggregatedHttpResponse, QueryType, T> func) {
final RequestHeadersBuilder builder = headersBuilder(HttpMethod.GET, path);
builder.set(HttpHeaderNames.IF_NONE_MATCH, lastKnownRevision.text())
.set(HttpHeaderNames.PREFER, "wait=" + LongMath.saturatedAdd(timeoutMillis, 999) / 1000L);
.set(HttpHeaderNames.PREFER,
// It is good to extract private method when this logic becomes heavier.
"wait=" + LongMath.saturatedAdd(timeoutMillis, 999) / 1000L +
", notify-entry-not-found=" + errorOnEntryNotFound);

try (SafeCloseable ignored = Clients.withContextCustomizer(ctx -> {
final long responseTimeoutMillis = ctx.responseTimeoutMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,60 +143,44 @@ public final CompletableFuture<PushResult> push(
author, summary, detail, markup, changes);
}

@Override
public final CompletableFuture<Revision> watchRepository(
String projectName, String repositoryName, Revision lastKnownRevision, String pathPattern) {
return CentralDogma.super.watchRepository(projectName, repositoryName, lastKnownRevision, pathPattern);
}

@Override
public final <T> CompletableFuture<Entry<T>> watchFile(
String projectName, String repositoryName, Revision lastKnownRevision, Query<T> query) {
return CentralDogma.super.watchFile(projectName, repositoryName, lastKnownRevision, query);
}

@Override
public final <T> Watcher<T> fileWatcher(String projectName, String repositoryName, Query<T> query) {
return CentralDogma.super.fileWatcher(projectName, repositoryName, query);
}

@Override
public <T, U> Watcher<U> fileWatcher(
String projectName, String repositoryName, Query<T> query,
Function<? super T, ? extends U> function) {
return fileWatcher(projectName, repositoryName, query, function, blockingTaskExecutor);
return fileWatcher(projectName, repositoryName, query, function,
blockingTaskExecutor,
WatchConstants.DEFAULT_WATCH_TIMEOUT_MILLIS,
WatchConstants.DEFAULT_WATCH_ERROR_ON_ENTRY_NOT_FOUND);
}

@Override
public <T, U> Watcher<U> fileWatcher(String projectName, String repositoryName, Query<T> query,
Function<? super T, ? extends U> function, Executor executor) {
Function<? super T, ? extends U> function, Executor executor,
long timeoutMillis, boolean errorOnEntryNotFound) {
final FileWatcher<U> watcher =
new FileWatcher<>(this, blockingTaskExecutor, executor, projectName, repositoryName, query,
function);
function, timeoutMillis, errorOnEntryNotFound);
watcher.start();
return watcher;
}

@Override
public final Watcher<Revision> repositoryWatcher(
String projectName, String repositoryName, String pathPattern) {
return CentralDogma.super.repositoryWatcher(projectName, repositoryName, pathPattern);
}

@Override
public <T> Watcher<T> repositoryWatcher(
String projectName, String repositoryName, String pathPattern,
Function<Revision, ? extends T> function) {
return repositoryWatcher(projectName, repositoryName, pathPattern, function, blockingTaskExecutor);
return repositoryWatcher(projectName, repositoryName, pathPattern, function,
blockingTaskExecutor,
WatchConstants.DEFAULT_WATCH_TIMEOUT_MILLIS,
WatchConstants.DEFAULT_WATCH_ERROR_ON_ENTRY_NOT_FOUND);
}

@Override
public <T> Watcher<T> repositoryWatcher(String projectName, String repositoryName, String pathPattern,
Function<Revision, ? extends T> function, Executor executor) {

Function<Revision, ? extends T> function, Executor executor,
long timeoutMillis, boolean errorOnEntryNotFound) {
final RepositoryWatcher<T> watcher =
new RepositoryWatcher<>(this, blockingTaskExecutor, executor,
projectName, repositoryName, pathPattern, function);
new RepositoryWatcher<>(this, blockingTaskExecutor, executor, projectName, repositoryName,
pathPattern, function, timeoutMillis, errorOnEntryNotFound);
watcher.start();
return watcher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.linecorp.centraldogma.common.Change;
import com.linecorp.centraldogma.common.Commit;
import com.linecorp.centraldogma.common.Entry;
import com.linecorp.centraldogma.common.EntryNotFoundException;
import com.linecorp.centraldogma.common.EntryType;
import com.linecorp.centraldogma.common.Markup;
import com.linecorp.centraldogma.common.MergeQuery;
Expand Down Expand Up @@ -452,10 +453,35 @@ default CompletableFuture<Revision> watchRepository(String projectName, String r
* @return the latest known {@link Revision} which contains the changes for the matched files.
* {@code null} if the files were not changed for {@code timeoutMillis} milliseconds
* since the invocation of this method.
*
* @deprecated Use {@link CentralDogma#watchRepository(String, String, Revision, String, long, boolean)}.
*/
@Deprecated
default CompletableFuture<Revision> watchRepository(String projectName, String repositoryName,
Revision lastKnownRevision, String pathPattern,
long timeoutMillis) {
return watchRepository(projectName, repositoryName, lastKnownRevision, pathPattern,
timeoutMillis, WatchConstants.DEFAULT_WATCH_ERROR_ON_ENTRY_NOT_FOUND);
}

/**
* Waits for the files matched by the specified {@code pathPattern} to be changed since the specified
* {@code lastKnownRevision}. If the files don't exist and {@code errorOnEntryNotFound} is {@code true},
* the returned {@link CompletableFuture} will be completed exceptionally with
* {@link EntryNotFoundException}. If no changes were made within the specified {@code timeoutMillis},
* the returned {@link CompletableFuture} will be completed with {@code null}.
* It is recommended to specify the largest {@code timeoutMillis} allowed by the server.
*
* <p>Note: Legacy client does not support {@code errorOnEntryNotFound}</p>
*
* @return the latest known {@link Revision} which contains the changes for the matched files.
* {@code null} if the files were not changed for {@code timeoutMillis} milliseconds
* since the invocation of this method. {@link EntryNotFoundException} is raised if the
* target does not exist.
*/
CompletableFuture<Revision> watchRepository(String projectName, String repositoryName,
Revision lastKnownRevision, String pathPattern,
long timeoutMillis);
long timeoutMillis, boolean errorOnEntryNotFound);

/**
* Waits for the file matched by the specified {@link Query} to be changed since the specified
Expand All @@ -481,10 +507,33 @@ default <T> CompletableFuture<Entry<T>> watchFile(String projectName, String rep
* @return the {@link Entry} which contains the latest known {@link Query} result.
* {@code null} if the file was not changed for {@code timeoutMillis} milliseconds
* since the invocation of this method.
*
* @deprecated Use {@link CentralDogma#watchFile(String, String, Revision, Query, long, boolean)}.
*/
@Deprecated
default <T> CompletableFuture<Entry<T>> watchFile(String projectName, String repositoryName,
Revision lastKnownRevision, Query<T> query,
long timeoutMillis) {
return watchFile(projectName, repositoryName, lastKnownRevision, query,
timeoutMillis, WatchConstants.DEFAULT_WATCH_ERROR_ON_ENTRY_NOT_FOUND);
}

/**
* Waits for the file matched by the specified {@link Query} to be changed since the specified
* {@code lastKnownRevision}. If the file does not exist and {@code errorOnEntryNotFound} is {@code true},
* the returned {@link CompletableFuture} will be completed exceptionally with
* {@link EntryNotFoundException}. If no changes were made within the specified {@code timeoutMillis},
* the returned {@link CompletableFuture} will be completed with {@code null}.
* It is recommended to specify the largest {@code timeoutMillis} allowed by the server.
*
* @return the {@link Entry} which contains the latest known {@link Query} result.
* {@code null} if the file was not changed for {@code timeoutMillis} milliseconds
* since the invocation of this method. {@link EntryNotFoundException} is raised if the
* target does not exist.
*/
<T> CompletableFuture<Entry<T>> watchFile(String projectName, String repositoryName,
Revision lastKnownRevision, Query<T> query,
long timeoutMillis);
long timeoutMillis, boolean errorOnEntryNotFound);

/**
* Returns a {@link Watcher} which notifies its listeners when the result of the
Expand Down Expand Up @@ -532,11 +581,37 @@ <T, U> Watcher<U> fileWatcher(String projectName, String repositoryName,
* assert myValue instanceof MyType;
* ...
* });}</pre>
*/
default <T, U> Watcher<U> fileWatcher(String projectName, String repositoryName, Query<T> query,
Function<? super T, ? extends U> function, Executor executor) {
return fileWatcher(projectName, repositoryName, query, function, executor,
WatchConstants.DEFAULT_WATCH_TIMEOUT_MILLIS,
WatchConstants.DEFAULT_WATCH_ERROR_ON_ENTRY_NOT_FOUND);
}

/**
* Returns a {@link Watcher} which notifies its listeners after applying the specified
* {@link Function} when the result of the given {@link Query} becomes available or changes. e.g:
* <pre>{@code
* Watcher<MyType> watcher = client.fileWatcher(
* "foo", "bar", Query.ofJson("/baz.json"),
* content -> new ObjectMapper().treeToValue(content, MyType.class), executor,
* 60000, true);
*
* @param executor the {@link Executor} that executes the {@link Function}
* watcher.initialValueFuture().thenAccept(result -> watcher.watch((revision, myValue) -> {
* assert myValue instanceof MyType;
* ...
* }));}</pre>
*
* @param executor the {@link Executor} that executes the {@link Function}.
* @param errorOnEntryNotFound the {@code errorOnEntryNotFound} that is a option on watching.
* if is {@code true} and the file doesn't exist, {@link Watcher#initialValueFuture()} will be
* completed with {@link EntryNotFoundException}.
* <p>Note: Legacy client does not support {@code errorOnEntryNotFound}</p>
*/
<T, U> Watcher<U> fileWatcher(String projectName, String repositoryName,
Query<T> query, Function<? super T, ? extends U> function, Executor executor);
<T, U> Watcher<U> fileWatcher(String projectName, String repositoryName, Query<T> query,
Function<? super T, ? extends U> function, Executor executor,
long timeoutMillis, boolean errorOnEntryNotFound);

/**
* Returns a {@link Watcher} which notifies its listeners when the specified repository has a new commit
Expand Down Expand Up @@ -589,6 +664,36 @@ <T> Watcher<T> repositoryWatcher(String projectName, String repositoryName, Stri
*
* @param executor the {@link Executor} that executes the {@link Function}
*/
default <T> Watcher<T> repositoryWatcher(String projectName, String repositoryName, String pathPattern,
Function<Revision, ? extends T> function, Executor executor) {
return repositoryWatcher(projectName, repositoryName, pathPattern, function, executor,
WatchConstants.DEFAULT_WATCH_TIMEOUT_MILLIS,
WatchConstants.DEFAULT_WATCH_ERROR_ON_ENTRY_NOT_FOUND);
}

/**
* Returns a {@link Watcher} which notifies its listeners when the specified repository has a new commit
* that contains the changes for the files matched by the given {@code pathPattern}. e.g:
* <pre>{@code
* Watcher<MyType> watcher = client.repositoryWatcher(
* "foo", "bar", "/*.json",
* revision -> client.getFiles("foo", "bar", revision, "/*.json").join(), executor,
* 60000, true);
*
* watcher.initialValueFuture().thenAccept(result -> watcher.watch((revision, contents) -> {
* ...
* }));}</pre>
* Note that you may get {@link RevisionNotFoundException} during the {@code getFiles()} call and
* may have to retry in the above example due to
* <a href="https://github.com/line/centraldogma/issues/40">a known issue</a>.
*
* @param executor the {@link Executor} that executes the {@link Function}.
* @param errorOnEntryNotFound the {@code errorOnEntryNotFound} that is a option on watching.
* if is {@code true} and the files don't exist, {@link Watcher#initialValueFuture()} will be
* completed with {@link EntryNotFoundException}.
* <p>Note: Legacy client does not support {@code errorOnEntryNotFound}</p>
*/
<T> Watcher<T> repositoryWatcher(String projectName, String repositoryName, String pathPattern,
Function<Revision, ? extends T> function, Executor executor);
Function<Revision, ? extends T> function, Executor executor,
long timeoutMillis, boolean errorOnEntryNotFound);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
final class WatchConstants {

static final long DEFAULT_WATCH_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
static final boolean DEFAULT_WATCH_ERROR_ON_ENTRY_NOT_FOUND = false;
static final int RECOMMENDED_AWAIT_TIMEOUT_SECONDS = 20;

private WatchConstants() {}
Expand Down
Loading

0 comments on commit 82fb33c

Please sign in to comment.