烦恼一般都是想太多了。

0%

skynet-任务调度及消息处理

通过将配置文件传入skynet后,其就会根据我们的脚本逻辑业务单元来启动对应的服务,然后把收到的消息进行分发处理。一般来说,这个框架针对的是网络游戏服务器,所以肯定会面向网络套接字信息这样的,但是也有进程间消息传递的处理机制。下面我们来看一下。

skyent main()

在文件skynet_main.c文件中,定义了一个main()函数。

int
main(int argc, char *argv[]) {
const char * config_file = NULL ;
if (argc > 1) {
config_file = argv[1];
} else {
fprintf(stderr, "Need a config file. Please read skynet wiki : https://github.com/cloudwu/skynet/wiki/Config\n"
"usage: skynet configfilename\n");
return 1;
}

skynet_globalinit();
skynet_env_init();

sigign();

struct skynet_config config;

#ifdef LUA_CACHELIB
// init the lock of code cache
luaL_initcodecache();
#endif

struct lua_State *L = luaL_newstate();
luaL_openlibs(L); // link lua lib

int err = luaL_loadbufferx(L, load_config, strlen(load_config), "=[skynet config]", "t");
assert(err == LUA_OK);
lua_pushstring(L, config_file);

err = lua_pcall(L, 1, 1, 0);
if (err) {
fprintf(stderr,"%s\n",lua_tostring(L,-1));
lua_close(L);
return 1;
}
_init_env(L);

config.thread = optint("thread",8);
config.module_path = optstring("cpath","./cservice/?.so");
config.harbor = optint("harbor", 1);
config.bootstrap = optstring("bootstrap","snlua bootstrap");
config.daemon = optstring("daemon", NULL);
config.logger = optstring("logger", NULL);
config.logservice = optstring("logservice", "logger");
config.profile = optboolean("profile", 1);

lua_close(L);

skynet_start(&config);
skynet_globalexit();

return 0;
}

这个函数做了几件事情:

  1. 初始化全局环境。skynet_globalinit()
  2. 初始化环境变量。skynet_env_init()
  3. 使用 Lua 加载代码来来加载我们的配置文件。luaL_loadbufferx()
  4. 就是读取传入的配置文件,解析参数,然后以skynet_start(&config)进行启动。

skynet_globalinit() - 全局节点初始化

这个函数,会初始化全局的节点信息,被设置主线程内的控制键值。

// skynet_server.c

struct skynet_node {
int total;
int init;
uint32_t monitor_exit;
pthread_key_t handle_key;
bool profile; // default is off
};

static struct skynet_node G_NODE;

void
skynet_globalinit(void) {
G_NODE.total = 0;
G_NODE.monitor_exit = 0;
G_NODE.init = 1;
if (pthread_key_create(&G_NODE.handle_key, NULL)) {
fprintf(stderr, "pthread_key_create failed");
exit(1);
}
// set mainthread's key
skynet_initthread(THREAD_MAIN);
}

void
skynet_initthread(int m) {
uintptr_t v = (uint32_t)(-m);
pthread_setspecific(G_NODE.handle_key, (void *)v);
}

看起来,在主线程上创建了一个线程存储键,并初始化为 THREAD_MAIN,这个常量定义为 1。

初始化全局的节点信息,这个应该是为了分布式或多节点而来的。

skynet_env_init()-全局环境变量

这个文件,初始化一个全局的环境变量E,可以看到这个全局变量其实也是用一个 lua_State 来保存我们的配置的。

// skynet_env.c
struct skynet_env {
struct spinlock lock;
lua_State *L;
};

static struct skynet_env *E = NULL;


void
skynet_env_init() {
E = skynet_malloc(sizeof(*E));
SPIN_INIT(E)
E->L = luaL_newstate();
}

struct spinlock {
int lock;
};

spinlock 就是一个整型,不知道是做什么用处的,可能是和锁相关的内容。
想要对全局环境进行修改的时候,势必要获取这个锁。

luaL_initcodecache

这个函数,原生的 Lua 是不具备的,是在 lauxlib.c 内增加 的这么一个函数。

// 3rd/lua/lauxlib.c

LUALIB_API void
luaL_initcodecache(void) {
SPIN_INIT(&CC);
}

struct codecache {
struct spinlock lock;
lua_State *L;
};

static struct codecache CC;

一个 CC 静态变量,用来存储一些需要有多个 lua_State 公用的代码,当然也是加了锁的。

配置加载

一些库的载入,是打开一个虚拟机,然后通过Lua脚本的形式载入的。我们看主函数中的代码:

// skynet_main.c 

struct skynet_config config;

struct lua_State *L = luaL_newstate();
luaL_openlibs(L); // link lua lib

int err = luaL_loadbufferx(L, load_config, strlen(load_config), "=[skynet config]", "t");
assert(err == LUA_OK);
lua_pushstring(L, config_file);

err = lua_pcall(L, 1, 1, 0);
if (err) {
fprintf(stderr,"%s\n",lua_tostring(L,-1));
lua_close(L);
return 1;
}

我们需要注意的是 luaL_loadbufferx会将字符串压入栈上,第三个参数一般用来做调试的时候打印信息用,第四个参数表示加载的是 二进制(b) 还是 文本 (t),或者两者都有(bt)。

先是建立一个新的Lua State,然后把 下面的代码载入其内;接着,把配置文件压入栈,然后执行配置文件。这个是我们的 config.lua 配置文件的加载逻辑。

