网站首页 编程语言 正文
目录
- 一.功能介绍
- 二.string类型数据交互
- 2.1 程序源码
- 2.2 编译&&执行
- 2.3 程序执行结果
- 三.byte类型数据交互
- 3.1 程序源码
- 3.2 编译&&执行
- 3.3 程序执行结果
一.功能介绍
基于boost asio实现server端通信,采用one by one的同步处理方式,并且设置连接等待超时。下面给出了string和byte两种数据类型的通信方式,可覆盖基本通信场景需求。
二.string类型数据交互
规定server与client双方交互的数据格式是string,并且server采用read_until的方式接收来自client的消息,通过delimiter(分隔符)来判断一帧数据接收完成,当未收到来自client的delimiter,那么server会一直等待,直到收到delimiter或超时。此处设置了本次回话的连接超时时间SESSION_TIMEOUT,程序中定义为2min,每次收到数据后重新计时,若是连续2min中内有收到来自client的任何消息,那么server会自动断开本次连接,并且析构本次的session。
通过string发送和接收的数据采用ASCII码的编码方式,因此不能直接发送byte数据,不然会产生乱码(第三部分为byte数据交互);采用string数据传输方式可以方便的进行序列化与反序列化,例如采用json对象的传输的方式,可以方便的组织交互协议。
下面是功能实现的完整程序,程序编译的前提是已经安装了boost库,boost库的安装及使用方法在我的前述博客已有提到:boost库安装及使用
2.1 程序源码
mian.cpp
#include "software.hpp"
int main(int argc, char** argv)
{
if(2 != argc){
std::cout<<"Usage: "<<argv[0]<< " port"<<std::endl;
return -1;
}
try {
boost::asio::io_context io;
int port = atoi(argv[1]); // get server port
software::server(io, port); // 开启一个server,ip地址为server主机地址,port为mian函数传入
}
catch (std::exception& e) {
std::cout<<"main exception: " << e.what()<<std::endl;
}
return 0;
}
software.hpp
#ifndef __SOFTWARE_HPP__
#define __SOFTWARE_HPP__
#include <string>
#include <iostream>
#include <boost/asio.hpp>
namespace software {
//! Session deadline duration
constexpr auto SESSION_TIMEOUT = std::chrono::minutes(2);
//! Protocol delimiter to software client;分隔符:接收来自client的string数据必须以"}\n"结尾
static constexpr char const* delimiter = "}";
namespace asio = boost::asio;
using tcp = asio::ip::tcp;
/**
* @brief Session for software
* Inherit @class enable_shared_from_this<>
* in order to give the lifecycle to io context,
* it'll causes the lifecycle automatically end when connection break
* (async operation will return when connection break)
* @code
* asio::io_context io;
* session sess(io, std::move(socket));
* io.run();
* @endcode
*/
class session
{
public:
/* session constructor function */
session(asio::io_context& io, tcp::socket socket);
/* session destructor function */
~session();
private:
/*! Async read session socket */
void do_read();
/*! Async wait deadline */
void async_deadline_wait();
/*! software on message handler */
void on_message(std::string&& message);
private:
tcp::socket socket_; // tcp socket
std::string recv_data_; // recv buffer[string]
asio::steady_timer deadline_; // wait deadline time,expire it will disconnect auto
};
/**
* @brief Start server to software(同步方式accept)
* Will serve client one by one(同步方式)
* @param[in] io The asio io context
* @param[in] port The listen port
*/
inline void server(asio::io_context& io, unsigned short port)
{
std::cout<<"sync server start, listen port: " << port << std::endl;
tcp::acceptor acceptor(io, tcp::endpoint(tcp::v4(), port));
// 一次处理一个连接[one by one]
while (true) {
using namespace std;
// client请求放在队列中,循环逐个处理,处理完继续阻塞
tcp::socket sock(io);
acceptor.accept(sock); // 一开始会阻塞在这,等待software client连接
io.restart();
session sess(io, std::move(sock)); // io socket
io.run(); // run until session async operations done,调用run()函数进入io事件循环
}
}
} // namespace software
#endif
software.cpp
#include "software.hpp"
using namespace std;
namespace software {
/**
* @brief Session construct function
* @param[in] io The io context
* @param[in] socket The connected session socket
*/
session::session(asio::io_context& io, tcp::socket socket)
: socket_(std::move(socket))
, deadline_(io)
{
std::cout<<"session created: " << socket_.remote_endpoint() <<std::endl;
do_read(); //在构造函数中调用do_read()函数完成对software数据的读取
async_deadline_wait(); //set on-request-deadline
}
session::~session()
{
std::cout<<"session destruct!" << std::endl;
}
/**
* @brief 从software异步读取数据并存放在recv_data_中
*/
void session::do_read()
{
auto handler = [this](std::error_code ec, std::size_t length) {
// recv data success, dispose the received data in [on_message] func
if (!ec && socket_.is_open() && length != 0) {
on_message(recv_data_.substr(0, length));
recv_data_.erase(0, length); // 将recv_data_擦除为0
do_read(); // Register async read operation again,重新执行读取操作
}
// error occured, shutdown the session
else if (socket_.is_open()) {
std::cout<<"client offline, close session" << std::endl;
socket_.shutdown(asio::socket_base::shutdown_both); // 关闭socket
socket_.close(); // 关闭socket
deadline_.cancel(); // deadline wait计时取消
}
};
std::cout<<"server waiting message..." << std::endl;
// block here until received the delimiter
asio::async_read_until(socket_, asio::dynamic_buffer(recv_data_),
delimiter, // 读取终止条件(分隔符号)
handler); // 消息处理句柄函数
deadline_.expires_after(SESSION_TIMEOUT); // close session if no request,超时2min自动关闭session
}
/**
* @brief Async wait for the deadline,计时等待函数
* @pre @a deadline_.expires_xxx() must called
*/
void session::async_deadline_wait()
{
using namespace std::chrono;
deadline_.async_wait(
//! lambda function
[this](std::error_code) {
if (!socket_.is_open())
return;
if (deadline_.expiry() <= asio::steady_timer::clock_type::now()) {
std::cout<< "client no data more than <"
<< duration_cast<milliseconds>(SESSION_TIMEOUT).count()
<< "> ms, shutdown" << std::endl;
socket_.shutdown(asio::socket_base::shutdown_both);
socket_.close();
return;
}
async_deadline_wait();
}
);
}
/**
* @brief SOFTWARE on message handler
* @param[in] message The received message
* &&表示右值引用,可以将字面常量、临时对象等右值绑定到右值引用上(也可以绑定到const 左值引用上,但是左值不能绑定到右值引用上)
* 右值引用也可以看作起名,只是它起名的对象是一个将亡值。然后延续这个将亡值的生命,直到这个引用销毁的右值的生命也结束了。
*/
void session::on_message(std::string&& message)
{
using namespace std;
try {
// print receive data
std::cout<<"recv from client is: "<<message<<std::endl;
// response to client
string send_buf = "hello client, you send data is: " + message;
asio::write(socket_, asio::buffer(send_buf));
}
catch (exception& ex) {
std::cout<<"some exception occured: "<< ex.what() << std::endl;
}
}
} // namespace software
分析一下系统执行流程:
- 在main函数中传入io和port,调用 software.hpp中的server(asio::io_context& io, unsigned short port)函数。
- 在server()函数中while(True)循环体中accept来自client的连接,每次接收到一个client的连接会创建一个session对象,在session对象中处理本次的连接socket。注意,此处采用的是one by one的同步处理方式,只有上一个session处理完成才能处理下一个session的请求,但是同步发送的请求消息不会丢失,只是暂时不会处理和返回;总的来说,server会按照请求的顺序进行one by one处理。
- session对象创建时会调用构造函数,其构造函数主要做了两件事情:一是调用do_read()函数进行等待读取来自client的数据并处理;二是通过async_deadline_wait()设置本次session连接的超时处理方法,超时时间默认设置为SESSION_TIMEOUT:deadline_.expires_after(SESSION_TIMEOUT)。
- 在do_read()函数中采用async_read_until()函数读取来自client的数据,async_read_until()函数会将传入的delimiter分隔符作为本次接收的结束标识。
- 当判断本次接收数据完成后,会调用handler句柄对消息进行处理,在handler句柄中主要做了两件事情:一是将收到的string信息传入到on_message()消息处理函数中进行处理,只有当本条消息处理完成后才能接收下一条消息并处理,消息会阻塞等待,但是不会丢失;二是在消息处理完成后再次调用do_read()函数,进入read_until()等待消息,如此循环…
- 当发生错误或异常,在hander中会关闭本次socket连接,并且不会再调用其他循环体,表示本次session通信结束,之后调用析构函数析构session对象。
socket_.shutdown(asio::socket_base::shutdown_both); // 关闭socket
socket_.close(); // 关闭socket
deadline_.cancel(); // deadline wait计时取消
- 在on_message()消息处理函数中会对收到的string数据进行处理(上述程序中以打印代替),然后调用asio::write(socket_, asio::buffer(send_buf))将response发送给client。
2.2 编译&&执行
编译:g++ main.cpp software.cpp -o iotest -lpthread -lboost_system -std=c++17
执行:./iotest 11112 (监听端口为11112)
2.3 程序执行结果
可以看出,client发送的每条消息都要以"}"结束,这是设定的delimter分隔符。
可以看出,当超过2min没有收到来自clinet的消息,server会自动断开连接。
tips:client1和clinet2可同时与server建立连接并发送数据,但是server会按照连接建立的先后顺序对client发送的请求进行one by one处理,比如clinet1先与server建立了连接,那么只有等到clinet1的所有请求执行完成才会处理client2发送的请求;在等待期间client2发送的请求不会处理,但不会丢失。
三.byte类型数据交互
上述给出了string类型数据的交互,但是string类型的数据只能采用ASCII码的方式传输,在某些场景中,例如传感器,需要交互byte类型的数据。因此下面给出了byte[hex]类型数据的交互。与上述的string数据交互流程基本一致,几点区别在下面阐述:
- 将session类中的string recv_data_;替换成u_int8_t recv_data_[MAX_RECV_LEN];
- 数据读取方式由read_until()改为:socket_.async_receive(asio::buffer(recv_data_,MAX_RECV_LEN),handler);
- on_message()数据处理函数变为:void on_message(const u_int8_t* recv_buf,std::size_t recv_len);
- 数据发送方式变为:socket_.async_send(asio::buffer(recv_buf,recv_len),[](error_code ec, size_t size){});
3.1 程序源码
mian.cpp
同上
software.hpp
#ifndef __SOFTWARE_HPP__
#define __SOFTWARE_HPP__
#include <string>
#include <iostream>
#include <boost/asio.hpp>
#define MAX_RECV_LEN 2048
namespace software {
//! Session deadline duration
constexpr auto SESSION_TIMEOUT = std::chrono::minutes(2);
//! Protocol delimiter to software client;分隔符:接收来自client的string数据必须以"}\n"结尾
static constexpr char const* delimiter = "}";
namespace asio = boost::asio;
using tcp = asio::ip::tcp;
/**
* @brief Session for software
* Inherit @class enable_shared_from_this<>
* in order to give the lifecycle to io context,
* it'll causes the lifecycle automatically end when connection break
* (async operation will return when connection break)
* @code
* asio::io_context io;
* session sess(io, std::move(socket));
* io.run();
* @endcode
*/
class session
{
public:
/* session constructor function */
session(asio::io_context& io, tcp::socket socket);
/* session destructor function */
~session();
private:
/*! Async read session socket */
void do_read();
/*! Async wait deadline */
void async_deadline_wait();
/*! software on message handler */
void on_message(const u_int8_t* recv_buf,std::size_t recv_len);
private:
tcp::socket socket_; // tcp socket
u_int8_t recv_data_[MAX_RECV_LEN]; // recv buffer[byte]
asio::steady_timer deadline_; // wait deadline time,expire it will disconnect auto
};
/**
* @brief Start server to software(同步方式accept)
* Will serve client one by one(同步方式)
* @param[in] io The asio io context
* @param[in] port The listen port
*/
inline void server(asio::io_context& io, unsigned short port)
{
std::cout<<"sync server start, listen port: " << port << std::endl;
tcp::acceptor acceptor(io, tcp::endpoint(tcp::v4(), port));
// 一次处理一个连接[one by one]
while (true) {
using namespace std;
// client请求放在队列中,循环逐个处理,处理完继续阻塞
tcp::socket sock(io);
acceptor.accept(sock); // 一开始会阻塞在这,等待software client连接
io.restart();
session sess(io, std::move(sock)); // io socket
io.run(); // run until session async operations done,调用run()函数进入io事件循环
}
}
} // namespace software
#endif
software.cpp
#include "software.hpp"
using namespace std;
namespace software {
/**
* @brief Session construct function
* @param[in] io The io context
* @param[in] socket The connected session socket
*/
session::session(asio::io_context& io, tcp::socket socket)
: socket_(std::move(socket))
, deadline_(io)
{
std::cout<<"session created: " << socket_.remote_endpoint() <<std::endl;
do_read(); //在构造函数中调用do_read()函数完成对software数据的读取
async_deadline_wait(); //set on-request-deadline
}
session::~session()
{
std::cout<<"session destruct!" << std::endl;
}
/**
* @brief 从software异步读取数据并存放在recv_data_中
*/
void session::do_read()
{
auto handler = [this](std::error_code ec, std::size_t length) {
// recv data success, dispose the received data in [on_message] func
if (!ec && socket_.is_open() && length != 0) {
on_message(recv_data_, length);
memset(recv_data_,0,sizeof(recv_data_));// 将recv_data_擦除为0
do_read(); // Register async read operation again,重新执行读取操作
}
// error occured, shutdown the session
else if (socket_.is_open()) {
std::cout<<"client offline, close session" << std::endl;
socket_.shutdown(asio::socket_base::shutdown_both); // 关闭socket
socket_.close(); // 关闭socket
deadline_.cancel(); // deadline wait计时取消
}
};
std::cout<<"server waiting message..." << std::endl;
//block here to receive some byte from client
socket_.async_receive(asio::buffer(recv_data_,MAX_RECV_LEN),handler);
deadline_.expires_after(SESSION_TIMEOUT); // close session if no request,超时2min自动关闭session
}
/**
* @brief Async wait for the deadline,计时等待函数
* @pre @a deadline_.expires_xxx() must called
*/
void session::async_deadline_wait()
{
using namespace std::chrono;
deadline_.async_wait(
//! lambda function
[this](std::error_code) {
if (!socket_.is_open())
return;
if (deadline_.expiry() <= asio::steady_timer::clock_type::now()) {
std::cout<< "client no data more than <"
<< duration_cast<milliseconds>(SESSION_TIMEOUT).count()
<< "> ms, shutdown" << std::endl;
socket_.shutdown(asio::socket_base::shutdown_both);
socket_.close();
return;
}
async_deadline_wait();
}
);
}
/**
* @brief SOFTWARE on message handler
* @param[in] recv_buf The received byte array address
* @param[in] recv_len The received byte length
*/
void session::on_message(const u_int8_t* recv_buf,std::size_t recv_len)
{
using namespace std;
try {
// print receive data
std::cout<<"recv data length is: "<<recv_len<<" data is: ";
for(int i = 0; i<recv_len; i++)
printf("%x ",recv_buf[i]);
std::cout<<std::endl;
// response to client
socket_.async_send(asio::buffer(recv_buf,recv_len),[](error_code ec, size_t size){});
}
catch (exception& ex) {
std::cout<<"some exception occured: "<< ex.what() << std::endl;
}
}
} // namespace software
3.2 编译&&执行
编译:g++ main.cpp software.cpp -o iotest -lpthread -lboost_system -std=c++17
执行:./iotest 11112 (监听端口为11112)
3.3 程序执行结果
原文链接:https://blog.csdn.net/weixin_42700740/article/details/125873199
相关推荐
- 2021-12-16 方案缺陷-HAProxy + Sentinel +redis
- 2022-12-13 python字典如何获取最大和最小value对应的key_python
- 2023-07-10 SpringBoot AOP+注解方式实现多数据源切换可能遇到的问题
- 2023-01-28 GoLang反射机制深入讲解_Golang
- 2023-01-31 Python函数用法和底层原理分析_python
- 2022-06-11 嵌入式C语言二级指针在链表中的应用_C 语言
- 2022-12-08 C#调用C++ DLL bool返回值始终为true的问题_C#教程
- 2023-02-18 C#调用EXE文件实现传参和获取返回结果_C#教程
- 最近更新
-
- window11 系统安装 yarn
- 超详细win安装深度学习环境2025年最新版(
- Linux 中运行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存储小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基础操作-- 运算符,流程控制 Flo
- 1. Int 和Integer 的区别,Jav
- spring @retryable不生效的一种
- Spring Security之认证信息的处理
- Spring Security之认证过滤器
- Spring Security概述快速入门
- Spring Security之配置体系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置权
- redisson分布式锁中waittime的设
- maven:解决release错误:Artif
- restTemplate使用总结
- Spring Security之安全异常处理
- MybatisPlus优雅实现加密?
- Spring ioc容器与Bean的生命周期。
- 【探索SpringCloud】服务发现-Nac
- Spring Security之基于HttpR
- Redis 底层数据结构-简单动态字符串(SD
- arthas操作spring被代理目标对象命令
- Spring中的单例模式应用详解
- 聊聊消息队列,发送消息的4种方式
- bootspring第三方资源配置管理
- GIT同步修改后的远程分支