大一暑假完成了一个C++在线聊天室,对epoll的应用仅仅在于那三个函数:
- epoll_create
- epoll_ctl
- epoll_wait
看看epoll的源码实现,理解epoll是如何实现IO多路复用的。这里我看的是linux2.6.39.4版本的源码,在在线网站可以查看。如果有错误敬请斧正。
accept 系统调用创建新的 socket 对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| SYSCALL_DEFINE4(accept4, int, fd, struct sockaddr __user *, upeer_sockaddr, int __user *, upeer_addrlen, int, flags) { struct socket *sock, *newsock; struct file *newfile; int err, len, newfd, fput_needed; struct sockaddr_storage address; if (flags & ~(SOCK_CLOEXEC | SOCK_NONBLOCK)) return -EINVAL;
if (SOCK_NONBLOCK != O_NONBLOCK && (flags & SOCK_NONBLOCK)) flags = (flags & ~SOCK_NONBLOCK) | O_NONBLOCK; sock = sockfd_lookup_light(fd, &err, &fput_needed); if (!sock) goto out;
err = -ENFILE; newsock = sock_alloc(); if (!newsock) goto out_put; newsock->type = sock->type; newsock->ops = sock->ops;
__module_get(newsock->ops->owner); newfd = sock_alloc_file(newsock, &newfile, flags); if (unlikely(newfd < 0)) { err = newfd; sock_release(newsock); err = security_socket_accept(sock, newsock); if (err) goto out_fd; err = sock->ops->accept(sock, newsock, sock->file->f_flags); if (err < 0) goto out_fd;
if (upeer_sockaddr) { if (newsock->ops->getname(newsock, (struct sockaddr *)&address, &len, 2) < 0) { err = -ECONNABORTED; goto out_fd; } err = move_addr_to_user((struct sockaddr *)&address, len, upeer_sockaddr, upeer_addrlen); if (err < 0) goto out_fd; }
fd_install(newfd, newfile); err = newfd;
out_put: fput_light(sock->file, fput_needed); out: return err; out_fd: fput(newfile); put_unused_fd(newfd); goto out_put; } }
SYSCALL_DEFINE3(accept, int, fd, struct sockaddr __user *, upeer_sockaddr, int __user *, upeer_addrlen) { return sys_accept4(fd, upeer_sockaddr, upeer_addrlen, 0); }
|
0. 关于 flags 的处理
可以看到这里accept
和accept4
的区别仅仅是多了一个flags
的参数,如果想了解这些flags
的设置可以自行搜索。如果使用accept
,本质上运行的是flags
参数为0的accept4
。
1. 根据 fd 查找到监听的 socket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
#define EBADF 9
static struct socket *sockfd_lookup_light(int fd, int *err, int *fput_needed) { struct file *file; struct socket *sock;
*err = -EBADF; file = fget_light(fd, fput_needed); if (file) { sock = sock_from_file(file, err); if (sock) return sock; fput_light(file, *fput_needed); } return NULL; }
|
这里还有另一个参数fput_needed
,该参数是在fget_light
内被初始化的,感兴趣的读者可以去看看,路径是/fs/file_table.c
第331行,简单说一下其作用:
- 如果 fput_needed 的值为 1,表示文件引用计数已经成功增加,调用者在使用完文件后需要调用
fput
函数来减少引用计数。
- 如果 fput_needed 的值为 0,表示文件引用计数未增加,可能由于并发操作中文件已被释放,调用者无需调用
fput
。
套接字分为两类:监听套接字和连接套接字。监听套接字是使用socket()
创建一个套接字,bind()
将其绑定到一个特定的本地地址和端口,listen()
开始监听连接请求。连接套接字是使用accept()
来创建与特定客户进行通信。而创建连接套接字的时候会使用监听套接字的部分设置(比如地址簇A、套接字类型、协议类型等等),当然连接套接字也有自己的一些设置,比如新的本地端口号等等。这里sockfd_lookup_light
函数的作用是通过fd
来找到监听套接字的socket
结构体指针。准备后面的部分设置共享。
2. alloc 一个新的 socket 对象
这里首先调用sock_alloc
函数申请一个新的socket
对象。然后将监听套接字的socket
对象的type
和协议操作函数集合ops赋给新socket
对象。这里的ops
里面主要是协议族,内核模块,以及一些需要的函数,感兴趣的可以去看看。需要注意的是这里的ops
是一个指针,并不是将其整体复制了一份。
1 2 3 4 5 6 7 8 9 10 11
| struct socket { socket_state state; kmemcheck_bitfield_begin(type); short type; kmemcheck_bitfield_end(type); unsigned long flags; struct socket_wq __rcu *wq; struct file *file; struct sock *sk; const struct proto_ops *ops; };
|
3. 为新的 socket 分配一个新的 file 对象
这里的file
对象其实就表示打开的文件对象,也就是文件描述符。使得连接套接字能通过文件描述符进行访问和操作。对其感兴趣可以点击该链接查看file结构体的具体细节。
因为连接套接字的socket
对象的file
指针还是空的。接下来调用sock_alloc_file
函数来申请并且初始化该对象。并且将其设置在sock->file
对象。具体代码点链接。
新的socket对象->file->f_op->poll函数指向的是sock_poll,之后会用到的。
4. 接收连接
这里介绍另一个socket
对象中的核心成员sock
。该结构体非常大,其中发送队列、接收数据包的队列、错误队列、缓存锁、异步等待的队列、数据包的哈希值,数量等核心数据结构都在这里。
1 2 3 4 5 6 7 8
| err = security_socket_accept(sock, newsock); if (err) goto out_fd;
err = sock->ops->accept(sock, newsock, sock->file->f_flags); if (err < 0) goto out_fd;
|
这里的accept对应的是inet-accept
。它会从握手队列中直接获取创建好的sock
。
5. 将新的文件添加到当前进程的打开文件列表
当 file、socket、sock 等关键内核对象创建完毕以后,剩下要做的一件事情就是把它挂到当前进程的打开文件列表中就行了。
1 2 3 4 5 6 7 8 9 10 11 12 13
| void fd_install(unsigned int fd, struct file *file) { __fd_install(current->files, fd, file); }
void __fd_install(struct files_struct *files, unsigned int fd, struct file *file) { fdt = files_fdtable(files); BUG_ON(fdt->fd[fd] != NULL); rcu_assign_pointer(fdt->fd[fd], file); }
|
epoll_create 的实现
用户进程在调用epoll_create
时,内核会创建一个eventpoll
对象。将其关联到当前进程的已打开文件列表中。epoll_create
源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| SYSCALL_DEFINE1(epoll_create1, int, flags) { int error; struct eventpoll *ep = NULL;
error = ep_alloc(&ep);
error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC)); return error; }
|
1. ep_alloc
eventpoll
的实现,这里我们只看主要的成员:
1 2 3 4 5 6 7 8 9
| struct eventpoll { wait_queue_head_t wq; struct list_head rdllist; struct rb_root rbr; };
|
成员作用:
- wq:软中断数据就绪的时候会通过wq来找到阻塞在epoll对象上的用户进程。等待队列链表用于管理等待事件发生的进程。
- rbr:红黑树用于存储被监控的文件描述符(fd)的数据结构及其状态信息。
- rdllist:当有连接就绪时,会将就绪的连接放在该链表中。这样就不用遍历整棵树。
然后在ep_alloc
里面实现初始化工作,源码:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| static int ep_alloc(struct eventpoll **pep) { int error; struct user_struct *user; struct eventpoll *ep;
user = get_current_user(); error = -ENOMEM; ep = kzalloc(sizeof(*ep), GFP_KERNEL); if (unlikely(!ep)) goto free_uid;
spin_lock_init(&ep->lock); mutex_init(&ep->mtx); init_waitqueue_head(&ep->wq); init_waitqueue_head(&ep->poll_wait); INIT_LIST_HEAD(&ep->rdllist); ep->rbr = RB_ROOT; ep->ovflist = EP_UNACTIVE_PTR; ep->user = user;
*pep = ep;
return 0;
free_uid: free_uid(user); return error; }
|
2. 创建匿名的文件对象以及文件描述符
匿名文件对象是内核用于表示一些临时或者有特殊用途的文件对象,其是在内存中动态创建和管理的,常见用于进程间通信(比如pipe
、fifo
)、临时文件、内核模块之间的通信以及一些特殊的操作。通过将文件挂接在单个 inode 上来创建新文件。这对于不需要拥有完整 inode 即可正确运行的文件非常有用。使用 anon_inode_getfd() 创建的所有文件将共享一个 inode,从而节省内存并避免文件/inode/dentry 设置的代码重复。将epoll
和文件描述符关联起来,内核也更方便管理和处理被监视的文件描述符的事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| int anon_inode_getfd(const char *name, const struct file_operations *fops, void *priv, int flags) { int error, fd; struct file *file; error = get_unused_fd_flags(flags); if (error < 0) return error; fd = error; file = anon_inode_getfile(name, fops, priv, flags); if (IS_ERR(file)) { error = PTR_ERR(file); goto err_put_unused_fd; } fd_install(fd, file);
return fd;
err_put_unused_fd: put_unused_fd(fd); return error; } EXPORT_SYMBOL_GPL(anon_inode_getfd);
|
总结epoll_create函数所做的事:调用epoll_create后,在内核中分配一个eventpoll结构和代表epoll文件的file结构,并且将这两个结构关联在一块,同时,返回一个也与file结构相关联的epoll文件描述符fd。当应用程序操作epoll时,需要传入一个epoll文件描述符fd,内核根据这个fd,找到epoll的file结构,然后通过file,获取之前epoll_create申请eventpoll结构变量,epoll相关的重要信息都存储在这个结构里面。接下来,所有epoll接口函数的操作,都是在eventpoll结构变量上进行的。
所以,epoll_create的作用就是为进程在内核中建立一个从epoll文件描述符到eventpoll结构变量的通道。
epoll_ctl 的实现
这里是理解epoll的核心区域(这么久了,终于到了….)。
首先epoll_ctl的作用是添加、修改、删除文件的监听事件,内核代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, struct epoll_event __user *, event) { int error; int did_lock_epmutex = 0; struct file *file, *tfile; struct eventpoll *ep; struct epitem *epi; struct epoll_event epds; error = -EFAULT; if (ep_op_has_event(op) && copy_from_user(&epds, event, sizeof(struct epoll_event))) goto error_return; error = -EBADF; file = fget(epfd); if (!file) goto error_return; tfile = fget(fd); if (!tfile) goto error_fput; error = -EPERM; if (!tfile->f_op || !tfile->f_op->poll) goto error_tgt_fput;
error = -EINVAL; if (file == tfile || !is_file_epoll(file)) goto error_tgt_fput;
ep = file->private_data;
if (unlikely(is_file_epoll(tfile) && op == EPOLL_CTL_ADD)) { mutex_lock(&epmutex); did_lock_epmutex = 1; error = -ELOOP; if (ep_loop_check(ep, tfile) != 0) goto error_tgt_fput; }
mutex_lock(&ep->mtx); epi = ep_find(ep, tfile, fd);
error = -EINVAL; switch (op) {(!is_file_epoll(f.file)); case EPOLL_CTL_ADD: if (!epi) { epds.events |= POLLERR | POLLHUP; error = nsert(ep, &epds, tfile, fd); } else error = -EEXIST; break; case EPOLL_CTL_DEL: if (epi) error = ep_remove(ep, epi); else error = -ENOENT; break;flags case EPOLL_CTL_MOD: if (epi) { epds.events |= POLLERR | POLLHUP; error = ep_modify(ep, epi, &epds); } else error = -ENOENT; break; } mutex_unlock(&ep->mtx);
error_tgt_fput: if (unlikely(did_lock_epmutex)) mutex_unlock(&epmutex);
fput(tfile); error_fput: fput(file); error_return:
return error; }
|
1. 根据操作判断是否拷贝到内核以及获取 file 对象
这里的op
是对epoll
操作动作(添加、删除、修改),ep_op_has_event(op)
是判断是否是删除操作,如果op != EPOLL_CTL_DEL 为 true
时,则需要调用copy_from_user()
将event
事件拷贝到内核的epds
变量中。因为只有删除操作不需要内核使用进程传入的event
事件。
1 2 3 4
| static inline int ep_op_has_event(int op) { return op != EPOLL_CTL_DEL; }
|
接下来调用两次fget
分别获取epoll文件和连接套接字文件的file结构变量。
接下来就是对参数的一些检查,出现如下情况,就可以认为传入的参数有问题,直接返回出错:
- 目标文件不支持poll操作(!tf.file->f_op->poll);
- 监听的目标文件就是epoll文件本身(f.file == tf.file);
- 用户传入的epoll文件(epfd代表的文件)并不是一个真正的epoll的文件(!is_file_epoll(f.file));
2. 循环引用的处理
这里会有一个问题,假设有两个epoll文件描述符 A 和 B,且A已经插入到B的文件描述符中,而某个时刻B直接或者间接插入到A中,这就形成闭环,导致事件发生时递归处理,从来形成文件的事件循环,这就是循环引用。它会导致死锁或者其他的问题。这里为了解决循环引用,代码中是下面的措施:
1 2 3 4 5 6 7
| if (unlikely(is_file_epoll(tfile) && op == EPOLL_CTL_ADD)) { mutex_lock(&epmutex); did_lock_epmutex = 1; error = -ELOOP; if (ep_loop_check(ep, tfile) != 0) goto error_tgt_fput; }
|
这里先判断插入的文件描述符是否是epoll文件描述符,是否是EPOLL_CTL_ADD
操作,条件满足时会先mutex_lock(&epmutex);
获取全局锁防止处理时发生竞争,接下来判断是否存在闭环ep_loop_check(ep, tfile) != 0
,如果存在进入对应错误处理。
3. ep_find
在ep里面,维护着一个红黑树rbr
,每次添加注册事件时,都会申请一个epitem结构的变量表示事件的监听项,然后插入ep的红黑树里面。在epoll_ctl里面,会调用ep_find函数从ep的红黑树里面查找目标文件表示的监听项,返回的监听项可能为空。而epoll_filefd
结构体中只有两个成员:file结构体和其对应的fd。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| static struct epitem *ep_find(struct eventpoll *ep, struct file *file, int fd) { int kcmp; struct rb_node *rbp; struct epitem *epi, *epir = NULL; struct epoll_filefd ffd; ep_set_ffd(&ffd, file, fd); for (rbp = ep->rbr.rb_node; rbp; ) { epi = rb_entry(rbp, struct epitem, rbn); kcmp = ep_cmp_ffd(&ffd, &epi->ffd); if (kcmp > 0) rbp = rbp->rb_right; else if (kcmp < 0) rbp = rbp->rb_left; else { epir = epi; break; } }
return epir; }
|
4. EPOLL_CTL_ADD情况
接下来switch这块区域的代码就是整个epoll_ctl函数的核心,对op进行switch出来的有添加(EPOLL_CTL_ADD)、删除(EPOLL_CTL_DEL)和修改(EPOLL_CTL_MOD)三种情况,这里我以添加为例讲解,其他两种情况类似,知道了如何添加监听事件,其他删除和修改监听事件都可以举一反三。
1 2 3 4 5 6 7 8
| case EPOLL_CTL_ADD: if (!epi) { epds.events |= POLLERR | POLLHUP; error = ep_insert(ep, &epds, tfile, fd); } else error = -EEXIST; break;
|
为目标文件添加监控事件时,首先要保证当前ep里面还没有对该目标文件进行监听,如果存在(epi不为空),就返回-EEXIST错误。否则说明参数正常,然后先默认设置对目标文件的POLLERR和POLLHUP监听事件,然后调用ep_insert函数,将对目标文件的监听事件插入到ep维护的红黑树里面,下面是ep_insert的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd) { int error, revents, pwake = 0; unsigned long flags; long user_watches; struct epitem *epi; struct ep_pqueue epq; user_watches = atomic_long_read(&ep->user->epoll_watches); if (unlikely(user_watches >= max_user_watches)) return -ENOSPC; if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) return -ENOMEM;
INIT_LIST_HEAD(&epi->rdllink); INIT_LIST_HEAD(&epi->fllink); INIT_LIST_HEAD(&epi->pwqlist); epi->ep = ep; ep_set_ffd(&epi->ffd, tfile, fd); epi->event = *event; epi->nwait = 0; epi->next = EP_UNACTIVE_PTR;
epq.epi = epi; init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
revents = tfile->f_op->poll(tfile, &epq.pt);
error = -ENOMEM; if (epi->nwait < 0) goto error_unregister;
spin_lock(&tfile->f_lock); list_add_tail(&epi->fllink, &tfile->f_ep_links); spin_unlock(&tfile->f_lock);
ep_rbtree_insert(ep, epi);
spin_lock_irqsave(&ep->lock, flags);
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist);
if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; }
spin_unlock_irqrestore(&ep->lock, flags);
atomic_long_inc(&ep->user->epoll_watches);
if (pwake) ep_poll_safewake(&ep->poll_wait);
return 0;
error_unregister: ep_unregister_pollwait(ep, epi);
spin_lock_irqsave(&ep->lock, flags); if (ep_is_linked(&epi->rdllink)) list_del_init(&epi->rdllink); spin_unlock_irqrestore(&ep->lock, flags);
kmem_cache_free(epi_cache, epi);
return error; }
|
在调用epoll_ctl时,可能会产生相关进程需要监听的事件,如果有监听的事件产生,(revents & event->events 为 true),并且目标文件相关的监听项没有链接到ep的准备链表rdlist里面的话,就将该监听项添加到ep的rdlist准备链表里面,rdlist链接的是该epoll描述符监听的所有已经就绪的目标文件的监听项。并且,如果有任务在等待产生事件时,就调用wake_up_locked函数唤醒所有正在等待的任务,处理相应的事件。当进程调用epoll_wait时,该进程就出现在ep的wq等待队列里面。
回调函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| #include <linux/rcupdate.h>
struct my_data { int value; struct rcu_head rcu; };
void enqueue_for_removal(struct my_data *data) { call_rcu(&data->rcu, my_data_free_callback); }
void my_data_free_callback(struct rcu_head *head) { struct my_data *data = container_of(head, struct my_data, rcu); kfree(data); }
|
sk_data_ready
sock_def_readable