static const char * load_config = "\
local result = {}\n\
local function getenv(name) return assert(os.getenv(name), [[os.getenv() failed: ]] .. name) end\n\
local sep = package.config:sub(1,1)\n\
local current_path = [[.]]..sep\n\
local function include(filename)\n\
local last_path = current_path\n\
local path, name = filename:match([[(.*]]..sep..[[)(.*)$]])\n\
if path then\n\
if path:sub(1,1) == sep then -- root\n\
current_path = path\n\
else\n\
current_path = current_path .. path\n\
end\n\
else\n\
name = filename\n\
end\n\
local f = assert(io.open(current_path .. name))\n\
local code = assert(f:read [[*a]])\n\
code = string.gsub(code, [[%$([%w_%d]+)]], getenv)\n\
f:close()\n\
assert(load(code,[[@]]..filename,[[t]],result))()\n\
current_path = last_path\n\
end\n\
setmetatable(result, { __index = { include = include } })\n\
local config_name = ...\n\
include(config_name)\n\
setmetatable(result, nil)\n\
return result\n\
";

需要注意的是,我们的配置,这个时候是加载在一在个本地的 Lua State 里面的,需要在后面进行配置到全局。

这段代码的运行逻辑看起来有点累哈,实际就是设置了一个 include 函数,这个函数会把我们指定的配置文件以 result 为环境加 load 到栈上,但是并不执行,返回值就是我们建立的 result。这即是加载后的配置表。

__init_env(L)

接下来的事情就比较奇妙了,在先前建立的Lua State内,已经保存了我们的配置信息,已经载入的库等。接下来就是把这个Lua State内的配置,都设置到全局变量内(事实上这些完全可以在C代码内完成的,为什么要用Lua呢)。

关于就在于我们的 skynet_setenv() 函数,会将我们 Lua 内配置的内容都设置到全局配置 E 内去。

static void
_init_env(lua_State *L) {
lua_pushnil(L); /* first key */
while (lua_next(L, -2) != 0) {
int keyt = lua_type(L, -2);
if (keyt != LUA_TSTRING) {
fprintf(stderr, "Invalid config table\n");
exit(1);
}
const char * key = lua_tostring(L,-2);
if (lua_type(L,-1) == LUA_TBOOLEAN) {
int b = lua_toboolean(L,-1);
skynet_setenv(key,b ? "true" : "false" );
} else {
const char * value = lua_tostring(L,-1);
if (value == NULL) {
fprintf(stderr, "Invalid config table key = %s\n", key);
exit(1);
}
skynet_setenv(key,value);
}
lua_pop(L,1);
}
lua_pop(L,1);
}

这个代码就是要理解 lua_next(L,index) 这个函数会从栈顶 弹出一个键,然后从 Index 的表处,压入两个值,key_value 对。

skynet_setenv(const char *key, const char *value) {
SPIN_LOCK(E)

lua_State *L = E->L;
lua_getglobal(L, key);
assert(lua_isnil(L, -1));
lua_pop(L,1);
lua_pushstring(L,value);
lua_setglobal(L,key);

SPIN_UNLOCK(E)
}

可以看到,我们的配置文件信息,其实是放在全局环境的注册表内的。

skynet_start(&config)

当我们把我们的 config.lua 内的内容加载到全局环境变量 E 中后,就会根据配置来构造我们的启动了。

类似 optint(), optstring() 这些函数其实都是使用 skynet_getenv 了从 E 内取内容。

skynet_start.c中,我们可以看到代码:

skynet_start(struct skynet_config * config) {
// register SIGHUP for log file reopen
struct sigaction sa;
sa.sa_handler = &handle_hup;
sa.sa_flags = SA_RESTART;
sigfillset(&sa.sa_mask);
sigaction(SIGHUP, &sa, NULL);

if (config->daemon) {
if (daemon_init(config->daemon)) {
exit(1);
}
}
skynet_harbor_init(config->harbor);
skynet_handle_init(config->harbor);
skynet_mq_init();
skynet_module_init(config->module_path);
skynet_timer_init();
skynet_socket_init();
skynet_profile_enable(config->profile);

struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);
if (ctx == NULL) {
fprintf(stderr, "Can't launch %s service\n", config->logservice);
exit(1);
}

bootstrap(ctx, config->bootstrap);

start(config->thread);

// harbor_exit may call socket send, so it should exit before socket_free
skynet_harbor_exit();
skynet_socket_free();
if (config->daemon) {
daemon_exit(config->daemon);
}
}

其主要工作为:

  1. 设置SIGHUP的信号处理程序。
  2. 初始化句柄。skynet_handle_init()
  3. 初始化消息队列。skynet_mq_init()
  4. 模块加载。skynet_module_init(config->module_path) 我们指定的 C 编译的 so 库的目录下的内容。
  5. 定时器设置。skynet_timer_init()
  6. 套接字初始化。skynet_socket_ini()
  7. 开启日志服务。
  8. 启动bootstrap脚本。bootstrap(ctr, confit->bootstrap)
  9. 启动线程。start(config->thread)

skynet_harbor_init(int harbor)

初始化节点化的ID。

harbor 可以是 1-255 间的任意整数。一个 skynet 网络最多支持 255 个节点。每个节点有必须有一个唯一的编号。

如果 harbor 为 0 ,skynet 工作在单节点模式下。此时 master 和 address 以及 standalone 都不必设置。

void
skynet_harbor_init(int harbor) {
HARBOR = (unsigned int)harbor << HANDLE_REMOTE_SHIFT;
}

skynet_handle_init()

handle是什么我一直没有搞清楚,这是干什么的我也没有弄明白。只有参考了一下作者的博客 。

把一个符合规范的 C 模块,从动态库(so 文件)中启动起来,绑定一个永不重复(即使模块退出)的数字 id 做为其 handle 。模块被称为服务(Service),服务间可以自由发送消息。每个模块可以向 Skynet 框架注册一个 callback 函数,用来接收发给它的消息。每个服务都是被一个个消息包驱动,当没有包到来的时候,它们就会处于挂起状态,对 CPU 资源零消耗。如果需要自主逻辑,则可以利用 Skynet 系统提供的 timeout 消息,定期触发。

