C++20协程 迈向异步 Part3最终回-配合io_uring的协程库设计

前情提要

上篇C++20 迈向异步 Part2 - 可等待体预告说使用socket套接字API实现TCP/UDP通讯并整合进C++协使用select原语实现事件循环。建立起一个简单的异步IO库。

但是后来写完发现步子还是太小了(指写起来很简单没啥难度),于是直接一步到位使用内核里新的io_uring设施实现消息循环。所有代码放在github上了OrbitZore/libzio。本文就简单阐述一下这个库为什么这么设计的。

技术背景说明

io_uring

io_uring 是2019年进入Linux内核(5.0)的一个新高性能异步IO设施,被设计用来替代原有的POSIX异步函数(aio_系列),是继C10k之后新时代C10m问题的一个解决方案。

io_uring 设计了三个全新的系统调用(io_uring_setupio_uring_enterio_uring_register)。应用程序调用io_uring_setup在用户程序的内存空间里创建两个无锁循环队列(submit queue和completion queue),应用程序通过写入submit queue写IO操作;读completion queue读取IO操作返回值。内核读submit queue执行IO操作;写completion queue返回IO操作返回值。应用程序可通过调用io_uring_enter手动通知内核消费submit queue/阻塞等待completion queue/消费completion queue。可以使用io_uring_register注册缓冲区供io_uring使用。

和select/poll/epoll这三种多路复用技术对比来说,io_uring是真正的异步IO;select维护fdset,poll维护fd数组,epoll维护红黑树。而io_uring仅仅维护两个无锁循环队列。简单而高效。io_uring现在已经成为LinuxIO的一个常用设施,在OceanBase数据库大赛等高性能IO场合里经常能见到io_uring的身影。

io_uring作者Jens Axboe将这三个系统调用封装成C库liburing并鼓励使用liburing而不是裸系统调用。本框架也将使用liburing编写。

整体设计 uringtest.h

await

await标识一个通用可等待体。被继承引出为三类:promise_base分支是代表一个协程、io_await代表io_uring的一个IO操作、timeout_await代表一个超时可等待体。对于xx被xx等待来说,主语可以是继承自通用可等待体的任意可等待体,宾语则只能是协程可等待体。所以await内部保存一个promise_base指针,用于可等待体完成时唤醒等待的协程。当然这样实现一个可等待体只能被一个协程等待,你也可以换成vector<promise_base*>之类。这里完全是因为课设够用。

done标志可等待体是否完成,from_ctx标志是否被io_context持有,如果是协程可等体from_ctx为true时,当io_context检测到执行完毕后会进行内存回收。,ctx指示可等待体所属io_context。这样当一个协程等待可等待体时能找到对应io_context提交可等待体。

1
2
3
4
5
struct await {
io_context* ctx{NULL};
promise_base* waiter{NULL};
bool done{false}, from_ctx{false};
};

promise_base/promise

promise_base内部仅多出wait_cnt表示等待可等待体的数量,一个可等待体完成后会将await::waiter->wait_cnt--。如果await::waiter->wait_cnt==0则将协程放入待执行队列表示等待完成。借助这个语义,wait_all实现起来就是直接将wait_cnt设置成对应的数量;而wait_any就直接把wait_cnt设置成1。

promise<T>promise_base添加了类型信息以存放协程的返回值/yield值。同样作为coroutine_traits::promise_type,实现了initial_suspend,final_suspend,unhandled_exception,await_transform,return_value,yield_value这一系列函数。其中在await_transform函数中,因为协程get_return返回的是一个optional<T>,但是co_await一个协程能保证其中保有值,所以用了一点concept选择出awaitable类型返回解引用后的值。为了避免T=void出现一些类型不完全故障,实现了promise<T=void>特化在此略过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
struct promise_base : public await {
virtual ~promise_base();
int wait_cnt{0};
coroutine_handle<promise_base> handle();
};
template <class T>
struct promise : promise_base {
auto initial_suspend() { return suspend_always{}; }
auto final_suspend() noexcept { return suspend_always{}; }
void unhandled_exception() { terminate(); }
template <waitable awaitableT>
struct awaitble {
awaitableT a;
constexpr bool await_ready() { return !a; }
auto await_suspend(coroutine_handle<promise> h) {
return a.await_suspend(h);
}
auto await_resume()
requires types::IS_AWAITABLE<awaitableT>
{
return *a.get_return();
}
auto await_resume()
requires (!types::IS_AWAITABLE<awaitableT>)
{
return a.get_return();
}
};
template <waitable awaitableT>
auto await_transform(awaitableT&& a) {
a.set_context(*ctx);
return awaitble<awaitableT&&>{std::forward<awaitableT>(a)};
}
awaitable<T> get_return_object() { return {this}; }
void return_value(T& value) {
value_ = value;
done = true;
}
void return_value(T value) {
value_ = ::move(value);
done = true;
}
auto yield_value(T& value) {
value_ = value;
return suspend_always{};
}
auto yield_value(T value) {
value_ = ::move(value);
return suspend_always{};
}
optional<T> value_;
};

