javascript
Spring Boot 配置线程池使用多线程插入数据
??點擊上方?好好學java?,選擇?星標?公眾號
重磅資訊、干貨,第一時間送達 今日推薦:牛人 20000 字的 Spring Cloud 總結,太硬核了~個人原創+1博客:點擊前往,查看更多 來源:https://segmentfault.com/a/1190000016345113 作者:Half前言
最近在工作中需要將一大批數據導入到數據庫中,因為種種原因這些數據不能使用同步數據的方式來進行復制,而是提供了一批文本,文本里面有很多行url地址,需要的字段都包含在這些url中。最開始是使用的正常的普通方式去寫入,但是量太大了,所以就嘗試使用多線程來寫入。下面我們就來介紹一下怎么使用多線程進行導入。
1.文本格式
格式就是類似于這種格式的url,當然這里只是舉個例子,大概有300多個文本,每個文本里面有大概25000條url,而每條url要插入兩個表,這個量還是有點大的,單線程跑的非常慢。
-
https://www.test.com/?type=1&code=123456&goodsId=321
2.springboot配置線程池
我們需要創建一個ExecutorConfig類來設置線程池的各種配置。
@Configuration @EnableAsync public class ExecutorConfig {private static Logger logger = LogManager.getLogger(ExecutorConfig.class.getName());@Beanpublic Executor asyncServiceExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//配置核心線程數executor.setCorePoolSize(5);//配置最大線程數executor.setMaxPoolSize(10);//配置隊列大小executor.setQueueCapacity(400);//配置線程池中的線程的名稱前綴executor.setThreadNamePrefix("thread-");// rejection-policy:當pool已經達到max size的時候,如何處理新任務// CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//執行初始化executor.initialize();return executor;} }3.創建異步任務接口
我們需要創建一個接口,再這個接口里面聲明了我們需要調用的異步方法
public interface AsyncService {/*** 執行異步任務*/void writeTxt(); }4.創建異步實現類
再創建一個異步類實現上面的異步接口,重寫接口里面的方法,最重要的是我們需要在方法上加@Async("asyncServiceExecutor")注解,它是剛剛我們在線程池配置類的里的那個配制方法的名字,加上這個后每次執行這個方法都會開啟一個線程放入線程池中。我下面這個方法是開啟多線程遍歷文件夾中的文件然后為每個文件都復制一個副本出來。
@Service public class AsyncServiceImpl implements AsyncService {private static Logger logger = LogManager.getLogger(AsyncServiceImpl.class.getName());@Async("asyncServiceExecutor")public void writeTxt(String fileName){logger.info("線程-" + Thread.currentThread().getId() + "在執行寫入");try {File file = new File(fileName);List<String> lines = FileUtils.readLines(file);File copyFile = new File(fileName + "_copy.txt");lines.stream().forEach(string->{try {FileUtils.writeStringToFile(copyFile,string,"utf8",true);FileUtils.writeStringToFile(copyFile,"\r\n","utf8",true);} catch (IOException e) {logger.info(e.getMessage());}});}catch (Exception e) {logger.info(e.getMessage());}} } @RunWith(SpringRunner.class) @SpringBootTest public class BootApplicationTests {@Autowired private AsyncService asyncService;@Test public void write() {File file = new File("F://ac_code_1//test.txt");try {FileUtils.writeStringToFile(file, "ceshi", "utf8");FileUtils.writeStringToFile(file, "\r\n", "utf8");FileUtils.writeStringToFile(file, "ceshi2", "utf8");} catch (IOException e) {e.printStackTrace();} }5.修改為阻塞式
上面的步驟已經基本實現了多線程的操作,但是當我真的開始導入數據的時候又發現一個問題,就是每次運行后才剛開始導入就自動停止了,原因是我在Junit中運行了代碼后它雖然開始導入了,但是因為數據很多時間很長,而Juint跑完主線程的邏輯后就把整個JVM都關掉了,所以導入了一點點就停止了,上面的測試方法之所以沒問題是因為幾個文件的復制速度很快,在主線程跑完之前就跑完了,所以看上去沒問題。最開始我用了一個最笨的方法,直接在主線程最后調用Thread.sleep()方法,雖然有效果但是這也太low了,而且你也沒法判斷到底數據導完沒有。所以我又換了一個方式。
6.使用countDownLatch阻塞主線程
CountDownLatch是一個同步工具類,它允許一個或多個線程一直等待,直到其他線程執行完后再執行。它可以使主線程一直等到所有的子線程執行完之后再執行。我們修改下代碼,創建一個CountDownLatch實例,大小是所有運行線程的數量,然后在異步類的方法中的finally里面對它進行減1,在主線程最后調用await()方法,這樣就能確保所有的子線程運行完后主線程才會繼續執行。
@RunWith(SpringRunner.class) @SpringBootTest public class BootApplicationTests {private final CountDownLatch countDownLatch = new CountDownLatch(10);@Autowiredprivate AsyncService asyncService;@Testpublic void mainWait() {try {for (int i = 0; i < 10; i++) {asyncService.mainWait(countDownLatch);}countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}} } @Service public class AsyncServiceImpl implements AsyncService {private static Logger logger = LogManager.getLogger(AsyncServiceImpl.class.getName());@Override@Async("asyncServiceExecutor")public void mainWait(CountDownLatch countDownLatch) {try {System.out.println("線程" + Thread.currentThread().getId() + "開始執行");for (int i=1;i<1000000000;i++){Integer integer = new Integer(i);int l = integer.intValue();for (int x=1;x<10;x++){Integer integerx = new Integer(x);int j = integerx.intValue();}}System.out.println("線程" + Thread.currentThread().getId() + "執行結束");} catch (Exception e) {e.printStackTrace();} finally {countDownLatch.countDown();}} }7.導入代碼
雖然上面的多線程是重點,不過還是把導入數據的代碼展示出來給大家參考一下,當然這是簡化版,真實的要比這個多了很多判斷,不過那都是基于業務需求做的判斷。
@RunWith(value = SpringRunner.class) @SpringBootTest public class ApplicationTests {private static Log logger = LogFactory.getLog(ApplicationTests.class);private final CountDownLatch countDownLatch;@AutowiredAsyncService asyncService;@Testpublic void writeCode() {try {File file = new File("F:\\ac_code_1");File[] files = file.listFiles();//計數器數量就等于文件數量,因為每個文件會開一個線程countDownLatch = new CountDownLatch(files.length);Arrays.stream(files).forEach(file1 -> {File child = new File(file1.getAbsolutePath());String fileName = child.getAbsolutePath();logger.info(asyncService.writeCode(fileName,countDownLatch));});countDownLatch.await();catch (InterruptedException e) {e.printStackTrace();}} } @Service public class AsyncServiceImpl implements AsyncService {private static Log logger = LogFactory.getLog(AsyncServiceImpl.class);@AutowiredIExampleService exampleService;@Override@Async("asyncServiceExecutor")public String writeCode(String fileName,CountDownLatch countDownLatch) {logger.info("線程-" + Thread.currentThread().getId() + "在導入-" + fileName);try {File file = new File(fileName);List<String> list = FileUtils.readLines(file);for (String string : list) {String[] parmas = string.split(",");ExampleVo vo = new ExampleVo();vo.setParam1(parmas[0]);vo.setParam1(parmas[1]);vo.setParam1(parmas[2]);exampleService.save(vo);}return "導入完成-" + fileName;}catch (Exception e){e.printStackTrace();return null;}finally {//導入完后減1countDownLatch.countDown();}} }總結
到這里就已經講完了多線程插入數據的方法,目前這個方法還很簡陋。因為是每個文件都開一個線程性能消耗比較大,而且如果線程池的線程配置太多了,頻繁切換反而會變得很慢,大家如果有更好的辦法都可以留言討論。
總結
以上是生活随笔為你收集整理的Spring Boot 配置线程池使用多线程插入数据的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 手把手 docker 从零搭建 jenk
- 下一篇: 2020 年 4 月全国程序员工资新鲜出