模块的实例是服务,实例的ID是 handle,每个服务都对应一个唯一的handle_id。

也就是对于很多类型的服务,都是首先加载进来,然后注册,为服务模块,可以启动多个实例。

// skynet_handle.c
struct handle_storage {
struct rwlock lock;

uint32_t harbor;
uint32_t handle_index;
int slot_size;
struct skynet_context ** slot;

int name_cap;
int name_count;
struct handle_name *name;
};

static struct handle_storage *H = NULL;

void
skynet_handle_init(int harbor) {
assert(H==NULL);
struct handle_storage * s = skynet_malloc(sizeof(*H));
s->slot_size = DEFAULT_SLOT_SIZE;
s->slot = skynet_malloc(s->slot_size * sizeof(struct skynet_context *));
memset(s->slot, 0, s->slot_size * sizeof(struct skynet_context *));

rwlock_init(&s->lock);
// reserve 0 for system
s->harbor = (uint32_t) (harbor & 0xff) << HANDLE_REMOTE_SHIFT;
s->handle_index = 1;
s->name_cap = 2;
s->name_count = 0;
s->name = skynet_malloc(s->name_cap * sizeof(struct handle_name));

H = s;

// Don't need to free H
}

设置一个全局控制柄变量H,默认有四个位置可以存储四个 skynet_context服务结构的指针。

skynet_module_init()

服务表已经有了,现在我们需要把我们的模块加载进来。

这个函数定义在skynet_module.c中,其作用,就是载入配置文件中lua_cpath= ...指定的动态库路径。同时,将全局的模块变量M指像这个路径。

每个模块的结构定义在skynet_module.h中:

// skynet_module.h

struct skynet_module {
const char * name;
void * module;
skynet_dl_create create;
skynet_dl_init init;
skynet_dl_release release;
skynet_dl_signal signal;
};

其中,后面四个函数是由动态库提供的。

// skynet_modlue.c
struct modules {
int count;
struct spinlock lock;
const char * path;
struct skynet_module m[MAX_MODULE_TYPE];
};

static struct modules * M = NULL;


void
skynet_module_init(const char *path) {
struct modules *m = skynet_malloc(sizeof(*m));
m->count = 0;
m->path = skynet_strdup(path);

SPIN_INIT(m)

M = m;
}

skynet_mq_init()

在文件skynet_mq.c中,定义了这个函数:

void
skynet_mq_init() {
struct global_queue *q = skynet_malloc(sizeof(*q));
memset(q,0,sizeof(*q));
SPIN_INIT(q);
Q=q;
}

这是建立了一个全局的消息队列 Q,此队列保存了每个服务的消息队列。

skynet_timer_init();

定时器初始化:

void 
skynet_timer_init(void) {
TI = timer_create_timer();
uint32_t current = 0;
systime(&TI->starttime, &current);
TI->current = current;
TI->current_point = gettime();
}

skynet_socket_init();

套接字服务器初始化,这个的作用是当需要启动网络服务的时候,由这个服务器来启动对应的监听。

void 
skynet_socket_init() {
SOCKET_SERVER = socket_server_create(skynet_now());
}

struct socket_server *
socket_server_create(uint64_t time) {
int i;
int fd[2];
poll_fd efd = sp_create();
if (sp_invalid(efd)) {
fprintf(stderr, "socket-server: create event pool failed.\n");
return NULL;
}
if (pipe(fd)) {
sp_release(efd);
fprintf(stderr, "socket-server: create socket pair failed.\n");
return NULL;
}
if (sp_add(efd, fd[0], NULL)) {
// add recvctrl_fd to event poll
fprintf(stderr, "socket-server: can't add server fd to event pool.\n");
close(fd[0]);
close(fd[1]);
sp_release(efd);
return NULL;
}

struct socket_server *ss = MALLOC(sizeof(*ss));
ss->time = time;
ss->event_fd = efd;
ss->recvctrl_fd = fd[0];
ss->sendctrl_fd = fd[1];
ss->checkctrl = 1;

for (i=0;i<MAX_SOCKET;i++) {
struct socket *s = &ss->slot[i];
s->type = SOCKET_TYPE_INVALID;
clear_wb_list(&s->high);
clear_wb_list(&s->low);
spinlock_init(&s->dw_lock);
}
ss->alloc_id = 0;
ss->event_n = 0;
ss->event_index = 0;
memset(&ss->soi, 0, sizeof(ss->soi));
FD_ZERO(&ss->rfds);
assert(ss->recvctrl_fd < FD_SETSIZE);

return ss;
}

skynet_context 结构

在skynet.c中,结构skynet_context为每个服务保存了一个内部结构:

struct skynet_context {
void * instance;
struct skynet_module * mod;
void * cb_ud;
skynet_cb cb;
struct message_queue *queue;
FILE * logfile;
uint64_t cpu_cost; // in microsec
uint64_t cpu_start; // in microsec
char result[32];
uint32_t handle;
int session_id;
int ref;
int message_count;
bool init;
bool endless;
bool profile;

CHECKCALLING_DECL
};

结构定义了每个服务的实例地址,模块地址,消息队列,日志文件,会话ID,引用数,消息数等字段。

服务 H 表

我们第一个启动的服务就是 logger 服务。

struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);
if (ctx == NULL) {
fprintf(stderr, "Can't launch %s service\n", config->logservice);
exit(1);
}

skynet_handle_namehandle(skynet_context_handle(ctx), "logger");

skynet_create_new()

skynet.c中,skynet_create_new()函数如下:

// skynet.c

