0%

ltask-一个Lua的多任务库

这是云风最新设计的一个多任务库,设计的思路见云风的博客,github 上的地址为 https://github.com/cloudwu/ltask。云风都设计大多是简单但不好理解,所以需要看一下才能把他方便的使用起来。

实际上 ltask 提供了四个库:

  1. ltask 常规使用的API
  2. ltask.bootstrap 框架搭建,设置,启动服务但不运行
  3. ltask.exclusive
  4. ltask.root

如果用官方的 Lua 解析器作为入口,那么入口代码只可以使用 ltask.bootstrap 这个子库。它提供的 api 可以完成一系列框架的搭建工作。可以视为把 skynet 设置和启动线程的部分,以库的形式提供了出来,方便用 lua 代码驱动。

所有设定工作完成后,ltask.bootstrap.run 这个 api 会启动调度器并阻塞住主线程,之后的一切任务都是由设定完成的工作线程驱动。

和 skynet 类似,一切任务( task )都被放在 service 中完成。每个 service 是一个独立的 lua 虚拟机。它们均配有一个和外界通讯的通信管道。我们用 32bit 来标识服务及所属的通信管道。和 skynet 不同,内核不负责管理服务的名字。

0 号 id 是保留的,可以用来表示无效的服务或系统服务。1 号服务是一个特殊的 root 服务,它会有一些特权。我计划保留 2-1023 号服务,用来做特定的系统任务。例如,名字解析,timer ,socket 等等。这样,一些必要的服务就可以直接用固定的数字 id 而不需要起字符串名字。

ltask.bootstrap 里有一系列 api 可以用来做上面的配置工作:启动服务但不运行。

有点需要注意的是,服务的ID是由我们来指定的,而不是由 ltask 管理的。这和 skynet 不同。

看其示例的 main.lua 脚本大概可以看到如下流程:

  • 扫描配置信息
  • 将配置信息给到 ltask.bootstrap.init(config),进行整个框架的配置工作
  • boot.init_timer() 初始化定时器
  • 创建独占的定时器服务
  • 初始化 root 服务
  • boot.run() 开始运行框架

关于配置,大概有这么多项:

  • worker 默认256 或者是核心数减1
  • queue 默认 4096
  • queue_sending 默认 和 queue 一样 用来发送消息的队列?
  • max_service 最大服务数 默认 65536

ltask.bootstrap.init

这个函数,会在 LUA 虚拟机,建立一个全局的服务对象 LTASK_GLOBAL,这个全局对象中有:

  • 日志队列
  • 工作线程对象
  • max_service 个 service 结构
  • schedule max_service 个调度队列

然后就会初始化工作线程。

root 服务

这个是云风内建的服务。

local function new_service(label, id)
local sid = boot.new_service(label, config.init_service, id)
assert(sid == id)
return sid
end
local function bootstrap()
new_service("root", SERVICE_ROOT)
boot.init_root(SERVICE_ROOT)
-- send init message to root service
local init_msg, sz = ltask.pack("init", {
lua_path = config.lua_path,
lua_cpath = config.lua_cpath,
service_path = config.service_path,
name = "root",
args = { config },
})
-- self bootstrap
boot.post_message {
from = SERVICE_ROOT,
to = SERVICE_ROOT,
session = 0, -- 0 for root init
type = MESSSAGE_SYSTEM,
message = init_msg,
size = sz,
}
end

boot 服务启动后,即调用 C API 进行初始化,然后还发去一条消息,init,root 服务收到这条消息才会执行起来,具体体现在会执行:

local function boot()
local request = ltask.request()
for i, t in ipairs(config.exclusive) do
local name, args
if type(t) == "table" then
name = table.remove(t, 1)
args = t
else
name = t
args = {}
end
local id = i + 1
register_service(id, name)
request:add { id, proto = "system", "init", {
lua_path = config.lua_path,
lua_cpath = config.lua_cpath,
service_path = config.service_path,
name = name,
args = args,
exclusive = true,
}}
end
for req, resp in request:select() do
if not resp then
print(string.format("exclusive %d init error: %s", req[1], req.error))
return
end
end
S.uniqueservice(table.unpack(config.logger))
S.spawn(table.unpack(config.bootstrap))
end

会向所有的读占线程发消息,并启动唯一服务 logger,然后再启动我们指定的 bootstrap ,启动器服务。

关于服务

实际上,任何服务,都通过了 ‘service/service.lua’ 的包装,在配置的时候将这个作为 init_service,当 root 服务新建服务的时候,都会首先调用这个脚本, root 服务也不例外。

local function init_service(address, name, ...)
root.init_service(address, name, config.init_service)
ltask.syscall(address, "init", {
lua_path = config.lua_path,
lua_cpath = config.lua_cpath,
service_path = config.service_path,
name = name,
args = {...},
})
end

local function new_service(name, ...)
local address = assert(ltask.post_message(0, 0, MESSAGE_SCHEDULE_NEW))
anonymous_services[address] = true
local ok, err = pcall(init_service, address, name, ...)
if not ok then
S.kill(address)
return nil, err
end
return address
end

服务本身有一些系统消息的处理,比如:init,这些在 server.lua 里面实现:

function sys_service.init(t)
-- The first system message
assert(service == nil)
if t.lua_path then
package.path = t.lua_path
end
if t.lua_cpath then
package.cpath = t.lua_cpath
end
local filename = assert(package.searchpath(t.name, t.service_path))
local f = assert(loadfile(filename))
_G.require = yieldable_require
if t.exclusive then
init_exclusive()
end
local r = f(table.unpack(t.args))
if service == nil then
service = r
end
if service == nil then
ltask.quit()
end
end

while true do
local s = ltask.schedule_message()
if s == SCHEDULE_QUIT then
break
end
yield_service()
end

每次建立一个一个服务,具体承载的脚本都是由这个消息来驱动执行的。每个服务建立后,实际上都是处于一个死循环中读取 ltask 的消息,然后进行对应的处理。

当收到消息的时候,就会从对应的模块中进行查找处理:

function ltask.schedule_message()
local from, session, type, msg, sz = ltask.recv_message()
local f = SESSION[type]
if f then
-- new session for this message
local co = new_session(f, from, session)
wakeup_session(co, type, msg, sz)
elseif session then
local co = session_coroutine_suspend_lookup[session]
if co == nil then
print("Unknown response session : ", session)
ltask.remove(msg, sz)
ltask.quit()
else
session_coroutine_suspend_lookup[session] = nil
wakeup_session(co, type, session, msg, sz)
end
else
return SCHEDULE_IDLE
end
dispatch_wakeup()
if quit then
return SCHEDULE_QUIT
end
return SCHEDULE_SUCCESS
end

local function request(command, ...)
local s = service[command]
if not s then
error("Unknown request message : " .. command)
return
end
send_response(s(...))
end

SESSION[MESSAGE_REQUEST] = function (type, msg, sz)
request(ltask.unpack_remove(msg, sz))
end

基本上的应用就是这样了。