Skip to content

Commit

Permalink
♻️ (thread) 使用组合而不是继承的方式使用 ThreadContextComponent
Browse files Browse the repository at this point in the history
  • Loading branch information
Hccake committed Mar 9, 2024
1 parent 021182a commit b567786
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void put(T t) {
this.queue.put(t);
}
catch (InterruptedException e) {
currentThread().interrupt();
Thread.currentThread().interrupt();
}
catch (Exception e) {
this.log.error("{} put Object error, param: {}", this.getClass().toString(), t, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
import java.util.concurrent.TimeUnit;

import org.ballcat.common.lock.JavaReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author lingting 2023-04-22 10:39
*/
@SuppressWarnings("java:S1066")
public abstract class AbstractDynamicTimer<T> extends AbstractThreadContextComponent {
public abstract class AbstractDynamicTimer<T> extends Thread {

protected final Logger log = LoggerFactory.getLogger(getClass());

private final JavaReentrantLock lock = new JavaReentrantLock();

Expand All @@ -48,6 +52,13 @@ protected int defaultCapacity() {
*/
protected abstract long sleepTime(T t);

protected void init() {
}

public boolean isRun() {
return !isInterrupted() && isAlive();
}

public void put(T t) {
if (t == null) {
return;
Expand All @@ -63,7 +74,7 @@ public void put(T t) {
interrupt();
}
catch (Exception e) {
this.log.error("{} put error, param: {}", this.getClass().toString(), t, e);
this.log.error("{} put error, param: {}", this.getClass(), t, e);
}
}

Expand Down Expand Up @@ -117,11 +128,11 @@ protected T pool() {
protected abstract void process(T t);

protected void error(Exception e) {
this.log.error("类: {}; 线程: {}; 运行异常! ", getSimpleName(), getId(), e);
this.log.error("类: {}; 线程: {}; 运行异常! ", getClass().getName(), getId(), e);
}

protected void shutdown() {
this.log.warn("类: {}; 线程: {}; 被中断! 剩余数据: {}", getSimpleName(), getId(), this.queue.size());
this.log.warn("类: {}; 线程: {}; 被中断! 剩余数据: {}", getClass().getName(), getId(), this.queue.size());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@
import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

/**
* 顶级队列线程类
*
* @author lingting 2021/3/2 15:07
*/
public abstract class AbstractQueueThread<E> extends AbstractThreadContextComponent {
public abstract class AbstractQueueThread<E> extends Thread {

protected final Logger log = LoggerFactory.getLogger(getClass());

/**
* 默认缓存数据数量
Expand Down Expand Up @@ -67,6 +71,13 @@ public long getPollTimeout() {
return POLL_TIMEOUT_MS;
}

public boolean isRun() {
return !isInterrupted() && isAlive();
}

protected void init() {
}

/**
* 往队列插入数据
* @param e 数据
Expand Down Expand Up @@ -131,9 +142,9 @@ public void run() {
error(e, list);
}
// Throwable 异常直接结束. 这里捕获用来保留信息. 方便排查问题
catch (Throwable t) {
this.log.error("线程队列运行异常!", t);
throw t;
catch (Throwable throwable) {
this.log.error("线程队列运行异常!", throwable);
throw throwable;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author lingting 2022/6/27 20:26
*/
public abstract class AbstractTimer extends AbstractThreadContextComponent {
public abstract class AbstractTimer extends Thread {

protected final Logger log = LoggerFactory.getLogger(getClass());

/**
* 获取超时时间, 单位: 毫秒
Expand All @@ -35,6 +40,13 @@ public long getTimeout() {
*/
protected abstract void process();

protected void init() {
}

public boolean isRun() {
return !isInterrupted() && isAlive();
}

/**
* 线程被中断触发.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,27 @@
/**
* @author lingting 2023-04-22 10:40
*/
public abstract class AbstractThreadContextComponent extends Thread implements ContextComponent {
public abstract class ThreadContextComponent implements ContextComponent {

protected final Logger log = org.slf4j.LoggerFactory.getLogger(getClass());

protected void init() {
private final Thread thread;

}

public boolean isRun() {
return !isInterrupted() && isAlive();
protected ThreadContextComponent(Thread thread) {
this.thread = thread;
}

@Override
public void onApplicationStart() {
setName(getClass().getSimpleName());
if (!isAlive()) {
start();
if (!this.thread.isAlive()) {
this.thread.start();
}
}

@Override
public void onApplicationStop() {
this.log.warn("{} 线程: {}; 开始关闭!", getClass().getSimpleName(), getId());
interrupt();
}

public String getSimpleName() {
return getClass().getSimpleName();
this.log.warn("{} 线程: {}; 开始关闭!", getClass().getSimpleName(), this.thread.getId());
this.thread.interrupt();
}

@Override
public abstract void run();

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ class AbstractDynamicTimerTest {
@BeforeEach
void before() {
this.timer = new DynamicTimer();
this.timer.onApplicationStart();
this.timer.start();
}

@AfterEach
void after() {
this.timer.onApplicationStop();
this.timer.interrupt();
}

@Test
Expand Down

0 comments on commit b567786

Please sign in to comment.