struct skynet_context *
skynet_context_new(const char * name, const char *param) {
// 查询模块是否加载
struct skynet_module * mod = skynet_module_query(name);

if (mod == NULL)
return NULL;
// 用模块建立一个实例 返回的是实例地址
void *inst = skynet_module_instance_create(mod);
if (inst == NULL)
return NULL;
// 建立一个服务结构
struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
CHECKCALLING_INIT(ctx)

ctx->mod = mod;
ctx->instance = inst;
ctx->ref = 2;
ctx->cb = NULL;
ctx->cb_ud = NULL;
ctx->session_id = 0;
ctx->logfile = NULL;

ctx->init = false;
ctx->endless = false;

ctx->cpu_cost = 0;
ctx->cpu_start = 0;
ctx->message_count = 0;
ctx->profile = G_NODE.profile;
// Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
ctx->handle = 0;
// 注册一个handle
ctx->handle = skynet_handle_register(ctx);
struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
// init function maybe use ctx->handle, so it must init at last
context_inc();

CHECKCALLING_BEGIN(ctx)
// 初始化服务
int r = skynet_module_instance_init(mod, inst, ctx, param);
CHECKCALLING_END(ctx)
if (r == 0) {
struct skynet_context * ret = skynet_context_release(ctx);
if (ret) {
ctx->init = true;
}
skynet_globalmq_push(queue);
if (ret) {
skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");
}
return ret;
} else {
skynet_error(ctx, "FAILED launch %s", name);
uint32_t handle = ctx->handle;
skynet_context_release(ctx);
skynet_handle_retire(handle);
struct drop_t d = { handle };
skynet_mq_release(queue, drop_message, &d);
return NULL;
}
}

此函数,

  1. 首先会查看一下全局服务变量M中是否存在对应模块,如果存在,就初始化一个实例;
  2. 初始化 服务的 skynet_context 结构,包括会分配一个唯一的 handle。skynet_handle_register(ctx) ,然后将服务注册到 H 中。
  3. 建立服务的消息队列。skynet_mq_create
  4. 初始化服务。skynet_module_instance_init(mod, inst, ctx, param); 其结果是调用模块自身的 init函数。
  5. 然后把消息队列放在全局消息队列中。

skynet_module_instance_create

void * 
skynet_module_instance_create(struct skynet_module *m) {
if (m->create) {
return m->create();
} else {
return (void *)(intptr_t)(~0);
}
}

模块实例的建立,实际上就是C 服务中的 create 函数的调用,其结果,一般都是返回一个服务需要的数据结构。如 logger, snlua, gate 服务:

struct gate *
gate_create(void) {
struct gate * g = skynet_malloc(sizeof(*g));
memset(g,0,sizeof(*g));
g->listen_id = -1;
return g;
}

struct logger *
logger_create(void) {
struct logger * inst = skynet_malloc(sizeof(*inst));
inst->handle = NULL;
inst->close = 0;
inst->filename = NULL;

return inst;
}

struct snlua *
snlua_create(void) {
struct snlua * l = skynet_malloc(sizeof(*l));
memset(l,0,sizeof(*l));
l->mem_report = MEMORY_WARNING_REPORT;
l->mem_limit = 0;
l->L = lua_newstate(lalloc, l);
return l;
}

skynet_handle_register

我们通过模块建立了一个服务后,事实上对于 skynet 来说,这个服务,就是一个 skynet_context ,其对具体的模块实际上是不关心的。

H 表中 slot 是服务的插槽,每个插槽都指向一个 skynet_context 结构。

下面这个逻辑会首先看一下插槽够用不,不够用就增大插槽,然后再注册后返回。

uint32_t
skynet_handle_register(struct skynet_context *ctx) {
struct handle_storage *s = H;

rwlock_wlock(&s->lock);

for (;;) {
int i;
uint32_t handle = s->handle_index;
for (i=0;i<s->slot_size;i++,handle++) {
if (handle > HANDLE_MASK) {
// 0 is reserved
handle = 1;
}
int hash = handle & (s->slot_size-1);
if (s->slot[hash] == NULL) {
s->slot[hash] = ctx;
s->handle_index = handle + 1;

rwlock_wunlock(&s->lock);

handle |= s->harbor;
return handle;
}
}
assert((s->slot_size*2 - 1) <= HANDLE_MASK);
struct skynet_context ** new_slot = skynet_malloc(s->slot_size * 2 * sizeof(struct skynet_context *));
memset(new_slot, 0, s->slot_size * 2 * sizeof(struct skynet_context *));
for (i=0;i<s->slot_size;i++) {
int hash = skynet_context_handle(s->slot[i]) & (s->slot_size * 2 - 1);
assert(new_slot[hash] == NULL);
new_slot[hash] = s->slot[i];
}
skynet_free(s->slot);
s->slot = new_slot;
s->slot_size *= 2;
}
}

模块(服务)的初始化

每个模块的实例建立了以后,就会调用模块自己的初始化函数进行初始化设置。对于我们所有的Lua服务来说,其都是 snlua模块的一个实例。对于 skyent_context_new()中调用的函数skynet_module_instance_init(),我们看一下它的代码:

// skynet_module.c

int
skynet_module_instance_init(struct skynet_module *m, void * inst, struct skynet_context *ctx, const char * parm) {
return m->init(inst, ctx, parm);
}

这个其实就是 c server模块中的 init函数。

现在框架提供的服务有 :logger, gate,snlua, harbor,默认4个,所以 H 表流了四个 slot 在。

snlua

snlua 实际上已经建立了一个 lua_State。

// service_src/service_snlua.c

struct snlua {
lua_State * L;
struct skynet_context * ctx;
size_t mem;
size_t mem_report;
size_t mem_limit;
};

