Skip to content

Commit

Permalink
Improving Log Agent Performance by Switching to Coroutine and Monito…
Browse files Browse the repository at this point in the history
…ring Active Log #739 (#740)

* The file module needs to add a few performance-related unit tests. #731

* Improving Log Agent Performance by Switching to Coroutine and Monitoring Active Log #739
  • Loading branch information
goodjava authored Sep 26, 2023
1 parent 17b69b4 commit f7f4939
Show file tree
Hide file tree
Showing 20 changed files with 914 additions and 2 deletions.
20 changes: 20 additions & 0 deletions jcommon/file/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,24 @@

</dependencies>


<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<compilerArgs>
<arg>--add-modules=jdk.incubator.concurrent</arg>
<arg>--enable-preview</arg>
</compilerArgs>
<compilerVersion>20</compilerVersion>
<source>20</source>
<target>20</target>
</configuration>
</plugin>
</plugins>
</build>


</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ public interface ILogFile {

void initLogFile(String file, ReadListener listener, long pointer, long lineNumber);


}
274 changes: 274 additions & 0 deletions jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
package com.xiaomi.mone.file;

import com.google.common.collect.Lists;
import com.xiaomi.mone.file.common.FileInfo;
import com.xiaomi.mone.file.common.FileInfoCache;
import com.xiaomi.mone.file.common.FileUtils;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.MessageDigest;
import java.util.concurrent.TimeUnit;

/**
* @author goodjava@qq.com
*/
@Slf4j
public class LogFile2 implements ILogFile {

@Getter
private String file;

@Getter
private Object fileKey;

@Getter
private MoneRandomAccessFile raf;

@Setter
private ReadListener listener;

@Setter
private volatile boolean stop;

@Setter
private volatile boolean reOpen;

@Setter
private volatile boolean reFresh;

@Getter
private int beforePointerHashCode;

@Setter
private long pointer;

//行号
private long lineNumber;

//每次读取时文件的最大偏移量
private long maxPointer;

private String md5;

private static final int LINE_MAX_LENGTH = 50000;

public LogFile2() {

}

public LogFile2(String file, ReadListener listener) {
this.file = file;
File f = new File(this.file);
this.fileKey = FileUtils.fileKey(f);
this.md5 = md5(file);
this.listener = listener;
this.pointer = readPointer();
}

public LogFile2(String file) {
this.file = file;
File f = new File(this.file);
this.fileKey = FileUtils.fileKey(f);
this.md5 = md5(file);
this.pointer = readPointer();
}


public LogFile2(String file, ReadListener listener, long pointer, long lineNumber) {
this.file = file;
this.md5 = md5(file);
this.listener = listener;
this.pointer = pointer;
this.lineNumber = lineNumber;
}

private void open() {
try {
//4kb
this.raf = new MoneRandomAccessFile(file, "r", 1024 * 4);
reOpen = false;
reFresh = false;
} catch (FileNotFoundException e) {
log.error("open file FileNotFoundException", e);
} catch (IOException e) {
log.error("open file IOException", e);
}
}

public void readLine() throws IOException {
while (true) {
open();
//兼容文件切换时,缓存的pointer
try {
log.info("open file:{},pointer:{}", file, this.pointer);
if (pointer > raf.length()) {
pointer = 0;
lineNumber = 0;
}
} catch (Exception e) {
log.error("file.length() IOException, file:{}", this.file, e);
}
raf.seek(pointer);

while (true) {
listener.setPointer(this);
if (this.pointer == -1) {
pointer = 0;
this.lineNumber = 0;
log.info("empty break");
break;
}
String line = raf.getNextLine();
if (null != line && lineNumber == 0 && pointer == 0) {
String hashLine = line.length() > 100 ? line.substring(0, 100) : line;
beforePointerHashCode = hashLine.hashCode();
}
//大行文件先临时截断
line = lineCutOff(line);

if (reFresh) {
break;
}

if (reOpen) {
pointer = 0;
lineNumber = 0;
break;
}

if (stop) {
break;
}

//文件内容被切割,重头开始采集内容
if (contentHasCutting(line)) {
reOpen = true;
pointer = 0;
lineNumber = 0;
log.warn("file:{} content have been cut, goto reOpen file", file);
break;
}

if (listener.isBreak(line)) {
stop = true;
break;
}

if (listener.isContinue(line)) {
continue;
}


try {
pointer = raf.getFilePointer();
maxPointer = raf.length();
} catch (IOException e) {
log.error("file.length() IOException, file:{}", this.file, e);
}

ReadResult readResult = new ReadResult();
readResult.setLines(Lists.newArrayList(line));
readResult.setPointer(pointer);
readResult.setFileMaxPointer(maxPointer);
readResult.setLineNumber(++lineNumber);
ReadEvent event = new ReadEvent(readResult);

listener.onEvent(event);
}
raf.close();
if (stop) {
log.info("stop:{}", this.file);
FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).build());
break;
}
}
}

@Override
public void initLogFile(String file, ReadListener listener, long pointer, long lineNumber) {
this.file = file;
this.md5 = md5(file);
this.listener = listener;
this.pointer = pointer;
this.lineNumber = lineNumber;
}

private String lineCutOff(String line) {
if (null != line) {
//todo 大行文件先临时截断
if (line.length() > LINE_MAX_LENGTH) {
line = line.substring(0, LINE_MAX_LENGTH);
}
}

return line;
}

private boolean contentHasCutting(String line) throws IOException {
if (null != line) {
return false;
}

long currentFileMaxPointer;
try {
currentFileMaxPointer = raf.length();
if (currentFileMaxPointer == 0L) {
raf.getFD().sync();
TimeUnit.MILLISECONDS.sleep(30);
currentFileMaxPointer = raf.length();
}
} catch (IOException e) {
log.error("get fileMaxPointer IOException", e);
return false;
} catch (InterruptedException e) {
log.error("get fileMaxPointer InterruptedException", e);
return false;
}

return false;
}


public void shutdown() {
try {
this.stop = true;
FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).build());
} catch (Throwable ex) {
log.error(ex.getMessage());
}
}


public long readPointer() {
try {
FileInfo fi = FileInfoCache.ins().get(this.fileKey.toString());
if (null != fi) {
return fi.getPointer();
}
} catch (Throwable e) {
log.error(e.getMessage());
}
return 0;
}


@SneakyThrows
public String md5(String msg) {
MessageDigest md = MessageDigest.getInstance("MD5");
md.update(msg.getBytes());
byte[] digest = md.digest();
StringBuilder sb = new StringBuilder(2 * digest.length);
for (byte b : digest) {
sb.append(String.format("%02x", b & 0xff));
}
return sb.toString().toUpperCase();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@
*/
public interface ReadListener {


void onEvent(ReadEvent event);

boolean isContinue(String line);

default boolean isBreak(String line) {
return false;
}

default void setPointer(Object obj) {

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.xiaomi.mone.file.common;

import lombok.Builder;
import lombok.Data;

/**
* @author goodjava@qq.com
* @date 2023/9/26 11:51
*/
@Data
@Builder
public class FileInfo {

private long pointer;

}
Loading

0 comments on commit f7f4939

Please sign in to comment.