Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11543. Track OzoneClient object leaks via LeakDetector framework. #7285

Merged
merged 6 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hdds;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;

import jakarta.annotation.Nonnull;
Expand Down Expand Up @@ -112,6 +113,8 @@ public final class HddsUtils {

private static final int NO_PORT = -1;

private static boolean ignoreReportingLeak = false;

private HddsUtils() {
}

Expand Down Expand Up @@ -878,4 +881,31 @@ public static HddsProtos.UUID toProtobuf(UUID uuid) {
? Thread.currentThread().getStackTrace()
: null;
}

@VisibleForTesting
public static void setIgnoreReportingLeak(boolean ignoreReportingLeak) {
HddsUtils.ignoreReportingLeak = ignoreReportingLeak;
}

/**
* Logs a warning to report that the class is not closed properly.
* If {@link HddsUtils#ignoreReportingLeak} is set to true it will log that
* the message has been ignored. This only serves for testing purposes.
*/
public static void reportLeak(Class<?> clazz, String stackTrace, Logger log) {
String warning = String.format("%s is not closed properly", clazz.getSimpleName());
if (stackTrace != null && LOG.isDebugEnabled()) {
String debugMessage = String.format("%nStackTrace for unclosed instance: %s",
stackTrace);
warning = warning.concat(debugMessage);
}
if (!ignoreReportingLeak) {
log.warn(warning);
} else {
String ignoreMessage =
String.format("Ignoring warning : %s is not closed correctly",
clazz.getSimpleName());
log.warn(ignoreMessage);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,7 @@ static UncheckedAutoCloseable track(AutoCloseable object) {

static void reportLeak(Class<?> clazz, String stackTrace) {
ManagedRocksObjectMetrics.INSTANCE.increaseLeakObject();
String warning = String.format("%s is not closed properly", clazz.getSimpleName());
if (stackTrace != null && LOG.isDebugEnabled()) {
String debugMessage = String.format("%nStackTrace for unclosed instance: %s", stackTrace);
warning = warning.concat(debugMessage);
}
LOG.warn(warning);
HddsUtils.reportLeak(clazz, stackTrace, LOG);
}

private static @Nullable StackTraceElement[] getStackTrace() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.IOException;

import com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.UncheckedAutoCloseable;

/**
* OzoneClient connects to Ozone Cluster and
Expand Down Expand Up @@ -76,6 +77,7 @@ public class OzoneClient implements Closeable {
private final ClientProtocol proxy;
private final ObjectStore objectStore;
private ConfigurationSource conf;
private final UncheckedAutoCloseable leakTracker = OzoneClientFactory.track(this);

/**
* Creates a new OzoneClient object, generally constructed
Expand Down Expand Up @@ -119,7 +121,11 @@ public ConfigurationSource getConfiguration() {
*/
@Override
public void close() throws IOException {
proxy.close();
try {
proxy.close();
} finally {
leakTracker.close();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.MutableConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.LeakDetector;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConsts;
Expand All @@ -34,13 +36,17 @@
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.security.token.Token;

import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
import org.apache.ratis.util.UncheckedAutoCloseable;

import com.google.common.base.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;

/**
* Factory class to create OzoneClients.
*/
Expand All @@ -54,6 +60,21 @@ public final class OzoneClientFactory {
*/
private OzoneClientFactory() { }

private static final LeakDetector OZONE_CLIENT_LEAK_DETECTOR =
new LeakDetector("OzoneClientObject");

public static UncheckedAutoCloseable track(AutoCloseable object) {
final Class<?> clazz = object.getClass();
final StackTraceElement[] stackTrace = HddsUtils.getStackTrace(LOG);
return OZONE_CLIENT_LEAK_DETECTOR.track(object,
() -> HddsUtils.reportLeak(clazz,
HddsUtils.formatStackTrace(stackTrace, 4), LOG));
}

public static Logger getLogger() {
return LOG;
}


/**
* Constructs and return an OzoneClient with default configuration.
Expand Down
2 changes: 1 addition & 1 deletion hadoop-ozone/dev-support/checks/_mvn_unit_report.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ find "." -not -path '*/iteration*' -name 'TEST*.xml' -print0 \
if [[ "${CHECK:-unit}" == "integration" ]]; then
find hadoop-ozone/integration-test -not -path '*/iteration*' -name '*-output.txt' -print0 \
| xargs -n1 -0 "grep" -l -E "not closed properly|was not shutdown properly" \
| awk -F/ '{sub("-output.txt",""); print $NF}' \
| awk -F/ '{sub("-output.txt",""); print $NF ": Memory Leak detected in test, Failing."}' \
adoroszlai marked this conversation as resolved.
Show resolved Hide resolved
>> "${tempfile}"
fi

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1831,12 +1831,14 @@ public void testLoopInLinkBuckets() throws Exception {
String rootPath = String.format("%s://%s.%s/",
OzoneConsts.OZONE_URI_SCHEME, linkBucket1Name, linksVolume);

try {
FileSystem.get(URI.create(rootPath), cluster.getConf());
fail("Should throw Exception due to loop in Link Buckets");
try (FileSystem fileSystem = FileSystem.get(URI.create(rootPath),
cluster.getConf())) {
fail("Should throw Exception due to loop in Link Buckets" +
" while initialising fs with URI " + fileSystem.getUri());
} catch (OMException oe) {
// Expected exception
assertEquals(OMException.ResultCodes.DETECTED_LOOP_IN_BUCKET_LINKS, oe.getResult());
assertEquals(OMException.ResultCodes.DETECTED_LOOP_IN_BUCKET_LINKS,
oe.getResult());
} finally {
volume.deleteBucket(linkBucket1Name);
volume.deleteBucket(linkBucket2Name);
Expand All @@ -1854,13 +1856,17 @@ public void testLoopInLinkBuckets() throws Exception {
String rootPath2 = String.format("%s://%s.%s/",
OzoneConsts.OZONE_URI_SCHEME, danglingLinkBucketName, linksVolume);

FileSystem fileSystem = null;
try {
FileSystem.get(URI.create(rootPath2), cluster.getConf());
fileSystem = FileSystem.get(URI.create(rootPath2), cluster.getConf());
} catch (OMException oe) {
// Expected exception
fail("Should not throw Exception and show orphan buckets");
} finally {
volume.deleteBucket(danglingLinkBucketName);
if (fileSystem != null) {
fileSystem.close();
}
}
}

Expand Down Expand Up @@ -2230,7 +2236,17 @@ void testFileSystemWithObjectStoreLayout() throws IOException {
OzoneConfiguration config = new OzoneConfiguration(fs.getConf());
config.set(FS_DEFAULT_NAME_KEY, obsRootPath);

IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> FileSystem.get(config));
IllegalArgumentException e =
assertThrows(IllegalArgumentException.class, () -> {
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.get(config);
} finally {
if (fileSystem != null) {
fileSystem.close();
}
}
sadanand48 marked this conversation as resolved.
Show resolved Hide resolved
});
assertThat(e.getMessage()).contains("OBJECT_STORE, which does not support file system semantics");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,17 @@ public static void listStatusIteratorOnPageSize(OzoneConfiguration conf,
URI uri = FileSystem.getDefaultUri(config);
config.setBoolean(
String.format("fs.%s.impl.disable.cache", uri.getScheme()), true);
FileSystem subject = FileSystem.get(uri, config);
Path dir = new Path(Objects.requireNonNull(rootPath), "listStatusIterator");
try {
Set<String> paths = new TreeSet<>();
for (int dirCount : dirCounts) {
listStatusIterator(subject, dir, paths, dirCount);
try (FileSystem subject = FileSystem.get(uri, config)) {
Path dir = new Path(Objects.requireNonNull(rootPath),
"listStatusIterator");
try {
Set<String> paths = new TreeSet<>();
for (int dirCount : dirCounts) {
listStatusIterator(subject, dir, paths, dirCount);
}
} finally {
subject.delete(dir, true);
}
} finally {
subject.delete(dir, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,16 @@ void teardown() throws IOException {
void fileSystemWithUnsupportedDefaultBucketLayout(String layout) {
OzoneConfiguration conf = configWithDefaultBucketLayout(layout);

OMException e = assertThrows(OMException.class,
() -> FileSystem.newInstance(conf));
OMException e = assertThrows(OMException.class, () -> {
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.newInstance(conf);
} finally {
if (fileSystem != null) {
fileSystem.close();
}
}
});
assertThat(e.getMessage())
.contains(ERROR_MAP.get(layout));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.net.StaticMapping;

import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.om.KeyManagerImpl;
import org.apache.hadoop.ozone.om.OmTestManagers;
import org.apache.hadoop.ozone.om.OzoneManager;
Expand Down Expand Up @@ -74,6 +75,8 @@ public class TestOMSortDatanodes {
"edge1", "/rack1"
);

private static OzoneClient ozoneClient;

@BeforeAll
public static void setup() throws Exception {
config = new OzoneConfiguration();
Expand Down Expand Up @@ -109,11 +112,15 @@ public static void setup() throws Exception {
= new OmTestManagers(config, scm.getBlockProtocolServer(),
mockScmContainerClient);
om = omTestManagers.getOzoneManager();
ozoneClient = omTestManagers.getRpcClient();
keyManager = (KeyManagerImpl)omTestManagers.getKeyManager();
}

@AfterAll
public static void cleanup() throws Exception {
if (ozoneClient != null) {
ozoneClient.close();
}
if (scm != null) {
scm.stop();
scm.join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
Expand All @@ -51,6 +53,7 @@
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ECReplicationConfig.EcCodec;
Expand Down Expand Up @@ -221,6 +224,7 @@ abstract class OzoneRpcClientTests extends OzoneTestBase {
private static OzoneAcl inheritedGroupAcl = new OzoneAcl(GROUP,
remoteGroupName, ACCESS, READ);
private static MessageDigest eTagProvider;
private static Set<OzoneClient> ozoneClients = new HashSet<>();

@BeforeAll
public static void initialize() throws NoSuchAlgorithmException {
Expand Down Expand Up @@ -250,6 +254,7 @@ static void startCluster(OzoneConfiguration conf, MiniOzoneCluster.Builder build
.build();
cluster.waitForClusterToBeReady();
ozClient = OzoneClientFactory.getRpcClient(conf);
ozoneClients.add(ozClient);
store = ozClient.getObjectStore();
storageContainerLocationClient =
cluster.getStorageContainerLocationClient();
Expand All @@ -260,9 +265,12 @@ static void startCluster(OzoneConfiguration conf, MiniOzoneCluster.Builder build
* Close OzoneClient and shutdown MiniOzoneCluster.
*/
static void shutdownCluster() throws IOException {
if (ozClient != null) {
ozClient.close();
for (OzoneClient ozoneClient : ozoneClients) {
if (ozoneClient != null) {
ozoneClient.close();
}
}
adoroszlai marked this conversation as resolved.
Show resolved Hide resolved
ozoneClients.clear();

if (storageContainerLocationClient != null) {
storageContainerLocationClient.close();
Expand All @@ -274,6 +282,7 @@ static void shutdownCluster() throws IOException {
}

private static void setOzClient(OzoneClient ozClient) {
ozoneClients.add(ozClient);
OzoneRpcClientTests.ozClient = ozClient;
}

Expand Down Expand Up @@ -3140,6 +3149,34 @@ void testMultipartUploadOverride(ReplicationConfig replication)
doMultipartUpload(bucket, keyName, (byte)97, replication);

}

@Test
public void testClientLeakDetector() throws Exception {
HddsUtils.setIgnoreReportingLeak(true);
OzoneClient client = OzoneClientFactory.getRpcClient(cluster.getConf());
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String keyName = UUID.randomUUID().toString();
GenericTestUtils.LogCapturer ozoneClientFactoryLogCapturer =
GenericTestUtils.LogCapturer.captureLogs(
OzoneClientFactory.getLogger());

client.getObjectStore().createVolume(volumeName);
OzoneVolume volume = client.getObjectStore().getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
byte[] data = new byte[10];
Arrays.fill(data, (byte) 1);
try (OzoneOutputStream out = bucket.createKey(keyName, 10,
ReplicationConfig.fromTypeAndFactor(RATIS, ONE), new HashMap<>())) {
out.write(data);
}
client = null;
System.gc();
GenericTestUtils.waitFor(() -> ozoneClientFactoryLogCapturer.getOutput()
.contains("is not closed correctly"), 100, 2000);
HddsUtils.setIgnoreReportingLeak(false);
adoroszlai marked this conversation as resolved.
Show resolved Hide resolved
}
@Test
public void testMultipartUploadOwner() throws Exception {
// Save the old user, and switch to the old user after test
Expand Down
Loading