int
snlua_init(struct snlua *l, struct skynet_context *ctx, const char * args) {
int sz = strlen(args);
char * tmp = skynet_malloc(sz);
memcpy(tmp, args, sz);
// 设置 ctx 回调函数为 launch_cb
skynet_callback(ctx, l , launch_cb);
// 这个就是返回一个自己的服务名字。如果我们传入 最后一个参数不是NULL,不是空字符,也不是以 . 开头就会出错。因为不以 . 开头的是系统服务。这个命令,是向 **H** 注册一个名字的意思。默认我 NULL,就是返回一个 : handleId 这样的值。 [:00000009]
const char * self = skynet_command(ctx, "REG", NULL);
uint32_t handle_id = strtoul(self+1, NULL, 16);
// it must be first message
// 模块向服务发送消息。参数:ctx, 源(0代表自己),目标handle,类型,seesionId,数据,大小。第一条消息会被发送给设置的回调。这个消息,会发送到 handle_Id 标识的 ctx 消息队列中。由消息调度线程查询消息后分发。
skynet_send(ctx, 0, handle_id, PTYPE_TAG_DONTCOPY,0, tmp, sz);
return 0;
}

static int
launch_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source , const void * msg, size_t sz) {
assert(type == 0 && session == 0);
struct snlua *l = ud;
skynet_callback(context, NULL, NULL);
int err = init_cb(l, context, msg, sz);
if (err) {
skynet_command(context, "EXIT", NULL);
}

return 0;
}

static int
init_cb(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) {
lua_State *L = l->L;
l->ctx = ctx;
lua_gc(L, LUA_GCSTOP, 0);
lua_pushboolean(L, 1); /* signal for libraries to ignore env. vars. */
lua_setfield(L, LUA_REGISTRYINDEX, "LUA_NOENV");
luaL_openlibs(L);
lua_pushlightuserdata(L, ctx);
lua_setfield(L, LUA_REGISTRYINDEX, "skynet_context");
luaL_requiref(L, "skynet.codecache", codecache , 0);
lua_pop(L,1);

const char *path = optstring(ctx, "lua_path","./lualib/?.lua;./lualib/?/init.lua");
lua_pushstring(L, path);
lua_setglobal(L, "LUA_PATH");
const char *cpath = optstring(ctx, "lua_cpath","./luaclib/?.so");
lua_pushstring(L, cpath);
lua_setglobal(L, "LUA_CPATH");
const char *service = optstring(ctx, "luaservice", "./service/?.lua");
lua_pushstring(L, service);
lua_setglobal(L, "LUA_SERVICE");
const char *preload = skynet_command(ctx, "GETENV", "preload");
lua_pushstring(L, preload);
lua_setglobal(L, "LUA_PRELOAD");

lua_pushcfunction(L, traceback);
assert(lua_gettop(L) == 1);

const char * loader = optstring(ctx, "lualoader", "./lualib/loader.lua");

int r = luaL_loadfile(L,loader);
if (r != LUA_OK) {
skynet_error(ctx, "Can't load %s : %s", loader, lua_tostring(L, -1));
report_launcher_error(ctx);
return 1;
}
lua_pushlstring(L, args, sz);
r = lua_pcall(L,1,0,1);
if (r != LUA_OK) {
skynet_error(ctx, "lua loader error : %s", lua_tostring(L, -1));
report_launcher_error(ctx);
return 1;
}
lua_settop(L,0);
if (lua_getfield(L, LUA_REGISTRYINDEX, "memlimit") == LUA_TNUMBER) {
size_t limit = lua_tointeger(L, -1);
l->mem_limit = limit;
skynet_error(ctx, "Set memory limit to %.2f M", (float)limit / (1024 * 1024));
lua_pushnil(L);
lua_setfield(L, LUA_REGISTRYINDEX, "memlimit");
}
lua_pop(L, 1);

lua_gc(L, LUA_GCRESTART, 0);

return 0;
}

可以看到,每个snlua结构都有自己的 Lua State,所以所有的代码都是snlua自己的内部执行的,不会影响其他的Lua State。

这个函数都干了什么:

  1. 设置对应服务 ctx 的回调函数为 launch_cb
  2. 注册自身;
  3. 向 ctx 发送消息(入列)。
  4. 收到消息后会回调 launch_cb,而这个函数会又取消调回调函数。转而调用 init_cb函数。进行初始化 snlua中的 Lua State;
  5. 这个函数才调用loader.lua来真正的加载服务脚本等操作。

bootstrap()

之前我们只启动了一个 logger 服务。现在我们要开始启动新的服务了。

struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);
if (ctx == NULL) {
fprintf(stderr, "Can't launch %s service\n", config->logservice);
exit(1);
}

默认情况下,会以logger, NULL为参数调用 skynet_context_new(“logger”, NULL)函数,建立一个关于 logger服务的skynet_context的结构,其实就是启动了 logger服务。

然后调用bootstrap(ctx, config->bootstrap)函数。

//skynet_start.c
static void
bootstrap(struct skynet_context * logger, const char * cmdline) {
int sz = strlen(cmdline);
char name[sz+1];
char args[sz+1];
sscanf(cmdline, "%s %s", name, args);
struct skynet_context *ctx = skynet_context_new(name, args);
if (ctx == NULL) {
skynet_error(NULL, "Bootstrap error : %s\n", cmdline);
skynet_context_dispatchall(logger);
exit(1);
}
}

默认情况下,config->bootstrapsnlua bootstrap。事实上最终执行的函数是skynet_context_new("snlua", "bootstrap"),如此又建立一个服务结构,并用bootstrap参数进行初始化。如果服务建立失败,就会将所有的信息发送到 logger,并处理全部的 logger服务的消息。

snlua作为沙盒服务,会再入lua脚本并进行执行。

实际上 snlua 会利用 lualib/loader.lua 来加载我们的 bootstrap 脚本。然后进行启动。

bootstrap 这个配置项关系着 skynet 运行的第二个服务。通常通过这个服务把整个系统启动起来。默认的 bootstrap 配置项为 “snlua bootstrap” ,这意味着,skynet 会启动 snlua 这个服务,并将 bootstrap 作为参数传给它。snlua 是 lua 沙盒服务,bootstrap 会根据配置的 luaservice 匹配到最终的 lua 脚本。如果按默认配置,这个脚本应该是 service/bootstrap.lua 。

