0%

Skynet中的消息处理

网络信息与普通消息的封装似乎有所不同,所以关注一下这个过程是非常的有必要的。我们先从网络服务的注册开始说起。skynet 封装了一个 socket 库作为 Lua 模块来给我们使用。我们可以看一下对于一个 socket 的注册是怎么样的。

服务注册流程

  1. 注册网络服务时调用 socketdriver:listen(address, port);
  2. 调用 C 库函数 llisten,获取 每个 Lua State 中保存的 服务结构 ctx,其是通过存储为上值实现的。然后调用skynet_socket_listen(ctx, host, port, backlog);
  3. skynet_socket_listen(ctx, host, port, backlog)中,根据 ctx 获得服务的 handle,然后调用socket_server_listen(SOCKET_SERVER, handle, host, port, backlog)
  4. socket_server_listen 会获取一个handle,全局未使用的代表网络服务结构的 id,及监听套接字 fd,构造一个请求发送给 SOCKET_SERVERsend_request(ss, &request, 'L', sizeof(request.u.listen)); 并返回网络服务 id;
  5. 第四步的操作,其实就把请求发送到了 SOCKET_SERVER监听的管道中。

C 层的消息处理

  1. thread_socket工作线程会监听所有的套接字接控制管理消息。具体是在skynet_socket_poll工作;
  2. skynet_socket_poll会调用 socket_server_poll获得取到的消息类型及消息体,然后根据返回的消息类型,转发消息forward_message
  3. skynet_socket_poll是一个非常重要的函数。其做了两个工作:

    1. 如果有控制命令,就是在管道中有消息。则会调用 ctrl_cmd进行对应的操作。比如开启、监听、绑定、关闭、打开一个套接字。
    2. 或者根据 epoll_wait 返回的事件,来进行操作。如果是读事件,获取事件的 socket 结构,调用forward_message_tcp(ss, s, &l, result);进行转发 tcp 消息。同样会返回一个消息体。
  4. forward_message根据消息体中的 handle,把消息 push 到对应的 ctx 中的消息队列中。

  5. 工作线程会轮流取消息后,调用服务注册的对应回调函数进行处理。

在另外一篇文章 有相关描述。

Lua 层的消息处理

我们知道,其实所有 Lua 服务的回调函数都是一个:skynet.dispatch_message(),在 中我们说过这一点。每一个服务的开始,都是从 skynet.start(func) 开始的。

我们一般会用 skynet.register_protocol 来注册一类消息的处理机制,或者直接用 skynet.dispatch() 来为一类消息设置处理函数。

当 C 层的消息到达时,就会根据 skynet_context 来调用对应的回调,在 Lua 层,最先走到的就是 skynet.dispatch_message
这里首先调用的是 raw_dispatch_message

raw_dispatch_message

这里还是贴一下代码吧:

local function raw_dispatch_message(prototype, msg, sz, session, source)
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
--- 如果是响应消息
if prototype == 1 then
--- 根据sessionID 找到挂起的协程
local co = session_id_coroutine[session]
--- 如果协程已中断,那么置空
if co == "BREAK" then
session_id_coroutine[session] = nil
--- 如果找不到协程,那就当作未知响应消息处理
elseif co == nil then
unknown_response(session, source, msg, sz)
--- 找到协程
else
--- 如果此协程需要进行 trace,则进行处理
local tag = session_coroutine_tracetag[co]
if tag then c.trace(tag, "resume") end
session_id_coroutine[session] = nil
--- 挂起执行完毕的协程
suspend(co, coroutine_resume(co, true, msg, sz, session))
end
else
--- 不是响应类型,查找消息类型的处理机制
local p = proto[prototype]
if p == nil then
--- 找不到,但是 PTYPE_TRACE
if prototype == skynet.PTYPE_TRACE then
-- trace next request
trace_source[source] = c.tostring(msg,sz)
--- 如果找不到协议处理机制,且有会话ID,那向服务的发送方返回错误
elseif session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "")
--- 没有会话ID,就当未知的请求处理
else
unknown_request(session, source, msg, sz, prototype)
end
return
end

local f = p.dispatch
--- 找到消息的处理机制了
if f then
--- 建立协程
local co = co_create(f)
session_coroutine_id[co] = session
session_coroutine_address[co] = source
local traceflag = p.trace
if traceflag == false then
-- force off
trace_source[source] = nil
session_coroutine_tracetag[co] = false
else
local tag = trace_source[source]
if tag then
trace_source[source] = nil
c.trace(tag, "request")
session_coroutine_tracetag[co] = tag
elseif traceflag then
-- set running_thread for trace
running_thread = co
skynet.trace()
end
end
--- 挂起
suspgdend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
else
trace_source[source] = nil
if session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "")
else
unknown_request(session, source, msg, sz, proto[prototype].name)
end
end
end
end