awaitable_base<T>/awaitable<T>

awaitable_base<T>awaitable<T>完全能够合并成一个类,当初设计成两个是为了使awaitable_base擦除类型,但是完全没必要。后来就放弃了这个设计目标。到现在awaitable_base<T>就做了等待一个协程应该做什么操作。awaitable<T>实现了get_return()使创建协程者在wait_allwait_any的情况下可以直接获取到协程的返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
template <class T>
struct awaitable_base {
promise<T>* p;
promise<T>* get_promise() { return p; }
void set_context(io_context& ctx) {
if (p)
p->ctx = &ctx;
}
// true for not done
// false for done or has value
operator bool() const {
if constexpr (!is_same_v<T, void>) {
return p && !p->done && !p->value_;
} else {
return p && !p->done;
}
}
template <class U>
void await_suspend(coroutine_handle<U> h) const {
p->ctx->work_queue.push_back(p);
p->waiter = &h.promise();
h.promise().wait_cnt++;
}
coroutine_handle<promise<T>> handle() {
return coroutine_handle<promise<T>>::from_promise(*p);
}
void detached() { p = nullptr; }
~awaitable_base() {
if (p) {
handle().destroy();
}
}
};
template <class T>
struct awaitable : public awaitable_base<T> {
using awaitable_base<T>::p;
using awaitable_base<T>::get_promise;
using awaitable_base<T>::handle;
using awaitable_base<T>::await_suspend;
void is_awaitble();
optional<T> get_return() {
if (!p)
return {};
auto ret = std::move(p->value_);
p->value_.reset();
if (p->done) {
handle().destroy();
p = {};
}
return ret;
}
};

io_await系列

得益于io_uring中所有IO操作的返回值都是一个int(即代表原系统调用的返回值,负数代表可能的errno)。所以io_await统一用int ret存放返回值,额外有一bool submit标记IO操作是否提交以防止重复提交进io_uring。prepare函数被io_await_read等各个io操作子类重写,按照各自需求初始化io_uring submit queue entity。在await_suspend阶段调用prepare,当然也可以不用虚函数,这里用虚函数更符合人类直觉一点。这里不用写虚析构函数单纯是因为所有io_await子类生命周期都在协程内,协程内部完全无需用到io_await管理io_await子类们。

1
2
3
4
5
6
7
8
9
10
struct io_await : public await {
int ret;
bool submit{false};
operator bool() const { return !done; }
virtual void prepare(io_uring_sqe* sqe) = 0;
void complete(int _ret);
template <class U>
void await_suspend(coroutine_handle<U> h);
int get_return();
};

一个典型子类

1
2
3
4
5
6
7
8
9
10
11
struct io_await_read : public io_await {
io_await_read();
int fd;
char* c;
size_t n;
u64 offset;
io_await_read(int fd, char* c, size_t n, u64 offset){/*...*/}
virtual void prepare(io_uring_sqe* sqe){
io_uring_prep_read(sqe, fd, c, n, offset);
}
};

timeout_await

内部保存UNIX时间戳,以便在multimap<time_t, timeout_await*> sleep_queue中保证小的先唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
struct timeout_await : public await {
time_t t;
timeout_await(time_t t);
bool operator<(const timeout_await& ta) const { return t < ta.t; }
operator bool() const { return time(NULL) <= t; }
template <class U>
void await_suspend(coroutine_handle<U> h) {
waiter = &h.promise();
h.promise().wait_cnt++;
ctx->sleep_queue.insert({t, this});
}
void get_return();
};

io_context

