0%

Skynet中消息传递的释放

我们经常会遇到,在反序列化的时候,调用 skynet 的 API,或者是送到套接字发送的时候,有时会需要手动释放,而有时又不需要释放,这是为什么?这就需要我们了解 Skynet 内部对一条消息它的处理流程。

我们知道,Skynet 内部有两个消息队列,一个全局消息队列,一个每个服务的消息队列,由工作线程来将消息推送到对应的服务,执行回调,然后就继续下一个。

dispatch_message 的时候,我们可以看到:

// skynet-server.c
static void
dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
assert(ctx->init);
CHECKCALLING_BEGIN(ctx)
pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
int type = msg->sz >> MESSAGE_TYPE_SHIFT;
size_t sz = msg->sz & MESSAGE_TYPE_MASK;
FILE *f = (FILE *)ATOM_LOAD(&ctx->logfile);
if (f) {
skynet_log_output(f, msg->source, type, msg->session, msg->data, sz);
}
++ctx->message_count;
int reserve_msg;
if (ctx->profile) {
ctx->cpu_start = skynet_thread_time();
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;
ctx->cpu_cost += cost_time;
} else {
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
}
if (!reserve_msg) {
skynet_free(msg->data);
}
CHECKCALLING_END(ctx)
}

针对消息传递后的返回值,会决定是否进行释放此条消息。而其返回值,是由某个服务的 C 层回调来返回。

// lua-skynet.c

static int
forward_cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
_cb(context, ud, type, session, source, msg, sz);
// don't delete msg in forward mode.
return 1;
}

static int
_cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
...

return 0;
}

如果回调是 _cb 那么就会释放,而是 forward_cb,就不会释放。至于使用的是哪个回调,由我们在 Lua 服务中进行设置的:

static int
lcallback(lua_State *L) {
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
int forward = lua_toboolean(L, 2);
luaL_checktype(L,1,LUA_TFUNCTION);
lua_settop(L,1);
lua_rawsetp(L, LUA_REGISTRYINDEX, _cb);

lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
lua_State *gL = lua_tothread(L,-1);

if (forward) {
skynet_callback(context, gL, forward_cb);
} else {
skynet_callback(context, gL, _cb);
}

return 0;
}

他根据我们传递给 skynet.callback 的第二个参数来确定,是否释放。

而在设置回调的时候设置不释放的,只有一个地方:

function skynet.forward_type(map, start_func)
c.callback(function(ptype, msg, sz, ...)
local prototype = map[ptype]
if prototype then
dispatch_message(prototype, msg, sz, ...)
else
local ok, err = pcall(dispatch_message, ptype, msg, sz, ...)
c.trash(msg, sz)
if not ok then
error(err)
end
end
end, true)
skynet.timeout(0, function()
skynet.init_service(start_func)
end)
end

总结

正常来说,对于一条消息,我们发送过去后,我们由 skynet.start 的启动的服务,消息都会由引擎自动释放。

一个例外,就是 skynet.forward_type,这个启动的,就需要我们手动释放。

因此,我们万不可以搞错了逻辑,在不需要的时候手动释放了,那么就会造成 double free 了。

Socket

对于 Socket 的消息发送也有些需要注意的地方。因为我们调用 socket 库发送的消息内容可能有多种类型:

static void
get_buffer(lua_State *L, int index, struct socket_sendbuffer *buf) {
void *buffer;
switch(lua_type(L, index)) {
size_t len;
case LUA_TUSERDATA:
// lua full useobject must be a raw pointer, it can't be a socket object or a memory object.
buf->type = SOCKET_BUFFER_RAWPOINTER;
buf->buffer = lua_touserdata(L, index);
if (lua_isinteger(L, index+1)) {
buf->sz = lua_tointeger(L, index+1);
} else {
buf->sz = lua_rawlen(L, index);
}
break;
case LUA_TLIGHTUSERDATA: {
int sz = -1;
if (lua_isinteger(L, index+1)) {
sz = lua_tointeger(L,index+1);
}
if (sz < 0) {
buf->type = SOCKET_BUFFER_OBJECT;
} else {
buf->type = SOCKET_BUFFER_MEMORY;
}
buf->buffer = lua_touserdata(L,index);
buf->sz = (size_t)sz;
break;
}
case LUA_TTABLE:
// concat the table as a string
len = count_size(L, index);
buffer = skynet_malloc(len);
concat_table(L, index, buffer, len);
buf->type = SOCKET_BUFFER_MEMORY;
buf->buffer = buffer;
buf->sz = len;
break;
default:
buf->type = SOCKET_BUFFER_RAWPOINTER;
buf->buffer = luaL_checklstring(L, index, &buf->sz);
break;
}

可以是

  • USERDATA SOCKET_BUFFER_RAWPOINTER
  • LIGHTUSERDATA SOCKET_BUFFER_OBJECT/SOCKET_BUFFER_MEMORY
  • TABLE SOCKET_BUFFER_MEMORY
  • STRING SOCKET_BUFFER_RAWPOINTER

这些都会被封装成一个 BUFFER,发送后调用对应 BUFFER 的 free_func,不同的类型有不同的释放函数:

SOCKET_BUFFER_RAWPOINTER 不释放
SOCKET_BUFFER_MEMORY skynet_free
SOCKET_BUFFER_OBJECT ss->soi.free 我们自己设置的用户对象的释放函数,一般没看到

所以如果我们传递的是字符串或者是表啥的,完全不用担心, socketdriver 会为我们释放掉它,但是传输的自定义数据就要注意了。

对于传递到服务的消息,不能用 netpack.tostring ,这个会将消息进行释放,我们应该调用 skynet.tostring 来替代。

rawcall 与 redirect

那当我们采用直接传递一个指针或者是重定向一个消息的时候又是什么情况呢。

直接这样来转移指针是不可行的,因为指针在当前服务中执行返回后就会释放了,其他服务拿到这个指针已经不合法了。

gateserver

gateserver 使用了 socketdriver netpack库,它将消息(lightuserdata,sz) 传递给对应的服务后,会由 netpack.filter 来将消息复制一份来做分包处理,最终也是以 msg,sz 的形式返回回来,这个需要我们手动释放。至于原本套接字上来的服务会由 skynet 的机制释放。