mirror of
https://github.com/koreader/koreader.git
synced 2025-08-10 00:52:38 +00:00
add async http client
it uses non-blocking turbo I/O looper to process http request so that multiple http request can be handled simultaneously and http request won't block user input, and most importantly, in Lua's way.
This commit is contained in:
2
base
2
base
Submodule base updated: 9aa76dc818...8d31203c7f
52
frontend/httpclient.lua
Normal file
52
frontend/httpclient.lua
Normal file
@@ -0,0 +1,52 @@
|
||||
local UIManager = require("ui/uimanager")
|
||||
local DEBUG = require("dbg")
|
||||
|
||||
local HTTPClient = {
|
||||
headers = {},
|
||||
input_timeouts = 0,
|
||||
INPUT_TIMEOUT = 100*1000,
|
||||
}
|
||||
|
||||
function HTTPClient:new()
|
||||
local o = {}
|
||||
setmetatable(o, self)
|
||||
self.__index = self
|
||||
return o
|
||||
end
|
||||
|
||||
function HTTPClient:addHeader(header, value)
|
||||
self.headers[header] = value
|
||||
end
|
||||
|
||||
function HTTPClient:removeHeader(header)
|
||||
self.headers[header] = nil
|
||||
end
|
||||
|
||||
function HTTPClient:request(request, response_callback, error_callback)
|
||||
request.on_headers = function(headers)
|
||||
for header, value in pairs(self.headers) do
|
||||
headers[header] = value
|
||||
end
|
||||
end
|
||||
request.connect_timeout = 10
|
||||
request.request_timeout = 20
|
||||
UIManager:initLooper()
|
||||
UIManager:handleTask(function()
|
||||
-- avoid endless waiting for input
|
||||
UIManager.INPUT_TIMEOUT = self.INPUT_TIMEOUT
|
||||
self.input_timeouts = self.input_timeouts + 1
|
||||
local turbo = require("turbo")
|
||||
local res = coroutine.yield(
|
||||
turbo.async.HTTPClient():fetch(request.url, request))
|
||||
-- reset INPUT_TIMEOUT to nil when all HTTP requests are fullfilled.
|
||||
self.input_timeouts = self.input_timeouts - 1
|
||||
UIManager.INPUT_TIMEOUT = self.input_timeouts > 0 and self.INPUT_TIMEOUT or nil
|
||||
if res.error and error_callback then
|
||||
error_callback(res)
|
||||
elseif response_callback then
|
||||
response_callback(res)
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
return HTTPClient
|
||||
@@ -6,6 +6,7 @@ local Geom = require("ui/geometry")
|
||||
local util = require("ffi/util")
|
||||
local DEBUG = require("dbg")
|
||||
local _ = require("gettext")
|
||||
local ffi = require("ffi")
|
||||
|
||||
-- there is only one instance of this
|
||||
local UIManager = {
|
||||
@@ -246,6 +247,7 @@ end
|
||||
function UIManager:quit()
|
||||
DEBUG("quit uimanager")
|
||||
self._running = false
|
||||
self._run_forever = nil
|
||||
for i = #self._window_stack, 1, -1 do
|
||||
table.remove(self._window_stack, i)
|
||||
end
|
||||
@@ -256,6 +258,10 @@ function UIManager:quit()
|
||||
self._zeromqs[i]:stop()
|
||||
table.remove(self._zeromqs, i)
|
||||
end
|
||||
if self.looper then
|
||||
self.looper:close()
|
||||
self.looper = nil
|
||||
end
|
||||
end
|
||||
|
||||
-- transmit an event to registered widgets
|
||||
@@ -430,75 +436,112 @@ function UIManager:_repaint()
|
||||
self.refresh_counted = false
|
||||
end
|
||||
|
||||
function UIManager:handleInput()
|
||||
local wait_until, now
|
||||
-- run this in a loop, so that paints can trigger events
|
||||
-- that will be honored when calculating the time to wait
|
||||
-- for input events:
|
||||
repeat
|
||||
wait_until, now = self:_checkTasks()
|
||||
|
||||
--DEBUG("---------------------------------------------------")
|
||||
--DEBUG("exec stack", self._execution_stack)
|
||||
--DEBUG("window stack", self._window_stack)
|
||||
--DEBUG("dirty stack", self._dirty)
|
||||
--DEBUG("---------------------------------------------------")
|
||||
|
||||
-- stop when we have no window to show
|
||||
if #self._window_stack == 0 and not self._run_forever then
|
||||
DEBUG("no dialog left to show")
|
||||
self:quit()
|
||||
return nil
|
||||
end
|
||||
|
||||
self:_repaint()
|
||||
until not self._execution_stack_dirty
|
||||
|
||||
-- wait for next event
|
||||
-- note that we will skip that if we have tasks that are ready to run
|
||||
local input_event = nil
|
||||
if not wait_until then
|
||||
if #self._zeromqs > 0 then
|
||||
-- pending message queue, wait 100ms for input
|
||||
input_event = Input:waitEvent(1000*100)
|
||||
if not input_event or input_event.handler == "onInputError" then
|
||||
for _, zeromq in ipairs(self._zeromqs) do
|
||||
input_event = zeromq:waitEvent()
|
||||
if input_event then break end
|
||||
end
|
||||
end
|
||||
else
|
||||
-- no pending task, wait without timeout
|
||||
input_event = Input:waitEvent(self.INPUT_TIMEOUT)
|
||||
end
|
||||
elseif wait_until[1] > now[1]
|
||||
or wait_until[1] == now[1] and wait_until[2] > now[2] then
|
||||
local wait_for = { s = wait_until[1] - now[1], us = wait_until[2] - now[2] }
|
||||
if wait_for.us < 0 then
|
||||
wait_for.s = wait_for.s - 1
|
||||
wait_for.us = 1000000 + wait_for.us
|
||||
end
|
||||
-- wait until next task is pending
|
||||
input_event = Input:waitEvent(wait_for.us, wait_for.s)
|
||||
end
|
||||
|
||||
-- delegate input_event to handler
|
||||
if input_event then
|
||||
local handler = self.event_handlers[input_event]
|
||||
if handler then
|
||||
handler(input_event)
|
||||
else
|
||||
self.event_handlers["__default__"](input_event)
|
||||
end
|
||||
end
|
||||
|
||||
-- handle next input
|
||||
self:handleTask(function() self:handleInput() end)
|
||||
end
|
||||
|
||||
-- handle task(callback function) in Turbo I/O looper
|
||||
-- or run task immediately if looper is not available
|
||||
function UIManager:handleTask(task)
|
||||
if self.looper then
|
||||
DEBUG("handle task in turbo I/O looper")
|
||||
self.looper:add_callback(task)
|
||||
else
|
||||
DEBUG("run task")
|
||||
task()
|
||||
end
|
||||
end
|
||||
|
||||
function UIManager:initLooper()
|
||||
if not self.looper then
|
||||
TURBO_SSL = true
|
||||
local turbo = require("turbo")
|
||||
self.looper = turbo.ioloop.instance()
|
||||
end
|
||||
end
|
||||
|
||||
-- this is the main loop of the UI controller
|
||||
-- it is intended to manage input events and delegate
|
||||
-- them to dialogs
|
||||
function UIManager:run()
|
||||
self._running = true
|
||||
while self._running do
|
||||
local wait_until, now
|
||||
-- run this in a loop, so that paints can trigger events
|
||||
-- that will be honored when calculating the time to wait
|
||||
-- for input events:
|
||||
repeat
|
||||
wait_until, now = self:_checkTasks()
|
||||
|
||||
--DEBUG("---------------------------------------------------")
|
||||
--DEBUG("exec stack", self._execution_stack)
|
||||
--DEBUG("window stack", self._window_stack)
|
||||
--DEBUG("dirty stack", self._dirty)
|
||||
--DEBUG("---------------------------------------------------")
|
||||
|
||||
-- stop when we have no window to show
|
||||
if #self._window_stack == 0 then
|
||||
DEBUG("no dialog left to show")
|
||||
self:quit()
|
||||
return nil
|
||||
end
|
||||
|
||||
self:_repaint()
|
||||
until not self._execution_stack_dirty
|
||||
|
||||
-- wait for next event
|
||||
-- note that we will skip that if we have tasks that are ready to run
|
||||
local input_event = nil
|
||||
if not wait_until then
|
||||
if #self._zeromqs > 0 then
|
||||
-- pending message queue, wait 100ms for input
|
||||
input_event = Input:waitEvent(1000*100)
|
||||
if not input_event or input_event.handler == "onInputError" then
|
||||
for _, zeromq in ipairs(self._zeromqs) do
|
||||
input_event = zeromq:waitEvent()
|
||||
if input_event then break end
|
||||
end
|
||||
end
|
||||
else
|
||||
-- no pending task, wait without timeout
|
||||
input_event = Input:waitEvent()
|
||||
end
|
||||
elseif wait_until[1] > now[1]
|
||||
or wait_until[1] == now[1] and wait_until[2] > now[2] then
|
||||
local wait_for = { s = wait_until[1] - now[1], us = wait_until[2] - now[2] }
|
||||
if wait_for.us < 0 then
|
||||
wait_for.s = wait_for.s - 1
|
||||
wait_for.us = 1000000 + wait_for.us
|
||||
end
|
||||
-- wait until next task is pending
|
||||
input_event = Input:waitEvent(wait_for.us, wait_for.s)
|
||||
end
|
||||
|
||||
-- delegate input_event to handler
|
||||
if input_event then
|
||||
local handler = self.event_handlers[input_event]
|
||||
if handler then
|
||||
handler(input_event)
|
||||
else
|
||||
self.event_handlers["__default__"](input_event)
|
||||
end
|
||||
end
|
||||
if ffi.os == "Windows" then
|
||||
self:handleInput()
|
||||
else
|
||||
self:initLooper()
|
||||
self:handleTask(function() self:handleInput() end)
|
||||
self.looper:start()
|
||||
end
|
||||
end
|
||||
|
||||
-- run uimanager forever for testing purpose
|
||||
function UIManager:runForever()
|
||||
self._run_forever = true
|
||||
self:run()
|
||||
end
|
||||
|
||||
UIManager:init()
|
||||
return UIManager
|
||||
|
||||
|
||||
36
spec/unit/httpclient_spec.lua
Normal file
36
spec/unit/httpclient_spec.lua
Normal file
@@ -0,0 +1,36 @@
|
||||
require("commonrequire")
|
||||
local UIManager = require("ui/uimanager")
|
||||
local HTTPClient = require("httpclient")
|
||||
local DEBUG = require("dbg")
|
||||
DEBUG:turnOn()
|
||||
|
||||
describe("HTTP client module", function()
|
||||
local requests = 0
|
||||
local function response_callback(res)
|
||||
requests = requests - 1
|
||||
if requests == 0 then UIManager:quit() end
|
||||
assert(res.body)
|
||||
end
|
||||
local function error_callback(res)
|
||||
requests = requests - 1
|
||||
if requests == 0 then UIManager:quit() end
|
||||
assert(false, "error occurs")
|
||||
end
|
||||
local async_client = HTTPClient:new()
|
||||
it("should get response from async GET request", function()
|
||||
UIManager:quit()
|
||||
local urls = {
|
||||
"http://www.example.com",
|
||||
"http://www.example.org",
|
||||
"https://www.example.com",
|
||||
"https://www.example.org",
|
||||
}
|
||||
requests = #urls
|
||||
for _, url in ipairs(urls) do
|
||||
async_client:request({
|
||||
url = url,
|
||||
}, response_callback, error_callback)
|
||||
end
|
||||
UIManager:runForever()
|
||||
end)
|
||||
end)
|
||||
Reference in New Issue
Block a user