Skip to content

Commit

Permalink
Update logging and fix comment typo
Browse files Browse the repository at this point in the history
  • Loading branch information
andybradshaw committed Nov 26, 2024
1 parent 246e44a commit 5c2a0e4
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 13 deletions.
19 changes: 12 additions & 7 deletions src/java/org/apache/cassandra/service/MigrationManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

import com.palantir.logsafe.SafeArg;
import com.palantir.tracing.CloseableTracer;

import java.lang.management.ManagementFactory;
Expand Down Expand Up @@ -165,9 +167,9 @@ public void run()
return;
}
logger.debug("submitting migration task for endpoint {}, endpoint schema version {}, and our schema version {}",
endpoint,
currentVersion,
Schema.instance.getVersion());
SafeArg.of("endpoint", endpoint),
SafeArg.of("endpointVersion", currentVersion),
SafeArg.of("schemaVersion", Schema.instance.getVersion()));
submitMigrationTask(endpoint, currentVersion);
}
};
Expand All @@ -178,7 +180,10 @@ public void run()

static BiFunction<UUID, Set<InetAddress>, Set<InetAddress>> removeEndpointFromSchemaPulls(InetAddress endpoint) {
return (v, s) -> {
logger.debug("Removing endpoint from scheduled schema pulls {}: {} ({})", endpoint, v, s);
logger.debug("Removing endpoint from scheduled schema pulls",
SafeArg.of("endpoint", endpoint),
SafeArg.of("schemaVersion", v),
SafeArg.of("scheduledPulls", s));
s.remove(endpoint);
if (!s.isEmpty()) {
return s;
Expand Down Expand Up @@ -211,7 +216,7 @@ private static Future<?> submitMigrationTask(InetAddress endpoint, UUID theirVer
public static boolean shouldPullSchemaFrom(InetAddress endpoint)
{
/*
* Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema)
* Don't request schema from nodes with a differnt or unknown major version (may have incompatible schema)
* Don't request schema from fat clients
*/
return MessagingService.instance().knowsVersion(endpoint)
Expand All @@ -222,15 +227,15 @@ public static boolean shouldPullSchemaFrom(InetAddress endpoint)
public static boolean shouldPullSchemaFrom(InetAddress endpoint, UUID theirVersion)
{
/*
* Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema)
* Don't request schema from nodes with a differnt or unknown major version (may have incompatible schema)
* Don't request schema from fat clients
* Don't request schema from bootstrapping nodes (?)
* Don't request schema if we have scheduled a pull request for that schema version
*/
Set<InetAddress> currentlyScheduledRequests = scheduledSchemaPulls.getOrDefault(theirVersion, Collections.emptySet());
boolean noScheduledRequests = currentlyScheduledRequests.size() < MAX_SCHEDULED_SCHEMA_PULL_REQUESTS
&& !currentlyScheduledRequests.contains(endpoint);
logger.debug("Evaluating schema pull criteria: currently scheduled requests for version {}: {}", theirVersion, currentlyScheduledRequests);
logger.debug("Evaluating schema pull criteria", SafeArg.of("schemaVersion", theirVersion), SafeArg.of("scheduledPulls", currentlyScheduledRequests));
return MessagingService.instance().knowsVersion(endpoint)
&& MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version
&& !Gossiper.instance.isGossipOnlyMember(endpoint)
Expand Down
15 changes: 9 additions & 6 deletions src/java/org/apache/cassandra/service/MigrationTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.palantir.logsafe.SafeArg;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
Expand Down Expand Up @@ -83,15 +84,13 @@ public void response(MessageIn<Collection<Mutation>> message)
{
try
{
// TODO: testing always failed merges
//LegacySchemaTables.mergeSchema(message.payload);
logger.debug("Processing response to schema pull from endpoint", SafeArg.of("endpoint", endpoint));
LegacySchemaTables.mergeSchema(message.payload);
}
/*
catch (IOException e)
{
logger.error("IOException merging remote schema", e);
}
*/
catch (ConfigurationException e)
{
logger.error("Configuration exception merging remote schema", e);
Expand All @@ -100,7 +99,9 @@ public void response(MessageIn<Collection<Mutation>> message)
{
// always attempt to clean up our outstanding schema pull request if created with a version
version.ifPresent(v -> {
logger.debug("Successfully processed response to schema pull, removing endpoint from scheduled schema pulls {}: {}", endpoint, v);
logger.debug("Successfully processed response to schema pull",
SafeArg.of("endpoint", endpoint),
SafeArg.of("schemaVersion", v));
MigrationManager.scheduledSchemaPulls.computeIfPresent(v, MigrationManager.removeEndpointFromSchemaPulls(endpoint));
});
}
Expand All @@ -111,7 +112,9 @@ public void onFailure(InetAddress from)
{
// always attempt to clean up our outstanding schema pull request if created with a version
version.ifPresent(v -> {
logger.debug("Timed out waiting for response to schema pull, removing endpoint from scheduled schema pulls {}: {}", endpoint, v);
logger.debug("Timed out waiting for response to schema pull",
SafeArg.of("endpoint", endpoint),
SafeArg.of("schemaVersion", v));
MigrationManager.scheduledSchemaPulls.computeIfPresent(v, MigrationManager.removeEndpointFromSchemaPulls(endpoint));
});
}
Expand Down

0 comments on commit 5c2a0e4

Please sign in to comment.