function skynet.dispatch_message(...)
local succ, err = pcall(raw_dispatch_message,...)
while true do
if fork_queue.h > fork_queue.t then
-- queue is empty
fork_queue.h = 1
fork_queue.t = 0
break
end
-- pop queue
local h = fork_queue.h
local co = fork_queue[h]
fork_queue[h] = nil
fork_queue.h = h + 1

local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))
if not fork_succ then
if succ then
succ = false
err = tostring(fork_err)
else
err = tostring(err) .. "\n" .. tostring(fork_err)
end
end
end
assert(succ, tostring(err))
end

prototype

关于消息的类型,内置了几种:

-- skynet.lua
local skynet = {
-- read skynet.h
PTYPE_TEXT = 0,
PTYPE_RESPONSE = 1,
PTYPE_MULTICAST = 2,
PTYPE_CLIENT = 3,
PTYPE_SYSTEM = 4,
PTYPE_HARBOR = 5,
PTYPE_SOCKET = 6,
PTYPE_ERROR = 7,
PTYPE_QUEUE = 8, -- used in deprecated mqueue, use skynet.queue instead
PTYPE_DEBUG = 9,
PTYPE_LUA = 10,
PTYPE_SNAX = 11,
PTYPE_TRACE = 12, -- use for debug trace
}

在 C 层也进行了定义:

// skynet.h
#define PTYPE_TEXT 0
#define PTYPE_RESPONSE 1
#define PTYPE_MULTICAST 2
#define PTYPE_CLIENT 3
#define PTYPE_SYSTEM 4
#define PTYPE_HARBOR 5
#define PTYPE_SOCKET 6
// read lualib/skynet.lua examples/simplemonitor.lua
#define PTYPE_ERROR 7
// read lualib/skynet.lua lualib/mqueue.lua lualib/snax.lua
#define PTYPE_RESERVED_QUEUE 8
#define PTYPE_RESERVED_DEBUG 9
#define PTYPE_RESERVED_LUA 10
#define PTYPE_RESERVED_SNAX 11

#define PTYPE_TAG_DONTCOPY 0x10000
#define PTYPE_TAG_ALLOCSESSION 0x20000

skynet.register_protocol 与 skynet.dispatch

skynet.register_protocol 设置一类消息的完整处理机制,大概如下进行注册:

skynet.register_protocol {
name = "socket",
id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
unpack = function ( msg, sz )
return netpack.filter( queue, msg, sz)
end,
dispatch = function (_, _, q, type, ...)
queue = q
if type then
MSG[type](...)
end
end
}

skynet.dispatch 是直接为某类已注册的消息,设置一个 dispatch 函数:

function skynet.dispatch(typename, func)
local p = proto[typename]
if func then
local ret = p.dispatch
p.dispatch = func
return ret
else
return p and p.dispatch
end
end

默认情况下,注册了三种类型的消息:

do
local REG = skynet.register_protocol

REG {
name = "lua",
id = skynet.PTYPE_LUA,
pack = skynet.pack,
unpack = skynet.unpack,
}

REG {
name = "response",
id = skynet.PTYPE_RESPONSE,
}

REG {
name = "error",
id = skynet.PTYPE_ERROR,
unpack = function(...) return ... end,
dispatch = _error_dispatch,
}
end

其他类型的消息如 socket, client 就需要我们手动进行处理了。

在注册消息的时候,我们还设置好 unpack, pack, dispatch 这个函数。

co_create

这个用来建立一个协程,这是一个对 Lua 自己协程库的包装。

local function co_create(f)
--- 从协程池移除一个协程
local co = tremove(coroutine_pool)
--- 如果池里没有了
if co == nil then
--- 新建一个协程 这里的使用很是奇妙,必须清晰的了解
--- 协程建立后,并不会立即执行,而是需要我们手动调用 resume 才会执行
co = coroutine_create(function(...)
f(...)
while true do
local session = session_coroutine_id[co]
if session and session ~= 0 then
local source = debug.getinfo(f,"S")
skynet.error(string.format("Maybe forgot response session %s from %s : %s:%d",
session,
skynet.address(session_coroutine_address[co]),
source.source, source.linedefined))
end
-- coroutine exit
local tag = session_coroutine_tracetag[co]
if tag ~= nil then
if tag then c.trace(tag, "end") end
session_coroutine_tracetag[co] = nil
end
local address = session_coroutine_address[co]
if address then
session_coroutine_id[co] = nil
session_coroutine_address[co] = nil
end

-- recycle co into pool
f = nil
coroutine_pool[#coroutine_pool+1] = co
-- recv new main function f
--- 返回值 COMMAND SUSPEND 给 suspend 函数用
f = coroutine_yield "SUSPEND"
--- 直接 yield 不会返回值
f(coroutine_yield())
end
end)
else
-- pass the main function f to coroutine, and restore running thread
-- 池里有协程
local running = running_thread
-- 用协程恢复执行函数
coroutine_resume(co, f)
running_thread = running
end
return co
end

这个函数,实际上是对于 Lua 原生的 coroutine.create 的包装。
首先,会从 coroutine_pool 协程池中取出一个协程来,如果没有,就新建一个,包装协议的 dispatch 函数。

我们看到,当这个协程开始运行后,会调用我们的 dispatch 一次,然后就在一个 while true 循环中执行,还加上一个 yield 函数,是不是很懵?其实这就是协程池实现的关键了。

其中两句关键代码:

f = coroutine_yield "SUSPEND"
f(coroutine_yield())

第一句,从当前协程返回,返回值是 SUSPEND;当再次执行这个协程的时候,会从此处继续执行,我们的 f,代表了一个消息分发函数(重新传递过来的),将会获得一个新值。
第二句,对于新的 f ,直接 yield,下次就可以从此传递参数过来,这在 co_create 直接从协程池取出协程来的时候有用。

请求的处理

当协程处理后,就会进行执行:

suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))