io_context管理io_uring结构体/超时等待队列/就绪协程队列。同时可以管理协程生命周期,一个协程创建出来可以被另一个协程co_await放入就绪队列等待执行,也可以直接丢给io_context::reg()将生命周期直接交给io_context管理。一个io_await操作交给io_context::reg()后可以直接提交给io_uring。并设置user_data为NULL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
struct io_context {
multimap<time_t, timeout_await*> sleep_queue;
deque<promise_base*> work_queue;
io_uring ring;
int uring_submit_cnt = 0;
io_context();
template <class T>
void reg(awaitable<T>& a) {
a.get_promise()->ctx = this;
work_queue.push_back(a.get_promise());
}
template <class T>
void reg(awaitable<T>&& a) {
a.get_promise()->from_ctx = true;
a.get_promise()->ctx = this;
work_queue.push_back(a.get_promise());
a.detached();
}
inline io_uring_sqe* get_sqe() {
auto sqe = io_uring_get_sqe(&ring);
while (!sqe) {
uring_submit_cnt += io_uring_submit(&ring);
sqe = io_uring_get_sqe(&ring);
}
return sqe;
}
inline void reg(io_await&& ioa) {
auto sqe = get_sqe();
ioa.prepare(sqe);
io_uring_sqe_set_data(sqe, NULL);
}
void run();
};

io_context::run()驱动事件循环。内部处理这三个队列,同时等待io_uring事件。io_uring_sqe内部有一个uint64_t user_data可以存放用户数据,这里直接存放对应io_await地址(被io_context::reg()提交的将会被置NULL)。在io_uring completion queue中获取到completion queue entity时user_data就是提交时user_data的值。根据user_data即可设置io_await完成状态,并有可能的将等待协程放入就绪队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void io_context::run() {
while (true) {
bool is_advanced = false;
while (work_queue.size()) {
is_advanced = true;
auto p = work_queue.front();
work_queue.pop_front();
p->handle()();
if (p->done)
p->wake_others();
if (p->done && p->from_ctx) {
p->handle().destroy();
}
}
uring_submit_cnt += io_uring_submit(&ring);
io_uring_cqe* cqe;
__kernel_timespec t{};
t.tv_nsec = 1e-4 / 1e-9;
while (!io_uring_wait_cqe_timeout(&ring, &cqe, &t)) {
is_advanced = true;
if (io_await* c = (io_await*)io_uring_cqe_get_data(cqe)) {
c->complete(cqe->res);
c->wake_others();
}
io_uring_cqe_seen(&ring, cqe);
uring_submit_cnt--;
}
while (sleep_queue.size() && time(NULL) >= sleep_queue.begin()->first) {
is_advanced = true;
sleep_queue.begin()->second->wake_others();
sleep_queue.erase(sleep_queue.begin());
}
if (!(is_advanced || sleep_queue.size() || work_queue.size() ||
uring_submit_cnt)) {
return;
}
}
}

其他外围封装说明

connection

connection标记一个成功建立连接的socket fd(move only的,也懒得做引用计数啥的,需要再用shared_ptr包一下吧)。标注模板信息只是为了做一个提示而已。也没做类型退化啥的,没这个需求(ipv4一把梭,实在需要ipv4/v6共存直接用之后的ipvx就行了)。

status用于保存accept()或者connect()(当然是异步版本的)返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
template <types::Address Address, types::Protocol Protocol>
struct connection {
int fd{-1};
int status{0};
explicit connection(int fd, int status) : fd(fd), status(status) {}
connection(const connection&) = delete;
connection(connection&& con) {
fd = con.fd;
status = con.status;
con.fd = -1;
}
~connection() { close(); }
operator bool() { return fd != -1 && status == 0; }
void close() {
if (fd >= 0)
::close(fd);
}

Address getpeer(){
Address address;
unsigned int len=sizeof(address);
getpeername(fd,(sockaddr*)&address, &len);
address.resize(len);
return address;
}

Address getaddr(){
Address address;
unsigned int len=sizeof(address);
getsockname(fd,(sockaddr*)&address, &len);
address.resize(len);
return address;
}

int getopt(int level, int opt, void* optval, socklen_t* optlen) {
return getsockopt(fd, level, opt, optval, optlen);
}
int setopt(int level, int opt, const void* optval, socklen_t optlen) {
return setsockopt(fd, level, opt, optval, optlen);
}

io_await_write async_write(const char* c, size_t n);
io_await_read async_read(char* c, size_t n);
io_await_recv async_recv(char* c, size_t n, int flags = 0);
io_await_send async_send(const char* c, size_t n, int flags = 0);
io_await_sendmsg async_sendmsg(message_header* msg, int flags = 0);
io_await_recvmsg async_recvmsg(message_header* msg, int flags = 0);
io_await_send_zc async_send_zc(const char* c, size_t n, int flags = 0);
};

