动态修改线程池

starlin 6,918 2021-07-02

在平时工作中,用到的最多的应该是线程池了,那么,当自定义线程池的时候 corePoolSize、maximumPoolSize、workQueue(队列长度)如何动态化调整了

动态更新的原理

更新corePoolSize的源码

public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

在运行期间线程池使用此方法设置corePoolSize之后,线程池会直接覆盖原来的corePoolSize值,并且基于当前值和原始值的比较结果采取不同的处理策略。
对于当前值小于当前工作线程数的情况,说明有多余的worker线程,此时会向当前idle的worker线程发起中断请求以实现回收,多余的worker在下次idel的时候也会被回收;
对于当前值大于原始值且当前队列中有待执行任务,则线程池会创建新的worker线程来执行队列任务

![](https://raw.githubusercontent.com/smartlin/pic/main/_posts/java%E5%B9%B6%E5%8F%91/%E5%8A%A8%E6%80%81%E4%BF%AE%E6%94%B9%E7%BA%BF%E7%A8%8B%E6%B1%A0.md/5084111706496.png =678x)

更新maximumPoolSize的源码

public void setMaximumPoolSize(int maximumPoolSize) {
    //校验参数合理性
    if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
        throw new IllegalArgumentException();
    //覆盖原值
    this.maximumPoolSize = maximumPoolSize;
    //工作线程是否是大于最大线程数,若大于,则对空闲线程发起中断请求
    if (workerCountOf(ctl.get()) > maximumPoolSize)
        interruptIdleWorkers();
}

如何更新队列长度

在ThreadPoolExecutor源码中没有设置队列长度的的方法,如下图:
![](https://raw.githubusercontent.com/smartlin/pic/main/_posts/java%E5%B9%B6%E5%8F%91/%E5%8A%A8%E6%80%81%E4%BF%AE%E6%94%B9%E7%BA%BF%E7%A8%8B%E6%B1%A0.md/5530597505588.png =652x)
那么问题来来,该如何去更新队列长度

先来看看LinkedBlockingQueue源码中的,发现是被final修饰的,所以没有提供set方法,如下:

private final int capacity;

于是乎我参考了大神的做法,把LinkedBlockingQueue源码复制出来,将capacity前面的final修饰去掉,重新定一个名为ResizeLinkedBlockIngQueue的类
在创建线程池的时候,把里面的LinkedBlockingQueue替换为ResizeLinkedBlockIngQueue,如下:

private ThreadPoolExecutor buildThreadPoolExecutorResizeBlockQueue() {
    return new ThreadPoolExecutor(2, 5, 60, TimeUnit.MILLISECONDS, new ResizeLinkedBlockIngQueue<>(10));
}

Tips:这里会有一个问题,如果将LinkedBlockingQueue换成其他的阻塞队列,则需要将相应阻塞队列中的capacity前面的final修饰去掉,也就是要自定义多个阻塞队列

来看看效果,测试代码如下:

@org.junit.Test
public void getThreadPoolInfounUseLinkedBlockingQueue() throws InterruptedException {
    ThreadPoolExecutor executor = buildThreadPoolExecutorResizeBlockQueue();
    ThreadPoolUtil.threadPoolStatus(executor, "before");
    for (int i = 0; i < 15; i++) {
        executor.submit(() ->{
            ThreadPoolUtil.threadPoolStatus(executor, "create-thread-starlin");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    executor.setCorePoolSize(10);
    executor.setMaximumPoolSize(10);
    ResizeLinkedBlockIngQueue queue = (ResizeLinkedBlockIngQueue) executor.getQueue();
    queue.setCapacity(100);
    ThreadPoolUtil.threadPoolStatus(executor, "after");
    Thread.currentThread().join();
}

运行结果,如下图所示:
![](https://raw.githubusercontent.com/smartlin/pic/main/_posts/java%E5%B9%B6%E5%8F%91/%E5%8A%A8%E6%80%81%E4%BF%AE%E6%94%B9%E7%BA%BF%E7%A8%8B%E6%B1%A0.md/3621315809743.png =1264x)

从上图可以很明显的看出队列长度、核心线程数、最大线程数都已经改变了

利用动态配置中心实现方式(思路版)

因为我之前是用Apollo来实现,主要思路是通过的Apollo的监听方式来实现,如果监听到配置发生来改变后刷新线程池的配置,重新set对应的值即可(仅仅配置了是无法生效的)
下面拿一个监听日志级别的的listen类(原来监听线程池的代码没找到,这里懒得写了),展示如下:

@Service
public class ApolloChangeListener {

    private static final Logger logger = LoggerFactory.getLogger(ApolloChangeListener.class);

    private static final String LOGGER_TAG = "logging.level.";

    @ApolloConfig
    private Config config;

    @Resource
    private LoggingSystem loggingSystem;

    @ApolloConfigChangeListener
    private void someChangeHandler(ConfigChangeEvent changeEvent) {
        for (String key : changeEvent.changedKeys()) {
            ConfigChange change = changeEvent.getChange(key);
            logger.info("Found change - 【{}】", change.toString());
        }

        Set<String> keyNames = config.getPropertyNames();
        for (String key : keyNames) {
            if (StringUtils.isEmpty(key)) {
                continue;
            }
            if (!key.startsWith(LOGGER_TAG)) {
                continue;
            }
            String loggerName = key.replace(LOGGER_TAG, "");
            String strLevel = config.getProperty(key, "info");
            LogLevel level = LogLevel.valueOf(strLevel.toUpperCase());
            loggingSystem.setLogLevel(loggerName, level);
            logger.info("{}:{}", key, strLevel);
        }
    }
}

因为我对Nacos没有那么熟悉,在官方文档中也能找到监听器的API,点击跳转,实现方式和Apollo原理一样

以上,end,感谢阅读!!!


# java并发