java如何阻塞和同步_同步与异步,阻塞与非阻塞
今天早上關注了這個問題,剛抽出時間大概整理下,以下僅是個人理解:
一定要多看幾遍代碼并結合文字理解下
引0、從I/O說起
這些概念之所以容易令人迷惑,在于很多人對I/O就沒有清晰準確的理解,后面的理解自然不可能正確。我想用一個具體的例子來說明一下I/O。
設想自己是一個進程,就叫小進吧。小進需要接收一個輸入,我們不管這個輸入是從網絡套接字來,還是鍵盤,鼠標來,輸入的來源可以千千萬萬。但是,都必須由內核來幫小進完成,為啥內核這么霸道?因為計算機上運行的可不只是咱小進一個進程,還有很多進程。這些進程兄弟也可能需要從這些輸入設備接收輸入,沒有內核居中協調,豈不是亂套。
從小進的角度看,內核幫助它完成輸入,其實包括三個步驟:內核替小進接收好數據,這些數據暫時存在內核的內存空間
內核將數據從自己的內存空間復制到小進的內存空間
告訴小進,輸入數據來了,趕快讀吧
這三步看似挺簡單,其實在具體實現時,有很多地方需要考慮:小進如何告訴內核自己要接收一個輸入?
內核接到小進的請求,替小進接收好數據這段時間, 小進咋辦?
內核在將數據復制到小進的內存空間這段時間,小進咋辦?
到底什么時候告訴小進數據準備好了,是在內核接收好數據之后就告訴小進,還是在將數據復制到小進的內存空間之后再告訴他?
內核以什么樣的方式告訴小進,數據準備好了?
1、阻塞式I/O模型
對上面5個問題,最簡單的解決方案就是阻塞式I/O模型,它的過程是這樣的:
小進:內核內核,我要接收一個鍵盤輸入,快點幫我完成!
內核:好咧!biubiu!一個阻塞丟給小進,小進頓時石化,就像被孫悟空點了定一樣。
就這樣,小進在石化中,時間一點點流逝。終于,內核收到了數據。
內核:數據終于來了,我要開干了!duang duang duang,先把數據存在自己的內核空間,然后又復制到小進的用戶空間。
內核:biubiu!一個解除阻塞丟給小進,小進瞬間復活,小進的記憶還是停留在讓內核幫他接收輸入時。
小進:哇!內核真靠譜,數據已經有了!干活去!
我們可以看到,小進發出接收輸入的請求給內核開始,就處于阻塞狀態,直到內核將數據復制到小進的用戶空間,小進才解除阻塞。
2、非阻塞式I/O
小進發現,阻塞式I/O中,自己總要被阻塞好久,好不爽啊,于是小進改用了非阻塞式I/O,其過程是這樣的:
小進:內核內核,我要接收一個輸入,趕緊幫我看看,數據到了沒有,先說好,不要阻塞我。
內核:查看了一下自己的內核空間,沒有發現數據,于是迅速告訴小進,沒有呢!并繼續幫小進等著數據。
如此這樣,小進不斷地問內核,終于,過了一段時間,小進再一次詢問時,內核往自己的空間中一查,呦!數據來了,不勝其煩的內核迅速告訴小進,數據好了!
小進:快給我!
內核:biu!一個阻塞丟給小進,悲催的小進還是石化了!
內核趕緊將自己空間的輸入數據復制到小進的用戶空間,復制好后。
內核:biu!一個非阻塞丟給小進,小進立馬復活
小進:哇!數據來了,啥也不說,干活!
我們看到,所謂的非阻塞I/O,其實在內核將數據從內核空間復制到小進的用戶空間時,小進還是被阻塞的。
3、信號驅動式I/O
非阻塞I/O中,小進不停地問內核,數據好了沒有啊,內核感覺太煩了,于是想出一個好辦法。
內核告訴小進,本內核升級了,如果想要我替你接收輸入,請先注冊一個信號處理函數,等數據準備好時,我會發信號給你。于是,現在的流程是這樣的:
小進:注冊信號處理函數,告訴內核,自己要接收一個輸入,然后繼續干活!
內核:收到函數,開始執行數據接收
接收完成時,給小進發送信號,信號處理函數收到信號,開始向內核發送讀數據請求
內核:biu!阻塞了小進,并把數據從內核空間復制到小進的用戶空間。
內核:biu!解除了阻塞
小進:哇!數據來了!啥也不說,干活去!
4、異步I/O
上面的三種I/O解決方案中,小進都被阻塞了,只不過是阻塞時間長短不一樣,第一種方案中小進被阻塞的時間長一些,在內核接收數據以及將數據復制到小進的用戶空間時,都被阻塞。
第二、第三種方案中,只在內核將數據從內核空間復制到小進的用戶空間時,小進才被阻塞。
我們現在說的異步I/O,目的就是讓小進絕對不被阻塞。其過程是這樣的:
小進:內核內核,我要接收一個輸入,弄好了告訴我。同時將一個信號和信號處理函數告訴內核,然后繼續干自己的活了。
內核:得了您嘞,您先忙。
一直到內核接收到數據并將數據從內核空間復制到小進的用戶空間后,內核才給小進發送信號。小進在信號處理函數中可以直接處理數據。
踐
1、阻塞式I/O式
客戶端代碼public class Client {
public static void main(String[] args) {
Socket socket = null;
try {
System.out.println("socket begin " + System.currentTimeMillis());
// 隨機綁定本地地址與端口
socket = new Socket("localhost", 8888);
System.out.println("socket end " + System.currentTimeMillis());
OutputStream os = socket.getOutputStream();
Random ran = new Random();
for (int n = 0; n < 10; n++) {
System.out.println("send message " + n);
os.write(("hello server form " + socket.getLocalAddress().getHostAddress() + " - " + n).getBytes());
try {
TimeUnit.SECONDS.sleep(ran.nextInt(10));
} catch (InterruptedException e) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (socket != null) {
// 自動關閉綁定流
socket.close();
}
System.out.println("exit");
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
服務端代碼public class Server {
public static void main(String[] args) {
ServerSocket serverSocket = null;
Socket socket = null;
ThreadPoolExecutor executor = null;
try {
// 初始化線程池
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
0L, TimeUnit.SECONDS, new LinkedBlockingDeque(), new ThreadBuilder());
// 監聽通配符地址
serverSocket = new ServerSocket(8888);
System.out.println("accept begin " + System.currentTimeMillis());
while ((socket = serverSocket.accept()) != null) {
executor.execute(new Task(socket));
}
System.out.println("accept end " + System.currentTimeMillis());
} catch (IOException e) {
e.printStackTrace();
} finally {
executor.shutdown();
try {
if (serverSocket != null) {
serverSocket.close();
}
System.out.println("exit");
} catch (IOException e) {
e.printStackTrace();
}
}
}
static class ThreadBuilder implements ThreadFactory {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "server-thread-" + counter.getAndIncrement());
thread.setUncaughtExceptionHandler((t, e) -> {
if (e instanceof TaskException) {
System.err.println(t.getName() + "|" + e.getCause().getMessage());
} else {
System.err.println(t.getName() + "|" + e.getMessage());
}
});
return thread;
}
}
static class Task implements Runnable {
private byte[] buffer = new byte[10 * 1024];
private Socket socket;
public Task(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
InputStream is = socket.getInputStream();
System.out.println("--------------------------------------------------");
System.out.println("read begin " + System.currentTimeMillis());
System.out.println("***");
int len = is.read(buffer);// 呈阻塞效果
while (len != -1) {
String str = new String(buffer, 0, len);
System.out.println(str);
len = is.read(buffer);
}
System.out.println("***");
System.out.println("read end " + System.currentTimeMillis());
System.out.println("--------------------------------------------------");
} catch (IOException e) {
throw new TaskException(e);
} finally {
if (socket != null) {
try {
// 自動關閉綁定流
socket.close();
System.out.println("socket closed");
} catch (IOException e) {
System.err.println("socket close failed");
}
}
}
}
}
static class TaskException extends RuntimeException {
public TaskException(Throwable cause) {
super(cause);
}
}
}
2、非阻塞式I/O
客戶端代碼同上
服務端代碼public class Server {
public static void main(String[] args) {
ServerSocketChannel serverSocket = null;
SocketChannel socket = null;
ThreadPoolExecutor executor = null;
try {
// 初始化線程池
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
0L, TimeUnit.SECONDS, new LinkedBlockingDeque(), new ThreadBuilder());
serverSocket = ServerSocketChannel.open();
// 設置阻塞
serverSocket.configureBlocking(true);
// 監聽通配符地址
serverSocket.bind(new InetSocketAddress(8888));
System.out.println("accept begin " + System.currentTimeMillis());
while ((socket = serverSocket.accept()) != null) {
// 設置非阻塞
socket.configureBlocking(false);
executor.execute(new Task(socket));
}
System.out.println("accept end " + System.currentTimeMillis());
} catch (IOException e) {
e.printStackTrace();
} finally {
executor.shutdown();
try {
if (serverSocket != null) {
serverSocket.close();
}
System.out.println("exit");
} catch (IOException e) {
e.printStackTrace();
}
}
}
static class ThreadBuilder implements ThreadFactory {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "server-thread-" + counter.getAndIncrement());
thread.setUncaughtExceptionHandler((t, e) -> {
if (e instanceof TaskException) {
System.err.println(t.getName() + "|" + e.getCause().getMessage());
} else {
System.err.println(t.getName() + "|" + e.getMessage());
}
});
return thread;
}
}
static class Task implements Runnable {
private ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
private SocketChannel socket;
public Task(SocketChannel socket) {
this.socket = socket;
}
@Override
public void run() {
try {
System.out.println("--------------------------------------------------");
System.out.println("read begin " + System.currentTimeMillis());
System.out.println("***");
socket.read(buffer);// 呈阻塞效果
while (true) {
if (buffer.position() == 0) {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
continue;
}
} else {
buffer.flip();
String str = new String(buffer.array(), 0, buffer.limit());
System.out.println(str);
if ("exit".equals(str)) {
break;
}
buffer.clear();
}
socket.read(buffer);
}
System.out.println("***");
System.out.println("read end " + System.currentTimeMillis());
System.out.println("--------------------------------------------------");
} catch (IOException e) {
throw new TaskException(e);
} finally {
if (socket != null) {
try {
// 自動關閉綁定流
socket.close();
System.out.println("socket closed");
} catch (IOException e) {
System.err.println("socket close failed");
}
}
}
}
}
static class TaskException extends RuntimeException {
public TaskException(Throwable cause) {
super(cause);
}
}
}
3、多路復用式I/O(基于非阻塞式I/O)
客戶端代碼同上
服務端代碼public class Server {
public static void main(String[] args) {
Selector selector = null;
ServerSocketChannel serverSocket = null;
SocketChannel socket = null;
ThreadPoolExecutor executor = null;
try {
// 初始化線程池
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
0L, TimeUnit.SECONDS, new LinkedBlockingDeque(), new ThreadBuilder());
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
// 設置非阻塞
serverSocket.configureBlocking(false);
// 監聽通配符地址
serverSocket.bind(new InetSocketAddress(8888));
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("accept begin " + System.currentTimeMillis());
while (true) {
selector.select();
Iterator iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
// 設置非阻塞
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
executor.execute(new Task(socketChannel));
key.cancel();
} else {
// TODO 寫事件注冊
}
iterator.remove();
}
}
// System.out.println("accept end " + System.currentTimeMillis());
} catch (IOException e) {
e.printStackTrace();
} finally {
executor.shutdown();
try {
if (serverSocket != null) {
serverSocket.close();
}
System.out.println("exit");
} catch (IOException e) {
e.printStackTrace();
}
}
}
static class ThreadBuilder implements ThreadFactory {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "server-thread-" + counter.getAndIncrement());
thread.setUncaughtExceptionHandler((t, e) -> {
if (e instanceof TaskException) {
System.err.println(t.getName() + "|" + e.getCause().getMessage());
} else {
System.err.println(t.getName() + "|" + e.getMessage());
}
});
return thread;
}
}
static class Task implements Runnable {
private ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
private SocketChannel socket;
public Task(SocketChannel socket) {
this.socket = socket;
}
@Override
public void run() {
try {
System.out.println("--------------------------------------------------");
System.out.println("read begin " + System.currentTimeMillis());
System.out.println("***");
socket.read(buffer);// 呈阻塞效果
while (true) {
if (buffer.position() == 0) {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
continue;
}
} else {
buffer.flip();
String str = new String(buffer.array(), 0, buffer.limit());
System.out.println(str);
if ("exit".equals(str)) {
break;
}
buffer.clear();
}
socket.read(buffer);
}
System.out.println("***");
System.out.println("read end " + System.currentTimeMillis());
System.out.println("--------------------------------------------------");
} catch (IOException e) {
throw new TaskException(e);
} finally {
if (socket != null) {
try {
// 自動關閉綁定流
socket.close();
System.out.println("socket closed");
} catch (IOException e) {
System.err.println("socket close failed");
}
}
}
}
}
static class TaskException extends RuntimeException {
public TaskException(Throwable cause) {
super(cause);
}
}
}
4、信號驅動式I/O
JAVA沒有實現
5、異步I/O
客戶端代碼同上
服務端代碼public class Server {
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8888));
serverSocketChannel.accept(null, new CompletionHandler() {
public void completed(AsynchronousSocketChannel asc, Void att) {
serverSocketChannel.accept(null, this);
ByteBuffer byteBuffer = ByteBuffer.allocate(10 * 1024);
asc.read(byteBuffer, null, new CompletionHandler() {
@Override
public void completed(Integer result, Void attachment) {
byteBuffer.flip();
System.out.println(new String(byteBuffer.array(), 0, byteBuffer.limit()));
byteBuffer.clear();
try {
asc.close();
} catch (IOException e) {
}
}
@Override
public void failed(Throwable exc, Void attachment) {
}
});
}
public void failed(Throwable exc, Void att) {
}
});
for (; ; ) {
}
}
}
結
牛奶工送牛奶場景阻塞式:每天早上自己去小區門口等牛奶工
非阻塞式:每天早上在家從窗戶早上隔3分鐘看看牛奶工到了沒,到了的話去拿
多路復用式:每天早上由小區門衛室接待所有牛奶工,到了會給住戶發短信,你馬上去拿
信號驅動式:每天早上牛奶工到了會給你發短信,你馬上去拿
異步式:每天早上牛奶工直接放到小區住戶牛奶柜并發短信,不需要現在去拿
它
程序分為CPU計算型和I/O讀寫型,線程尤其是被內核調度的線程是及其珍貴的資源(JAVA計劃在JDK將來的版本實現由JVM”自己“調度的輕型線程),在有限的線程資源下CPU計算型程序不但不會有明顯提升,反而由于頻繁的上下文切換導致性能下降(這也是Redis這種基于內存的數據庫采用單工作線程并且速度非常快的原因,另一個重要的原因是單線程導致了不用為共享資源給線程加/解鎖造成人為阻塞),而在I/O讀寫型的程序中,多線程工作在以上五種模式下性能是逐步提升的(最后多說一句,還是以Redis舉例,不管是Jedis-Pool這種池化客戶端還是Lettuce這種單連接客戶端,當多用戶接入Redis服務器時一定是多連接的,這時候就要用到多路復用來處理用戶請求了,至于為什么沒有用異步,一個原因是工作線程是單線程,另一個原因是異步I/O模型在性能提升方面有限并且復雜度高,以至于Netty在新版本的包中把這種模式刪除了)
總結
以上是生活随笔為你收集整理的java如何阻塞和同步_同步与异步,阻塞与非阻塞的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: android content item
- 下一篇: ENSP配置 实例八 三层交换机DHC