zio.ip.hpp

很常规的对arpa/inet.h的封装。值得一提的就是枚举类ipvx,将ipv4和ipv6聚合在了一起,被C风格多态折服

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
struct ipv4 : public sockaddr_in {
static inline constexpr auto DEFAULT_AF=AF_INET;
static inline constexpr auto AF(){return AF_INET;};
static inline constexpr unsigned int length() { return sizeof(ipv4); }
static inline constexpr void resize(unsigned int){}
inline void set_port(uint16_t hport) { sin_port = htons(hport); }
static inline optional<ipv4> from_url(const char* a) {
return ip::from_url<ipv4>(a);
}
static inline ipv4 from_pret(const char* c, uint16_t port) {
ipv4 addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF();
inet_pton(AF(), c, &addr.sin_addr);
addr.sin_port = htons(port);
return addr;
}
string inline to_pret() const {
char str[INET_ADDRSTRLEN];
inet_ntop(AF(), &this->sin_addr, &str[0], INET_ADDRSTRLEN);
string a = str;
a += ":" + to_string(ntohs(sin_port));
return a;
}
};
struct ipv6 : public sockaddr_in6 {
static inline constexpr auto DEFAULT_AF=AF_INET6;
static inline constexpr auto AF(){return AF_INET6;};
static inline constexpr unsigned int length() { return sizeof(ipv6); }
static inline constexpr void resize(unsigned int){}
inline void set_port(uint16_t hport) { sin6_port = htons(hport); }
static inline optional<ipv6> from_url(const char* a) {
return ip::from_url<ipv6>(a);
}
static inline ipv6 from_pret(const char* c, uint16_t port) {
ipv6 addr;
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF();
inet_pton(AF(), c, &addr.sin6_addr);
addr.sin6_port = htons(port);
return addr;
}
string inline to_pret() const {
char str[INET_ADDRSTRLEN];
inet_ntop(AF(), &this->sin6_addr, &str[0], INET6_ADDRSTRLEN);
string a = str;
a += ":" + to_string(ntohs(sin6_port));
return a;
}
};
union ipvx{
ipv4 v4;
ipv6 v6;
inline auto AF(){
if (v4.sin_family==ipv4::AF()){
return v4.AF();
}else if (v6.sin6_family==ipv6::AF()){
return v6.AF();
}
throw std::domain_error("No such ip address");
}
static inline constexpr void resize(unsigned int){}
inline unsigned int length(){
if (v4.sin_family==ipv4::AF()){
return v4.length();
}else if (v6.sin6_family==ipv6::AF()){
return v6.length();
}
throw std::domain_error("No such ip address");
}
inline void set_port(uint16_t hport) {
if (v4.sin_family==ipv4::AF()){
return v4.set_port(hport);
}else if (v6.sin6_family==ipv6::AF()){
return v6.set_port(hport);
}
throw std::domain_error("No such ip address");
}
string inline to_pret() const {
if (v4.sin_family==ipv4::AF()){
return v4.to_pret();
}else if (v6.sin6_family==ipv6::AF()){
return v6.to_pret();
}
throw std::domain_error("No such ip address");
}
};
struct tcp {
static inline constexpr auto SOCK = SOCK_STREAM;
static inline constexpr auto PROTO = IPPROTO_TCP;
template <IP_Address T>
using acceptor = zio::acceptor<T, zio::ip::tcp>;
template <IP_Address T>
static auto async_connect(T&& addr) {
return zio::async_connect<T, tcp>(std::forward<T>(addr));
}
};

struct udp {
static constexpr auto SOCK = SOCK_DGRAM;
static constexpr auto PROTO = IPPROTO_UDP;
template <IP_Address T>
using acceptor = zio::acceptor<T, zio::ip::udp>;
template <IP_Address T>
static auto async_connect(T&& addr) {
return zio::async_connect<T, udp>(std::forward<T>(addr));
}
template <IP_Address T>
static auto open(T&& addr) {
return zio::open<T, udp>(std::forward<T>(addr));
}
};

zio_http.hpp

简单对http进行一些封装,包括

  • zio::http::url::decodezio::http::url::encode对url的解编码
  • zio::http::STATUS_CODE将状态码文本化的map
  • zio::http::MIME::data_typezio::http::MIME::data_type_name对MIME的简单列举
  • struct zio::http::http_requestzio::http::http_request::recv_requestzio::http::http_request::recv_parse和可能的zio::http::http_request::recv_content对http请求的封装
  • zio::http::make_http_header,制作http内容的头信息

