套接字通信
1.1 通信效率问题
-
服务器端
-
单线程 / 单进程
- 无法使用,不支持多客户端
-
多线程 / 多进程
- 写程序优先考虑多线程:
- 什么时候考虑多进程?
- 启动了一个可执行程序 A ,要在 A 中启动一个可执行程序 B
- 支持多客户端连接
-
IO 多路转接
-
单线程 / 进程
-
支持多客户端连接但是效率不是最高的
- 所有的客户端请求都是顺序处理的 -> 排队
-
多线程
int main() { // 1. 监听 fd int lfd = socket(); }; // 2. 绑定 bind(); // 3. 监听 listen(); // 4. 初始化 epoll() int epfd = epoll_create(x); // 5. epooll 添加检测节点 -> lfd epoll_ctl(epfd, epoll_ctl_add, lfd, &ev); while (1) { int num = epoll_wait(epfd, evs, 1024, NULL); for (int i = 0; i < num; i++) { if (curfd == lfd) { pthread_create(&tid, NULL, acceptConn, &epfd); // accept(); } else { ... // read(); // write(); } } } }
-
线程池
-
多个线程的一个集合,可以回收用完的线程
- 线程池的个数取决于业务逻辑
-
密集型业务逻辑:需要大量 cpu 时间进行数据处理
- 线程个数 == 电脑核心数
-
进行 io 操作
- 线程个数 == 两倍 cpu 核心数
-
- 线程池的个数取决于业务逻辑
-
不需要重复频繁地创建销毁线程
-
设计思路
1. 需要两个角色 - 管理者 -> 1 个 - 工作的线程 -> N 个 2. 管理者 - 不工作(不处理业务逻辑,监测工作的线程的状态,管理线程的个数) - 假设工作线程不够用了,动态创建新的线程 - 假设工作的线程太多了,销毁一部分工作的线程 - 动态监测工作的线程的状态 3. 工作的线程 - 处理业务逻辑 4. 需要一个任务队列 - 存储任务 -> 唤醒阻塞线程 -> 条件变量 - 工作的线程处理任务队列中的任务 - 没有任务 -> 阻塞
-
-
-
-
-
-
客户端
// 创建TcpSocket对象 == 一个连接,这个对象就可以和服务器通信了,多个连接需要创建多个这样的对象 class TcpSocket { public: TcpSocket() { m_connfd = socket(af_inet, sock_stream, 0); } TcpSocket(int fd) { m_connfd = fd; // 传递进行的fd是可以直接通信的文件描述符,不需要连接操作 } ~TcpSocket(); /* 客户端 连接服务器 */ int conectToHost(string ip, unsigned short port, int connecttime) { connect(m_connfd, &serverAddress, &len); } /* 客户端 关闭和服务端的连接 */ int disConnect(); /* 客户端 发送报文 */ int sendMsg(string sendMsg, int sendtime = 10000) { senf(m_connfd, data, datalen, 0); } /* 客户端 接受报文 */ string recvMsg(int timeout) { recv(m_connfd, buffer, size, 0); return buffer; } private: int m_connfd; };
-
服务器
// 思想: 服务端不负责通信,只负责监听,如果通信使用客户端类 class TcpServer { public: // 初始化监听的套接字: 创建,绑定,监听 TcpServer(); ~TcpServer(); // 在这里边关闭监听的fd TcpSocket* acceptConn(int timeout = 999999) { int fd = accept(m_lfd, &address, &len); // 通信fd -> 类 TcpSocket* tcp = new TcpSocket(fd); if (tcp != nullptr) { return tcp; } return nullptr; } private: int m_lfd; // 监听的fd };
// 使用 void * callback(void* arg) { TcpSocket * tcp = (TcpSocket *) arg; tcp->sendMsg(); tcp->recvMsg(); tcp->disConnect(); delete tcp; } int main() { TcpServer * server = new Tcpserver; while (1) { TcpSocket * tcp = server->acceptConn(); // 创建子进程 -> 通信 pthread_create(&tid, NULL, callback, arg) } delete server; return 0; }
// 客户端程序 int main() { TcpSocket * tcp = new TcpSocket; tcp->ConnectToHost(ip, port, timeout); tcp->sendMsg(); tcp->recvMsg(); tcp->disConnect(); delete tcp; }
2 套接字超时
套接字通信过程中的默认的阻塞函数
等待并接受客户端连接
通信、接受数据、发送数据、连接服务器时
设置超时处理的原因:不想让进程(线程)一直在对应为止阻塞
超时处理的思路:
- 定时器
sleep(10)
- 以上两种不可用,在指定时间内阻塞函数满足条件直接解除阻塞,以上两种不满足要求
- IO 多路转接函数
- 这些函数最后一个参数是设置阻塞的时长,如果有
fd
发生变化,函数直接返回- 帮助委托内核检测
fd
状态:读写异常
2.1 accept 超时
// 等待并接受客户端连接
// 如果没有客户端连接,一直阻塞
// 检测 accept 函数的 fd 读缓冲区就可以了
int accept(int sockfd, struct sockaddr * addr, socklen_t * addrlen);
// 使用select 函数检测状态
int select(int nfds, fd_set * readfds,
fd_set * writefds,
fd_set * exceptfds,
struct timeval * timeout);
// 通信的fd放到 fd_set 中检测
if (ret == 0)
{
// 超时
}
else if (ret == 1)
{
// 有新连接
accept(); // 绝对不阻塞
}
else
{
// error, -1
}
读超时、写超时略
2.2 connect 超时
// 连接服务器 -> 处于连接过程中,函数不返回 -> 程序阻塞在这个函数上,可通过返回值判断是不是连接成功了
// 返回值 0,成功,-1,失败
// 该函数默认有一个超时处理:75s 或 175s
- 设置 connect 函数操作的文件描述符为非阻塞
- 使用 select 检测
- 设置 connect 函数操作的文件描述符为阻塞 -> 状态还原
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
// 与上述同理
2.3 tcp 粘包问题
造成原因:
- 发送的时候,内核进行了优化,数据达到一定量发一次
- 网络环境不好,有延迟
- 接收方频率低,一次性读到了多条客户端发送的数据
解决方案:
- 发送的时候强制缓存区发送数据
- 在发送数据的时候添加包头
- 包头:一块内存,存储了当前消息的属性信息
- 属于谁:char[12]
- 有多大:int
- …
3 进程间通信:共享内存
-
使用流程
1. 向内核申请一块内存 -> 指定大小 2. 如果有两个进程需要进行通信,可以使用共享内存通信,先创建两个进程 3. 进程 A 和进程 B 分别和共享内存进程关联 - 拿到共享内存的地址 -> 首地址 4. 两个进程可以公国这个首地址对共享内存进行读(写)操作 5. 如果这个进程不再使用这块共享内存,需要和共享内存断开连接 - 进程退出对共享内存没有任何影响 6. 当不再使用共享内存的时候,需要将共享内存销毁
-
共享内存头文件
#include <sys/ipc.h> #include <sys/shm.h>
-
共享内存操作函数
-
创建或打开一块共享内存区
// 创建共享内存 // 共享内存已经存在 // 可以创建多块共享内存 int shmget(key_t key, size_t size, int sh mhflg); key:通过 key 记录共享内存在内核中的位置,为一个大于零的整数,等于 0 不行,随便指定一个就可以 size:创建共享内存的时候指定共享内存的大小。如果已经创建,设为 0 shmflg:创建共享内存的时候使用,指定打开文件的方式 返回值: 成功:创建(打开)成功,得到一个整形数 失败:-1 // 应用 // 1. 创建共享内存 int shmid = shmget(100, 4096, IPC_CREAT | 0664); int shmid = shmget(200, 4096, IPC_CREAT | 0664); // 2. open share memory int shmid = shmget(100, 0, 0)
-
将当前进程与共享内存关联
void * shmat(int shmid, const void * shmaddr, int shmflg); 参数: - shmid:通过这个参数访问共享内存,shmget() 的返回值 - shmaddr:指定共享内存在内核中的位置,指定为空,委托内核寻找 - shmflg:关联成功后对内存的操作权限 - SHM_RDONLY:只读 - 0:读写 成功: 内存的地址 失败: (void *)-1 // 函数调用: shmat(shmid, NULL, 0); // write on shm memcpy(ptr, "xxx", len); printf("%s", (char*)ptr);
-
depart shm
int shmdt(const void * shmaddr); arg: address of shm return: suc: 0 fai: -1
-
control shm
int shmctl(int shmid, int com, struct shmid_ds * buf); arg: -shimd: reuturn value of shmget - cmd: operation to shm - IPC_STAT: get status of cmd - IPC_SET: set shm - IPC_RMID: mark shm to be destroyed - buf: as a struct can describe the status of shm reutrn value: succ: 0 fail: -1 // demo: destroy shm shmctl(shmid, IPC_RMID, nullptr);
demo: communication of processes
read:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <sys/ipc.h> #include <sys/shm.h> int main() { // 1. open shm int shmid = shmget(100, 0, 0); // 2. relate shm with cur process void* ptr = shmat(shmid, NULL, 0); // 3. write on shm printf("content: %s\n", (char *)ptr); printf("press any key to continue ...\n"); getchar(); // 4. release relevance shmdt(ptr); // 5. destory shm shmctl(shmid, IPC_RMID, NULL); return 0; }
write:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <sys/ipc.h> #include <sys/shm.h> int main() { // 1. create shm int shmid = shmget(100, 4096, IPC_CREAT|0664); // 2. relate shm with cur process void* ptr = shmat(shmid, NULL, 0); // 3. write on shm const char * tmp = "this is a damo for shm ..."; memcpy(ptr, tmp, strlen(tmp) + 1); printf("press any key to continue ...\n"); getchar(); // 4. release relevance shmdt(ptr); // 5. destory shm shmctl(shmid, IPC_RMID, NULL); return 0; }
-
-
question:
- question 1: how dose the operation system know the number of process that relate with a shm?
- shm keep a struct
struct shmid_ds
, that has a membershm_nattch
. shm_nattch
records the number of being related process.
- shm keep a struct
- question 2: whether can call
shmctl
to destroyed a shm more than once?- yes
- because
shmctl
function is just marking a shm to be destroy, not destroying it directly. - when it was destroyed truly?
- when
shm_nattch == 0
- when
the commands to observe the shm:
ipcs -m
-
ftok
function prototypekey_t ftok(const char *pathname, int proj_id); - pathname: the absolute path - proj_id: only use the top 1 byte - value range: 1 - 255 key_t t = fotk("/home/", 'a');
-
difference of shm and mmp
. . .
4 the API encapsulation of shm
class BashShm
{
public:
BashShm(int key); // open a shm according a given key.
BashShm(string path); // **with no size**, according to string path -> int key, open a shm.
BashShm(int key, int size); // create a shm by the given key and size.
BashShm(string path, int size); // string -> key, according to size.
void * mapshm()
{
m_ptr = shmat(shmid);
return shmat();
}
int unmapshm()
{
return shmdt(m_ptr);
}
int delshm()
{
return shmctl(shmid);
}
private:
int m_shmid; // reuturn value of `shmat()`
void * m_ptr
}