关于dubbo 自定义线程池的问题
初识dubbo
一、什么是dubbo?
Dubbo是阿里巴巴开源的基于 Java 的高性能 RPC(一种远程调用) 分布式服务框架(SOA),致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案,其实就是一种远程服务调用的分布式框架
二、为什么要用dubbo
在互联网的发展,网站应用的规模不断扩大,常规的垂直应用架构已无法应对,分布式服务架构以及流动计算架构势在必行,亟需一个治理系统确保架构有条不紊的演进,所以就出现了dubbo
单一应用框架:当网站流量很小时,只需一个应用,将所有功能都部署在一起。
垂直应用框架:当访问量逐渐增大,单一应用增加机器带来的加速度越来越小,将应用拆成互不相干的几个应用,以提升效率。
分布式服务架构:当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服务,逐渐形成稳定的服务中心,使前端应用能更快速的响应多变的市场需求。此时,用于提高业务复用及整合的分布式服务框架(RPC)是关键。
流动计算架构:当服务越来越多,容量的评估,小服务资源的浪费等问题逐渐显现,此时需增加一个调度中心基于访问压力实时管理集群容量,提高集群利用率。此时,用于提高机器利用率的资源调度和治理中心(SOA)是关键。
前言
在日常开发中,线程池几乎涉及到了所有的开发框架,或者一些中间件,像我们熟悉的JDK线程池,druid连接线程池等等,线程池的使用,大大降低了人工维护线程的成本,而且提升了线程资源在使用中的效率;
dubbo线程池
dubbo也不例外,默认情况下,当我们在说到dubbo线程池的时候,通常是指服务提供者一端的线程池,其常用配置参数如下:
spring.dubbo.protocol.threads = 2000 spring.dubbo.protocol.threadpool = cached spring.dubbo.protocol.dispatcher = message
对应到dubbo的配置文件中如下:
<dubbo:protocol name="dubbo" threadpool="cached" dispatcher="message" threads="50" port="20880"/>
dubbo线程池说明
dubbo在使用的时候,都是通过创建真实的业务线程来进行操作的。已知的线程池模型主要有2个,固定大小线程池和带缓存的线程池;
- fix线程池,即固定大小线程池。也是dubbo默认的使用方式,默认情况下,不做任何配置,线程池最大的线程个数为200个,并且是没有任何等待队列的。所以这种线程池,在极端情况下,可能会存在问题,比如某个应用执行某个大批量操作时,可能因为线程堵塞造成其他应用无法调用的情况;
- cached线程池,非固定大小线程池,当线程不足的时候,会自动创建新线程。这种类型的线程池的问题在于,如果有较高的TPS过来的时候,如果请求的dubbo接口比较耗时,未能及时响应,则会连续不断的创建新线程,则对系统的CPU以及负载都是巨大的压力,甚至可能造成系统宕机的风险; dubbo 自定义线程池
在真实的使用过程中,大多数开发人员是忽略这个配置的,也就是说通常情况下默认是使用fix模式的,如果对于那种TPS比较高,或者dubbo接口中执行的核心业务逻辑比较耗时,并且系统要应对的并发也是居高不下的场景下,fix模式最终因为线程数创建不足而产生错误;
在这种情况下,出了错误之后,通常来说也是无感知的,怎么能快速定位因线程创建不足而导致的问题呢?这就需要一种机制,能够监控dubbo线程池,对运行过程中线程池的状况进行监控,当核心指标达到告警阈值时,及时给出预警,通知开发或运维人员快速做出响应和调整;
这就需要自定义线程池来解决这个问题
自定义线程池代码实现步骤
1、自定义一个maven模块并添加核心依赖
<dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo</artifactId> <version>2.7.0</version> </dependency>
2、自定义线程池类
自定义一个类,继承FixedThreadPool 类,并实现Runnable接口,即该类本身作为一个线程;
import org.apache.dubbo.common.URL; import org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.*; public class WatchingPool extends FixedThreadPool implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(WatchingPool.class); // 线程池预警值【可以根据实际情况动态调整大小】 private static final double ALARM_PERCENT = 0.70; private final Map<URL, ThreadPoolExecutor> theadPoolMap = new ConcurrentHashMap<>(); public WatchingPool() { // 创建一个定时任务,每3秒执行一次【可以根据实际情况动态调整参数】 Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this, 1, 3, TimeUnit.SECONDS); } @Override public Executor getExecutor(URL url) { // 重写父类getExecutor, 如果executor是ThreadPoolExecutor,则放入theadPoolMap中 Executor executor = super.getExecutor(url); if (executor instanceof ThreadPoolExecutor) { theadPoolMap.put(url, (ThreadPoolExecutor) executor); } return executor; public void run() { for (Map.Entry<URL, ThreadPoolExecutor> entry : theadPoolMap.entrySet()) { URL url = entry.getKey(); ThreadPoolExecutor threadPoolExecutor = entry.getValue(); // 获取正在活动的线程数 int activeCount = threadPoolExecutor.getActiveCount(); // 获取总的线程数 (继承的FixedThreadPool , 所以这里获取核心的线程数就是总的线程数) int corePoolSize = threadPoolExecutor.getCorePoolSize(); double percent = activeCount / (corePoolSize * 1.0); LOGGER.info("线程池状态:{}/{},: {}%", activeCount, corePoolSize, percent*100); if (percent > ALARM_PERCENT) { LOGGER.error("超出警戒线 : host:{}, 当前使用量 {}%, URL:{}", url.getHost(), percent*100, url); } }
该类的逻辑,主要是设置了一个最大的阈值,当超出这个阈值时候,打印出相关的信息,实际应用中,可以集成发短信或邮件进行告警;
3、将上面的类配置到META-INF目录中
4、在生产端工程中引入上面的模块
<dependency> <groupId>com.congge</groupId> <artifactId>common-pool</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
5、在生产端配置文件中,指定自定义线程池
<dubbo:protocol name="dubbo" threadpool="watchingPool" threads="50" port="20880"/>
6、改造消费端的启动类,使用1000个线程不断调用接口
import com.congge.service.HelloService; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import java.io.IOException; public class ConsumerMain { public static void main(String[] args) throws Exception { ApplicationContext ac = new ClassPathXmlApplicationContext("spring-consumer.xml"); HelloService service = (HelloService) ac.getBean("helloService"); while (true){ for(int i=0;i<1000;i++){ Thread.sleep(6); new Thread( () ->{ String hello = service.hello("Hello Provider"); System.out.println(hello); }).start(); } } } }
为了看到效果,我们将配置文件这里的线程数调整到了50个,下面启动生产端和消费端的代码,观察控制台输出日志(先启动生产端,再启动消费端)
生产端已就绪
启动消费端,不断发起了调用
这时再次回到生产端控制台,通过输出日志可以看到整个线程池经历了线程数不断往上增长的过程,直到最后达到了警戒线;