新聞中心
這篇文章將為大家詳細(xì)講解有關(guān)ThreadPoolExecutor的CallerRunsPolicy拒絕策略是什么,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
成都網(wǎng)絡(luò)公司-成都網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián)建站十多年經(jīng)驗(yàn)成就非凡,專業(yè)從事網(wǎng)站建設(shè)、成都做網(wǎng)站,成都網(wǎng)頁設(shè)計(jì),成都網(wǎng)頁制作,軟文發(fā)稿,廣告投放平臺(tái)等。十多年來已成功提供全面的成都網(wǎng)站建設(shè)方案,打造行業(yè)特色的成都網(wǎng)站建設(shè)案例,建站熱線:18980820575,我們期待您的來電!
首先介紹一下ThreadPoolExecutor構(gòu)造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
構(gòu)造方法中的字段含義如下
corePoolSize:核心線程數(shù)量,當(dāng)有新任務(wù)在execute()方法提交時(shí),會(huì)執(zhí)行以下判斷:
如果運(yùn)行的線程少于 corePoolSize,則創(chuàng)建新線程來處理任務(wù),即使線程池中的其他線程是空閑的;
如果線程池中的線程數(shù)量大于等于 corePoolSize 且小于 maximumPoolSize,則只有當(dāng)workQueue滿時(shí)才創(chuàng)建新的線程去處理任務(wù);
如果設(shè)置的corePoolSize 和 maximumPoolSize相同,則創(chuàng)建的線程池的大小是固定的,這時(shí)如果有新任務(wù)提交,若workQueue未滿,則將請(qǐng)求放入workQueue中,等待有空閑的線程去從workQueue中取任務(wù)并處理;
如果運(yùn)行的線程數(shù)量大于等于maximumPoolSize,這時(shí)如果workQueue已經(jīng)滿了,則通過handler所指定的策略來處理任務(wù)
所以,任務(wù)提交時(shí),判斷的順序?yàn)?corePoolSize –> workQueue –> maximumPoolSize。
maximumPoolSize:最大線程數(shù)量
workQueue:等待隊(duì)列,當(dāng)任務(wù)提交時(shí),如果線程池中的線程數(shù)量大于等于corePoolSize的時(shí)候,把該任務(wù)封裝成一個(gè)Worker對(duì)象放入等待隊(duì)列;
keepAliveTime:線程池維護(hù)線程所允許的空閑時(shí)間。當(dāng)線程池中的線程數(shù)量大于corePoolSize的時(shí)候,如果這時(shí)沒有新的任務(wù)提交,核心線程外的線程不會(huì)立即銷毀,而是會(huì)等待,直到等待的時(shí)間超過了keepAliveTime;
threadFactory:它是ThreadFactory類型的變量,用來創(chuàng)建新線程。默認(rèn)使用Executors.defaultThreadFactory() 來創(chuàng)建線程。使用默認(rèn)的ThreadFactory來創(chuàng)建線程時(shí),會(huì)使新創(chuàng)建的線程具有相同的NORM_PRIORITY優(yōu)先級(jí)并且是非守護(hù)線程,同時(shí)也設(shè)置了線程的名稱。
handler:它是RejectedExecutionHandler類型的變量,表示線程池的飽和策略。如果阻塞隊(duì)列滿了并且沒有空閑的線程,這時(shí)如果繼續(xù)提交任務(wù),就需要采取一種策略處理該任務(wù)。線程池提供了4種策略:
AbortPolicy:直接拋出異常,這是默認(rèn)策略;
CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
DiscardPolicy:直接丟棄任務(wù);
簡單來說,在執(zhí)行execute()方法時(shí)如果狀態(tài)一直是RUNNING時(shí),的執(zhí)行過程如下:
如果
workerCount < corePoolSize
,則創(chuàng)建并啟動(dòng)一個(gè)線程來執(zhí)行新提交的任務(wù);如果
workerCount >= corePoolSize
,且線程池內(nèi)的阻塞隊(duì)列未滿,則將任務(wù)添加到該阻塞隊(duì)列中;如果
workerCount >= corePoolSize && workerCount < maximumPoolSize
,且線程池內(nèi)的阻塞隊(duì)列已滿,則創(chuàng)建并啟動(dòng)一個(gè)線程來執(zhí)行新提交的任務(wù);如果
workerCount >= maximumPoolSize
,并且線程池內(nèi)的阻塞隊(duì)列已滿, 則根據(jù)拒絕策略來處理該任務(wù), 默認(rèn)的處理方式是直接拋異常。
實(shí)驗(yàn):拒絕策略CallerRunsPolicy
測試當(dāng)拒絕策略是CallerRunsPolicy時(shí),用調(diào)用者所在的線程來執(zhí)行任務(wù),是什么現(xiàn)象。
實(shí)驗(yàn)環(huán)境
jdk 1.8
postman模擬并發(fā)
MySQL5.7
intellj
springboot 2.1.4.RELEASE
實(shí)驗(yàn)代碼
本實(shí)驗(yàn)代碼,在https://github.com/vincentduan/mavenProject 下的threadManagement目錄下。
實(shí)驗(yàn)步驟
1. 首先配置maven pom.xml文件內(nèi)容,關(guān)鍵內(nèi)容如下:
org.springframework.boot spring-boot-dependencies 2.1.4.RELEASE import pom org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-configuration-processor true mysql mysql-connector-java org.springframework.boot spring-boot-starter-jdbc com.alibaba druid 1.1.6 org.projectlombok lombok 1.18.6
2. 配置數(shù)據(jù)庫連接池:
@SpringBootConfiguration public class DBConfiguration { @Autowired private Environment environment; @Bean public DataSource createDataSource() { DruidDataSource druidDataSource = new DruidDataSource(); druidDataSource.setUrl(environment.getProperty("spring.datasource.url")); druidDataSource.setUsername(environment.getProperty("spring.datasource.username")); druidDataSource.setPassword(environment.getProperty("spring.datasource.password")); druidDataSource.setDriverClassName(environment.getProperty("spring.datasource.driver-class-name")); return druidDataSource; } }
3. 配置線程池:
@Configuration @EnableAsync @Slf4j public class ExecutorConfig { @Autowired VisiableThreadPoolTaskExecutor visiableThreadPoolTaskExecutor; @Bean public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor"); ThreadPoolTaskExecutor executor = visiableThreadPoolTaskExecutor; //配置核心線程數(shù) executor.setCorePoolSize(5); //配置最大線程數(shù) executor.setMaxPoolSize(5); //配置隊(duì)列大小 executor.setQueueCapacity(5); //配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("async-service-"); // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù) // CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執(zhí)行初始化 executor.initialize(); return executor; } }
VisiableThreadPoolTaskExecutor類作為一個(gè)bean,并且繼承了ThreadPoolTaskExecutor;
4. 接下來創(chuàng)建一個(gè)service,為了模擬并發(fā),我們將方法執(zhí)行sleep了30秒。如下:
@Service @Slf4j public class UserThreadServiceImpl implements UserThreadService { @Autowired UserThreadDao userThreadDao; @Override @Async("asyncServiceExecutor") public void serviceTest(String username) { log.info("開啟執(zhí)行一個(gè)Service, 這個(gè)Service執(zhí)行時(shí)間為30s, threadId:{}",Thread.currentThread().getId()); userThreadDao.add(username, Integer.parseInt(Thread.currentThread().getId() +""), "started"); try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("執(zhí)行完成一個(gè)Service, threadId:{}",Thread.currentThread().getId()); userThreadDao.update(username, Integer.parseInt(Thread.currentThread().getId() +""), "ended"); } @Override public void update(String username, int threadId, String status) { userThreadDao.update(username, threadId, status); } }
5. 其中的dao很簡單,就是往數(shù)據(jù)庫插入數(shù)據(jù)和更新數(shù)據(jù)。
6. 創(chuàng)建web Controller:
@RestController @RequestMapping("/serviceTest") public class TestController { @Autowired UserThreadService userThreadService; @Autowired private VisiableThreadPoolTaskExecutor visiableThreadPoolTaskExecutor; @GetMapping("test/{username}") public Object test(@PathVariable("username") String username) { userThreadService.serviceTest(username); JSONObject jsonObject = new JSONObject(); ThreadPoolExecutor threadPoolExecutor = visiableThreadPoolTaskExecutor.getThreadPoolExecutor(); jsonObject.put("ThreadNamePrefix", visiableThreadPoolTaskExecutor.getThreadNamePrefix()); jsonObject.put("TaskCount", threadPoolExecutor.getTaskCount()); jsonObject.put("completedTaskCount", threadPoolExecutor.getCompletedTaskCount()); jsonObject.put("activeCount", threadPoolExecutor.getActiveCount()); jsonObject.put("queueSize", threadPoolExecutor.getQueue().size()); return jsonObject; } }
執(zhí)行測試
因?yàn)镃orePoolSize數(shù)量是5,MaxPoolSize也是5,因此線程池的大小是固定的。而QueueCapacity數(shù)量也是5。因此當(dāng)請(qǐng)求前5次時(shí),返回響應(yīng):
當(dāng)前線程數(shù)達(dá)到CorePoolSize時(shí),新來的請(qǐng)求就會(huì)進(jìn)入到workQueue中,如下所示:
而QueueCapacity的數(shù)量也是5,因此達(dá)到QueueCapacity的最大限制后,同時(shí)也達(dá)到了MaxPoolSize的最大限制,將會(huì)根據(jù)拒絕策略來處理該任務(wù),這里的策略是CallerRunsPolicy時(shí),用調(diào)用者所在的線程來執(zhí)行任務(wù),表現(xiàn)為當(dāng)前頁面被阻塞住了,直到當(dāng)前調(diào)用者所在的線程執(zhí)行完畢。如下所示:
從控制臺(tái)的輸出也能看出CallerRunsPolicy策略執(zhí)行線程是調(diào)用者線程:
2019-08-19 21:06:50.255 INFO 1302 --- [async-service-4] c.a.i.s.impl.UserThreadServiceImpl : 開啟執(zhí)行一個(gè)Service, 這個(gè)Service執(zhí)行時(shí)間為30s, threadId:51 2019-08-19 21:06:50.271 INFO 1302 --- [async-service-4] cn.ac.iie.dao.UserThreadDao : threadId:51, jdbcTemplate:org.springframework.jdbc.core.JdbcTemplate@5d83694c 2019-08-19 21:06:50.751 INFO 1302 --- [async-service-5] c.a.i.s.impl.UserThreadServiceImpl : 開啟執(zhí)行一個(gè)Service, 這個(gè)Service執(zhí)行時(shí)間為30s, threadId:52 2019-08-19 21:06:50.771 INFO 1302 --- [async-service-5] cn.ac.iie.dao.UserThreadDao : threadId:52, jdbcTemplate:org.springframework.jdbc.core.JdbcTemplate@5d83694c 2019-08-19 21:06:55.028 INFO 1302 --- [nio-8080-exec-1] c.a.i.s.impl.UserThreadServiceImpl : 開啟執(zhí)行一個(gè)Service, 這個(gè)Service執(zhí)行時(shí)間為30s, threadId:24 2019-08-19 21:06:55.036 INFO 1302 --- [nio-8080-exec-1] cn.ac.iie.dao.UserThreadDao : threadId:24, jdbcTemplate:org.springframework.jdbc.core.JdbcTemplate@5d83694c
其中[nio-8080-exec-1]表示就是調(diào)用者的線程。而其他線程都是[async-service-]
關(guān)于ThreadPoolExecutor的CallerRunsPolicy拒絕策略是什么就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。
網(wǎng)站名稱:ThreadPoolExecutor的CallerRunsPolicy拒絕策略是什么
文章分享:http://fisionsoft.com.cn/article/pcjihe.html