Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a cli tool to perform some account metadata related operations #1268

Merged
merged 5 commits into from
Oct 1, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ abstract class AccountMetadataStore {
protected final AccountServiceMetrics accountServiceMetrics;
protected final BackupFileManager backupFileManager;
protected final String znRecordPath;
private final HelixPropertyStore<ZNRecord> helixStore;
protected final HelixPropertyStore<ZNRecord> helixStore;

/** Create a new {@link AccountMetadataStore} instance for the subclasses.
* @param accountServiceMetrics The {@link AccountServiceMetrics}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ public void setupRouter(final Router router) throws IllegalStateException {
}
}

/**
* Return the {@link Notifier}.
* @return The {@link Notifier}
*/
Notifier<String> getNotifier() {
return notifier;
}

/**
* A synchronized function to fetch account metadata from {@link AccountMetadataStore} and update the in memory cache.
* @param isCalledFromListener True is this function is invoked in the {@link TopicListener}.
Expand Down Expand Up @@ -273,7 +281,17 @@ public boolean removeAccountUpdateConsumer(Consumer<Collection<Account>> account
*/
@Override
public boolean updateAccounts(Collection<Account> accounts) {
checkOpen();
return updateAccountsWithAccountMetadataStore(accounts, accountMetadataStore);
}

/**
* Helper function to update {@link Account} metadata.
* @param accounts The {@link Account} metadata to update.
* @param accountMetadataStore The {@link AccountMetadataStore}.
* @return True when the update operation succeeds.
*/
boolean updateAccountsWithAccountMetadataStore(Collection<Account> accounts, AccountMetadataStore accountMetadataStore) {
checkOpen();
Objects.requireNonNull(accounts, "accounts cannot be null");
if (accounts.isEmpty()) {
logger.debug("Empty account collection to update.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.helix.AccessOption;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.zookeeper.data.Stat;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
Expand Down Expand Up @@ -142,6 +144,69 @@ Map<String, String> readAccountMetadataFromBlobID(String blobID) {
return null;
}

/**
* Get the list of {@link BlobIDAndVersion} from {@link ZNRecord}. This function returns null when there
* is any error.
* @return list of {@link BlobIDAndVersion}.
*/
private List<BlobIDAndVersion> fetchAllBlobIDAndVersions() {
if (router.get() == null) {
logger.error("Router is not yet initialized");
return null;
}
long startTimeMs = System.currentTimeMillis();
logger.trace("Start reading ZNRecord from path={}", znRecordPath);
Stat stat = new Stat();
ZNRecord znRecord = helixStore.get(znRecordPath, stat, AccessOption.PERSISTENT);
logger.trace("Fetched ZNRecord from path={}, took time={} ms", znRecordPath,
System.currentTimeMillis() - startTimeMs);
if (znRecord == null) {
logger.info("The ZNRecord to read does not exist on path={}", znRecordPath);
return null;
}

List<String> blobIDAndVersionsJson = znRecord.getListField(ACCOUNT_METADATA_BLOB_IDS_LIST_KEY);
if (blobIDAndVersionsJson == null || blobIDAndVersionsJson.size() == 0) {
logger.info("ZNRecord={} to read on path={} does not have a simple list with key={}", znRecord,
ACCOUNT_METADATA_BLOB_IDS_PATH, ACCOUNT_METADATA_BLOB_IDS_LIST_KEY);
return null;
} else {
return blobIDAndVersionsJson.stream().map(BlobIDAndVersion::fromJson).collect(Collectors.toList());
}
}

/**
* Returns all the versions of {@link Account} metadata as a list. It will return null when the versions are empty.
* @return All the versions of {@link Account} metadata.
*/
public List<Integer> getAllVersions() {
List<BlobIDAndVersion> blobIDAndVersions = fetchAllBlobIDAndVersions() ;
if (blobIDAndVersions == null) {
return null;
} else {
return blobIDAndVersions.stream().map(BlobIDAndVersion::getVersion).collect(Collectors.toList());
}
}

/**
* Return a map from account id to {@link Account} metadata at given version. It returns null when there is any error.
* @param version The version of {@link Account} metadata to return.
* @return A map from account id to {@link Account} metadata in json format.
* @throws IllegalArgumentException When the version is not valid.
*/
public Map<String, String> fetchAccountMetadataAtVersion(int version) throws IllegalArgumentException {
List<BlobIDAndVersion> blobIDAndVersions = fetchAllBlobIDAndVersions();
if (blobIDAndVersions == null) {
return null;
}
for (BlobIDAndVersion blobIDAndVersion: blobIDAndVersions) {
if (blobIDAndVersion.getVersion() == version) {
return readAccountMetadataFromBlobID(blobIDAndVersion.getBlobID());
}
}
throw new IllegalArgumentException("Version " + version + " doesn't exist");
}

@Override
AccountMetadataStore.ZKUpdater createNewZKUpdater(Collection<Account> accounts) {
Objects.requireNonNull(router.get(), "Router is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public void testUpdateAndFetch() throws Exception {
// generate an new account and test update and fetch on this account
AccountTestUtils.generateRefAccounts(idToRefAccountMap, idtoRefContainerMap, accountIDSet, 1, 1);
assertUpdateAndFetch(store, idToRefAccountMap, idToRefAccountMap, 1, 1);
Map<Short, Account> accountMapFirstVersion = new HashMap<>(idToRefAccountMap);

// generate another new account and test update and fetch on this account
Map<Short, Account> anotherIdToRefAccountMap = new HashMap<>();
Expand All @@ -133,6 +134,18 @@ public void testUpdateAndFetch() throws Exception {
}
// the version should be 2 now
assertUpdateAndFetch(store, idToRefAccountMap, anotherIdToRefAccountMap, 2, 2);

// Make sure we can get all the versions out
List<Integer> versions = store.getAllVersions();
assertNotNull(versions);
Collections.sort(versions);
assertEquals(versions.size(), 2);
assertEquals((int) versions.get(0), 1);
assertEquals((int) versions.get(1), 2);

// Make sure we can still get the first version out
Map<String, String> accountMap = store.fetchAccountMetadataAtVersion(1);
assertAccountsEqual(accountMap, accountMapFirstVersion);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,40 @@
*/
public class HelixPropertyStoreConfig {
public static final String HELIX_PROPERTY_STORE_PREFIX = "helix.property.store.";
public static final String HELIX_ZK_CLIENT_CONNECTION_TIMEOUT_MS =
HELIX_PROPERTY_STORE_PREFIX + "zk.client.connection.timeout.ms";
public static final String HELIX_ZK_CLIENT_SESSION_TIMEOUT_MS =
HELIX_PROPERTY_STORE_PREFIX + "zk.client.session.timeout.ms";
public static final String HELIX_ROOT_PATH = HELIX_PROPERTY_STORE_PREFIX + "root.path";

/**
* Time in ms to time out a connection to a ZooKeeper server.
*/
@Config(HELIX_PROPERTY_STORE_PREFIX + "zk.client.connection.timeout.ms")
@Config(HELIX_ZK_CLIENT_CONNECTION_TIMEOUT_MS)
@Default("20 * 1000")
public final int zkClientConnectionTimeoutMs;

/**
* Time in ms defines disconnection tolerance by a session. I.e., if reconnected within this time, it will
* be considered as the same session.
*/
@Config(HELIX_PROPERTY_STORE_PREFIX + "zk.client.session.timeout.ms")
@Config(HELIX_ZK_CLIENT_SESSION_TIMEOUT_MS)
@Default("20 * 1000")
public final int zkClientSessionTimeoutMs;

/**
* The root path of helix property store in the ZooKeeper. Must start with {@code /}, and must not end with {@code /}.
* It is recommended to make root path in the form of {@code /ambry/<clustername>/helixPropertyStore}
*/
@Config(HELIX_PROPERTY_STORE_PREFIX + "root.path")
@Config(HELIX_ROOT_PATH)
@Default("/ambry/defaultCluster/helixPropertyStore")
public final String rootPath;

public HelixPropertyStoreConfig(VerifiableProperties verifiableProperties) {
zkClientConnectionTimeoutMs =
verifiableProperties.getIntInRange(HELIX_PROPERTY_STORE_PREFIX + "zk.client.connection.timeout.ms", 20 * 1000,
1, Integer.MAX_VALUE);
verifiableProperties.getIntInRange(HELIX_ZK_CLIENT_CONNECTION_TIMEOUT_MS, 20 * 1000, 1, Integer.MAX_VALUE);
zkClientSessionTimeoutMs =
verifiableProperties.getIntInRange(HELIX_PROPERTY_STORE_PREFIX + "zk.client.session.timeout.ms", 20 * 1000, 1,
Integer.MAX_VALUE);
rootPath = verifiableProperties.getString(HELIX_PROPERTY_STORE_PREFIX + "root.path",
"/ambry/defaultCluster/helixPropertyStore");
verifiableProperties.getIntInRange(HELIX_ZK_CLIENT_SESSION_TIMEOUT_MS, 20 * 1000, 1, Integer.MAX_VALUE);
rootPath = verifiableProperties.getString(HELIX_ROOT_PATH, "/ambry/defaultCluster/helixPropertyStore");
}
}
Loading