通过将配置文件传入 skynet 后,其就会根据我们的脚本逻辑业务单元来启动对应的服务,然后把收到的消息进行分发处理。一般来说,这个框架针对的是网络游戏服务器,所以肯定会面向网络套接字信息这样的,但是也有进程间消息传递的处理机制。下面我们来看一下。
skyent main() 在文件skynet_main.c
文件中,定义了一个main() 函数。
int main(int argc, char *argv[]) { const char * config_file = NULL ; if (argc > 1 ) { config_file = argv[1 ]; } else { fprintf (stderr , "Need a config file. Please read skynet wiki : https://github.com/cloudwu/skynet/wiki/Config\n" "usage: skynet configfilename\n" ); return 1 ; } skynet_globalinit(); skynet_env_init(); sigign(); struct skynet_config config ; #ifdef LUA_CACHELIB luaL_initcodecache(); #endif struct lua_State *L = luaL_newstate (); luaL_openlibs(L); int err = luaL_loadbufferx(L, load_config, strlen (load_config), "=[skynet config]" , "t" ); assert(err == LUA_OK); lua_pushstring(L, config_file); err = lua_pcall(L, 1 , 1 , 0 ); if (err) { fprintf (stderr ,"%s\n" ,lua_tostring(L,-1 )); lua_close(L); return 1 ; } _init_env(L); config.thread = optint("thread" ,8 ); config.module_path = optstring("cpath" ,"./cservice/?.so" ); config.harbor = optint("harbor" , 1 ); config.bootstrap = optstring("bootstrap" ,"snlua bootstrap" ); config.daemon = optstring("daemon" , NULL ); config.logger = optstring("logger" , NULL ); config.logservice = optstring("logservice" , "logger" ); config.profile = optboolean("profile" , 1 ); lua_close(L); skynet_start(&config); skynet_globalexit(); return 0 ; }
这个函数做了几件事情:
初始化全局环境。skynet_globalinit()
初始化环境变量。skynet_env_init()
使用 Lua 加载代码来来加载我们的配置文件。luaL_loadbufferx()
就是读取传入的配置文件,解析参数,然后以skynet_start(&config)
进行启动。
skynet_globalinit() - 全局节点初始化 这个函数,会初始化全局的节点信息,被设置主线程内的控制键值。
struct skynet_node { int total; int init; uint32_t monitor_exit; pthread_key_t handle_key; bool profile; }; static struct skynet_node G_NODE ;void skynet_globalinit(void ) { G_NODE.total = 0 ; G_NODE.monitor_exit = 0 ; G_NODE.init = 1 ; if (pthread_key_create(&G_NODE.handle_key, NULL )) { fprintf (stderr , "pthread_key_create failed" ); exit (1 ); } skynet_initthread(THREAD_MAIN); } void skynet_initthread(int m) { uintptr_t v = (uint32_t )(-m); pthread_setspecific(G_NODE.handle_key, (void *)v); }
看起来,在主线程上创建了一个线程存储键,并初始化为 THREAD_MAIN,这个常量定义为 1。
初始化全局的节点信息,这个应该是为了分布式或多节点而来的。
skynet_env_init()-全局环境变量 这个文件,初始化一个全局的环境变量E ,可以看到这个全局变量其实也是用一个 lua_State 来保存我们的配置的。
struct skynet_env { struct spinlock lock ; lua_State *L; }; static struct skynet_env *E = NULL ;void skynet_env_init() { E = skynet_malloc(sizeof (*E)); SPIN_INIT(E) E->L = luaL_newstate(); } struct spinlock { int lock; };
spinlock 就是一个整型,不知道是做什么用处的,可能是和锁相关的内容。 想要对全局环境进行修改的时候,势必要获取这个锁。
luaL_initcodecache 这个函数,原生的 Lua 是不具备的,是在 lauxlib.c 内增加 的这么一个函数。
LUALIB_API void luaL_initcodecache(void ) { SPIN_INIT(&CC); } struct codecache { struct spinlock lock ; lua_State *L; }; static struct codecache CC ;
一个 CC 静态变量,用来存储一些需要有多个 lua_State 公用的代码,当然也是加了锁的。
配置加载 一些库的载入,是打开一个虚拟机,然后通过 Lua 脚本的形式载入的。我们看主函数中的代码:
struct skynet_config config ; struct lua_State *L = luaL_newstate (); luaL_openlibs(L); int err = luaL_loadbufferx(L, load_config, strlen (load_config), "=[skynet config]" , "t" ); assert(err == LUA_OK); lua_pushstring(L, config_file); err = lua_pcall(L, 1 , 1 , 0 ); if (err) { fprintf (stderr ,"%s\n" ,lua_tostring(L,-1 )); lua_close(L); return 1 ; }
我们需要注意的是 luaL_loadbufferx
会将字符串压入栈上,第三个参数一般用来做调试的时候打印信息用,第四个参数表示加载的是 二进制(b ) 还是 文本 (t ),或者两者都有(bt )。
先是建立一个新的 Lua State,然后把 下面的代码载入其内;接着,把配置文件压入栈,然后执行配置文件。这个是我们的 config.lua 配置文件的加载逻辑。
static const char * load_config = "\ local result = {}\n\ local function getenv(name) return assert(os.getenv(name), [[os.getenv() failed: ]] .. name) end\n\ local sep = package.config:sub(1,1)\n\ local current_path = [[.]]..sep\n\ local function include(filename)\n\ local last_path = current_path\n\ local path, name = filename:match([[(.*]]..sep..[[)(.*)$]])\n\ if path then\n\ if path:sub(1,1) == sep then -- root\n\ current_path = path\n\ else\n\ current_path = current_path .. path\n\ end\n\ else\n\ name = filename\n\ end\n\ local f = assert(io.open(current_path .. name))\n\ local code = assert(f:read [[*a]])\n\ code = string.gsub(code, [[%$([%w_%d]+)]], getenv)\n\ f:close()\n\ assert(load(code,[[@]]..filename,[[t]],result))()\n\ current_path = last_path\n\ end\n\ setmetatable(result, { __index = { include = include } })\n\ local config_name = ...\n\ include(config_name)\n\ setmetatable(result, nil)\n\ return result\n\ " ;
需要注意的是,我们的配置,这个时候是加载在一在个本地的 Lua State 里面的,需要在后面进行配置到全局。
这段代码的运行逻辑看起来有点累哈,实际就是设置了一个 include
函数,这个函数会把我们指定的配置文件以 result 为环境加 load 到栈上,但是并不执行,返回值就是我们建立的 result。这即是加载后的配置表。
__init_env(L) 接下来的事情就比较奇妙了,在先前建立的 Lua State 内,已经保存了我们的配置信息,已经载入的库等。接下来就是把这个 Lua State 内的配置,都设置到全局变量内(事实上这些完全可以在 C 代码内完成的,为什么要用 Lua 呢)。
关于就在于我们的 skynet_setenv() 函数,会将我们 Lua 内配置的内容都设置到全局配置 E 内去。
static void _init_env(lua_State *L) { lua_pushnil(L); while (lua_next(L, -2 ) != 0 ) { int keyt = lua_type(L, -2 ); if (keyt != LUA_TSTRING) { fprintf (stderr , "Invalid config table\n" ); exit (1 ); } const char * key = lua_tostring(L,-2 ); if (lua_type(L,-1 ) == LUA_TBOOLEAN) { int b = lua_toboolean(L,-1 ); skynet_setenv(key,b ? "true" : "false" ); } else { const char * value = lua_tostring(L,-1 ); if (value == NULL ) { fprintf (stderr , "Invalid config table key = %s\n" , key); exit (1 ); } skynet_setenv(key,value); } lua_pop(L,1 ); } lua_pop(L,1 ); }
这个代码就是要理解 lua_next(L,index)
这个函数会从栈顶 弹出一个键,然后从 Index 的表处,压入两个值,key_value 对。
skynet_setenv(const char *key, const char *value) { SPIN_LOCK(E) lua_State *L = E->L; lua_getglobal(L, key); assert(lua_isnil(L, -1 )); lua_pop(L,1 ); lua_pushstring(L,value); lua_setglobal(L,key); SPIN_UNLOCK(E) }
可以看到,我们的配置文件信息,其实是放在全局环境的注册表内的。
skynet_start(&config) 当我们把我们的 config.lua 内的内容加载到全局环境变量 E 中后,就会根据配置来构造我们的启动了。
类似 optint(), optstring()
这些函数其实都是使用 skynet_getenv 了从 E 内取内容。
在skynet_start.c
中,我们可以看到代码:
skynet_start(struct skynet_config * config) { struct sigaction sa ; sa.sa_handler = &handle_hup; sa.sa_flags = SA_RESTART; sigfillset(&sa.sa_mask); sigaction(SIGHUP, &sa, NULL ); if (config->daemon) { if (daemon_init(config->daemon)) { exit (1 ); } } skynet_harbor_init(config->harbor); skynet_handle_init(config->harbor); skynet_mq_init(); skynet_module_init(config->module_path); skynet_timer_init(); skynet_socket_init(); skynet_profile_enable(config->profile); struct skynet_context *ctx = skynet_context_new (config ->logservice , config ->logger ); if (ctx == NULL ) { fprintf (stderr , "Can't launch %s service\n" , config->logservice); exit (1 ); } bootstrap(ctx, config->bootstrap); start(config->thread); skynet_harbor_exit(); skynet_socket_free(); if (config->daemon) { daemon_exit(config->daemon); } }
其主要工作为:
设置SIGHUP
的信号处理程序。
初始化句柄。skynet_handle_init()
初始化消息队列。skynet_mq_init()
模块加载。skynet_module_init(config->module_path)
我们指定的 C 编译的 so 库的目录下的内容。
定时器设置。skynet_timer_init()
套接字初始化。skynet_socket_ini()
开启日志服务。
启动bootstrap 脚本。bootstrap(ctr, confit->bootstrap)
启动线程。start(config->thread)
。
skynet_harbor_init(int harbor) 初始化节点化的 ID。
harbor 可以是 1-255 间的任意整数。一个 skynet 网络最多支持 255 个节点。每个节点有必须有一个唯一的编号。
如果 harbor 为 0 ,skynet 工作在单节点模式下。此时 master 和 address 以及 standalone 都不必设置。
void skynet_harbor_init(int harbor) { HARBOR = (unsigned int )harbor << HANDLE_REMOTE_SHIFT; }
skynet_handle_init() handle 是什么我一直没有搞清楚,这是干什么的我也没有弄明白。只有参考了一下作者的博客 。
把一个符合规范的 C 模块,从动态库(so 文件)中启动起来,绑定一个永不重复(即使模块退出)的数字 id 做为其 handle 。模块被称为服务(Service),服务间可以自由发送消息。每个模块可以向 Skynet 框架注册一个 callback 函数,用来接收发给它的消息。每个服务都是被一个个消息包驱动,当没有包到来的时候,它们就会处于挂起状态,对 CPU 资源零消耗。如果需要自主逻辑,则可以利用 Skynet 系统提供的 timeout 消息,定期触发。
模块的实例是服务,实例的 ID 是 handle,每个服务都对应一个唯一的 handle_id。
也就是对于很多类型的服务,都是首先加载进来,然后注册,为服务模块,可以启动多个实例。
struct handle_storage { struct rwlock lock ; uint32_t harbor; uint32_t handle_index; int slot_size; struct skynet_context ** slot ; int name_cap; int name_count; struct handle_name *name ; }; static struct handle_storage *H = NULL ;void skynet_handle_init(int harbor) { assert(H==NULL ); struct handle_storage * s = skynet_malloc (sizeof (*H )); s->slot_size = DEFAULT_SLOT_SIZE; s->slot = skynet_malloc(s->slot_size * sizeof (struct skynet_context *)); memset (s->slot, 0 , s->slot_size * sizeof (struct skynet_context *)); rwlock_init(&s->lock); s->harbor = (uint32_t ) (harbor & 0xff ) << HANDLE_REMOTE_SHIFT; s->handle_index = 1 ; s->name_cap = 2 ; s->name_count = 0 ; s->name = skynet_malloc(s->name_cap * sizeof (struct handle_name)); H = s; }
设置一个全局控制柄变量H ,默认有四个位置可以存储四个 skynet_context
服务结构的指针。
skynet_module_init() 服务表已经有了,现在我们需要把我们的模块加载进来。
这个函数定义在skynet_module.c
中,其作用,就是载入配置文件中lua_cpath= ...
指定的动态库路径。同时,将全局的模块变量M 指像这个路径。
每个模块的结构定义在skynet_module.h
中:
struct skynet_module { const char * name; void * module ; skynet_dl_create create; skynet_dl_init init; skynet_dl_release release; skynet_dl_signal signal; };
其中,后面四个函数是由动态库提供的。
struct modules { int count; struct spinlock lock ; const char * path; struct skynet_module m [MAX_MODULE_TYPE ]; }; static struct modules * M = NULL ;void skynet_module_init(const char *path) { struct modules *m = skynet_malloc (sizeof (*m )); m->count = 0 ; m->path = skynet_strdup(path); SPIN_INIT(m) M = m; }
skynet_mq_init() 在文件skynet_mq.c
中,定义了这个函数:
void skynet_mq_init() { struct global_queue *q = skynet_malloc (sizeof (*q )); memset (q,0 ,sizeof (*q)); SPIN_INIT(q); Q=q; }
这是建立了一个全局的消息队列 Q ,此队列保存了每个服务的消息队列。
skynet_timer_init(); 定时器初始化:
void skynet_timer_init(void ) { TI = timer_create_timer(); uint32_t current = 0 ; systime(&TI->starttime, ¤t); TI->current = current; TI->current_point = gettime(); }
skynet_socket_init(); 套接字服务器初始化,这个的作用是当需要启动网络服务的时候,由这个服务器来启动对应的监听。
void skynet_socket_init() { SOCKET_SERVER = socket_server_create(skynet_now()); } struct socket_server *socket_server_create (uint64_t time ) { int i; int fd[2 ]; poll_fd efd = sp_create(); if (sp_invalid(efd)) { fprintf (stderr , "socket-server: create event pool failed.\n" ); return NULL ; } if (pipe(fd)) { sp_release(efd); fprintf (stderr , "socket-server: create socket pair failed.\n" ); return NULL ; } if (sp_add(efd, fd[0 ], NULL )) { fprintf (stderr , "socket-server: can't add server fd to event pool.\n" ); close(fd[0 ]); close(fd[1 ]); sp_release(efd); return NULL ; } struct socket_server *ss = MALLOC (sizeof (*ss )); ss->time = time; ss->event_fd = efd; ss->recvctrl_fd = fd[0 ]; ss->sendctrl_fd = fd[1 ]; ss->checkctrl = 1 ; for (i=0 ;i<MAX_SOCKET;i++) { struct socket *s = &ss ->slot [i ]; s->type = SOCKET_TYPE_INVALID; clear_wb_list(&s->high); clear_wb_list(&s->low); spinlock_init(&s->dw_lock); } ss->alloc_id = 0 ; ss->event_n = 0 ; ss->event_index = 0 ; memset (&ss->soi, 0 , sizeof (ss->soi)); FD_ZERO(&ss->rfds); assert(ss->recvctrl_fd < FD_SETSIZE); return ss; }
skynet_context 结构 在 skynet.c 中,结构skynet_context
为每个服务保存了一个内部结构:
struct skynet_context { void * instance; struct skynet_module * mod ; void * cb_ud; skynet_cb cb; struct message_queue *queue ; FILE * logfile; uint64_t cpu_cost; uint64_t cpu_start; char result[32 ]; uint32_t handle; int session_id; int ref; int message_count; bool init; bool endless; bool profile; CHECKCALLING_DECL };
结构定义了每个服务的实例地址,模块地址,消息队列,日志文件,会话 ID,引用数,消息数 等字段。
服务 H 表 我们第一个启动的服务就是 logger 服务。
struct skynet_context *ctx = skynet_context_new (config ->logservice , config ->logger );if (ctx == NULL ) { fprintf (stderr , "Can't launch %s service\n" , config->logservice); exit (1 ); } skynet_handle_namehandle(skynet_context_handle(ctx), "logger" );
skynet_create_new() skynet.c
中,skynet_create_new()
函数如下:
struct skynet_context *skynet_context_new (const char * name , const char *param ) { struct skynet_module * mod = skynet_module_query (name ); if (mod == NULL ) return NULL ; void *inst = skynet_module_instance_create(mod); if (inst == NULL ) return NULL ; struct skynet_context * ctx = skynet_malloc (sizeof (*ctx )); CHECKCALLING_INIT(ctx) ctx->mod = mod; ctx->instance = inst; ctx->ref = 2 ; ctx->cb = NULL ; ctx->cb_ud = NULL ; ctx->session_id = 0 ; ctx->logfile = NULL ; ctx->init = false ; ctx->endless = false ; ctx->cpu_cost = 0 ; ctx->cpu_start = 0 ; ctx->message_count = 0 ; ctx->profile = G_NODE.profile; ctx->handle = 0 ; ctx->handle = skynet_handle_register(ctx); struct message_queue * queue = ctx ->queue = skynet_mq_create (ctx ->handle ); context_inc(); CHECKCALLING_BEGIN(ctx) int r = skynet_module_instance_init(mod, inst, ctx, param); CHECKCALLING_END(ctx) if (r == 0 ) { struct skynet_context * ret = skynet_context_release (ctx ); if (ret) { ctx->init = true ; } skynet_globalmq_push(queue ); if (ret) { skynet_error(ret, "LAUNCH %s %s" , name, param ? param : "" ); } return ret; } else { skynet_error(ctx, "FAILED launch %s" , name); uint32_t handle = ctx->handle; skynet_context_release(ctx); skynet_handle_retire(handle); struct drop_t d = { handle }; skynet_mq_release(queue , drop_message, &d); return NULL ; } }
此函数,
首先会查看一下全局服务变量M 中是否存在对应模块,如果存在,就初始化一个实例;
初始化 服务的 skynet_context 结构,包括会分配一个唯一的 handle。skynet_handle_register(ctx)
,然后将服务注册到 H 中。
建立服务的消息队列。skynet_mq_create
初始化服务。skynet_module_instance_init(mod, inst, ctx, param);
其结果是调用模块自身的 init
函数。
然后把消息队列放在全局消息队列中。
skynet_module_instance_create void *skynet_module_instance_create(struct skynet_module *m) { if (m->create) { return m->create(); } else { return (void *)(intptr_t )(~0 ); } }
模块实例的建立,实际上就是 C 服务中的 create 函数的调用,其结果,一般都是返回一个服务需要的数据结构。如 logger, snlua, gate 服务:
struct gate *gate_create (void ) { struct gate * g = skynet_malloc (sizeof (*g )); memset (g,0 ,sizeof (*g)); g->listen_id = -1 ; return g; } struct logger *logger_create (void ) { struct logger * inst = skynet_malloc (sizeof (*inst )); inst->handle = NULL ; inst->close = 0 ; inst->filename = NULL ; return inst; } struct snlua *snlua_create (void ) { struct snlua * l = skynet_malloc (sizeof (*l )); memset (l,0 ,sizeof (*l)); l->mem_report = MEMORY_WARNING_REPORT; l->mem_limit = 0 ; l->L = lua_newstate(lalloc, l); return l; }
skynet_handle_register 我们通过模块建立了一个服务后,事实上对于 skynet 来说,这个服务,就是一个 skynet_context ,其对具体的模块实际上是不关心的。
在 H 表中 slot 是服务的插槽,每个插槽都指向一个 skynet_context 结构。
下面这个逻辑会首先看一下插槽够用不,不够用就增大插槽,然后再注册后返回。
uint32_t skynet_handle_register(struct skynet_context *ctx) { struct handle_storage *s = H ; rwlock_wlock(&s->lock); for (;;) { int i; uint32_t handle = s->handle_index; for (i=0 ;i<s->slot_size;i++,handle++) { if (handle > HANDLE_MASK) { handle = 1 ; } int hash = handle & (s->slot_size-1 ); if (s->slot[hash] == NULL ) { s->slot[hash] = ctx; s->handle_index = handle + 1 ; rwlock_wunlock(&s->lock); handle |= s->harbor; return handle; } } assert((s->slot_size*2 - 1 ) <= HANDLE_MASK); struct skynet_context ** new_slot = skynet_malloc (s ->slot_size * 2 * sizeof (struct skynet_context *)); memset (new_slot, 0 , s->slot_size * 2 * sizeof (struct skynet_context *)); for (i=0 ;i<s->slot_size;i++) { int hash = skynet_context_handle(s->slot[i]) & (s->slot_size * 2 - 1 ); assert(new_slot[hash] == NULL ); new_slot[hash] = s->slot[i]; } skynet_free(s->slot); s->slot = new_slot; s->slot_size *= 2 ; } }
模块(服务)的初始化 每个模块的实例建立了以后,就会调用模块自己的初始化函数进行初始化设置。对于我们所有的 Lua 服务来说,其都是 snlua
模块的一个实例。对于 skyent_context_new()
中调用的函数skynet_module_instance_init()
,我们看一下它的代码:
int skynet_module_instance_init(struct skynet_module *m, void * inst, struct skynet_context *ctx, const char * parm) { return m->init(inst, ctx, parm); }
这个其实就是 c server
模块中的 init
函数。
现在框架提供的服务有 :logger, gate,snlua, harbor,默认 4 个,所以 H 表流了四个 slot 在。
snlua snlua 实际上已经建立了一个 lua_State。
struct snlua { lua_State * L; struct skynet_context * ctx ; size_t mem; size_t mem_report; size_t mem_limit; }; int snlua_init(struct snlua *l, struct skynet_context *ctx, const char * args) { int sz = strlen (args); char * tmp = skynet_malloc(sz); memcpy (tmp, args, sz); skynet_callback(ctx, l , launch_cb); const char * self = skynet_command(ctx, "REG" , NULL ); uint32_t handle_id = strtoul(self+1 , NULL , 16 ); skynet_send(ctx, 0 , handle_id, PTYPE_TAG_DONTCOPY,0 , tmp, sz); return 0 ; } static int launch_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source , const void * msg, size_t sz) { assert(type == 0 && session == 0 ); struct snlua *l = ud ; skynet_callback(context, NULL , NULL ); int err = init_cb(l, context, msg, sz); if (err) { skynet_command(context, "EXIT" , NULL ); } return 0 ; } static int init_cb(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) { lua_State *L = l->L; l->ctx = ctx; lua_gc(L, LUA_GCSTOP, 0 ); lua_pushboolean(L, 1 ); lua_setfield(L, LUA_REGISTRYINDEX, "LUA_NOENV" ); luaL_openlibs(L); lua_pushlightuserdata(L, ctx); lua_setfield(L, LUA_REGISTRYINDEX, "skynet_context" ); luaL_requiref(L, "skynet.codecache" , codecache , 0 ); lua_pop(L,1 ); const char *path = optstring(ctx, "lua_path" ,"./lualib/?.lua;./lualib/?/init.lua" ); lua_pushstring(L, path); lua_setglobal(L, "LUA_PATH" ); const char *cpath = optstring(ctx, "lua_cpath" ,"./luaclib/?.so" ); lua_pushstring(L, cpath); lua_setglobal(L, "LUA_CPATH" ); const char *service = optstring(ctx, "luaservice" , "./service/?.lua" ); lua_pushstring(L, service); lua_setglobal(L, "LUA_SERVICE" ); const char *preload = skynet_command(ctx, "GETENV" , "preload" ); lua_pushstring(L, preload); lua_setglobal(L, "LUA_PRELOAD" ); lua_pushcfunction(L, traceback); assert(lua_gettop(L) == 1 ); const char * loader = optstring(ctx, "lualoader" , "./lualib/loader.lua" ); int r = luaL_loadfile(L,loader); if (r != LUA_OK) { skynet_error(ctx, "Can't load %s : %s" , loader, lua_tostring(L, -1 )); report_launcher_error(ctx); return 1 ; } lua_pushlstring(L, args, sz); r = lua_pcall(L,1 ,0 ,1 ); if (r != LUA_OK) { skynet_error(ctx, "lua loader error : %s" , lua_tostring(L, -1 )); report_launcher_error(ctx); return 1 ; } lua_settop(L,0 ); if (lua_getfield(L, LUA_REGISTRYINDEX, "memlimit" ) == LUA_TNUMBER) { size_t limit = lua_tointeger(L, -1 ); l->mem_limit = limit; skynet_error(ctx, "Set memory limit to %.2f M" , (float )limit / (1024 * 1024 )); lua_pushnil(L); lua_setfield(L, LUA_REGISTRYINDEX, "memlimit" ); } lua_pop(L, 1 ); lua_gc(L, LUA_GCRESTART, 0 ); return 0 ; }
可以看到,每个 snlua 结构都有自己的 Lua State,所以所有的代码都是 snlua 自己的内部执行的,不会影响其他的 Lua State。
这个函数都干了什么:
设置对应服务 ctx 的回调函数为 launch_cb
;
注册自身;
向 ctx 发送消息(入列)。
收到消息后会回调 launch_cb
,而这个函数会又取消调回调函数。转而调用 init_cb
函数。进行初始化 snlua 中的 Lua State;
这个函数才调用loader.lua
来真正的加载服务脚本等操作。
bootstrap() 之前我们只启动了一个 logger 服务。现在我们要开始启动新的服务了。
struct skynet_context *ctx = skynet_context_new (config ->logservice , config ->logger ); if (ctx == NULL ) { fprintf (stderr , "Can't launch %s service\n" , config->logservice); exit (1 ); }
默认情况下,会以logger, NULL 为参数调用 skynet_context_new(“logger”, NULL)
函数,建立一个关于 logger 服务的skynet_context
的结构,其实就是启动了 logger 服务。
然后调用bootstrap(ctx, config->bootstrap)
函数。
static void bootstrap(struct skynet_context * logger, const char * cmdline) { int sz = strlen (cmdline); char name[sz+1 ]; char args[sz+1 ]; sscanf (cmdline, "%s %s" , name, args); struct skynet_context *ctx = skynet_context_new (name , args ); if (ctx == NULL ) { skynet_error(NULL , "Bootstrap error : %s\n" , cmdline); skynet_context_dispatchall(logger); exit (1 ); } }
默认情况下,config->bootstrap
是snlua bootstrap
。事实上最终执行的函数是skynet_context_new("snlua", "bootstrap")
,如此又建立一个服务结构,并用bootstrap 参数进行初始化。如果服务建立失败,就会将所有的信息发送到 logger ,并处理全部的 logger 服务的消息。
snlua 作为沙盒服务,会再入lua
脚本并进行执行。
实际上 snlua 会利用 lualib/loader.lua 来加载我们的 bootstrap 脚本。然后进行启动。
bootstrap 这个配置项关系着 skynet 运行的第二个服务。通常通过这个服务把整个系统启动起来。默认的 bootstrap 配置项为 “snlua bootstrap” ,这意味着,skynet 会启动 snlua 这个服务,并将 bootstrap 作为参数传给它。snlua 是 lua 沙盒服务,bootstrap 会根据配置的 luaservice 匹配到最终的 lua 脚本。如果按默认配置,这个脚本应该是 service/bootstrap.lua 。
如无必要,你不需要更改 bootstrap 配置项,让默认的 bootstrap 脚本工作。
最后,它从 config 中读取 start 这个配置项,作为用户定义的服务启动入口脚本运行。成功后,把自己退出。
这个 start 配置项,才是用户定义的启动脚本,默认值为 “main” 。如果你只是试玩一下 skynet ,可能有多份不同的启动脚本,那么建议你多写几份 config 文件,在里面配置不同的 start 项。examples 目录下有很多这样的例子。
最终,bootstrap 会将我们配置在配置文件中的 start 脚本加载进来,启动。其他服务。
start_thread(config->thread) 此函数用来启动我们设置的线程数。一共启动了config->thread + 3 个线程。
struct monitor { int count; struct skynet_monitor ** m ; pthread_cond_t cond; pthread_mutex_t mutex; int sleep; int quit; }; start(int thread) { pthread_t pid[thread+3 ]; struct monitor *m = skynet_malloc (sizeof (*m )); memset (m, 0 , sizeof (*m)); m->count = thread; m->sleep = 0 ; m->m = skynet_malloc(thread * sizeof (struct skynet_monitor *)); int i; for (i=0 ;i<thread;i++) { m->m[i] = skynet_monitor_new(); } if (pthread_mutex_init(&m->mutex, NULL )) { fprintf (stderr , "Init mutex error" ); exit (1 ); } if (pthread_cond_init(&m->cond, NULL )) { fprintf (stderr , "Init cond error" ); exit (1 ); } create_thread(&pid[0 ], thread_monitor, m); create_thread(&pid[1 ], thread_timer, m); create_thread(&pid[2 ], thread_socket, m); static int weight[] = { -1 , -1 , -1 , -1 , 0 , 0 , 0 , 0 , 1 , 1 , 1 , 1 , 1 , 1 , 1 , 1 , 2 , 2 , 2 , 2 , 2 , 2 , 2 , 2 , 3 , 3 , 3 , 3 , 3 , 3 , 3 , 3 , }; struct worker_parm wp [thread ]; for (i=0 ;i<thread;i++) { wp[i].m = m; wp[i].id = i; if (i < sizeof (weight)/sizeof (weight[0 ])) { wp[i].weight= weight[i]; } else { wp[i].weight = 0 ; } create_thread(&pid[i+3 ], thread_worker, &wp[i]); } for (i=0 ;i<thread+3 ;i++) { pthread_join(pid[i], NULL ); } free_monitor(m); }
我们看到,函数先建立了一个监控器m ,并初始化了线程数,睡眠数分别为(8, 0) ,然后,再为每个线程建立了 skynet 自己的监控数据。
struct skynet_monitor { int version; int check_version; uint32_t source; uint32_t destination; };
接下来我们可以看到,是通过 互斥量 和条件变量来进行线程的抢占使用的。接着建立了三个线程和thread(默认是 8) 个工作线程:
struct worker_parm { struct monitor *m ; int id; int weight; }; create_thread(&pid[0 ], thread_monitor, m); create_thread(&pid[1 ], thread_timer, m); create_thread(&pid[2 ], thread_socket, m); static int weight[] = { -1 , -1 , -1 , -1 , 0 , 0 , 0 , 0 , 1 , 1 , 1 , 1 , 1 , 1 , 1 , 1 , 2 , 2 , 2 , 2 , 2 , 2 , 2 , 2 , 3 , 3 , 3 , 3 , 3 , 3 , 3 , 3 , }; struct worker_parm wp [thread ]; for (i=0 ;i<thread;i++) { wp[i].m = m; wp[i].id = i; if (i < sizeof (weight)/sizeof (weight[0 ])) { wp[i].weight= weight[i]; } else { wp[i].weight = 0 ; } create_thread(&pid[i+3 ], thread_worker, &wp[i]); }
建立了一个工作线程参数表struct worker_parm wp[thread];
,并设置每个线程的权重,然后启动线程,线程 ID 保存在pid 数组内。
thread_worker 工作线程循环从队列内取出消息进行处理,如果没有消息,就从全局消息队列去取。
static void *thread_worker(void *p) { struct worker_parm *wp = p ; int id = wp->id; int weight = wp->weight; struct monitor *m = wp ->m ; struct skynet_monitor *sm = m ->m [id ]; skynet_initthread(THREAD_WORKER); struct message_queue * q = NULL ; while (!m->quit) { q = skynet_context_message_dispatch(sm, q, weight); if (q == NULL ) { if (pthread_mutex_lock(&m->mutex) == 0 ) { ++ m->sleep; if (!m->quit) pthread_cond_wait(&m->cond, &m->mutex); -- m->sleep; if (pthread_mutex_unlock(&m->mutex)) { fprintf (stderr , "unlock mutex error" ); exit (1 ); } } } } return NULL ; }
skynet_initthread(THREAD_WORKER);
用于在线程特定数据内保存自己的handlekey 。
void skynet_initthread(int m) { uintptr_t v = (uint32_t )(-m); pthread_setspecific(G_NODE.handle_key, (void *)v); }
每个工作线程,都是调用skynet_context_message_dispatch(sm, q, wight)
来从消息队列获取信息,如果获取不到,就进入睡眠状态。直到条件变量条件满足,才进行唤醒。
当线程的消息队列不存在的时候,就会从全局消息队列获取一个消息队列:
struct message_queue { struct spinlock lock ; uint32_t handle; int cap; int head; int tail; int release; int in_global; int overload; int overload_threshold; struct skynet_message *queue ; struct message_queue *next ; }; struct global_queue { struct message_queue *head ; struct message_queue *tail ; struct spinlock lock ; }; struct message_queue *skynet_globalmq_pop () { struct global_queue *q = Q ; SPIN_LOCK(q) struct message_queue *mq = q ->head ; if (mq) { q->head = mq->next; if (q->head == NULL ) { assert(mq == q->tail); q->tail = NULL ; } mq->next = NULL ; } SPIN_UNLOCK(q) return mq; }
struct message_queue *skynet_context_message_dispatch (struct skynet_monitor *sm , struct message_queue *q , int weight ) { if (q == NULL ) { q = skynet_globalmq_pop(); if (q==NULL ) return NULL ; } uint32_t handle = skynet_mq_handle(q); struct skynet_context * ctx = skynet_handle_grab (handle ); if (ctx == NULL ) { struct drop_t d = { handle }; skynet_mq_release(q, drop_message, &d); return skynet_globalmq_pop(); } int i,n=1 ; struct skynet_message msg ; for (i=0 ;i<n;i++) { if (skynet_mq_pop(q,&msg)) { skynet_context_release(ctx); return skynet_globalmq_pop(); } else if (i==0 && weight >= 0 ) { n = skynet_mq_length(q); n >>= weight; } int overload = skynet_mq_overload(q); if (overload) { skynet_error(ctx, "May overload, message queue length = %d" , overload); } skynet_monitor_trigger(sm, msg.source , handle); if (ctx->cb == NULL ) { skynet_free(msg.data); } else { dispatch_message(ctx, &msg); } skynet_monitor_trigger(sm, 0 ,0 ); } assert(q == ctx->queue ); struct message_queue *nq = skynet_globalmq_pop (); if (nq) { skynet_globalmq_push(q); q = nq; } skynet_context_release(ctx); return q; }
处理消息最主要的就是这个函数了:
如果传入的消息队列q 为空,就会重新从全局队列弹出一个。
q = skynet_globalmq_pop();
获得弹出消息队列的 handle,根据 handle 来找到对应的 服务 ctx;
uint32_t handle = skynet_mq_handle(q);
从队列取消息。 如果从特定的消息队列弹出消息失败,就会从全局队列返回一个新的消息队列。
int i,n=1 ;struct skynet_message msg ;for (i=0 ;i<n;i++) { if (skynet_mq_pop(q,&msg)) { skynet_context_release(ctx); return skynet_globalmq_pop(); } else if (i==0 && weight >= 0 ) { n = skynet_mq_length(q); n >>= weight; } int overload = skynet_mq_overload(q); if (overload) { skynet_error(ctx, "May overload, message queue length = %d" , overload); } skynet_monitor_trigger(sm, msg.source , handle); if (ctx->cb == NULL ) { skynet_free(msg.data); } else { dispatch_message(ctx, &msg); } skynet_monitor_trigger(sm, 0 ,0 ); }
调用dispatch_message(ctx, &msg)
; 此函数会调用注册的回调函数处理消息。
消息处理完毕后,检查全局消息队列是否为空,如果为空的话,就继续返回当前队列;如果全局消息队列不为空, 就返回一个新的消息队列。
也就是说,默认情况下,每个线程每次只会处理全局消息队列中,某一消息队列的一条消息。
thread_socket 这个线程,会轮询所有的的套接字,具体的工作在skynet_socket_poll()
内完成。
static void *thread_socket(void *p) { struct monitor * m = p ; skynet_initthread(THREAD_SOCKET); for (;;) { int r = skynet_socket_poll(); if (r==0 ) break ; if (r<0 ) { CHECK_ABORT continue ; } wakeup(m,0 ); } return NULL ; }
此函数会获取获取消息类型和消息内容,并进行消息转发。
int skynet_socket_poll() { struct socket_server *ss = SOCKET_SERVER ; assert(ss); struct socket_message result ; int more = 1 ; int type = socket_server_poll(ss, &result, &more); switch (type) { case SOCKET_EXIT: return 0 ; case SOCKET_DATA: forward_message(SKYNET_SOCKET_TYPE_DATA, false , &result); break ; case SOCKET_CLOSE: forward_message(SKYNET_SOCKET_TYPE_CLOSE, false , &result); break ; case SOCKET_OPEN: forward_message(SKYNET_SOCKET_TYPE_CONNECT, true , &result); break ; case SOCKET_ERR: forward_message(SKYNET_SOCKET_TYPE_ERROR, true , &result); break ; case SOCKET_ACCEPT: forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true , &result); break ; case SOCKET_UDP: forward_message(SKYNET_SOCKET_TYPE_UDP, false , &result); break ; case SOCKET_WARNING: forward_message(SKYNET_SOCKET_TYPE_WARNING, false , &result); break ; default : skynet_error(NULL , "Unknown socket message type %d." ,type); return -1 ; } if (more) { return -1 ; } return 1 ; }
这个函数的主要工作还是由 socket_service_poll(ss, &result, &more)
来完成的。该函数会返回套接字消息的类型,消息本身及是否有更多消息。
int socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) { for (;;) { if (ss->checkctrl) { if (has_cmd(ss)) { int type = ctrl_cmd(ss, result); if (type != -1 ) { clear_closed_event(ss, result, type); return type; } else continue ; } else { ss->checkctrl = 0 ; } } if (ss->event_index == ss->event_n) { ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT); ss->checkctrl = 1 ; if (more) { *more = 0 ; } ss->event_index = 0 ; if (ss->event_n <= 0 ) { ss->event_n = 0 ; if (errno == EINTR) { continue ; } return -1 ; } } struct event *e = &ss ->ev [ss ->event_index ++]; struct socket *s = e ->s ; if (s == NULL ) { continue ; } struct socket_lock l ; socket_lock_init(s, &l); switch (s->type) { case SOCKET_TYPE_CONNECTING: return report_connect(ss, s, &l, result); case SOCKET_TYPE_LISTEN: { int ok = report_accept(ss, s, result); if (ok > 0 ) { return SOCKET_ACCEPT; } if (ok < 0 ) { return SOCKET_ERR; } break ; } case SOCKET_TYPE_INVALID: fprintf (stderr , "socket-server: invalid socket\n" ); break ; default : if (e->read) { int type; if (s->protocol == PROTOCOL_TCP) { type = forward_message_tcp(ss, s, &l, result); } else { type = forward_message_udp(ss, s, &l, result); if (type == SOCKET_UDP) { --ss->event_index; return SOCKET_UDP; } } if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERR) { e->read = false ; --ss->event_index; } if (type == -1 ) break ; return type; } if (e->write) { int type = send_buffer(ss, s, &l, result); if (type == -1 ) break ; return type; } if (e->error) { int error; socklen_t len = sizeof (error); int code = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &error, &len); const char * err = NULL ; if (code < 0 ) { err = strerror(errno); } else if (error != 0 ) { err = strerror(error); } else { err = "Unknown error" ; } force_close(ss, s, &l, result); result->data = (char *)err; return SOCKET_ERR; } break ; } } }
forward_message
会被对应类型的消息,压入到对应服务结构中的消息队列去。
static void forward_message(int type, bool padding, struct socket_message * result) { struct skynet_socket_message *sm ; size_t sz = sizeof (*sm); if (padding) { if (result->data) { size_t msg_sz = strlen (result->data); if (msg_sz > 128 ) { msg_sz = 128 ; } sz += msg_sz; } else { result->data = "" ; } } sm = (struct skynet_socket_message *)skynet_malloc(sz); sm->type = type; sm->id = result->id; sm->ud = result->ud; if (padding) { sm->buffer = NULL ; memcpy (sm+1 , result->data, sz - sizeof (*sm)); } else { sm->buffer = result->data; } struct skynet_message message ; message.source = 0 ; message.session = 0 ; message.data = sm; message.sz = sz | ((size_t )PTYPE_SOCKET << MESSAGE_TYPE_SHIFT); if (skynet_context_push((uint32_t )result->opaque, &message)) { skynet_free(sm->buffer); skynet_free(sm); } }
其是通过 skynet_context_push()
函数实现的。
int skynet_context_push(uint32_t handle, struct skynet_message *message) { struct skynet_context * ctx = skynet_handle_grab (handle ); if (ctx == NULL ) { return -1 ; } skynet_mq_push(ctx->queue , message); skynet_context_release(ctx); return 0 ; }
通过 handle 来找到服务的 ctx->queue,然后压进去。
从服务的注册看起。 我们通过 Lua 编写的所有服务,都是通过 skynet.newservice
函数注册的。这是一个 Lua 函数:
function skynet.newservice (name, ...) return skynet.call(".launcher" , "lua" , "LAUNCH" , "snlua" , name, ...) end function skynet.call (addr, typename, ...) local p = proto[typename] local session = c.send(addr, p.id , nil , p.pack(...)) if session == nil then error ("call to invalid address " .. skynet.address(addr)) end return p.unpack (yield_call(addr, session)) end
其本质,是通过 skynet.call
函数向 .launcher 服务,发送 lua 类型的消息,加上一系列参数实现的。
而skynet.call
则调用的是导出的 C 函数 skynet_core.send
,更具体的 C 函数就是lsend
。具体函数的导出参看另外一篇文章skynet 的启动与服务载入流程
static int lsend(lua_State *L) { return send_message(L, 0 , 2 ); } static int send_message(lua_State *L, int source, int idx_type) { struct skynet_context * context = lua_touserdata (L , lua_upvalueindex (1)); uint32_t dest = (uint32_t )lua_tointeger(L, 1 ); const char * dest_string = NULL ; if (dest == 0 ) { if (lua_type(L,1 ) == LUA_TNUMBER) { return luaL_error(L, "Invalid service address 0" ); } dest_string = get_dest_string(L, 1 ); } int type = luaL_checkinteger(L, idx_type+0 ); int session = 0 ; if (lua_isnil(L,idx_type+1 )) { type |= PTYPE_TAG_ALLOCSESSION; } else { session = luaL_checkinteger(L,idx_type+1 ); } int mtype = lua_type(L,idx_type+2 ); switch (mtype) { case LUA_TSTRING: { size_t len = 0 ; void * msg = (void *)lua_tolstring(L,idx_type+2 ,&len); if (len == 0 ) { msg = NULL ; } if (dest_string) { session = skynet_sendname(context, source, dest_string, type, session , msg, len); } else { session = skynet_send(context, source, dest, type, session , msg, len); } break ; } case LUA_TLIGHTUSERDATA: { void * msg = lua_touserdata(L,idx_type+2 ); int size = luaL_checkinteger(L,idx_type+3 ); if (dest_string) { session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size); } else { session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size); } break ; } default : luaL_error(L, "invalid param %s" , lua_typename(L, lua_type(L,idx_type+2 ))); } if (session < 0 ) { return 0 ; } lua_pushinteger(L,session); return 1 ; }
最终,是通过skynet_server.c
中的的 skynet_context_push
函数将消息推到对应的skynet_context
结构中的消息队列中去,接着就返回 sessionId。
消息分发函数 每个业务脚本(加载成为 skynet 服务)都会注册一个消息处理函数:
关于 Lua 内的消息分发可以参考 skynet中的消息处理
function skynet.start (start_func) c.callback(skynet.dispatch_message) skynet.timeout(0 , function () skynet.init_service(start_func) end ) end
关于 Skynet LUA 服务内的线程调度 如上所说, Skynet 会在 Lua 内,使用多个协程来实现并行操作。
每个会话(session)会与一个协程相关联
function skynet.timeout (ti, func) local session = c.intcommand("TIMEOUT" ,ti) assert (session) local co = co_create_for_timeout(func, ti) assert (session_id_coroutine[session] == nil ) session_id_coroutine[session] = co return co end