如无必要,你不需要更改 bootstrap 配置项,让默认的 bootstrap 脚本工作。

最后,它从 config 中读取 start 这个配置项,作为用户定义的服务启动入口脚本运行。成功后,把自己退出。

这个 start 配置项,才是用户定义的启动脚本,默认值为 “main” 。如果你只是试玩一下 skynet ,可能有多份不同的启动脚本,那么建议你多写几份 config 文件,在里面配置不同的 start 项。examples 目录下有很多这样的例子。

最终,bootstrap 会将我们配置在配置文件中的 start 脚本加载进来,启动。其他服务。

start_thread(config->thread)

此函数用来启动我们设置的线程数。一共启动了config->thread + 3个线程。

// skynet_start.c
struct monitor {
int count;
struct skynet_monitor ** m;
pthread_cond_t cond;
pthread_mutex_t mutex;
int sleep;
int quit;
};


start(int thread) {
pthread_t pid[thread+3];

struct monitor *m = skynet_malloc(sizeof(*m));
memset(m, 0, sizeof(*m));
m->count = thread;
m->sleep = 0;

m->m = skynet_malloc(thread * sizeof(struct skynet_monitor *));
int i;
for (i=0;i<thread;i++) {
m->m[i] = skynet_monitor_new();
}
if (pthread_mutex_init(&m->mutex, NULL)) {
fprintf(stderr, "Init mutex error");
exit(1);
}
if (pthread_cond_init(&m->cond, NULL)) {
fprintf(stderr, "Init cond error");
exit(1);
}

create_thread(&pid[0], thread_monitor, m);
create_thread(&pid[1], thread_timer, m);
create_thread(&pid[2], thread_socket, m);

static int weight[] = {
-1, -1, -1, -1, 0, 0, 0, 0,
1, 1, 1, 1, 1, 1, 1, 1,
2, 2, 2, 2, 2, 2, 2, 2,
3, 3, 3, 3, 3, 3, 3, 3, };
struct worker_parm wp[thread];
for (i=0;i<thread;i++) {
wp[i].m = m;
wp[i].id = i;
if (i < sizeof(weight)/sizeof(weight[0])) {
wp[i].weight= weight[i];
} else {
wp[i].weight = 0;
}
create_thread(&pid[i+3], thread_worker, &wp[i]);
}

for (i=0;i<thread+3;i++) {
pthread_join(pid[i], NULL);
}

free_monitor(m);
}

我们看到,函数先建立了一个监控器m,并初始化了线程数,睡眠数分别为(8, 0),然后,再为每个线程建立了skynet自己的监控数据。

// skynet_monitor.c
struct skynet_monitor {
int version;
int check_version;
uint32_t source;
uint32_t destination;
};

接下来我们可以看到,是通过 互斥量 和条件变量来进行线程的抢占使用的。接着建立了三个线程和thread(默认是8)个工作线程:

// skynet_start.c

struct worker_parm {
struct monitor *m;
int id;
int weight;
};

create_thread(&pid[0], thread_monitor, m);
create_thread(&pid[1], thread_timer, m);
create_thread(&pid[2], thread_socket, m);

static int weight[] = {
-1, -1, -1, -1, 0, 0, 0, 0,
1, 1, 1, 1, 1, 1, 1, 1,
2, 2, 2, 2, 2, 2, 2, 2,
3, 3, 3, 3, 3, 3, 3, 3, };
struct worker_parm wp[thread];
for (i=0;i<thread;i++) {
wp[i].m = m;
wp[i].id = i;
if (i < sizeof(weight)/sizeof(weight[0])) {
wp[i].weight= weight[i];
} else {
wp[i].weight = 0;
}
create_thread(&pid[i+3], thread_worker, &wp[i]);
}

建立了一个工作线程参数表struct worker_parm wp[thread];,并设置每个线程的权重,然后启动线程,线程ID保存在pid数组内。

thread_worker

工作线程循环从队列内取出消息进行处理,如果没有消息,就从全局消息队列去取。

static void *
thread_worker(void *p) {
struct worker_parm *wp = p;
int id = wp->id;
int weight = wp->weight;
struct monitor *m = wp->m;
struct skynet_monitor *sm = m->m[id];
skynet_initthread(THREAD_WORKER);
struct message_queue * q = NULL;
while (!m->quit) {
q = skynet_context_message_dispatch(sm, q, weight);
if (q == NULL) {
if (pthread_mutex_lock(&m->mutex) == 0) {
++ m->sleep;
// "spurious wakeup" is harmless,
// because skynet_context_message_dispatch() can be call at any time.
if (!m->quit)
pthread_cond_wait(&m->cond, &m->mutex);
-- m->sleep;
if (pthread_mutex_unlock(&m->mutex)) {
fprintf(stderr, "unlock mutex error");
exit(1);
}
}
}
}
return NULL;
}

skynet_initthread(THREAD_WORKER);用于在线程特定数据内保存自己的handlekey

// skynet_server.c
void
skynet_initthread(int m) {
uintptr_t v = (uint32_t)(-m);
pthread_setspecific(G_NODE.handle_key, (void *)v);
}

每个工作线程,都是调用skynet_context_message_dispatch(sm, q, wight)来从消息队列获取信息,如果获取不到,就进入睡眠状态。直到条件变量条件满足,才进行唤醒。

当线程的消息队列不存在的时候,就会从全局消息队列获取一个消息队列:

// skynet_mq.c

struct message_queue {
struct spinlock lock;
uint32_t handle;
int cap;
int head;
int tail;
int release;
int in_global;
int overload;
int overload_threshold;
struct skynet_message *queue;
struct message_queue *next;
};

