Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Activity ID behavior #2136

Merged
merged 3 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 13 additions & 44 deletions src/main/java/com/microsoft/sqlserver/jdbc/ActivityCorrelator.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,30 @@

package com.microsoft.sqlserver.jdbc;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


/**
* ActivityCorrelator provides the APIs to access the ActivityId in TLS
*/
final class ActivityCorrelator {

private static Map<Long, ActivityId> activityIdTlsMap = new ConcurrentHashMap<>();

static void cleanupActivityId() {
// remove ActivityIds that belongs to this thread or no longer have an associated thread.
activityIdTlsMap.entrySet().removeIf(e -> null == e.getValue() || null == e.getValue().getThread()
|| e.getValue().getThread() == Thread.currentThread() || !e.getValue().getThread().isAlive());
}
private static ActivityId s_ActivityId;
private static Lock lockObject = new ReentrantLock();

// Get the current ActivityId in TLS
@SuppressWarnings("deprecation")
static ActivityId getCurrent() {
// get the value in TLS, not reference
Thread thread = Thread.currentThread();
if (!activityIdTlsMap.containsKey(thread.getId())) {
activityIdTlsMap.put(thread.getId(), new ActivityId(thread));
if (s_ActivityId == null) {
lockObject.lock();
if (s_ActivityId == null) {
s_ActivityId = new ActivityId();
}
lockObject.unlock();
}

return activityIdTlsMap.get(thread.getId());
return s_ActivityId;
}

// Increment the Sequence number of the ActivityId in TLS
Expand All @@ -47,15 +43,6 @@ static ActivityId getNext() {
return activityId;
}

static void setCurrentActivityIdSentFlag() {
ActivityId activityId = getCurrent();
activityId.setSentFlag();
}

static Map<Long, ActivityId> getActivityIdTlsMap() {
return activityIdTlsMap;
}

/*
* Prevent instantiation.
*/
Expand All @@ -65,19 +52,11 @@ private ActivityCorrelator() {}

class ActivityId {
private final UUID id;
private final Thread thread;
private long sequence;
private boolean isSentToServer;

ActivityId(Thread thread) {
ActivityId() {
id = UUID.randomUUID();
this.thread = thread;
sequence = 0;
isSentToServer = false;
}

Thread getThread() {
return thread;
sequence = 1;
}

UUID getId() {
Expand All @@ -95,16 +74,6 @@ void increment() {
} else {
sequence = 0;
}

isSentToServer = false;
}

void setSentFlag() {
isSentToServer = true;
}

boolean isSentToServer() {
return isSentToServer;
}

@Override
Expand Down
23 changes: 0 additions & 23 deletions src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -3406,39 +3406,16 @@ void writeMessageHeader() throws SQLServerException {
// Include ALL_Headers/MARS header in message's first packet
// Note: The PKT_BULK message does not nees this ALL_HEADERS
if ((TDS.PKT_QUERY == tdsMessageType || TDS.PKT_DTC == tdsMessageType || TDS.PKT_RPC == tdsMessageType)) {
boolean includeTraceHeader = false;
int totalHeaderLength = TDS.MESSAGE_HEADER_LENGTH;
if ((TDS.PKT_QUERY == tdsMessageType || TDS.PKT_RPC == tdsMessageType) && (con.isDenaliOrLater()
&& Util.isActivityTraceOn() && !ActivityCorrelator.getCurrent().isSentToServer())) {
includeTraceHeader = true;
totalHeaderLength += TDS.TRACE_HEADER_LENGTH;
}

writeInt(totalHeaderLength); // allHeaders.TotalLength (DWORD)
writeInt(TDS.MARS_HEADER_LENGTH); // MARS header length (DWORD)
writeShort((short) 2); // allHeaders.HeaderType(MARS header) (USHORT)
writeBytes(con.getTransactionDescriptor());
writeInt(1); // marsHeader.OutstandingRequestCount
if (includeTraceHeader) {
writeInt(TDS.TRACE_HEADER_LENGTH); // trace header length (DWORD)
writeTraceHeaderData();
ActivityCorrelator.setCurrentActivityIdSentFlag(); // set the flag to indicate this ActivityId is sent
}
}
}

void writeTraceHeaderData() throws SQLServerException {
ActivityId activityId = ActivityCorrelator.getCurrent();
final byte[] actIdByteArray = Util.asGuidByteArray(activityId.getId());
long seqNum = activityId.getSequence();
writeShort(TDS.HEADERTYPE_TRACE); // trace header type
writeBytes(actIdByteArray, 0, actIdByteArray.length); // guid part of ActivityId
writeInt((int) seqNum); // sequence number of ActivityId

if (logger.isLoggable(Level.FINER))
logger.finer("Send Trace Header - ActivityID: " + activityId.toString());
}

/**
* Convenience method to prepare the TDS channel for writing and start a new TDS message.
*
Expand Down
71 changes: 39 additions & 32 deletions src/main/java/com/microsoft/sqlserver/jdbc/SQLServerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -47,7 +49,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Date;
import java.util.logging.Level;

import javax.sql.XAConnection;
Expand Down Expand Up @@ -97,6 +98,11 @@ public class SQLServerConnection implements ISQLServerConnection, java.io.Serial
*/
private static final long serialVersionUID = 1965647556064751510L;

/**
* A random netAddress for this process to send during LOGIN7
*/
private static final byte[] netAddress = getRandomNetAddress();

/** timer expiry */
long timerExpire;

Expand Down Expand Up @@ -257,6 +263,18 @@ public class SQLServerConnection implements ISQLServerConnection, java.io.Serial
**/
private static final Lock sLock = new ReentrantLock();

/**
* Generate a 6 byte random array for netAddress
*
* @return byte[]
*/
private static byte[] getRandomNetAddress() {
byte[] a = new byte[6];
Random random = new Random();
random.nextBytes(a);
return a;
}

/**
* Return an existing cached SharedTimer associated with this Connection or create a new one.
*
Expand Down Expand Up @@ -1469,7 +1487,7 @@ final boolean isSessionUnAvailable() {
final void setMaxFieldSize(int limit) throws SQLServerException {
if (maxFieldSize != limit) {
if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
// If no limit on field size, set text size to max (2147483647), NOT default (0 --> 4K)
connectionCommand("SET TEXTSIZE " + ((0 == limit) ? Integer.MAX_VALUE : limit), "setMaxFieldSize");
Expand Down Expand Up @@ -1500,7 +1518,7 @@ final void initResettableValues() {
final void setMaxRows(int limit) throws SQLServerException {
if (maxRows != limit) {
if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
connectionCommand("SET ROWCOUNT " + limit, "setMaxRows");
maxRows = limit;
Expand Down Expand Up @@ -3677,15 +3695,15 @@ void prelogin(String serverName, int portNumber) throws SQLServerException {
System.arraycopy(conIdByteArray, 0, preloginRequest, offset, conIdByteArray.length);
offset += conIdByteArray.length;

if (Util.isActivityTraceOn()) {
ActivityId activityId = ActivityCorrelator.getNext();
final byte[] actIdByteArray = Util.asGuidByteArray(activityId.getId());
System.arraycopy(actIdByteArray, 0, preloginRequest, offset, actIdByteArray.length);
offset += actIdByteArray.length;
long seqNum = activityId.getSequence();
Util.writeInt((int) seqNum, preloginRequest, offset);
offset += 4;
ActivityId activityId = ActivityCorrelator.getNext();
final byte[] actIdByteArray = Util.asGuidByteArray(activityId.getId());
System.arraycopy(actIdByteArray, 0, preloginRequest, offset, actIdByteArray.length);
offset += actIdByteArray.length;
long seqNum = activityId.getSequence();
Util.writeInt((int) seqNum, preloginRequest, offset);
offset += 4;

if (Util.isActivityTraceOn()) {
if (connectionlogger.isLoggable(Level.FINER)) {
connectionlogger.finer(toString() + " ActivityId " + activityId.toString());
}
Expand All @@ -3709,10 +3727,6 @@ void prelogin(String serverName, int portNumber) throws SQLServerException {
throw e;
}

if (Util.isActivityTraceOn()) {
ActivityCorrelator.setCurrentActivityIdSentFlag(); // indicate current ActivityId is sent
}

// Read the entire prelogin response
int responseLength = preloginResponse.length;
int responseBytesRead = 0;
Expand Down Expand Up @@ -4377,7 +4391,7 @@ public void setAutoCommit(boolean newAutoCommitMode) throws SQLServerException {
if (loggerExternal.isLoggable(Level.FINER)) {
loggerExternal.entering(loggingClassName, "setAutoCommit", newAutoCommitMode);
if (Util.isActivityTraceOn())
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
String commitPendingTransaction = "";
checkClosed();
Expand Down Expand Up @@ -4433,7 +4447,7 @@ public void commit() throws SQLServerException {
public void commit(boolean delayedDurability) throws SQLServerException {
loggerExternal.entering(loggingClassName, "commit");
if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}

checkClosed();
Expand All @@ -4451,7 +4465,7 @@ public void commit(boolean delayedDurability) throws SQLServerException {
public void rollback() throws SQLServerException {
loggerExternal.entering(loggingClassName, "rollback");
if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
checkClosed();

Expand Down Expand Up @@ -4539,10 +4553,6 @@ private void clearConnectionResources() {

// Clean-up queue etc. related to batching of prepared statement discard actions (sp_unprepare).
cleanupPreparedStatementDiscardActions();

if (Util.isActivityTraceOn()) {
ActivityCorrelator.cleanupActivityId();
}
}

/**
Expand All @@ -4563,9 +4573,7 @@ final void poolCloseEventNotify() throws SQLServerException {
connectionCommand("IF @@TRANCOUNT > 0 ROLLBACK TRAN", "close connection");
}
notifyPooledConnection(null);
if (Util.isActivityTraceOn()) {
ActivityCorrelator.cleanupActivityId();
}

if (connectionlogger.isLoggable(Level.FINER)) {
connectionlogger.finer(toString() + " Connection closed and returned to connection pool");
}
Expand Down Expand Up @@ -4612,7 +4620,7 @@ public boolean isReadOnly() throws SQLServerException {
public void setCatalog(String catalog) throws SQLServerException {
loggerExternal.entering(loggingClassName, "setCatalog", catalog);
if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
checkClosed();
if (catalog != null) {
Expand All @@ -4635,7 +4643,7 @@ public void setTransactionIsolation(int level) throws SQLServerException {
if (loggerExternal.isLoggable(Level.FINER)) {
loggerExternal.entering(loggingClassName, "setTransactionIsolation", level);
if (Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
}

Expand Down Expand Up @@ -6342,7 +6350,6 @@ final boolean complete(LogonCommand logonCommand, TDSReader tdsReader) throws SQ
byte[] interfaceLibVersionBytes = {(byte) SQLJdbcVersion.BUILD, (byte) SQLJdbcVersion.PATCH,
(byte) SQLJdbcVersion.MINOR, (byte) SQLJdbcVersion.MAJOR};
byte[] databaseNameBytes = toUCS16(databaseName);
byte[] netAddress = new byte[6];
int dataLen = 0;

// TDS version 8 if strict mode
Expand Down Expand Up @@ -6849,7 +6856,7 @@ final private Savepoint setNamedSavepoint(String sName) throws SQLServerExceptio
public Savepoint setSavepoint(String sName) throws SQLServerException {
loggerExternal.entering(loggingClassName, SET_SAVE_POINT, sName);
if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
checkClosed();
Savepoint pt = setNamedSavepoint(sName);
Expand All @@ -6861,7 +6868,7 @@ public Savepoint setSavepoint(String sName) throws SQLServerException {
public Savepoint setSavepoint() throws SQLServerException {
loggerExternal.entering(loggingClassName, SET_SAVE_POINT);
if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
checkClosed();
Savepoint pt = setNamedSavepoint(null);
Expand All @@ -6873,7 +6880,7 @@ public Savepoint setSavepoint() throws SQLServerException {
public void rollback(Savepoint s) throws SQLServerException {
loggerExternal.entering(loggingClassName, "rollback", s);
if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
checkClosed();
if (databaseAutoCommitMode) {
Expand All @@ -6898,7 +6905,7 @@ public void setHoldability(int holdability) throws SQLServerException {
loggerExternal.entering(loggingClassName, "setHoldability", holdability);

if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
checkValidHoldability(holdability);
checkClosed();
Expand Down
Loading