Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement helixAccountService with version history and blobs #1234

Merged
merged 12 commits into from
Aug 16, 2019
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.account;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import org.I0Itec.zkclient.DataUpdater;
import org.apache.helix.AccessOption;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.HelixPropertyStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* An AccountMetadataStore is a storage to keep and retrieve {@link Account} metadata. This is the base
*/
abstract class AccountMetadataStore {
private static final Logger logger = LoggerFactory.getLogger(AccountMetadataStore.class);

protected final AccountServiceMetrics accountServiceMetrics;
protected final LocalBackup backup;
protected final String znRecordPath;
private final HelixPropertyStore<ZNRecord> helixStore;
private final ReentrantLock lock = new ReentrantLock();
justinlin-linkedin marked this conversation as resolved.
Show resolved Hide resolved

/** Create a new {@link AccountMetadataStore} instance for the subclasses.
* @param accountServiceMetrics The {@link AccountServiceMetrics}
* @param backup The {@link LocalBackup} to manage the backup files.
* @param helixStore The {@link HelixPropertyStore} to retrieve and update the {@link ZNRecord}.
* @param znRecordPath The {@link ZNRecord} path.
*/
AccountMetadataStore(AccountServiceMetrics accountServiceMetrics, LocalBackup backup,
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
HelixPropertyStore<ZNRecord> helixStore, String znRecordPath) {
this.accountServiceMetrics = accountServiceMetrics;
this.backup = backup;
this.helixStore = helixStore;
this.znRecordPath = znRecordPath;
}

/**
* ZKUpdater extends the {@link DataUpdater} with another method to permform some clean up logic after
* an update.
*/
interface ZKUpdater extends DataUpdater<ZNRecord> {

/**
* Called after {@link HelixPropertyStore} update the {@code znRecordPath} with the {@link ZKUpdater}.
* @param isUpdateSucceeded The result of the {@link HelixPropertyStore#update}.
*/
void afterUpdate(boolean isUpdateSucceeded);
}

/**
* Fetch the {@link Account} metadata from the given ZNRecord. It should return null when there is no {@link Account}
* ever created before. Subclass can assume the {@link ZNRecord} passed in this function is not null;
* @param record The {@link ZNRecord} fetched from {@code znRecordPath}.
* @return {@link Account} metadata in a map.
*/
abstract Map<String, String> fetchAccountMetadataFromZNRecord(ZNRecord record);

/**
* Create new {@link ZKUpdater} that will be used to update the accounts.
* @param accounts The {@link Account} collection to update.
* @return the Specific {@link ZKUpdater} to update accounts with {@link HelixPropertyStore}.
*/
abstract ZKUpdater createNewZKUpdater(Collection<Account> accounts);

/**
* fetchAccountMetadata would fetch the latest full set of {@link Account} metadata from the store. It returns null
* when there is no {@link Account} created.
* @return {@link Account} metadata in a map.
*/
Map<String, String> fetchAccountMetadata() {
long startTimeMs = System.currentTimeMillis();
logger.trace("Start reading ZNRecord from path={}", znRecordPath);
ZNRecord znRecord = helixStore.get(znRecordPath, null, AccessOption.PERSISTENT);
logger.trace("Fetched ZNRecord from path={}, took time={} ms", znRecordPath, startTimeMs);
if (znRecord == null) {
logger.debug("The ZNRecord to read does not exist on path={}", znRecordPath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I think here can be warn

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it's not a warning message. ZNRecord can be null here. It's one of the legit state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not warn, I would suggest info because such info is supposed to be printed out (debug level is turned off by default). I feel like ZNRecord should exist in almost all cases, if not, it's actually a warning that either the ZNode is deleted or znRecordPath is incorrect.

return null;
}
return fetchAccountMetadataFromZNRecord(znRecord);
}

/**
* updateAccounts updates the latest full set of {@link Account} metadata and save it in the storage.
* @param accounts The {@link Account} collection to update. It will not be null or empty.
* @return false when there is any error.
*/
boolean updateAccounts(Collection<Account> accounts) {
ZKUpdater zkUpdater = createNewZKUpdater(accounts);
boolean hasSucceeded = helixStore.update(znRecordPath, zkUpdater, AccessOption.PERSISTENT);
zkUpdater.afterUpdate(hasSucceeded);
return hasSucceeded;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class AccountServiceMetrics {
public final Histogram updateAccountTimeInMs;
public final Histogram fetchRemoteAccountTimeInMs;
public final Histogram accountUpdateConsumerTimeInMs;
public final Histogram accountUpdateToAmbryTimeInMs;
public final Histogram accountFetchFromAmbryTimeInMs;

// Counter
public final Counter unrecognizedMessageErrorCount;
Expand All @@ -39,6 +41,9 @@ public class AccountServiceMetrics {
public final Counter backupErrorCount;
public final Counter nullNotifierCount;
public final Counter accountUpdatesCapturedByScheduledUpdaterCount;
public final Counter accountUpdatesToAmbryServerErrorCount;
public final Counter accountDeletesToAmbryServerErrorCount;
public final Counter accountFetchFromAmbryServerErrorCount;

public AccountServiceMetrics(MetricRegistry metricRegistry) {
// Histogram
Expand All @@ -49,6 +54,10 @@ public AccountServiceMetrics(MetricRegistry metricRegistry) {
metricRegistry.histogram(MetricRegistry.name(HelixAccountService.class, "FetchRemoteAccountTimeInMs"));
accountUpdateConsumerTimeInMs =
metricRegistry.histogram(MetricRegistry.name(HelixAccountService.class, "AccountUpdateConsumerTimeInMs"));
accountUpdateToAmbryTimeInMs =
metricRegistry.histogram(MetricRegistry.name(HelixAccountService.class, "AccountUpdateToAmbryTimeInMs"));
accountFetchFromAmbryTimeInMs =
metricRegistry.histogram(MetricRegistry.name(HelixAccountService.class, "AccountFetchFromAmbryTimeInMs"));

// Counter
unrecognizedMessageErrorCount =
Expand All @@ -65,5 +74,11 @@ public AccountServiceMetrics(MetricRegistry metricRegistry) {
nullNotifierCount = metricRegistry.counter(MetricRegistry.name(HelixAccountService.class, "NullNotifierCount"));
accountUpdatesCapturedByScheduledUpdaterCount = metricRegistry.counter(
MetricRegistry.name(HelixAccountService.class, "AccountUpdatesCapturedByScheduledUpdaterCount"));
accountUpdatesToAmbryServerErrorCount =
metricRegistry.counter(MetricRegistry.name(HelixAccountService.class, "AccountUpdatesToAmbryServerErrorCount"));
accountDeletesToAmbryServerErrorCount =
metricRegistry.counter(MetricRegistry.name(HelixAccountService.class, "AccountDeletesToAmbryServerErrorCount"));
accountFetchFromAmbryServerErrorCount =
metricRegistry.counter(MetricRegistry.name(HelixAccountService.class, "AccountFetchFromAmbryServerErrorCount"));
}
}
Loading