struct global_queue {
struct message_queue *head;
struct message_queue *tail;
struct spinlock lock;
};
struct message_queue *
skynet_globalmq_pop() {
struct global_queue *q = Q;

SPIN_LOCK(q)
struct message_queue *mq = q->head;
if(mq) {
q->head = mq->next;
if(q->head == NULL) {
assert(mq == q->tail);
q->tail = NULL;
}
mq->next = NULL;
}
SPIN_UNLOCK(q)

return mq;
}
// skynet_server.c
struct message_queue *
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
if (q == NULL) {
q = skynet_globalmq_pop();
if (q==NULL)
return NULL;
}

uint32_t handle = skynet_mq_handle(q);

struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) {
struct drop_t d = { handle };
skynet_mq_release(q, drop_message, &d);
return skynet_globalmq_pop();
}

int i,n=1;
struct skynet_message msg;

for (i=0;i<n;i++) {
if (skynet_mq_pop(q,&msg)) {
skynet_context_release(ctx);
return skynet_globalmq_pop();
} else if (i==0 && weight >= 0) {
n = skynet_mq_length(q);
n >>= weight;
}
int overload = skynet_mq_overload(q);
if (overload) {
skynet_error(ctx, "May overload, message queue length = %d", overload);
}

skynet_monitor_trigger(sm, msg.source , handle);

if (ctx->cb == NULL) {
skynet_free(msg.data);
} else {
dispatch_message(ctx, &msg);
}

skynet_monitor_trigger(sm, 0,0);
}

assert(q == ctx->queue);
struct message_queue *nq = skynet_globalmq_pop();
if (nq) {
// If global mq is not empty , push q back, and return next queue (nq)
// Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
skynet_globalmq_push(q);
q = nq;
}
skynet_context_release(ctx);

return q;
}

处理消息最主要的就是这个函数了:

  1. 如果传入的消息队列q为空,就会重新从全局队列弹出一个。
q = skynet_globalmq_pop();
  1. 获得弹出消息队列的 handle,根据handle来找到对应的 服务 ctx;
uint32_t handle = skynet_mq_handle(q);
  1. 从队列取消息。 如果从特定的消息队列弹出消息失败,就会从全局队列返回一个新的消息队列。
int i,n=1;
struct skynet_message msg;

for (i=0;i<n;i++) {
if (skynet_mq_pop(q,&msg)) {
skynet_context_release(ctx);
// 弹出消息失败,返回一个新的消息队列
return skynet_globalmq_pop();
// 根据线程权重设置来获取消息数
} else if (i==0 && weight >= 0) {
n = skynet_mq_length(q);
n >>= weight;
}
int overload = skynet_mq_overload(q);
if (overload) {
skynet_error(ctx, "May overload, message queue length = %d", overload);
}

skynet_monitor_trigger(sm, msg.source , handle);

if (ctx->cb == NULL) {
skynet_free(msg.data);
} else {
dispatch_message(ctx, &msg);
}

skynet_monitor_trigger(sm, 0,0);
}
  1. 调用dispatch_message(ctx, &msg); 此函数会调用注册的回调函数处理消息。
  2. 消息处理完毕后,检查全局消息队列是否为空,如果为空的话,就继续返回当前队列;如果全局消息队列不为空, 就返回一个新的消息队列。

也就是说,默认情况下,每个线程每次只会处理全局消息队列中,某一消息队列的一条消息。

thread_socket

这个线程,会轮询所有的的套接字,具体的工作在skynet_socket_poll()内完成。

// skynet_start.c
static void *
thread_socket(void *p) {
struct monitor * m = p;
skynet_initthread(THREAD_SOCKET);
for (;;) {
int r = skynet_socket_poll();
if (r==0)
break;
if (r<0) {
CHECK_ABORT
continue;
}
wakeup(m,0);
}
return NULL;
}

此函数会获取获取消息类型和消息内容,并进行消息转发。

// skynet_socket.c
int
skynet_socket_poll() {
struct socket_server *ss = SOCKET_SERVER;
assert(ss);
struct socket_message result;
int more = 1;
int type = socket_server_poll(ss, &result, &more);
switch (type) {
case SOCKET_EXIT:
return 0;
case SOCKET_DATA:
forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
break;
case SOCKET_CLOSE:
forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result);
break;
case SOCKET_OPEN:
forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
break;
case SOCKET_ERR:
forward_message(SKYNET_SOCKET_TYPE_ERROR, true, &result);
break;
case SOCKET_ACCEPT:
forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result);
break;
case SOCKET_UDP:
forward_message(SKYNET_SOCKET_TYPE_UDP, false, &result);
break;
case SOCKET_WARNING:
forward_message(SKYNET_SOCKET_TYPE_WARNING, false, &result);
break;
default:
skynet_error(NULL, "Unknown socket message type %d.",type);
return -1;
}
if (more) {
return -1;
}
return 1;
}

这个函数的主要工作还是由 socket_service_poll(ss, &result, &more)来完成的。该函数会返回套接字消息的类型,消息本身及是否有更多消息。

