Skip to content

Commit

Permalink
Change Activity ID behavior (#2136)
Browse files Browse the repository at this point in the history
  • Loading branch information
David-Engel authored Jun 6, 2023
1 parent 0eec249 commit 953e4a0
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 338 deletions.
57 changes: 13 additions & 44 deletions src/main/java/com/microsoft/sqlserver/jdbc/ActivityCorrelator.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,30 @@

package com.microsoft.sqlserver.jdbc;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


/**
* ActivityCorrelator provides the APIs to access the ActivityId in TLS
*/
final class ActivityCorrelator {

private static Map<Long, ActivityId> activityIdTlsMap = new ConcurrentHashMap<>();

static void cleanupActivityId() {
// remove ActivityIds that belongs to this thread or no longer have an associated thread.
activityIdTlsMap.entrySet().removeIf(e -> null == e.getValue() || null == e.getValue().getThread()
|| e.getValue().getThread() == Thread.currentThread() || !e.getValue().getThread().isAlive());
}
private static ActivityId s_ActivityId;
private static Lock lockObject = new ReentrantLock();

// Get the current ActivityId in TLS
@SuppressWarnings("deprecation")
static ActivityId getCurrent() {
// get the value in TLS, not reference
Thread thread = Thread.currentThread();
if (!activityIdTlsMap.containsKey(thread.getId())) {
activityIdTlsMap.put(thread.getId(), new ActivityId(thread));
if (s_ActivityId == null) {
lockObject.lock();
if (s_ActivityId == null) {
s_ActivityId = new ActivityId();
}
lockObject.unlock();
}

return activityIdTlsMap.get(thread.getId());
return s_ActivityId;
}

// Increment the Sequence number of the ActivityId in TLS
Expand All @@ -47,15 +43,6 @@ static ActivityId getNext() {
return activityId;
}

static void setCurrentActivityIdSentFlag() {
ActivityId activityId = getCurrent();
activityId.setSentFlag();
}

static Map<Long, ActivityId> getActivityIdTlsMap() {
return activityIdTlsMap;
}

/*
* Prevent instantiation.
*/
Expand All @@ -65,19 +52,11 @@ private ActivityCorrelator() {}

class ActivityId {
private final UUID id;
private final Thread thread;
private long sequence;
private boolean isSentToServer;

ActivityId(Thread thread) {
ActivityId() {
id = UUID.randomUUID();
this.thread = thread;
sequence = 0;
isSentToServer = false;
}

Thread getThread() {
return thread;
sequence = 1;
}

UUID getId() {
Expand All @@ -95,16 +74,6 @@ void increment() {
} else {
sequence = 0;
}

isSentToServer = false;
}

void setSentFlag() {
isSentToServer = true;
}

boolean isSentToServer() {
return isSentToServer;
}

@Override
Expand Down
23 changes: 0 additions & 23 deletions src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -3415,39 +3415,16 @@ void writeMessageHeader() throws SQLServerException {
// Include ALL_Headers/MARS header in message's first packet
// Note: The PKT_BULK message does not nees this ALL_HEADERS
if ((TDS.PKT_QUERY == tdsMessageType || TDS.PKT_DTC == tdsMessageType || TDS.PKT_RPC == tdsMessageType)) {
boolean includeTraceHeader = false;
int totalHeaderLength = TDS.MESSAGE_HEADER_LENGTH;
if ((TDS.PKT_QUERY == tdsMessageType || TDS.PKT_RPC == tdsMessageType) && (con.isDenaliOrLater()
&& Util.isActivityTraceOn() && !ActivityCorrelator.getCurrent().isSentToServer())) {
includeTraceHeader = true;
totalHeaderLength += TDS.TRACE_HEADER_LENGTH;
}

writeInt(totalHeaderLength); // allHeaders.TotalLength (DWORD)
writeInt(TDS.MARS_HEADER_LENGTH); // MARS header length (DWORD)
writeShort((short) 2); // allHeaders.HeaderType(MARS header) (USHORT)
writeBytes(con.getTransactionDescriptor());
writeInt(1); // marsHeader.OutstandingRequestCount
if (includeTraceHeader) {
writeInt(TDS.TRACE_HEADER_LENGTH); // trace header length (DWORD)
writeTraceHeaderData();
ActivityCorrelator.setCurrentActivityIdSentFlag(); // set the flag to indicate this ActivityId is sent
}
}
}

void writeTraceHeaderData() throws SQLServerException {
ActivityId activityId = ActivityCorrelator.getCurrent();
final byte[] actIdByteArray = Util.asGuidByteArray(activityId.getId());
long seqNum = activityId.getSequence();
writeShort(TDS.HEADERTYPE_TRACE); // trace header type
writeBytes(actIdByteArray, 0, actIdByteArray.length); // guid part of ActivityId
writeInt((int) seqNum); // sequence number of ActivityId

if (logger.isLoggable(Level.FINER))
logger.finer("Send Trace Header - ActivityID: " + activityId.toString());
}

/**
* Convenience method to prepare the TDS channel for writing and start a new TDS message.
*
Expand Down
71 changes: 39 additions & 32 deletions src/main/java/com/microsoft/sqlserver/jdbc/SQLServerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -47,7 +49,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Date;
import java.util.logging.Level;

import javax.sql.XAConnection;
Expand Down Expand Up @@ -97,6 +98,11 @@ public class SQLServerConnection implements ISQLServerConnection, java.io.Serial
*/
private static final long serialVersionUID = 1965647556064751510L;

/**
* A random netAddress for this process to send during LOGIN7
*/
private static final byte[] netAddress = getRandomNetAddress();

/** timer expiry */
long timerExpire;

Expand Down Expand Up @@ -257,6 +263,18 @@ public class SQLServerConnection implements ISQLServerConnection, java.io.Serial
**/
private static final Lock sLock = new ReentrantLock();

/**
* Generate a 6 byte random array for netAddress
*
* @return byte[]
*/
private static byte[] getRandomNetAddress() {
byte[] a = new byte[6];
Random random = new Random();
random.nextBytes(a);
return a;
}

/**
* Return an existing cached SharedTimer associated with this Connection or create a new one.
*
Expand Down Expand Up @@ -1469,7 +1487,7 @@ final boolean isSessionUnAvailable() {
final void setMaxFieldSize(int limit) throws SQLServerException {
if (maxFieldSize != limit) {
if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
// If no limit on field size, set text size to max (2147483647), NOT default (0 --> 4K)
connectionCommand("SET TEXTSIZE " + ((0 == limit) ? Integer.MAX_VALUE : limit), "setMaxFieldSize");
Expand Down Expand Up @@ -1500,7 +1518,7 @@ final void initResettableValues() {
final void setMaxRows(int limit) throws SQLServerException {
if (maxRows != limit) {
if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
connectionCommand("SET ROWCOUNT " + limit, "setMaxRows");
maxRows = limit;
Expand Down Expand Up @@ -3677,15 +3695,15 @@ void prelogin(String serverName, int portNumber) throws SQLServerException {
System.arraycopy(conIdByteArray, 0, preloginRequest, offset, conIdByteArray.length);
offset += conIdByteArray.length;

if (Util.isActivityTraceOn()) {
ActivityId activityId = ActivityCorrelator.getNext();
final byte[] actIdByteArray = Util.asGuidByteArray(activityId.getId());
System.arraycopy(actIdByteArray, 0, preloginRequest, offset, actIdByteArray.length);
offset += actIdByteArray.length;
long seqNum = activityId.getSequence();
Util.writeInt((int) seqNum, preloginRequest, offset);
offset += 4;
ActivityId activityId = ActivityCorrelator.getNext();
final byte[] actIdByteArray = Util.asGuidByteArray(activityId.getId());
System.arraycopy(actIdByteArray, 0, preloginRequest, offset, actIdByteArray.length);
offset += actIdByteArray.length;
long seqNum = activityId.getSequence();
Util.writeInt((int) seqNum, preloginRequest, offset);
offset += 4;

if (Util.isActivityTraceOn()) {
if (connectionlogger.isLoggable(Level.FINER)) {
connectionlogger.finer(toString() + " ActivityId " + activityId.toString());
}
Expand All @@ -3709,10 +3727,6 @@ void prelogin(String serverName, int portNumber) throws SQLServerException {
throw e;
}

if (Util.isActivityTraceOn()) {
ActivityCorrelator.setCurrentActivityIdSentFlag(); // indicate current ActivityId is sent
}

// Read the entire prelogin response
int responseLength = preloginResponse.length;
int responseBytesRead = 0;
Expand Down Expand Up @@ -4377,7 +4391,7 @@ public void setAutoCommit(boolean newAutoCommitMode) throws SQLServerException {
if (loggerExternal.isLoggable(Level.FINER)) {
loggerExternal.entering(loggingClassName, "setAutoCommit", newAutoCommitMode);
if (Util.isActivityTraceOn())
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
String commitPendingTransaction = "";
checkClosed();
Expand Down Expand Up @@ -4433,7 +4447,7 @@ public void commit() throws SQLServerException {
public void commit(boolean delayedDurability) throws SQLServerException {
loggerExternal.entering(loggingClassName, "commit");
if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}

checkClosed();
Expand All @@ -4451,7 +4465,7 @@ public void commit(boolean delayedDurability) throws SQLServerException {
public void rollback() throws SQLServerException {
loggerExternal.entering(loggingClassName, "rollback");
if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
checkClosed();

Expand Down Expand Up @@ -4539,10 +4553,6 @@ private void clearConnectionResources() {

// Clean-up queue etc. related to batching of prepared statement discard actions (sp_unprepare).
cleanupPreparedStatementDiscardActions();

if (Util.isActivityTraceOn()) {
ActivityCorrelator.cleanupActivityId();
}
}

/**
Expand All @@ -4563,9 +4573,7 @@ final void poolCloseEventNotify() throws SQLServerException {
connectionCommand("IF @@TRANCOUNT > 0 ROLLBACK TRAN", "close connection");
}
notifyPooledConnection(null);
if (Util.isActivityTraceOn()) {
ActivityCorrelator.cleanupActivityId();
}

if (connectionlogger.isLoggable(Level.FINER)) {
connectionlogger.finer(toString() + " Connection closed and returned to connection pool");
}
Expand Down Expand Up @@ -4612,7 +4620,7 @@ public boolean isReadOnly() throws SQLServerException {
public void setCatalog(String catalog) throws SQLServerException {
loggerExternal.entering(loggingClassName, "setCatalog", catalog);
if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
checkClosed();
if (catalog != null) {
Expand All @@ -4635,7 +4643,7 @@ public void setTransactionIsolation(int level) throws SQLServerException {
if (loggerExternal.isLoggable(Level.FINER)) {
loggerExternal.entering(loggingClassName, "setTransactionIsolation", level);
if (Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
}

Expand Down Expand Up @@ -6342,7 +6350,6 @@ final boolean complete(LogonCommand logonCommand, TDSReader tdsReader) throws SQ
byte[] interfaceLibVersionBytes = {(byte) SQLJdbcVersion.BUILD, (byte) SQLJdbcVersion.PATCH,
(byte) SQLJdbcVersion.MINOR, (byte) SQLJdbcVersion.MAJOR};
byte[] databaseNameBytes = toUCS16(databaseName);
byte[] netAddress = new byte[6];
int dataLen = 0;

// TDS version 8 if strict mode
Expand Down Expand Up @@ -6849,7 +6856,7 @@ final private Savepoint setNamedSavepoint(String sName) throws SQLServerExceptio
public Savepoint setSavepoint(String sName) throws SQLServerException {
loggerExternal.entering(loggingClassName, SET_SAVE_POINT, sName);
if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
checkClosed();
Savepoint pt = setNamedSavepoint(sName);
Expand All @@ -6861,7 +6868,7 @@ public Savepoint setSavepoint(String sName) throws SQLServerException {
public Savepoint setSavepoint() throws SQLServerException {
loggerExternal.entering(loggingClassName, SET_SAVE_POINT);
if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
checkClosed();
Savepoint pt = setNamedSavepoint(null);
Expand All @@ -6873,7 +6880,7 @@ public Savepoint setSavepoint() throws SQLServerException {
public void rollback(Savepoint s) throws SQLServerException {
loggerExternal.entering(loggingClassName, "rollback", s);
if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
checkClosed();
if (databaseAutoCommitMode) {
Expand All @@ -6898,7 +6905,7 @@ public void setHoldability(int holdability) throws SQLServerException {
loggerExternal.entering(loggingClassName, "setHoldability", holdability);

if (loggerExternal.isLoggable(Level.FINER) && Util.isActivityTraceOn()) {
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getNext().toString());
loggerExternal.finer(toString() + ACTIVITY_ID + ActivityCorrelator.getCurrent().toString());
}
checkValidHoldability(holdability);
checkClosed();
Expand Down
Loading

0 comments on commit 953e4a0

Please sign in to comment.