JUC编程java多线程并发详细总结
1、什么是JUC
JUC即Java.util.concurrent包,這是一個處理線程的工具包,JDK 1.5開始出現的。
2、進程與線程
一個進程可以包含多個線程,至少包含一個
java默認有兩個線程(main、 gc)
對于Java而言,創建線程的方式:進程Thread、實現Runnable接口、實現Callback接口
java并不能直接開啟線程,它使調了底層c++的方法。
并發:多個線程操作一個資源,一個CUP
并行:多個線程同時執行,多個CPU
3、線程的狀態
由源碼
NEW :新建RUNNABLE, 就緒BLOCKED, 阻塞WAITING, 等待TIMED_WAITING, 超時等待TERMINATED; 終止4、wait與sleep
wait來自Object
sleep 來自Thread
wait會釋放鎖,sleep不會釋放鎖
5、Lock鎖
線程就是一個單獨的資源,沒有其他附屬操作。
傳統的synchronized
package cn.butcher;public class Test02 {public static void main(String[] args) {Ticket ticket = new Ticket();new Thread(() -> {for (int i = 0; i < 50; i++) {ticket.sale();}}).start();new Thread(() -> {for (int i = 0; i < 50; i++) {ticket.sale();}}).start();new Thread(() -> {for (int i = 0; i < 50; i++) {ticket.sale();}}).start();} }class Ticket{int num = 50;public synchronized void sale(){if (num>0){System.out.println(Thread.currentThread().getName()+"賣出了:1張票,剩余:"+(num--));}} }使用Lock鎖
package cn.butcher;import java.util.concurrent.locks.ReentrantLock;public class Test03 {public static void main(String[] args) {Ticket1 ticket = new Ticket1();new Thread(() -> {for (int i = 0; i < 50; i++) {ticket.sale();}}).start();new Thread(() -> {for (int i = 0; i < 50; i++) {ticket.sale();}}).start();new Thread(() -> {for (int i = 0; i < 50; i++) {ticket.sale();}}).start();} }class Ticket1{ReentrantLock lock = new ReentrantLock(true);int num = 50;public void sale(){lock.lock();try {if (num>0){System.out.println(Thread.currentThread().getName()+"賣出了:1張票,剩余:"+(num--));}}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}} }區別:
- synchronized是關鍵字lock是java類
- synchronized不可以獲取鎖的狀態,lock可以
- synchronized會自動釋放鎖,lock必須手動釋放鎖,不然會死鎖
- synchronized 鎖定的資源,線程會阻塞,直到上一個線程使用完,lock不一定
- synchronized不可以中斷,是公平的,lock可以公平可以非公平(根據構造方法決定)
- 適合鎖少量同步代碼,lock鎖大量同步
6、生產者與消費者
線程之間的通信。等待,通知
6.1 synchronized 版
package cn.butcher;public class ProductAndSale {public static void main(String[] args) {Data data = new Data();new Thread(() ->{try {for (int i = 0; i < 100; i++) {data.add();}} catch (InterruptedException e) {e.printStackTrace();}},"線程1").start();new Thread(() ->{try {for (int i = 0; i < 100; i++) {data.reduce();}} catch (InterruptedException e) {e.printStackTrace();}},"線程2").start();} }class Data{int num;synchronized void add() throws InterruptedException {if (num<=0){num++;System.out.println(Thread.currentThread().getName()+"生產了"+num+"個產品");this.notifyAll();}this.wait();}synchronized void reduce() throws InterruptedException {if (num>0){num--;System.out.println(Thread.currentThread().getName()+"消費了"+num+"個產品");this.notifyAll();}this.wait();} }
但是如果線程數量多了,也會出現多產的現象。虛假喚醒
因為我們用了if,將它換成while就能解決這個問題,官方文檔。
6.2 Lock版:
package cn.butcher;import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;public class ProductAndSaleLock {public static void main(String[] args) {Data2 data = new Data2();new Thread(() -> {try {for (int i = 0; i < 100; i++) {data.add();}} catch (InterruptedException e) {e.printStackTrace();}}, "線程1").start();new Thread(() -> {try {for (int i = 0; i < 100; i++) {data.reduce();}} catch (InterruptedException e) {e.printStackTrace();}}, "線程2").start();} }class Data2 {int num;Lock lock = new ReentrantLock(true);Condition condition = null;public Data2() {this.condition = lock.newCondition();}void add() throws InterruptedException {lock.lock();try {while (num <= 0) {num++;System.out.println(Thread.currentThread().getName() + "生產了" + num + "個產品");condition.signalAll();}condition.await();}finally {lock.unlock();}}void reduce() throws InterruptedException {lock.lock();try {while (num > 0) {num--;System.out.println(Thread.currentThread().getName() + "消費了" + num + "個產品");condition.signalAll();}condition.await();}finally {lock.unlock();}} }7、線程順序執行
在之前,我們控制線程的順序執行是一件非常困難的事情,我們不知道如何去喚醒特定的線程,但是使用了Lock以后,這一切變得簡單起來。
Lock中有Condition 條件,其中有兩個方法await()和single()對應我們之前wait()和notify()
Condition在一個Lock里面可以有多個實例!
我們可以給不同的頁面放置不同Condition管理,當線程使用到這個資源的時候,如果當前條件符合,那么就讓它執行,如果不符合,就讓它等待
package cn.butcher;import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;public class SequentialExecution {public static void main(String[] args) {Printer printer = new Printer();new Thread(() ->{for (int i = 0; i < 10; i++) {printer.printA();}}).start();new Thread(() ->{for (int i = 0; i < 10; i++) {printer.printB();}}).start();new Thread(() ->{for (int i = 0; i < 10; i++) {printer.printC();}}).start();}}class Printer{Lock lock = new ReentrantLock();Condition condition1 = lock.newCondition();Condition condition2 = lock.newCondition();Condition condition3 = lock.newCondition();String msg = "A";void printA(){lock.lock();try {while (!msg.equals("A")){condition1.await();}System.out.println(Thread.currentThread().getName()+"=>AAAAAA");msg = "B";condition2.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}void printB(){lock.lock();try {while (!msg.equals("B")){condition2.await();}System.out.println(Thread.currentThread().getName()+"=>BBBBBB");msg = "C";condition3.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}void printC(){lock.lock();try {while (!msg.equals("C")){condition3.await();}System.out.println(Thread.currentThread().getName()+"=>CCCCCC");msg = "A";condition1.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}} }8、關于鎖
9、集合不安全
我們都知道集合有線程安全的,有非線程安全的,例如Vector是線程安全的,ArrayList是非線程安全的,HashTable是線程安全的,HashMap是非線程安全的等,這些線程安全的集合底層基本上都是有synchronized關鍵字修飾的,而這種方式的同步,就必然會影響效率。
比較有趣的是:
是的,java很早就想到了線程安全的重要性,但為了提高效率后面還是出現了ArrayList。
既然說它不安全,那就測試一下。
9.1 測試ArrayList與Vector
public class CollectionNotThreadSafe {public static void main(String[] args) {List<String> arrayList = new ArrayList<>();for (int i = 0; i < 10; i++) {new Thread(() ->{arrayList.add(String.valueOf(UUID.randomUUID()));//ConcurrentModificationException// 并發修改異常System.out.println(arrayList);}).start();}} }果然:
出現了并發修改異常。
如果使用Vector呢?
一切正常。
9.2 將ArrayList轉為線程安全
如果我們非要使用ArrayList如何將它轉成線程安全的呢?java想得非常周到,它給我們提供的集合工具有這么一個方法Collections.synchronizedList(),也就是下面的代碼:
public class CollectionNotThreadSafe {public static void main(String[] args) {List<String> arrayList = Collections.synchronizedList(new ArrayList<>());for (int i = 0; i < 10; i++) {new Thread(() ->{arrayList.add(String.valueOf(UUID.randomUUID()));System.out.println(arrayList);}).start();}} }9.3 JUC中的集合之CopyOnWriteArrayList
JUC中也提供了很多我們常用的集合類,這些集合都是線程安全的。
如:
Concurrent:并發的
- ConcurrentHashMap<K,V>
- ConcurrentLinkedDeque
- CopyOnWriteArraySet
- …
對于CopyOnWriteArrayList,CopyOnWrite直譯為寫入時復制,這是計算機程序設計領域的一種優化策略,某個調用者試圖修改資源的內容時,系統會復制一份專用副本給該調用者,而其他調用者所見到的最初的資源仍然保持不變。
package cn.butcher;import java.util.*; import java.util.concurrent.CopyOnWriteArrayList;public class CollectionNotThreadSafe {public static void main(String[] args) {List<String> copy = new CopyOnWriteArrayList<>();for (int i = 0; i < 10; i++) {new Thread(() ->{copy.add(String.valueOf(UUID.randomUUID()));System.out.println(copy);}).start();}} }這里與Collections.synchronizedList()有什么區別?
通過查看源代碼我們發現:
Collections.synchronizedList()傳入一個List返回了一個由synchronized修飾的List包裝類,這個包裝類時使用了大量的synchronized關鍵字。
而CopyOnWriteArrayList卻不是,查看源碼:
看到了熟悉的ReentrantLock鎖。
transient關鍵字的作用是需要實現Serilizable接口,將不需要序列化的屬性前添加關鍵字transient,序列化對象的時候,這個屬性就不會序列化。
volatile是Java提供的一種輕量級的同步機制。相比于synchronized(synchronized通常稱為重量級鎖),volatile更輕量級,因為它不會引起線程上下文的切換和調度。但是volatile 變量的同步性較差(有時它更簡單并且開銷更低),而且其使用也更容易出錯。
讀的多用CopyOnWriteArrayList,寫多用synchronized。為啥?
其實看源碼時候我們發現了,CopyOnWriteArrayList是每次修改復制了一次數組,這是很耗資源的,而synchronized就沒那么復雜了。但是synchronized會將資源鎖,當一點線程擁有該資源的時候,其他線程只能觀望。。自然讀的效率就沒有那么高了。
10、Callable
Callable接口類似于Runnable ,因為它們都是為其實例可能由另一個線程執行的類設計的。 然而,Runnable不返回結果,也不能拋出被檢查的異常。
但是問題是創建線程要不繼承Thread,要么實現Runnable接口,并沒有Callable,如何實現呢?
Runnable所有已知實現類:
AsyncBoxView.ChildState , ForkJoinWorkerThread , FutureTask , RenderableImageProducer , SwingWorker , Thread , TimerTask
其中的FutureTask 可以傳入Callable接口
如此,我們就可以通過FutureTask將Callable與Thread給勾搭上了。
11、輔助類
11.1 CountDownLatch減法計數器
允許一個或多個線程等待直到在其他線程中執行的一組操作完成的同步輔助。
比如現在模擬一個場景,有10個線程去上廁所,我們要求所有線程都上完廁所才能關門。
我們如果這樣實現:
這顯然不符合我們的需求,現在使用CountDownLatch改造:
public class CountDownLatchTest {public static void main(String[] args) {// 通過構造傳入10作為基數CountDownLatch count = new CountDownLatch(10);for (int i = 0; i < 10; i++) {new Thread(() ->{System.out.println(Thread.currentThread().getName()+"出來了");// 每執行一次,計數器減1count.countDown();},String.valueOf(i)).start();}try {// 計數器沒有歸零,就不往下執行count.await();System.out.println("WC關門了~");} catch (InterruptedException e) {e.printStackTrace();}} }11.2 CyclicBarrier加法計數器
允許一組線程全部等待彼此達到共同屏障點的同步輔助。 循環阻塞在涉及固定大小的線程方的程序中很有用,這些線程必須偶爾等待彼此。 屏障被稱為循環 ,因為它可以在等待的線程被釋放之后重新使用。
加法計數器就是,設定一個初始值,當到達一定的數量時執行某段代碼。
例如拼車,我們需要滿10個人才能發車:
package cn.butcher;import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier;public class CyclicBarrierTest {public static void main(String[] args) {CyclicBarrier barrier = new CyclicBarrier(10,()->{System.out.println("人數已滿拼車成功");});for (int i = 0; i < 10; i++) {new Thread(() ->{System.out.println(Thread.currentThread().getName()+"上車");try {barrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}},String.valueOf(i)).start();}} }11.3 Semaphore 信號量(停車位)
一個計數信號量。 在概念上,信號量維持一組許可證。 如果有必要,每個acquire()都會阻塞,直到許可證可用,然后才能使用它。 每個release()添加許可證,潛在地釋放阻塞獲取方。 但是,沒有使用實際的許可證對象; Semaphore只保留可用數量的計數,并相應地執行。
package cn.butcher;import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit;public class SemaphoreTest {public static void main(String[] args) {Semaphore semaphore = new Semaphore(3);// 有許可證才能進入執行代碼,否則只能阻塞等待for (int i = 0; i < 10; i++) {// 有10輛車要停車但是停車場一次只能停3輛車new Thread(() ->{try {semaphore.acquire();// 獲取許可證,如果滿了,會等待知道有空閑的許可證System.out.println(Thread.currentThread().getName()+"進入停車");TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName()+"離開了");semaphore.release(); // 釋放許可證} catch (InterruptedException e) {e.printStackTrace();}}).start();}} }12、讀寫鎖 ReadWriteLock
ReadWriteLock維護一對關聯的locks ,一個用于只讀操作,一個用于寫入。讀鎖可以由多個線程同時進行,寫鎖只能由一個線程訪問。
也就是我們說的讀鎖(共享鎖),寫鎖(獨占鎖)
所有已知實現類: ReentrantReadWriteLock
我們使用Map模擬我們的數據庫,然后分別各自啟動5個線程對數據庫進行讀寫操作。
看看我們的實現:
public class ReadWriteLockTest {public static void main(String[] args) {MyReadWriteLock db = new MyReadWriteLock();for (int i = 0; i < 5; i++) {final int temp = i;new Thread(() ->{db.write(temp+"",temp+":value");}).start();}for (int i = 0; i < 5; i++) {final int temp = i;new Thread(() ->{System.out.println(db.read(Thread.currentThread().getName()+"讀取成功:"+temp));}).start();}} }class MyReadWriteLock{Map<String, Object> db = new HashMap<>();void write(String key,Object value){System.out.println(Thread.currentThread().getName()+":開始寫入"+key);this.db.put(key,value);System.out.println(Thread.currentThread().getName()+":"+key+"寫入成功");}Object read(String key){System.out.println(Thread.currentThread().getName()+":開始讀取"+key);return this.db.get(key);} }結果:
使用ReadWriteLock改良后:
13、阻塞隊列
BlockingQueue方法有四種形式,具有不同的操作方式:
13.1 拋出異常
static void test01(){BlockingQueue blockingQueue = new LinkedBlockingQueue(3);System.out.println(blockingQueue.add("a"));System.out.println(blockingQueue.add("b"));System.out.println(blockingQueue.add("c"));// System.out.println(blockingQueue.add("d"));// java.lang.IllegalStateException: Queue fullSystem.out.println("==========================");System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());// System.out.println(blockingQueue.remove());// java.util.NoSuchElementException }13.2 返回boolean
static void test02(){BlockingQueue blockingQueue = new LinkedBlockingQueue(3);System.out.println(blockingQueue.offer("a"));System.out.println(blockingQueue.offer("b"));System.out.println(blockingQueue.offer("c"));// System.out.println(blockingQueue.offer("c"));// falseSystem.out.println("===========================");System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());//System.out.println(blockingQueue.poll());// null}13.3 一直阻塞
static void test03() throws InterruptedException {BlockingQueue blockingQueue = new LinkedBlockingQueue(3);blockingQueue.put("a");blockingQueue.put("b");blockingQueue.put("c");//blockingQueue.put("c");// 一直阻塞System.out.println("======================");System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());// System.out.println(blockingQueue.take());// 一直阻塞 }13.4 超時阻塞
static void test04() throws InterruptedException {BlockingQueue blockingQueue = new LinkedBlockingQueue(3);System.out.println(blockingQueue.offer("a", 2, TimeUnit.SECONDS));System.out.println(blockingQueue.offer("b", 2, TimeUnit.SECONDS));System.out.println(blockingQueue.offer("c", 2, TimeUnit.SECONDS));//System.out.println(blockingQueue.offer("d", 2, TimeUnit.SECONDS));// 兩秒后退出System.out.println("==============================");System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));//System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));//兩秒后退出 }13.5 同步隊列 SynchronousQueue
同步隊列不存放數據,一次只有一個,只有里面有值的時候才能take()取出,否則就阻塞直到有值,同樣的要存入必須先確保隊列里面是空的。
static void test05(){BlockingQueue blockingQueue = new SynchronousQueue();new Thread(()->{try {System.out.println(Thread.currentThread().getName()+"放入a");blockingQueue.put("a");// 先打印再放,線程的速度比打印快,先放的話下面取的形成就能拿到了,會很詭異System.out.println(Thread.currentThread().getName()+"放入b");blockingQueue.put("b");System.out.println(Thread.currentThread().getName()+"放入c");blockingQueue.put("c");} catch (InterruptedException e) {e.printStackTrace();}}).start();new Thread(()->{try {TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName()+"取出"+blockingQueue.take());TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName()+"取出"+blockingQueue.take());TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName()+"取出"+blockingQueue.take());} catch (InterruptedException e) {e.printStackTrace();}}).start(); }14、線程池
池的作用:節省系統開銷,創建和消費資源開銷非常大
如:常量池、數據庫連接池、內存池。。。
14.1 創建線程三個方法
通過Executors工具類創建線程池
都使用下面的循環來測試
for (int i = 0; i < 10; i++) {final int j = i;// 使用線程池創建線程執行代碼executor.execute(()->{System.out.println(Thread.currentThread().getName()+"執行"+j);});}14.2 自定義線程池(7大參數)
在上面,我們使用工具類去創建線程,進入源碼我們發現,它內部也是使用的ThreadPoolExecutor創建的線程池。但是,這樣做是有弊端的,阿里巴巴開發手冊上面說明了:
OOM為out of memory的簡稱,稱之為內存溢出。所以我們需要自定義線程池。
通過源碼我們可以看到,自定義線程有7大參數,分別是:
以上參數的意思是,默認創建兩個核心線程,不管有沒有人用,都創建。最大5個線程全開,阻塞隊列里可以放5個人,如果處理的人數超過7個人(核心兩個,阻塞5個),就增加線程去處理,如果超過了最大的處理能力10個,拒接策略就生效。
14.3 四大拒絕策略
拒絕策略是實現了RejectedExecutionHandler接口的類,目前有四個實現類
14.4 最大線程如何定義
開頭的時候說了,線程有并行和并發,并行的效率是最高的,我們將線程池的最大數量設置為服務器支持的最大線程數量,可以保證線程的效率最高!線程數量超過了CPU的核心數,就沒那么快了。
IO操作是非常耗時的,如果有10個大型任務,我們最好分配10個線程去執行,為了不阻塞,我們一般會在這個基礎上在加一倍,保證在執行著10個大型任務的時候,其他任務不阻塞,當然具體多分配幾個,依然視情況而定。
總之線程不是越多越好,當超過CPU支持的并行數量,線程的執行效率就會下降,線程越多效率越低。
15、Fork/Join 分支合并,將一個任務分解讓多個線程執行
Java 7開始引入了一種新的Fork/Join線程池,它可以執行一種特殊的任務:把一個大任務拆成多個小任務并行執行。
如何使用?
使用ForkJoinPool線程池創建
ForkJoinPool forkJoinPool = new ForkJoinPool();submit有返回值,其中可以傳入四種類型的參數:
這里我們需要ForkJoinTask
所以我們需要創建一個類去繼承ForkJoinTask的子類
9次運行結果比較
暴力結果:2205000001050000000 耗時:788 ForkJoin結果:2205000001050000000 耗時:814 Stream結果:2205000001050000000 耗時:382暴力結果:2205000001050000000 耗時:794 ForkJoin結果:2205000001050000000 耗時:906 Stream結果:2205000001050000000 耗時:374暴力結果:2205000001050000000 耗時:849 ForkJoin結果:2205000001050000000 耗時:704 Stream結果:2205000001050000000 耗時:369暴力結果:2205000001050000000 耗時:825 ForkJoin結果:2205000001050000000 耗時:470 Stream結果:2205000001050000000 耗時:441暴力結果:2205000001050000000 耗時:832 ForkJoin結果:2205000001050000000 耗時:845 Stream結果:2205000001050000000 耗時:390暴力結果:2205000001050000000 耗時:786 ForkJoin結果:2205000001050000000 耗時:523 Stream結果:2205000001050000000 耗時:409暴力結果:2205000001050000000 耗時:913 ForkJoin結果:2205000001050000000 耗時:505 Stream結果:2205000001050000000 耗時:392暴力結果:2205000001050000000 耗時:834 ForkJoin結果:2205000001050000000 耗時:670 Stream結果:2205000001050000000 耗時:388暴力結果:2205000001050000000 耗時:894 ForkJoin結果:2205000001050000000 耗時:962 Stream結果:2205000001050000000 耗時:358ForkJoin確實不穩定哈,但是如果任務拆分更細一些呢?
class AddPlus extends RecursiveTask<Long> {Long start;Long end;public AddPlus(Long start, Long end) {this.start = start;this.end = end;}@Overrideprotected Long compute() {long sum = 0L;if ((end - start) < 10_0000L) {for (long i = start; i <= end; i++) {sum += i;}return sum;} else {long splice = (end + start) / 4;// 拆分任務AddPlus addPlus1 = new AddPlus(start, splice);addPlus1.fork(); // 將這個addPlus1任務壓入線程隊列AddPlus addPlus2 = new AddPlus(splice + 1, splice*2);addPlus2.fork(); // 將這個addPlus2任務壓入線程隊列AddPlus addPlus3 = new AddPlus(splice*2+1, splice*3);addPlus3.fork(); // 將這個addPlus3任務壓入線程隊列AddPlus addPlus4 = new AddPlus(splice*3+1, end);addPlus4.fork(); // 將這個addPlus4任務壓入線程隊列return addPlus1.join() + addPlus2.join() + addPlus3.join()+ addPlus4.join();}} }奇跡發生了:
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.StackOverflowErrorat java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1006)at cn.butcher.ForkJoinTest.test02(ForkJoinTest.java:85)at cn.butcher.ForkJoinTest.main(ForkJoinTest.java:15) Caused by: java.lang.StackOverflowErrorat sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)at java.lang.reflect.Constructor.newInstance(Constructor.java:423)at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)at java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1005)這個問題還沒得解決掉,有人知道嗎?
總結
以上是生活随笔為你收集整理的JUC编程java多线程并发详细总结的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ERJ | 马来西亚三城室内环境微生物/
- 下一篇: vue:hadoop@1.0.0 dev