是一些使用起来很方便就能构建一个http/1.1服务器的工具函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#pragma once
#include <charconv>
#include <string_view>
#include <unordered_map>
#include "uringtest.h"
#include "zio_buffer.hpp"
#include "zio_ip.hpp"
#include "zio_types.hpp"

using namespace std;
namespace zio::http {
namespace url {
inline string encode(string_view a) {
ostringstream os;
os.fill('0');
os << hex;
for (auto c : a) {
if (isalnum(c) || c == '-' || c == '_' || c == '.' || c == '~') {
os << c;
} else {
os << uppercase << '%' << setw(2) << int((unsigned char)c) << nouppercase;
}
}
return os.str();
}

inline string decode(string_view a) {
string os;
for (auto i = begin(a); i != end(a);) {
if (*i == '%' && end(a) - i >= 3) {
os += tools::asciihex_to_uint(*next(i)) * 16 +
tools::asciihex_to_uint(*next(i, 2));
i += 3;
} else {
os += *i;
i++;
}
}
return os;
}
} // namespace url
namespace STATUS_CODE {
inline map<int, string> status_code_to_name = {
{200, "OK"},
{404, "Not Found"},
};
}
namespace MIME {
enum data_type {
html,
file,
};

inline map<data_type, string> data_type_name = {
{html, "text/html; charset=utf-8"},
{file, "application/octet-stream"}};
} // namespace MIME

struct http_request {
string _data, content;
string_view method, url, version;
unordered_map<string_view, string_view> header;
inline http_request() = default;
inline http_request(http_request&&) = default;
inline http_request(const http_request&) = delete;
inline http_request& operator=(http_request&&) = default;
inline http_request& operator=(http_request&) = delete;
inline operator bool() { return _data.size(); }
inline void prase() {
const char* l = _data.data();
bool xfirst = true;
for (const char& c : _data) {
if (c == '\r' || c == '\n') {
if (&c - l >= 2) {
if (xfirst) {
const char* l1 = ranges::find(string_view(l, &c), ' ');
if (l1 >= &c - 2) {
_data.clear();
return;
}
const char* l2 = ranges::find(string_view(l1 + 2, &c), ' ');
if (l2 >= &c - 1) {
_data.clear();
return;
}
method = {l, l1};
url = {l1 + 1, l2};
version = {l2 + 1, &c};
xfirst = false;
} else {
const char* l1 = ranges::find(string_view(l, &c), ':');
if (l1 == &c) {
_data.clear();
return;
}
header[tools::strip(string_view(l, l1))] =
tools::strip(string_view{l1 + 1, &c});
}
}
l = &c;
}
}
}
inline awaitable<void> recv_content(
zio::buffer::zio_istream<ip::ipv4, ip::tcp>& s) {
if (auto cl = header.find("Content-Length"); cl != header.end()) {
if (int length;
from_chars(cl->second.begin(), cl->second.end(), length).ec ==
errc{}) {
auto x = tools::to_int(cl->second);
if (x)
if (int len = *x; len > 0) {
for (int i = 0; i < len; i++)
if (auto c = co_await s.get())
content += *c;
else
break;
}
}
}
}
inline static awaitable<http_request> read_request(
zio::buffer::zio_istream<ip::ipv4, ip::tcp>& s) {
string buffer;
bool f = true;
while (auto x = co_await s.get()) {
buffer += *x;
if (buffer.size() >= 1024 * 1024)
break;
if (buffer.ends_with("\r\n\r\n") || buffer.ends_with("\n\n") ||
buffer.ends_with("\r\r")) {
f = false;
break;
}
}
if (f)
co_return {};
http_request h;
h._data = std::move(buffer);
h.prase();
co_return std::move(h);
}
};

inline string make_http_header(int status_code,
size_t size,
MIME::data_type type,
vector<string> extra = {}) {
string header;
auto add_line = [&](string_view str) {
header += str;
header += "\r\n";
};
add_line("HTTP/1.1 " + to_string(status_code) + " " +
STATUS_CODE::status_code_to_name[status_code]);
add_line(
"Server: "
"SimpleHTTP/0.1 "
"with libzio");
add_line("Date: " + tools::format_now("%c %Z"));
add_line("Content-Length: " + to_string(size));
add_line("Content-type: " + MIME::data_type_name[type]);
for (auto& i : extra)
add_line(i);
add_line("");
return header;
}
} // namespace zio::http