Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-9809][Audit] SDK supports both singleton and non-singleton usage #9812

Merged
merged 5 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,332 +17,27 @@

package org.apache.inlong.audit;

import org.apache.inlong.audit.protocol.AuditApi;
import org.apache.inlong.audit.send.SenderManager;
import org.apache.inlong.audit.util.AuditConfig;
import org.apache.inlong.audit.util.Config;
import org.apache.inlong.audit.util.StatInfo;

import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_REQUEST;

/**
* Audit operator, which is singleton.
* Creating objects through deserialization is not supported.
*/
public class AuditOperator implements Serializable {

private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LoggerFactory.getLogger(AuditOperator.class);
private static final String FIELD_SEPARATORS = ":";
private static final String DEFAULT_AUDIT_TAG = "-1";
private static final long DEFAULT_AUDIT_VERSION = -1;
private static final int BATCH_NUM = 100;
private static final AuditOperator AUDIT_OPERATOR = new AuditOperator();
private static final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
private static final int PERIOD = 1000 * 60;
private final ConcurrentHashMap<String, StatInfo> countMap = new ConcurrentHashMap<>();
private final HashMap<String, StatInfo> threadCountMap = new HashMap<>();
private final ConcurrentHashMap<String, StatInfo> deleteCountMap = new ConcurrentHashMap<>();
private final List<String> deleteKeyList = new ArrayList<>();
private final Config config = new Config();
private int packageId = 1;
private int dataId = 0;
private boolean initialized = false;
private SenderManager manager;

private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
private AuditConfig auditConfig = null;
private SocketAddressListLoader loader = null;
public final class AuditOperator extends AuditReporterImpl {

/**
* Not support create from outer.
*/
private AuditOperator() {

}

/**
* Get AuditOperator instance.
*/
public static AuditOperator getInstance() {
return AUDIT_OPERATOR;
}

/**
* init
*/
private void init() {
if (initialized) {
return;
}
config.init();
timeoutExecutor.scheduleWithFixedDelay(new Runnable() {

@Override
public void run() {
try {
loadIpPortList();
send();
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
}

}, PERIOD, PERIOD, TimeUnit.MILLISECONDS);
if (auditConfig == null) {
auditConfig = new AuditConfig();
}
this.manager = new SenderManager(auditConfig);
}
private static class AuditOperatorHandler {

private void loadIpPortList() {
if (loader == null) {
return;
}
try {
List<String> ipPortList = loader.loadSocketAddressList();
if (ipPortList != null && ipPortList.size() > 0) {
HashSet<String> ipPortSet = new HashSet<>();
ipPortSet.addAll(ipPortList);
this.setAuditProxy(ipPortSet);
}
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
private static final AuditOperator INSTANCE = new AuditOperator();
}

/**
* set loader
*
* @param loader the loader to set
*/
public void setLoader(SocketAddressListLoader loader) {
this.loader = loader;
}

/**
* setLoaderClass
*
* @param loaderClassName
*/
public void setLoaderClass(String loaderClassName) {
if (StringUtils.isEmpty(loaderClassName)) {
return;
}
try {
Class<?> loaderClass = ClassUtils.getClass(loaderClassName);
Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
if (loaderObject instanceof SocketAddressListLoader) {
SocketAddressListLoader loader = (SocketAddressListLoader) loaderObject;
this.loader = loader;
LOGGER.info("audit IpPortListLoader loaderClass:{}", loaderClassName);
}
} catch (Throwable t) {
LOGGER.error("Fail to init IpPortListLoader,loaderClass:{},error:{}",
loaderClassName, t.getMessage(), t);
}
}

/**
* Set AuditProxy from the ip
*/
public void setAuditProxy(HashSet<String> ipPortList) {
try {
GLOBAL_LOCK.lockInterruptibly();
if (!initialized) {
init();
initialized = true;
}
this.manager.setAuditProxy(ipPortList);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage());
} finally {
GLOBAL_LOCK.unlock();
}
}

/**
* set audit config
*/
public void setAuditConfig(AuditConfig config) {
auditConfig = config;
manager.setAuditConfig(config);
}

/**
* Add audit data
*/
public void add(int auditID, String inlongGroupID, String inlongStreamID, Long logTime, long count, long size) {
add(auditID, DEFAULT_AUDIT_TAG, inlongGroupID, inlongStreamID, logTime, count, size);
}

public void add(int auditID, String auditTag, String inlongGroupID, String inlongStreamID, Long logTime,
long count, long size) {
long delayTime = System.currentTimeMillis() - logTime;
add(auditID, auditTag, inlongGroupID, inlongStreamID, logTime, count, size,
delayTime * count, DEFAULT_AUDIT_VERSION);
}

public void add(int auditID, String inlongGroupID, String inlongStreamID, Long logTime, long count, long size,
long delayTime) {
add(auditID, DEFAULT_AUDIT_TAG, inlongGroupID, inlongStreamID, logTime, count, size,
delayTime, DEFAULT_AUDIT_VERSION);
}

public void add(int auditID, String auditTag, String inlongGroupID, String inlongStreamID, Long logTime,
long count, long size, long delayTime, long auditVersion) {
StringJoiner keyJoiner = new StringJoiner(FIELD_SEPARATORS);
keyJoiner.add(String.valueOf(logTime / PERIOD));
keyJoiner.add(inlongGroupID);
keyJoiner.add(inlongStreamID);
keyJoiner.add(String.valueOf(auditID));
keyJoiner.add(auditTag);
keyJoiner.add(String.valueOf(auditVersion));
addByKey(keyJoiner.toString(), count, size, delayTime);
}

/**
* Add audit info by key.
*/
private void addByKey(String key, long count, long size, long delayTime) {
if (countMap.get(key) == null) {
countMap.put(key, new StatInfo(0L, 0L, 0L));
}
countMap.get(key).count.addAndGet(count);
countMap.get(key).size.addAndGet(size);
countMap.get(key).delay.addAndGet(delayTime);
}

/**
* Send audit data
*/
public synchronized void send() {
manager.clearBuffer();
resetStat();
// Retrieve statistics from the list of objects without statistics to be eliminated
for (Map.Entry<String, StatInfo> entry : this.deleteCountMap.entrySet()) {
this.sumThreadGroup(entry.getKey(), entry.getValue());
}
this.deleteCountMap.clear();
for (Map.Entry<String, StatInfo> entry : countMap.entrySet()) {
String key = entry.getKey();
StatInfo value = entry.getValue();
// If there is no data, enter the list to be eliminated
if (value.count.get() == 0) {
this.deleteKeyList.add(key);
continue;
}
this.sumThreadGroup(key, value);
}

// Clean up obsolete statistical data objects
for (String key : this.deleteKeyList) {
StatInfo value = this.countMap.remove(key);
this.deleteCountMap.put(key, value);
}
this.deleteKeyList.clear();

long sdkTime = Calendar.getInstance().getTimeInMillis();
AuditApi.AuditMessageHeader msgHeader = AuditApi.AuditMessageHeader.newBuilder()
.setIp(config.getLocalIP()).setDockerId(config.getDockerId())
.setThreadId(String.valueOf(Thread.currentThread().getId()))
.setSdkTs(sdkTime).setPacketId(packageId)
.build();
AuditApi.AuditRequest.Builder requestBuild = AuditApi.AuditRequest.newBuilder();
requestBuild.setMsgHeader(msgHeader).setRequestId(manager.nextRequestId());

// process the stat info for all threads
for (Map.Entry<String, StatInfo> entry : threadCountMap.entrySet()) {
// Entry key order: logTime inlongGroupID inlongStreamID auditID auditTag auditVersion
String[] keyArray = entry.getKey().split(FIELD_SEPARATORS);
long logTime = Long.parseLong(keyArray[0]) * PERIOD;
String inlongGroupID = keyArray[1];
String inlongStreamID = keyArray[2];
String auditID = keyArray[3];
String auditTag = keyArray[4];
long auditVersion = Long.parseLong(keyArray[5]);
StatInfo value = entry.getValue();
AuditApi.AuditMessageBody msgBody = AuditApi.AuditMessageBody.newBuilder()
.setLogTs(logTime)
.setInlongGroupId(inlongGroupID)
.setInlongStreamId(inlongStreamID)
.setAuditId(auditID)
.setAuditTag(auditTag)
.setCount(value.count.get())
.setSize(value.size.get())
.setDelay(value.delay.get())
.setAuditVersion(auditVersion)
.build();
requestBuild.addMsgBody(msgBody);

if (dataId++ >= BATCH_NUM) {
dataId = 0;
packageId++;
sendByBaseCommand(requestBuild.build());
requestBuild.clearMsgBody();
}
}
if (requestBuild.getMsgBodyCount() > 0) {
sendByBaseCommand(requestBuild.build());
requestBuild.clearMsgBody();
}
threadCountMap.clear();

LOGGER.info("finish report audit data");
}

/**
* Send base command
*/
private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) {
AuditApi.BaseCommand.Builder baseCommand = AuditApi.BaseCommand.newBuilder();
baseCommand.setType(AUDIT_REQUEST).setAuditRequest(auditRequest).build();
manager.send(baseCommand.build(), auditRequest);
}

/**
* Summary
*/
private void sumThreadGroup(String key, StatInfo statInfo) {
long count = statInfo.count.getAndSet(0);
if (0 == count) {
return;
}
if (threadCountMap.get(key) == null) {
threadCountMap.put(key, new StatInfo(0, 0, 0));
}

long size = statInfo.size.getAndSet(0);
long delay = statInfo.delay.getAndSet(0);

threadCountMap.get(key).count.addAndGet(count);
threadCountMap.get(key).size.addAndGet(size);
threadCountMap.get(key).delay.addAndGet(delay);
}

/**
* Reset statistics
* Get AuditOperator instance.
*/
private void resetStat() {
dataId = 0;
packageId = 1;
public static AuditOperator getInstance() {
return AuditOperatorHandler.INSTANCE;
}
}
Loading
Loading