我们看到,在执行完协程后,会将协程的返回值,传递给 suspend 函数,然后他将协程进行挂起。

suspend

我们会以两种形式调用这个函数:

-- 处理响应
suspend(co, coroutine_resume(co, true, msg, sz, session))
-- 处理请求
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))

重要的是,这个函数,会根据是否收到最后一个参数(由协程中的 yield 返回) 来决定,不同的操作。

在这里看来,好像 suspend 并没有什么意义啊?
实际上,它是针对一些需要进行类似定时,睡眠这些回调的。

supend 函数的参数解释:

  • 需要挂起的 co
  • result coroutine_resume 执行协程的结果,true or false
  • command `coroutine_resume 执行的协程中返回的值,我们在 co_create 中看到,在执行完毕 dispatch后,这个一般会返回 SUSPEND

如果执行不成功,那么就会上报错误信息,并 fork 一个协程出来(这个是空的,所以在执行直接就会返回 SUSPEND),唤醒 dispatch_wakeu 继而,上报错误队列 dispatch_error_queue

-- suspend is local function
function suspend(co, result, command)
--- 需要挂起的协程执行出错
if not result then
local session = session_coroutine_id[co]
--- 有会话ID
if session then -- coroutine may fork by others (session is nil)
local addr = session_coroutine_address[co]
if session ~= 0 then
-- only call response error
local tag = session_coroutine_tracetag[co]
if tag then c.trace(tag, "error") end
--- 向消息的来源发送错误消息
c.send(addr, skynet.PTYPE_ERROR, session, "")
end
session_coroutine_id[co] = nil
end
session_coroutine_address[co] = nil
session_coroutine_tracetag[co] = nil
--- fork 一个空的协程 放到 fork_queue 里面
skynet.fork(function() end) -- trigger command "SUSPEND"
local tb = traceback(co,tostring(command))
coroutine.close(co)
error(tb)
end
--- 协程执行后挂起
if command == "SUSPEND" then
return dispatch_wakeup()
elseif command == "QUIT" then
coroutine.close(co)
-- service exit
return
elseif command == "USER" then
-- See skynet.coutine for detail
error("Call skynet.coroutine.yield out of skynet.coroutine.resume\n" .. traceback(co))
elseif command == nil then
-- debug trace
return
else
error("Unknown command : " .. command .. "\n" .. traceback(co))
end
end

在每个协程执行完毕后,都会尝试执行一下被 wakeup 唤醒的需要执行的协程。
在这些执行完毕后,最后才会来执行我们手动 fork 出来的协程。

消息的发送

通常,我们是调用 skynet.send 或者 skynet.call 来进行消息的发送。

function skynet.send(addr, typename, ...)
local p = proto[typename]
return c.send(addr, p.id, 0 , p.pack(...))
end

function skynet.call(addr, typename, ...)
local tag = session_coroutine_tracetag[running_thread]
if tag then
c.trace(tag, "call", 2)
c.send(addr, skynet.PTYPE_TRACE, 0, tag)
end

local p = proto[typename]
local session = c.send(addr, p.id , nil , p.pack(...))
if session == nil then
error("call to invalid address " .. skynet.address(addr))
end
return p.unpack(yield_call(addr, session))
end

区别在于 call 是同步的,而 send 是异步的,发送的时候,都会带上消息的类型。

Lua 消息

看看它的注册消息

skynet.pack = assert(c.pack)
skynet.packstring = assert(c.packstring)
skynet.unpack = assert(c.unpack)
skynet.tostring = assert(c.tostring)
skynet.trash = assert(c.trash)

REG {
name = "lua",
id = skynet.PTYPE_LUA,
pack = skynet.pack,
unpack = skynet.unpack,
}

它是调用了 skynet 的 pack/unpack 来进行打包,解包的。解包的时候正好相反。