要想理解为什么要使用IOCP, 首先我们要理解同步/异步
什么是同步/异步
同步和异步的概念很好理解:
-
同步的操作在函数返回的时候就已经完成了, 比如读一个文件, 当函数返回时所有的数据已经全部读到了, 如果IO比较耗时(比通过网络读取远程的文件)就一直等待直到本次的读操作完成
-
异步的操作在函数返回的时候不一定完成了, 比如读一个文件, 如果IO比较耗时, 函数不会等待读操作完成, 它会立刻结束, 当本次的读操作真正完成后在通知调用方(通常是调用回调函数)
需要注意的是同步/异步操作和线程无关, 一个异步操作完成时候的通知(callback)可能在其它线程中, 也可能在本线程的事件循环中的某一次事件中. 总之, 我们可以这样理解: 异步操作完成不是在函数调用结束, 而是在未来的某一个时刻完成.
为什么要用异步
假设我们正在开发一个服务器用来执行客户端发过来的请求, 需要以下步骤:
- 等待 client 消息请求
- 执行一些耗时的IO (这里的耗时并不是指需要大量CPU运算, 而是指需要等待IO完成, 比如请求sql, 读取磁盘文件, 等待网络返回等) …
- 执行后续操作
如果这个耗时的IO是同步的, 其伪代码如下:
void sync_io(...);
int main() {
while (!stop) {
// 1. 等待 client 消息请求
wait_client_msg(...);
...
// 2. 执行一些耗时IO
sync_io(...);
sync_io(...);
...
// 3. 执行后续操作
other_option(...);
return 0;
}
}
上面的代码如果只处理一个请求的话是没有问题的, 但如果有多个请求同时发过来的时候, 它只能按顺序一个一个的执行:
请求1 --> 请求2 --> 请求3 -->...
这是相当低效的, 实际上当我们处理一个请求时, 程序的大多时间都花在了等待sync_io完成上面, 真正占用的资源并不多, 即便PC的性能再高, 后续的请求也依旧需要等待.
因此我们需要将同步IO函数改为异步IO函数: sync_io(...) -> async_io(..., callback)
为什么不能使用多线程来实现异步
我们很容易想到使用线程将sync_io包起来来实现 sync_io:
void async_io(..., callback) {
// 让我们暂时忽略线程的生命周期管理
std::thread([]() {
auto r = sync_io(...);
callback(r);
}).detach();
}
这样的话, 每次调用async_io都要创建一个线程, 如果有成千上万的并发, 那么就得创建成千上万的线程, 这会有几个致命缺陷:
- 创建线程的代价是也是极大的, 比如每创建一个线程就要为其分配一个栈(一般是4M), 所以一台机器能够创建的线程是有限的
- 当线程数大于CPU核心数时候, 线程的上下文进行切换也是极为消耗资源的, 尤其是线程数量巨大时候, 线程的上下文进行切换的消耗也尤为明显
顺便一提, 线程池的方案是明显不行的, 使用线程池不过是并行请求数量从1变为线程池的线程数量而已
什么是IOCP
现在我们知道不能来一个io操作就为其开一个线程, 那么我们有什么解决方案呢? 实际上这个问题的关键是如何知道一个IO操作完成了, 在使用多线程方案中是在一个线程中等待IO完成, 我们容易想到以下思路:
- 如果有方法能够检查一个IO是否完成, 我们就可以在一个线程中遍历多个IO操作了, 如果完成了就调用其callback.
- 或者一个IO完成了能不能给一个通知呢, 这样只需要接到通知的时候调用callback就行了.
顺便一提, 这俩个思路很类似, 第一个思路封装一下就是第二个
这两个想法都是可以实现的, 在windows上比如Select, iocp, 奇怪一点的比如::WaitForMultipleEvent, linux上的epol等.
本文主要讲iocp, 它属于第二种思路: 使用支持iocp的io接口时, 当有IO操作完成的时候, 它会通知IOCP. 在具体说iocp如何使用前, 我们大概可以预想下应该要有以下操作:
- 创建iocp
- 执行io操作(支持iocp的接口)时候, 将其绑定到1步中创建的iocp上, 并传入一个key用来表示是哪一个io操作
- 在一个线程中等待io完成, 并根据传入的key判断是那个io操作完成了
伪代码如下:
struct io_ctx {
callback;
}
void async_io(..., iocp, callback) {
auto io_handle = create_io_handle();
// 2. 执行io操作时候, 将其绑定到1步中创建的iocp上, 并传入一个key用来表示是哪一个io操作
bind_iocp(iocp, io_handle);
do_io_opt(io_handle, io_ctx{callback});
}
int main() {
// 1. 创建iocp
auto iocp = create_iocp();
std::thread([](iocp){
// 3. 在一个线程中等待io完成, 并根据传入的key判断是那个io操作完成了
while (!stop) {
auto key = iocp.wait();
key.callback();
}
}).detach();
async_io(... iocp, []() {
...
});
return 0;
}
这里需要说明的是要使用iocp, 所有的io接口(比如read/write等)都必须支持iocp, windows操作系统已经提供了这一系列接口.
如何使用iocp
思路已经很明确了, 我们需要学习上述步骤中的相关接口
1. 创建iocp
HANDLE CreateIoCompletionPort(
[in] HANDLE FileHandle,
[in, optional] HANDLE ExistingCompletionPort,
[in] ULONG_PTR CompletionKey,
[in] DWORD NumberOfConcurrentThreads
);
这个接口既可以用来创建iocp, 也可以用来将io和iocp绑定, 用作创造iocp时候, 参数要求如下:
FileHandle: 必须为INVALID_HANDLE_VALUE
ExistingCompletionPort: 必须为NULL
CompletionKey: 被忽略, 可以传入任意值, 建议为0
NumberOfConcurrentThreads: 系统调度iocp所使用的线程数量, 可以指定为0表示使用和处理器数一样多的线程
该接口返回新创建的iocp句柄.
HANDLE create_iocp() {
return ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
}
2. 将io操作句柄和iocp绑定
HANDLE CreateIoCompletionPort(
[in] HANDLE FileHandle,
[in, optional] HANDLE ExistingCompletionPort,
[in] ULONG_PTR CompletionKey,
[in] DWORD NumberOfConcurrentThreads
);
依旧使用这个接口, 参数要求如下:
FileHandle: 支持io操作的句柄, 比如使用CreateFile接口并指定FILE_FLAG_OVERLAPPED标志所打开的文件句柄等
ExistingCompletionPort: 上一步创建的iocp返回句柄
CompletionKey: 用户指定的值, 可以用来帮助程序跟踪哪个 I/O 操作已完成
NumberOfConcurrentThreads: 被忽略
该接口返回上一步创建的iocp返回句柄.
bool bind_to_iocp(HANDLE io_handle, HANDLE iocp_handle, ULONG_PTR key) {
return ::CreateIoCompletionPort(io_handle, iocp_handle, key, 0) != NULL;
}
等待io完成
BOOL GetQueuedCompletionStatus(
[in] HANDLE CompletionPort,
LPDWORD lpNumberOfBytesTransferred,
[out] PULONG_PTR lpCompletionKey,
[out] LPOVERLAPPED *lpOverlapped,
[in] DWORD dwMilliseconds
);
该接口是同步接口, 参数解释如下:
CompletionPort: CreateIoCompletionPort 返回的iocp句柄
lpNumberOfBytesTransferred: io完成的具体字节数量, 读/写了多少数据
lpCompletionKey: 将io操作句柄和iocp绑定时候传入的key
lpOverlapped: 调用io接口时候传入的值, CompletionKey用来表示是在哪一个io句柄上的操作, Overlapped在该io句柄上的哪次操作, 比如对一个io句柄调用了多次Read/Write, 使用这个参数来确定是哪一个操作成功了.
dwMilliseconds: 毫秒为单位的等待超时时间. 指定INFINITE 为永不超时
该接口返回TRUE表示成功
enum class iocp_notify_type
{
complete = 0, // 有端口完成了
closed, // iocp被关闭了, 此时应该停止pump退出循环
timeout // 超时
};
struct iocp_item
{
bool has_error = false; // 如果为true, 说明某个端口完成了, 但是有错误, 比如连接断开等
DWORD error_code = 0;
u32 transferred_number = 0;
ULONG_PTR key = NULL;
OVERLAPPED* overlapped = NULL;
};
iocp_notify_type wait(HANDLE iocp, iocp_item& item, u32 timeout /* = INFINITE */) {
BOOL result = ::GetQueuedCompletionStatus(iocp, (LPDWORD)&(item.transferred_number), (PULONG_PTR)&item.key, &item.overlapped, timeout);
if (item.overlapped == NULL) {
// 当完成端口未完成返回时候有以下几种情况:
// 1. ioco被关闭, ::GetLastError() == ERROR_ABANDONED_WAIT_0;
// 2. 超时返回, ::GetLastError() == WAIT_TIMEOUT;
// 3. 其它情况按超时处理
if (::GetLastError() == ERROR_ABANDONED_WAIT_0) {
return iocp_notify_type::closed;
}
// ::GetLastError() == WAIT_TIMEOUT and other case.
return iocp_notify_type::timeout;
}
// 当完成端口完成时候有以下几种情况:
// 1. 当result为TRUE, 正常完成;
// 2. 当result为FALSE, 异常完成, 比如socket连接断开等;
item.has_error = !result;
if (item.has_error) {
item.error_code = ::GetLaseError();
item.transferred_number = 0;
} else {
(void)::GetOverlappedResult(item.handle, item.overlapped, (LPDWORD)(&item.transferred_number), true);
}
return iocp_notify_type::complete;
}
现在, 我们来完整的实现下:
struct io_ctx {
HANDLE io_handle = NULL;
OVERLAPPED ov {0};
std::function<void(bool successful, int readed)> callback;
};
void async_io(HANDLE iocp_handle, const std::function<void(bool successful, int readed)>& callback) {
io_ctx * ctx = new io_ctx();
ctx->callback = callback;
do {
auto io_handle = ::CreateFileA("text.txt", GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
if (io_handle == INVALID_HANDLE_VALUE) {
break;
}
ctx->io_handle = io_handle;
if (!bind_to_iocp(io_handle, iocp_handle, (ULONG_PTR)ctx)) {
break;
}
if (!::ReadFile(handle, buff, (DWORD)buff_size, NULL, &ctx.ov) &&
::GetLastError() != ERROR_IO_PENDING) {
break;
}
} while (0);
::CloseHandle(iocp_handle);
if (ctx->io_handle != INVALID_HANDLE_VALUE) {
::CloseHandle(ctx->io_handle);
}
delete ctx;
}
int main() {
// 1. 创建iocp
auto iocp_handle = create_iocp();
if (iocp_handle == NULL) {
return -1;
}
// 2. 异步io
async_io(iocp_handle, [](bool successful, int readed) {
// ...
});
// 3. 在主线程中等待io的完成
while (true) {
iocp_item item;
iocp_notify_type type = wait(iocp_handle, item);
if (type == iocp_notify_type::closed) {
break;
} else if (type == iocp_notify_type::complete) {
io_ctx * ctx = (io_ctx *)item.key;
ctx->callback(!item.has_error, item.transferred_number);
::CloseHandle(ctx->io_handle);
delete ctx;
}
}
return 0;
}
我们创建了io_ctx这样的结构来保存上下文, 事实上我们可以将这些封装到一个类中, 完整的代码参考:
iocp 实现
iocp 测试
iocp + coroutine + winsocket http(s) server 测试