大一暑假完成了一个 C++ 在线聊天室,对 epoll 的应用仅仅在于那三个函数:
- epoll_create
- epoll_ctl
- epoll_wait
看看 epoll 的源码实现,理解 epoll 是如何实现 IO 多路复用的。这里我看的是 linux2.6.39.4 版本的源码,在在线网站可以查看。如果有错误敬请斧正。
accept 系统调用创建新的 socket 对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| SYSCALL_DEFINE4(accept4, int, fd, struct sockaddr __user *, upeer_sockaddr, int __user *, upeer_addrlen, int, flags) { struct socket *sock, *newsock; struct file *newfile; int err, len, newfd, fput_needed; struct sockaddr_storage address; if (flags & ~(SOCK_CLOEXEC | SOCK_NONBLOCK)) return -EINVAL;
if (SOCK_NONBLOCK != O_NONBLOCK && (flags & SOCK_NONBLOCK)) flags = (flags & ~SOCK_NONBLOCK) | O_NONBLOCK; sock = sockfd_lookup_light(fd, &err, &fput_needed); if (!sock) goto out;
err = -ENFILE; newsock = sock_alloc(); if (!newsock) goto out_put; newsock->type = sock->type; newsock->ops = sock->ops;
__module_get(newsock->ops->owner); newfd = sock_alloc_file(newsock, &newfile, flags); if (unlikely(newfd < 0)) { err = newfd; sock_release(newsock); err = security_socket_accept(sock, newsock); if (err) goto out_fd; err = sock->ops->accept(sock, newsock, sock->file->f_flags); if (err < 0) goto out_fd;
if (upeer_sockaddr) { if (newsock->ops->getname(newsock, (struct sockaddr *)&address, &len, 2) < 0) { err = -ECONNABORTED; goto out_fd; } err = move_addr_to_user((struct sockaddr *)&address, len, upeer_sockaddr, upeer_addrlen); if (err < 0) goto out_fd; }
fd_install(newfd, newfile); err = newfd;
out_put: fput_light(sock->file, fput_needed); out: return err; out_fd: fput(newfile); put_unused_fd(newfd); goto out_put; } }
SYSCALL_DEFINE3(accept, int, fd, struct sockaddr __user *, upeer_sockaddr, int __user *, upeer_addrlen) { return sys_accept4(fd, upeer_sockaddr, upeer_addrlen, 0); }
|
0. 关于 flags 的处理
可以看到这里 accept
和 accept4
的区别仅仅是多了一个 flags
的参数,如果想了解这些 flags
的设置可以自行搜索。如果使用 accept
,本质上运行的是 flags
参数为 0 的 accept4
。
1. 根据 fd 查找到监听的 socket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
#define EBADF 9
static struct socket *sockfd_lookup_light(int fd, int *err, int *fput_needed) { struct file *file; struct socket *sock;
*err = -EBADF; file = fget_light(fd, fput_needed); if (file) { sock = sock_from_file(file, err); if (sock) return sock; fput_light(file, *fput_needed); } return NULL; }
|
这里还有另一个参数 fput_needed
,该参数是在 fget_light
内被初始化的,感兴趣的读者可以去看看,路径是 /fs/file_table.c
第 331 行,简单说一下其作用:
- 如果 fput_needed 的值为 1,表示文件引用计数已经成功增加,调用者在使用完文件后需要调用
fput
函数来减少引用计数。
- 如果 fput_needed 的值为 0,表示文件引用计数未增加,可能由于并发操作中文件已被释放,调用者无需调用
fput
。
套接字分为两类:监听套接字和连接套接字。监听套接字是使用 socket()
创建一个套接字,bind()
将其绑定到一个特定的本地地址和端口,listen()
开始监听连接请求。连接套接字是使用 accept()
来创建与特定客户进行通信。而创建连接套接字的时候会使用监听套接字的部分设置(比如地址簇 A、套接字类型、协议类型等等),当然连接套接字也有自己的一些设置,比如新的本地端口号等等。这里 sockfd_lookup_light
函数的作用是通过 fd
来找到监听套接字的 socket
结构体指针。准备后面的部分设置共享。
2. alloc 一个新的 socket 对象
这里首先调用 sock_alloc
函数申请一个新的 socket
对象。然后将监听套接字的 socket
对象的 type
和协议操作函数集合 ops 赋给新 socket
对象。这里的 ops
里面主要是协议族,内核模块,以及一些需要的函数,感兴趣的可以去看看。需要注意的是这里的 ops
是一个指针,并不是将其整体复制了一份。
1 2 3 4 5 6 7 8 9 10 11
| struct socket { socket_state state; kmemcheck_bitfield_begin(type); short type; kmemcheck_bitfield_end(type); unsigned long flags; struct socket_wq __rcu *wq; struct file *file; struct sock *sk; const struct proto_ops *ops; };
|
3. 为新的 socket 分配一个新的 file 对象
这里的 file
对象其实就表示打开的文件对象,也就是文件描述符。使得连接套接字能通过文件描述符进行访问和操作。对其感兴趣可以点击该链接查看 file 结构体的具体细节。
因为连接套接字的 socket
对象的 file
指针还是空的。接下来调用 sock_alloc_file
函数来申请并且初始化该对象。并且将其设置在 sock->file
对象。具体代码点链接。
新的 socket 对象 ->file->f_op->poll 函数指向的是 sock_poll,之后会用到的。
4. 接收连接
这里介绍另一个 socket
对象中的核心成员 sock
。该结构体非常大,其中发送队列、接收数据包的队列、错误队列、缓存锁、异步等待的队列、数据包的哈希值,数量等核心数据结构都在这里。
1 2 3 4 5 6 7 8
| err = security_socket_accept(sock, newsock); if (err) goto out_fd;
err = sock->ops->accept(sock, newsock, sock->file->f_flags); if (err < 0) goto out_fd;
|
这里的 accept 对应的是 inet-accept
。它会从握手队列中直接获取创建好的 sock
。
5. 将新的文件添加到当前进程的打开文件列表
当 file、socket、sock 等关键内核对象创建完毕以后,剩下要做的一件事情就是把它挂到当前进程的打开文件列表中就行了。
1 2 3 4 5 6 7 8 9 10 11 12 13
| void fd_install(unsigned int fd, struct file *file) { __fd_install(current->files, fd, file); }
void __fd_install(struct files_struct *files, unsigned int fd, struct file *file) { fdt = files_fdtable(files); BUG_ON(fdt->fd[fd] != NULL); rcu_assign_pointer(fdt->fd[fd], file); }
|
epoll_create 的实现
用户进程在调用 epoll_create
时,内核会创建一个 eventpoll
对象。将其关联到当前进程的已打开文件列表中。epoll_create
源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| SYSCALL_DEFINE1(epoll_create1, int, flags) { int error; struct eventpoll *ep = NULL;
error = ep_alloc(&ep);
error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC)); return error; }
|
1. ep_alloc
eventpoll
的实现,这里我们只看主要的成员:
1 2 3 4 5 6 7 8 9
| struct eventpoll { wait_queue_head_t wq; struct list_head rdllist; struct rb_root rbr; };
|
成员作用:
- wq:软中断数据就绪的时候会通过 wq 来找到阻塞在 epoll 对象上的用户进程。等待队列链表用于管理等待事件发生的进程。
- rbr:红黑树用于存储被监控的文件描述符(fd)的数据结构及其状态信息。
- rdllist:当有连接就绪时,会将就绪的连接放在该链表中。这样就不用遍历整棵树。
然后在 ep_alloc
里面实现初始化工作,源码:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| static int ep_alloc(struct eventpoll **pep) { int error; struct user_struct *user; struct eventpoll *ep;
user = get_current_user(); error = -ENOMEM; ep = kzalloc(sizeof(*ep), GFP_KERNEL); if (unlikely(!ep)) goto free_uid;
spin_lock_init(&ep->lock); mutex_init(&ep->mtx); init_waitqueue_head(&ep->wq); init_waitqueue_head(&ep->poll_wait); INIT_LIST_HEAD(&ep->rdllist); ep->rbr = RB_ROOT; ep->ovflist = EP_UNACTIVE_PTR; ep->user = user;
*pep = ep;
return 0;
free_uid: free_uid(user); return error; }
|
2. 创建匿名的文件对象以及文件描述符
匿名文件对象是内核用于表示一些临时或者有特殊用途的文件对象,其是在内存中动态创建和管理的,常见用于进程间通信(比如 pipe
、fifo
)、临时文件、内核模块之间的通信以及一些特殊的操作。通过将文件挂接在单个 inode 上来创建新文件。这对于不需要拥有完整 inode 即可正确运行的文件非常有用。使用 anon_inode_getfd () 创建的所有文件将共享一个 inode,从而节省内存并避免文件 /inode/dentry 设置的代码重复。将 epoll
和文件描述符关联起来,内核也更方便管理和处理被监视的文件描述符的事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| int anon_inode_getfd(const char *name, const struct file_operations *fops, void *priv, int flags) { int error, fd; struct file *file; error = get_unused_fd_flags(flags); if (error < 0) return error; fd = error; file = anon_inode_getfile(name, fops, priv, flags); if (IS_ERR(file)) { error = PTR_ERR(file); goto err_put_unused_fd; } fd_install(fd, file);
return fd;
err_put_unused_fd: put_unused_fd(fd); return error; } EXPORT_SYMBOL_GPL(anon_inode_getfd);
|
总结 epoll_create 函数所做的事:调用 epoll_create 后,在内核中分配一个 eventpoll 结构和代表 epoll 文件的 file 结构,并且将这两个结构关联在一块,同时,返回一个也与 file 结构相关联的 epoll 文件描述符 fd。当应用程序操作 epoll 时,需要传入一个 epoll 文件描述符 fd,内核根据这个 fd,找到 epoll 的 file 结构,然后通过 file,获取之前 epoll_create 申请 eventpoll 结构变量,epoll 相关的重要信息都存储在这个结构里面。接下来,所有 epoll 接口函数的操作,都是在 eventpoll 结构变量上进行的。
所以,epoll_create 的作用就是为进程在内核中建立一个从 epoll 文件描述符到 eventpoll 结构变量的通道。
epoll_ctl 的实现
这里是理解 epoll 的核心区域(这么久了,终于到了….)。
首先 epoll_ctl 的作用是添加、修改、删除文件的监听事件,内核代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, struct epoll_event __user *, event) { int error; int did_lock_epmutex = 0; struct file *file, *tfile; struct eventpoll *ep; struct epitem *epi; struct epoll_event epds; error = -EFAULT; if (ep_op_has_event(op) && copy_from_user(&epds, event, sizeof(struct epoll_event))) goto error_return; error = -EBADF; file = fget(epfd); if (!file) goto error_return; tfile = fget(fd); if (!tfile) goto error_fput; error = -EPERM; if (!tfile->f_op || !tfile->f_op->poll) goto error_tgt_fput;
error = -EINVAL; if (file == tfile || !is_file_epoll(file)) goto error_tgt_fput;
ep = file->private_data;
if (unlikely(is_file_epoll(tfile) && op == EPOLL_CTL_ADD)) { mutex_lock(&epmutex); did_lock_epmutex = 1; error = -ELOOP; if (ep_loop_check(ep, tfile) != 0) goto error_tgt_fput; }
mutex_lock(&ep->mtx); epi = ep_find(ep, tfile, fd);
error = -EINVAL; switch (op) {(!is_file_epoll(f.file)); case EPOLL_CTL_ADD: if (!epi) { epds.events |= POLLERR | POLLHUP; error = nsert(ep, &epds, tfile, fd); } else error = -EEXIST; break; case EPOLL_CTL_DEL: if (epi) error = ep_remove(ep, epi); else error = -ENOENT; break;flags case EPOLL_CTL_MOD: if (epi) { epds.events |= POLLERR | POLLHUP; error = ep_modify(ep, epi, &epds); } else error = -ENOENT; break; } mutex_unlock(&ep->mtx);
error_tgt_fput: if (unlikely(did_lock_epmutex)) mutex_unlock(&epmutex);
fput(tfile); error_fput: fput(file); error_return:
return error; }
|
1. 根据操作判断是否拷贝到内核以及获取 file 对象
这里的 op
是对 epoll
操作动作(添加、删除、修改),ep_op_has_event(op)
是判断是否是删除操作,如果 op != EPOLL_CTL_DEL 为 true
时,则需要调用 copy_from_user()
将 event
事件拷贝到内核的 epds
变量中。因为只有删除操作不需要内核使用进程传入的 event
事件。
1 2 3 4
| static inline int ep_op_has_event(int op) { return op != EPOLL_CTL_DEL; }
|
接下来调用两次 fget
分别获取 epoll 文件和连接套接字文件的 file 结构变量。
接下来就是对参数的一些检查,出现如下情况,就可以认为传入的参数有问题,直接返回出错:
- 目标文件不支持 poll 操作 (!tf.file->f_op->poll);
- 监听的目标文件就是 epoll 文件本身 (f.file == tf.file);
- 用户传入的 epoll 文件 (epfd 代表的文件)并不是一个真正的 epoll 的文件 (!is_file_epoll (f.file));
2. 循环引用的处理
这里会有一个问题,假设有两个 epoll 文件描述符 A 和 B,且 A 已经插入到 B 的文件描述符中,而某个时刻 B 直接或者间接插入到 A 中,这就形成闭环,导致事件发生时递归处理,从来形成文件的事件循环,这就是循环引用。它会导致死锁或者其他的问题。这里为了解决循环引用,代码中是下面的措施:
1 2 3 4 5 6 7
| if (unlikely(is_file_epoll(tfile) && op == EPOLL_CTL_ADD)) { mutex_lock(&epmutex); did_lock_epmutex = 1; error = -ELOOP; if (ep_loop_check(ep, tfile) != 0) goto error_tgt_fput; }
|
这里先判断插入的文件描述符是否是 epoll 文件描述符,是否是 EPOLL_CTL_ADD
操作,条件满足时会先 mutex_lock(&epmutex);
获取全局锁防止处理时发生竞争,接下来判断是否存在闭环 ep_loop_check(ep, tfile) != 0
,如果存在进入对应错误处理。
3. ep_find
在 ep 里面,维护着一个红黑树 rbr
,每次添加注册事件时,都会申请一个 epitem 结构的变量表示事件的监听项,然后插入 ep 的红黑树里面。在 epoll_ctl 里面,会调用 ep_find 函数从 ep 的红黑树里面查找目标文件表示的监听项,返回的监听项可能为空。而 epoll_filefd
结构体中只有两个成员:file 结构体和其对应的 fd。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| static struct epitem *ep_find(struct eventpoll *ep, struct file *file, int fd) { int kcmp; struct rb_node *rbp; struct epitem *epi, *epir = NULL; struct epoll_filefd ffd; ep_set_ffd(&ffd, file, fd); for (rbp = ep->rbr.rb_node; rbp; ) { epi = rb_entry(rbp, struct epitem, rbn); kcmp = ep_cmp_ffd(&ffd, &epi->ffd); if (kcmp > 0) rbp = rbp->rb_right; else if (kcmp < 0) rbp = rbp->rb_left; else { epir = epi; break; } }
return epir; }
|
4. EPOLL_CTL_ADD 情况
接下来 switch 这块区域的代码就是整个 epoll_ctl 函数的核心,对 op 进行 switch 出来的有添加 (EPOLL_CTL_ADD)、删除 (EPOLL_CTL_DEL) 和修改 (EPOLL_CTL_MOD) 三种情况,这里我以添加为例讲解,其他两种情况类似,知道了如何添加监听事件,其他删除和修改监听事件都可以举一反三。
1 2 3 4 5 6 7 8
| case EPOLL_CTL_ADD: if (!epi) { epds.events |= POLLERR | POLLHUP; error = ep_insert(ep, &epds, tfile, fd); } else error = -EEXIST; break;
|
为目标文件添加监控事件时,首先要保证当前 ep 里面还没有对该目标文件进行监听,如果存在 (epi 不为空),就返回 - EEXIST 错误。否则说明参数正常,然后先默认设置对目标文件的 POLLERR 和 POLLHUP 监听事件,然后调用 ep_insert 函数,将对目标文件的监听事件插入到 ep 维护的红黑树里面,下面是 ep_insert 的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd) { int error, revents, pwake = 0; unsigned long flags; long user_watches; struct epitem *epi; struct ep_pqueue epq; user_watches = atomic_long_read(&ep->user->epoll_watches); if (unlikely(user_watches >= max_user_watches)) return -ENOSPC; if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) return -ENOMEM;
INIT_LIST_HEAD(&epi->rdllink); INIT_LIST_HEAD(&epi->fllink); INIT_LIST_HEAD(&epi->pwqlist); epi->ep = ep; ep_set_ffd(&epi->ffd, tfile, fd); epi->event = *event; epi->nwait = 0; epi->next = EP_UNACTIVE_PTR;
epq.epi = epi; init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
revents = tfile->f_op->poll(tfile, &epq.pt);
error = -ENOMEM; if (epi->nwait < 0) goto error_unregister;
spin_lock(&tfile->f_lock); list_add_tail(&epi->fllink, &tfile->f_ep_links); spin_unlock(&tfile->f_lock);
ep_rbtree_insert(ep, epi);
spin_lock_irqsave(&ep->lock, flags);
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist);
if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; }
spin_unlock_irqrestore(&ep->lock, flags);
atomic_long_inc(&ep->user->epoll_watches);
if (pwake) ep_poll_safewake(&ep->poll_wait);
return 0;
error_unregister: ep_unregister_pollwait(ep, epi);
spin_lock_irqsave(&ep->lock, flags); if (ep_is_linked(&epi->rdllink)) list_del_init(&epi->rdllink); spin_unlock_irqrestore(&ep->lock, flags);
kmem_cache_free(epi_cache, epi);
return error; }
|
在调用 epoll_ctl 时,可能会产生相关进程需要监听的事件,如果有监听的事件产生,(revents & event->events 为 true),并且目标文件相关的监听项没有链接到 ep 的准备链表 rdlist 里面的话,就将该监听项添加到 ep 的 rdlist 准备链表里面,rdlist 链接的是该 epoll 描述符监听的所有已经就绪的目标文件的监听项。并且,如果有任务在等待产生事件时,就调用 wake_up_locked 函数唤醒所有正在等待的任务,处理相应的事件。当进程调用 epoll_wait 时,该进程就出现在 ep 的 wq 等待队列里面。
回调函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| #include <linux/rcupdate.h>
struct my_data { int value; struct rcu_head rcu; };
void enqueue_for_removal(struct my_data *data) { call_rcu(&data->rcu, my_data_free_callback); }
void my_data_free_callback(struct rcu_head *head) { struct my_data *data = container_of(head, struct my_data, rcu); kfree(data); }
|
sk_data_ready
sock_def_readable