这一节介绍下redis中的多线程机制。
先看看多线程换出的机制。
servercron函数中调用 vmswaponeobjectthreaded开始多线程方式换出value,vmswaponeobjectthreaded会调用 vmswaponeobject(参看上一节的解释),而vmswaponeobject最终会调用vmswapobjectthreaded。
static int vmswapobjectthreaded(robj *key, robj *val, redisdb *db) { iojob *j; assert(key->storage == redis_vm_memory); assert(key->refcount == 1); j = zmalloc(sizeof(*j)); j->type = redis_iojob_prepare_swap; j->db = db; j->key = key; j->val = val; incrrefcount(val); j->canceled = 0; j->thread = (pthread_t) -1; key->storage = redis_vm_swapping; lockthreadedio(); queueiojob(j); unlockthreadedio(); return redis_ok;}
vmswapobjectthreaded 会创建一个类型为redis_iojob_prepare_swap的job,然后使用queueiojob来排队。而queueiojob所做的主要工作就是就是将新job加入到server.io_newjobs,并在创建的线程数还没超过配置值时,创建新的线程。
/* this function must be called while with threaded io locked */static void queueiojob(iojob *j) { redislog(redis_debug,queued io job %p type %d about key '%s'\n, (void*)j, j->type, (char*)j->key->ptr); listaddnodetail(server.io_newjobs,j); if (server.io_active_threads 从spawniothread中可以知道,新线程的入口点是iothreadentrypoint。
static void spawniothread(void) { pthread_t thread; sigset_t mask, omask; int err; sigemptyset(&mask); sigaddset(&mask,sigchld); sigaddset(&mask,sighup); sigaddset(&mask,sigpipe); pthread_sigmask(sig_setmask, &mask, &omask); while ((err = pthread_create(&thread,&server.io_threads_attr,iothreadentrypoint,null)) != 0) { redislog(redis_warning,unable to spawn an i/o thread: %s, strerror(err)); usleep(1000000); } pthread_sigmask(sig_setmask, &omask, null); server.io_active_threads++;}
iothreadentrypoint会将io_newjobs中的job移入server.io_processing,然后在做完job类型的工作后(加载value/计算value所需交换页数/换出value),将job从server.io_processing移入io_processed中。然后往 server.io_ready_pipe_write所在的管道(io_ready_pipe_read、io_ready_pipe_write组成管道的两端)写入一个字节,让睡眠中的vmthreadediocompletedjob继续运行,该函数会做些后续工作。
static void *iothreadentrypoint(void *arg) { iojob *j; listnode *ln; redis_notused(arg); pthread_detach(pthread_self()); while(1) { /* get a new job to process */ lockthreadedio(); if (listlength(server.io_newjobs) == 0) { /* no new jobs in queue, exit. */ redislog(redis_debug,thread %ld exiting, nothing to do, (long) pthread_self()); server.io_active_threads--; unlockthreadedio(); return null; } ln = listfirst(server.io_newjobs); j = ln->value; listdelnode(server.io_newjobs,ln); /* add the job in the processing queue */ j->thread = pthread_self(); listaddnodetail(server.io_processing,j); ln = listlast(server.io_processing); /* we use ln later to remove it */ unlockthreadedio(); redislog(redis_debug,thread %ld got a new job (type %d): %p about key '%s', (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr); /* process the job */ if (j->type == redis_iojob_load) { j->val = vmreadobjectfromswap(j->page,j->key->vtype); } else if (j->type == redis_iojob_prepare_swap) { file *fp = fopen(/dev/null,w+); j->pages = rdbsavedobjectpages(j->val,fp); fclose(fp); } else if (j->type == redis_iojob_do_swap) { if (vmwriteobjectonswap(j->val,j->page) == redis_err) j->canceled = 1; } /* done: insert the job into the processed queue */ redislog(redis_debug,thread %ld completed the job: %p (key %s), (long) pthread_self(), (void*)j, (char*)j->key->ptr); lockthreadedio(); listdelnode(server.io_processing,ln); listaddnodetail(server.io_processed,j); unlockthreadedio(); /* signal the main thread there is new stuff to process */ assert(write(server.io_ready_pipe_write,x,1) == 1); } return null; /* never reached */}static void vmthreadediocompletedjob(aeeventloop *el, int fd, void *privdata, int mask){ char buf[1]; int retval, processed = 0, toprocess = -1, trytoswap = 1; redis_notused(el); redis_notused(mask); redis_notused(privdata); if (privdata != null) trytoswap = 0; /* check the comments above... */ /* for every byte we read in the read side of the pipe, there is one * i/o job completed to process. */ while((retval = read(fd,buf,1)) == 1) { iojob *j; listnode *ln; robj *key; struct dictentry *de; redislog(redis_debug,processing i/o completed job); /* get the processed element (the oldest one) */ lockthreadedio(); assert(listlength(server.io_processed) != 0); if (toprocess == -1) { toprocess = (listlength(server.io_processed)*redis_max_completed_jobs_processed)/100; if (toprocess value; listdelnode(server.io_processed,ln); unlockthreadedio(); /* if this job is marked as canceled, just ignore it */ if (j->canceled) { freeiojob(j); continue; } /* post process it in the main thread, as there are things we * can do just here to avoid race conditions and/or invasive locks */ redislog(redis_debug,job %p type: %d, key at %p (%s) refcount: %d\n, (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount); de = dictfind(j->db->dict,j->key); assert(de != null); key = dictgetentrykey(de); if (j->type == redis_iojob_load) { redisdb *db; /* key loaded, bring it at home */ key->storage = redis_vm_memory; key->vm.atime = server.unixtime; vmmarkpagesfree(key->vm.page,key->vm.usedpages); redislog(redis_debug, vm: object %s loaded from disk (threaded), (unsigned char*) key->ptr); server.vm_stats_swapped_objects--; server.vm_stats_swapins++; dictgetentryval(de) = j->val; incrrefcount(j->val); db = j->db; freeiojob(j); /* handle clients waiting for this key to be loaded. */ handleclientsblockedonswappedkey(db,key); } else if (j->type == redis_iojob_prepare_swap) { /* now we know the amount of pages required to swap this object. * let's find some space for it, and queue this task again * rebranded as redis_iojob_do_swap. */ if (!vmcanswapout() || vmfindcontiguouspages(&j->page,j->pages) == redis_err) { /* ooops... no space or we can't swap as there is * a fork()ed redis trying to save stuff on disk. */ freeiojob(j); key->storage = redis_vm_memory; /* undo operation */ } else { /* note that we need to mark this pages as used now, * if the job will be canceled, we'll mark them as freed * again. */ vmmarkpagesused(j->page,j->pages); j->type = redis_iojob_do_swap; lockthreadedio(); queueiojob(j); unlockthreadedio(); } } else if (j->type == redis_iojob_do_swap) { robj *val; /* key swapped. we can finally free some memory. */ if (key->storage != redis_vm_swapping) { printf(key->storage: %d\n,key->storage); printf(key->name: %s\n,(char*)key->ptr); printf(key->refcount: %d\n,key->refcount); printf(val: %p\n,(void*)j->val); printf(val->type: %d\n,j->val->type); printf(val->ptr: %s\n,(char*)j->val->ptr); } redisassert(key->storage == redis_vm_swapping); val = dictgetentryval(de); key->vm.page = j->page; key->vm.usedpages = j->pages; key->storage = redis_vm_swapped; key->vtype = j->val->type; decrrefcount(val); /* deallocate the object from memory. */ dictgetentryval(de) = null; redislog(redis_debug, vm: object %s swapped out at %lld (%lld pages) (threaded), (unsigned char*) key->ptr, (unsigned long long) j->page, (unsigned long long) j->pages); server.vm_stats_swapped_objects++; server.vm_stats_swapouts++; freeiojob(j); /* put a few more swap requests in queue if we are still * out of memory */ if (trytoswap && vmcanswapout() && zmalloc_used_memory() > server.vm_max_memory) { int more = 1; while(more) { lockthreadedio(); more = listlength(server.io_newjobs) 原文地址:redis源代码分析25–vm(下), 感谢原作者分享。