// socket_server.c
int
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
for (;;) {
if (ss->checkctrl) {
if (has_cmd(ss)) {
int type = ctrl_cmd(ss, result);
if (type != -1) {
clear_closed_event(ss, result, type);
return type;
} else
continue;
} else {
ss->checkctrl = 0;
}
}
if (ss->event_index == ss->event_n) {
ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);
ss->checkctrl = 1;
if (more) {
*more = 0;
}
ss->event_index = 0;
if (ss->event_n <= 0) {
ss->event_n = 0;
if (errno == EINTR) {
continue;
}
return -1;
}
}
struct event *e = &ss->ev[ss->event_index++];
struct socket *s = e->s;
if (s == NULL) {
// dispatch pipe message at beginning
continue;
}
struct socket_lock l;
socket_lock_init(s, &l);
switch (s->type) {
case SOCKET_TYPE_CONNECTING:
return report_connect(ss, s, &l, result);
case SOCKET_TYPE_LISTEN: {
int ok = report_accept(ss, s, result);
if (ok > 0) {
return SOCKET_ACCEPT;
} if (ok < 0 ) {
return SOCKET_ERR;
}
// when ok == 0, retry
break;
}
case SOCKET_TYPE_INVALID:
fprintf(stderr, "socket-server: invalid socket\n");
break;
default:
if (e->read) {
int type;
if (s->protocol == PROTOCOL_TCP) {
type = forward_message_tcp(ss, s, &l, result);
} else {
type = forward_message_udp(ss, s, &l, result);
if (type == SOCKET_UDP) {
// try read again
--ss->event_index;
return SOCKET_UDP;
}
}
if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERR) {
// Try to dispatch write message next step if write flag set.
e->read = false;
--ss->event_index;
}
if (type == -1)
break;
return type;
}
if (e->write) {
int type = send_buffer(ss, s, &l, result);
if (type == -1)
break;
return type;
}
if (e->error) {
// close when error
int error;
socklen_t len = sizeof(error);
int code = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &error, &len);
const char * err = NULL;
if (code < 0) {
err = strerror(errno);
} else if (error != 0) {
err = strerror(error);
} else {
err = "Unknown error";
}
force_close(ss, s, &l, result);
result->data = (char *)err;
return SOCKET_ERR;
}
break;
}
}
}

forward_message会被对应类型的消息,压入到对应服务结构中的消息队列去。

// skynet_socket.c
static void
forward_message(int type, bool padding, struct socket_message * result) {
struct skynet_socket_message *sm;
size_t sz = sizeof(*sm);
if (padding) {
if (result->data) {
size_t msg_sz = strlen(result->data);
if (msg_sz > 128) {
msg_sz = 128;
}
sz += msg_sz;
} else {
result->data = "";
}
}
sm = (struct skynet_socket_message *)skynet_malloc(sz);
sm->type = type;
sm->id = result->id;
sm->ud = result->ud;
if (padding) {
sm->buffer = NULL;
memcpy(sm+1, result->data, sz - sizeof(*sm));
} else {
sm->buffer = result->data;
}

struct skynet_message message;
message.source = 0;
message.session = 0;
message.data = sm;
message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);

if (skynet_context_push((uint32_t)result->opaque, &message)) {
// todo: report somewhere to close socket
// don't call skynet_socket_close here (It will block mainloop)
skynet_free(sm->buffer);
skynet_free(sm);
}
}

其是通过 skynet_context_push()函数实现的。

// skynet_server.c

int
skynet_context_push(uint32_t handle, struct skynet_message *message) {
struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) {
return -1;
}
skynet_mq_push(ctx->queue, message);
skynet_context_release(ctx);

return 0;
}

通过handle来找到服务的ctx->queue,然后压进去。

从服务的注册看起。

我们通过Lua编写的所有服务,都是通过 skynet.newservice函数注册的。这是一个Lua函数:

-- lualib/skynet.lua

function skynet.newservice(name, ...)
return skynet.call(".launcher", "lua" , "LAUNCH", "snlua", name, ...)
end

function skynet.call(addr, typename, ...)
local p = proto[typename]
local session = c.send(addr, p.id , nil , p.pack(...))
if session == nil then
error("call to invalid address " .. skynet.address(addr))
end
return p.unpack(yield_call(addr, session))
end

其本质,是通过 skynet.call函数向 .launcher服务,发送 lua类型的消息,加上一系列参数实现的。

skynet.call则调用的是导出的C函数 skynet_core.send,更具体的C函数就是lsend。具体函数的导出参看另外一篇文章skynet的启动与服务载入流程

// lualib-src/lua-skynet.c
static int
lsend(lua_State *L) {
return send_message(L, 0, 2);
}

static int
send_message(lua_State *L, int source, int idx_type) {
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
uint32_t dest = (uint32_t)lua_tointeger(L, 1);
const char * dest_string = NULL;
if (dest == 0) {
if (lua_type(L,1) == LUA_TNUMBER) {
return luaL_error(L, "Invalid service address 0");
}
dest_string = get_dest_string(L, 1);
}

int type = luaL_checkinteger(L, idx_type+0);
int session = 0;
if (lua_isnil(L,idx_type+1)) {
type |= PTYPE_TAG_ALLOCSESSION;
} else {
session = luaL_checkinteger(L,idx_type+1);
}

int mtype = lua_type(L,idx_type+2);
switch (mtype) {
case LUA_TSTRING: {
size_t len = 0;
void * msg = (void *)lua_tolstring(L,idx_type+2,&len);
if (len == 0) {
msg = NULL;
}
if (dest_string) {
session = skynet_sendname(context, source, dest_string, type, session , msg, len);
} else {
session = skynet_send(context, source, dest, type, session , msg, len);
}
break;
}
case LUA_TLIGHTUSERDATA: {
void * msg = lua_touserdata(L,idx_type+2);
int size = luaL_checkinteger(L,idx_type+3);
if (dest_string) {
session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);
} else {
session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);
}
break;
}
default:
luaL_error(L, "invalid param %s", lua_typename(L, lua_type(L,idx_type+2)));
}
if (session < 0) {
// send to invalid address
// todo: maybe throw an error would be better
return 0;
}
lua_pushinteger(L,session);
return 1;
}

最终,是通过skynet_server.c中的的 skynet_context_push函数将消息推到对应的skynet_context结构中的消息队列中去,接着就返回 sessionId。

消息分发函数

每个业务脚本都会注册一个消息处理函数:

-- lualib/skynet.lua
function skynet.start(start_func)
c.callback(skynet.dispatch_message)
skynet.timeout(0, function()
skynet.init_service(start_func)
end)
end

每次收到消息的时候就会调用这个消息处理函数。