目录
一:epoll_wait如何判断服务端是异步否返回
二:流程图
三:伪代码
四:注意
五:简化代码(可运行)
今天使用异步操作来实现请求,这里要用到epoll_wait来监听服务端是请求请求否返回数据来实现异步
一:epoll_wait如何判断服务端是否返回
通过fd是否可读
二:流程图
三:伪代码
struct Context{ epollfd;}struct sockItem{ socketfd; cb result_cb;}Context* asy_init(){ Context* context = new Context; epollfd = epoll_create(1); context.epollfd = eppllfd; std::thread th(handleCallback, context); return context;}void asy_commit(Context* context, cb result_cb){ sockfd = socket(); sockaddr_in addr; connect(sockfd, addr); Request request; sendto(sockfd, request, sizeof(request), 0); sockItem* sockitem = new sockItem; sockitem.sockfd = sockfd; sockitem.result_cb = result_cb; epoll_event event; event.data.ptr = sockitem; event.events = EPOLLIN; epoll_ctl(context.epollfd, sockfd, EPOLL_CTL_ADD);}void handleCallback(Context* context){ epollfd = context.epollfd; epoll_event events[1024]; int nums = epoll_wait(epollfd, events, 1024, 0); for(int i = 0; i < nums; ++i) { sockitem = event.data.ptr; sockfd = sockitem.sockfd; result_cb = sockitem.result_cb; if(event.data.fd & EPOLLIN) { int n = recvfrom(sockfd, response, sizeof(response), 0); } result_cb(response); }}void result_callback(Response response) //用户定义的, 用于处理收到的数据的回调{ std::cout << response << std::endl;}int main(){ Context* context = asy_init(); for(int i = 0; i < nums; ++i) { asy_commit(context, result_callback); } return 0;}
四:注意
- 两个context通过在堆区分配内存来防止被回收,进而实现函数间数据的异步传输
- 五个函数,两个和callback有关的请求请求函数,一个是异步handleCallback(在这个函数的最后面执行用户回调),一个是请求请求userCallback(将被执行的用户回调,让用户来决定怎么处理拿到的异步数据)
- event.data.ptr可以传输一个数据,这个在很多地方都是请求请求用于实现回调函数的注册
- 通过这样实现了commit只管请求,不管数据处理,异步从而实现了异步(不用等待返回数据处理,请求请求才能再次commit)
五:简化代码(可运行)
#include #include #include #include #include #include #include struct Context{ int epollfd;};//模拟构建request和response格式using Response = std::string;using Requset = std::string;//用户回调的异步函数格式using CallBack = std::function;struct sockItem{ int sockfd; CallBack result_cb;};void result_callback(Response response) //用户定义的, 用于处理收到的数据的回调{ std::cout << response << std::endl;}void handleCallback(Context* context){ int epollfd = context->epollfd; epoll_event events[1024]; int nums = epoll_wait(epollfd, events, 1024, 0); for(int i = 0; i < nums; ++i) { sockItem* sockitem = (sockItem* )events[i].data.ptr; int sockfd = sockitem->sockfd; CallBack result_cb = sockitem->result_cb; // if(events[i].data.fd & EPOLLIN) // { // int ret = recvfrom(sockfd, response, sizeof(response), 0); // } std::string response = "response";//假设拿到了response result_cb(response); }}class asyInit{ private: std::thread th;public: ~asyInit() { th.join(); } Context* asy_init() { Context* context = new Context; int epollfd = epoll_create(1); context->epollfd = epollfd; th = std::thread(handleCallback, context); return context; }};void Send(Response response) { }void asy_commit(Context* context, CallBack result_cb_){ int sockfd = socket(AF_INET, SOCK_STREAM, 0); // sockaddr_in addr; // int ret = connect(sockfd, addr, sizeof(addr)); //connect(sockfd, addr, sizeof(addr)); //假设已经建立连接 Requset request = "request"; // sendto(sockfd, request.c_str(), sizeof(request.c_str()), 0); Send(request);//假设发送数据 // sockItem* sockitem;//这里定义了指针没有分配内存, 直接报错 sockItem* sockitem = new sockItem; sockitem->sockfd = sockfd; sockitem->result_cb = result_cb_; epoll_event event; event.data.ptr = sockitem; event.events = EPOLLIN; epoll_ctl(context->epollfd, EPOLL_CTL_ADD, sockfd, &event);}int main(){ asyInit asy; Context* context = asy.asy_init(); for(int i = 0; i < 5; ++i) { asy_commit(context, result_callback); } return 0;}