java openresty 调用_玩转 OpenResty 协程 API
注意:本文中列出的所有代碼只是 Proof Of Concept,基本上都沒有進行錯誤處理。另外對于一些邊際情況,也可能沒有考慮清楚。所以對于直接復制文中代碼到項目中所造成的一切后果,請自負責任。
OK,言歸正題。OpenResty 提供了以 ngx.thread.*,coroutine.* 和 ngx.semaphore 等一系列協程 API。雖然受限于 Nginx 的請求處理方式,表現力不如通用語言的協程 API 那么強大。但是開開腦洞,還是可以玩出一些花樣來的。
借助這些 API,讓我們嘗試模擬下其他編程平臺里面的調度方式。
模擬 Java 里面的 Future
Java 里的 Future 可以讓我們創建一個任務,然后在需要的時候才去 get 任務的返回值。另外 Future 還有超時功能。
我們可以啟用一個協程來完成具體的任務,再加一個定時結束的協程,用于實現超時。
像這樣:
local function task()
ngx.sleep(3)
ngx.say("Done")
end
local task_thread = ngx.thread.spawn(task)
local timeout_thread = ngx.thread.spawn(function(timeout)
ngx.sleep(timeout)
error("timeout")
end, 2)
local ok, res = ngx.thread.wait(task_thread, timeout_thread)
if not ok then
if res == "timeout" then
ngx.thread.kill(task_thread)
ngx.say("task cancelled by timeout")
return
end
ngx.say("task failed, result: ", res)
end
ngx.thread.kill(timeout_thread)
注意一點,在某一協程退出之后,我們需要 kill 掉另外一個協程。因為如果沒有調用 ngx.exit 之類的方法顯式退出的話,一直到所有協程退出為止,當前階段都不會結束。
引用文檔里相關的內容:
By default, the corresponding Nginx handler (e.g., rewrite_by_lua handler) will not terminate until
both the "entry thread" and all the user "light threads" terminates,
a "light thread" (either the "entry thread" or a user "light thread" aborts by calling ngx.exit, ngx.exec, ngx.redirect, or ngx.req.set_uri(uri, true), or
the "entry thread" terminates with a Lua error.
模擬 Javascript 里面的 Promise.race/all
Promise.race/all 可以接收多個 Promise,然后打包成一個新的 Promise 返回。引用相關的文檔:
The Promise.race(iterable) method returns a promise that resolves or rejects as soon as one of the promises in the iterable resolves or rejects, with the value or reason from that promise.
The Promise.all(iterable) method returns a promise that resolves when all of the promises in the iterable argument have resolved, or rejects with the reason of the first passed promise that rejects.
這里 reject 等價于協程運行中拋出 error,而 resolve 相對于協程返回了結果。這兩個 API 對于 reject 的處理是一致的,都是有任一出錯則立刻返回異常結果。對于正常結果,race 會在第一個結果出來時返回,而 all 則會在所有結果都出來后返回。
值得注意的是,Javascript 原生的 Promise 暫時沒有 cancell 的功能。所以即使其中一個 Promise reject 了,其他 Promise 依然會繼續運行。對此我們也照搬過來。
Promise.race 的實現:
local function apple()
ngx.sleep(0.1)
--error("apple lost")
return "apple done"
end
local function banana()
ngx.sleep(0.2)
return "banana done"
end
local function carrot()
ngx.sleep(0.3)
return "carrot done"
end
local function race(...)
local functions = {...}
local threads = {}
for _, f in ipairs(functions) do
local th, err = ngx.thread.spawn(f)
if not th then
-- Promise.race 沒有實現 cancell 接口,
-- 所以我偷下懶,不管已經創建的協程了
return nil, err
end
table.insert(threads, th)
end
local ok, res = ngx.thread.wait(unpack(threads))
if not ok then
return nil, res
end
return res
end
local res, err = race(apple, banana, carrot)
ngx.say("res: ", res, " err: ", err)
ngx.exit(ngx.OK)
Promise.all 的實現:
local function all(...)
local functions = {...}
local threads = {}
for _, f in ipairs(functions) do
local th, err = ngx.thread.spawn(f)
if not th then
return nil, err
end
table.insert(threads, th)
end
local res_group = {}
for _ = 1, #threads do
local ok, res = ngx.thread.wait(unpack(threads))
if not ok then
return nil, res
end
table.insert(res_group, res)
end
return res_group
end
模擬 Go 里面的 channel (僅部分實現)
再進一步,試試模擬下 Go 里面的 channel。
我們需要實現如下的語義:
當數據沒有被消費時,生產者會在發送數據之后中斷運行。
當數據沒有被生產時,消費者會在接收數據之前中斷運行。
當存在等待消費者接收數據的生產者時,其他生產者會在發送數據之前中斷運行。
這次要用到 ngx.semaphore。
local semaphore = require "ngx.semaphore"
local Chan = {
new = function(self)
local chan_attrs = {
_read_sema = semaphore:new(),
_write_sema = semaphore:new(),
_exclude_sema = semaphore:new(),
_buffer = nil,
_waiting_thread_num = 0,
}
return setmetatable(chan_attrs, {__index = self})
end,
send = function(self, value, timeout)
timeout = timeout or 60
while self._buffer do
self._waiting_thread_num = self._waiting_thread_num + 1
self._exclude_sema:wait(timeout)
self._waiting_thread_num = self._waiting_thread_num - 1
end
self._buffer = value
self._read_sema:post()
self._write_sema:wait(timeout)
end,
receive = function(self, timeout)
timeout = timeout or 60
self._read_sema:wait(timeout)
local value = self._buffer
self._buffer = nil
self._write_sema:post()
if self._waiting_thread_num > 0 then
self._exclude_sema:post()
end
return value
end,
}
local chan = Chan:new()
-- 以下是使用方法
local function worker_a(ch)
for i = 1, 10 do
ngx.sleep(math.random() / 10)
ch:send(i, 1)
end
end
local function worker_c(ch)
for i = 11, 20 do
ngx.sleep(math.random() / 10)
ch:send(i, 1)
end
end
local function worker_d(ch)
for i = 21, 30 do
ngx.sleep(math.random() / 10)
ch:send(i, 1)
end
end
local function worker_b(ch)
for _ = 1, 20 do
ngx.sleep(math.random() / 10)
local v = ch:receive(1)
ngx.say("recv ", v)
end
end
local function worker_e(ch)
for _ = 1, 10 do
ngx.sleep(math.random() / 10)
local v = ch:receive(1)
ngx.say("recv ", v)
end
end
ngx.thread.spawn(worker_a, chan)
ngx.thread.spawn(worker_b, chan)
ngx.thread.spawn(worker_c, chan)
ngx.thread.spawn(worker_d, chan)
ngx.thread.spawn(worker_e, chan)
模擬 Buffered channel 也是可行的。
local ok, new_tab = pcall(require, "table.new")
if not ok then
new_tab = function (_, _) return {} end
end
local BufferedChan = {
new = function(self, buffer_size)
if not buffer_size or buffer_size <= 0 then
error("Invalid buffer_size " .. (buffer_size or "nil") .. " given")
end
local chan_attrs = {
_read_sema = semaphore:new(),
_write_sema = semaphore:new(),
_waiting_thread_num = 0,
_buffer_size = buffer_size,
}
chan_attrs._buffer = new_tab(buffer_size, 0)
return setmetatable(chan_attrs, {__index = self})
end,
send = function (self, value, timeout)
timeout = timeout or 60
while #self._buffer >= self._buffer_size do
self._waiting_thread_num = self._waiting_thread_num + 1
self._write_sema:wait(timeout)
self._waiting_thread_num = self._waiting_thread_num - 1
end
table.insert(self._buffer, value)
self._read_sema:post()
end,
receive = function(self, timeout)
timeout = timeout or 60
self._read_sema:wait(timeout)
local value = table.remove(self._buffer)
if self._waiting_thread_num > 0 then
self._write_sema:post()
end
return value
end,
}
local chan = BufferedChan:new(2)
-- ...
當然上面的山寨貨還是有很多問題的。比如它缺少至關重要的 select 支持,另外也沒有實現 close 相關的特性。
總結
以上是生活随笔為你收集整理的java openresty 调用_玩转 OpenResty 协程 API的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python 登录接口_使用python
- 下一篇: 巨坑!这公司的行为,挺适合清明节!