目录
一、管道通信
二、共享内存
三、消息队列
一、管道通信
管道是由操作系统维护的一个文件,管道通信的本质就是将管道文件作为临界资源,实现不同进程之间的数据读写,但是管道只允许父子进程或者兄弟进程之间的通信。
管道文件本身是全双工机制的,但是在管道通信中,它的工作模式是半双工的,在管道通信之前,读端会关闭对管道文件的写通道,写端会关闭对管道文件的读通道。
当读端在读数据时,若写端关闭,读端会将管道文件中剩余数据读取结束之后再关闭;当写端在写数据时,若读端关闭,写端会被操作系统直接关闭。
那么管道通道如何实现数据交互呢?方法是创建两个管道进行通信。
管道通信方式分为匿名管道和命名管道,由 pipe() 系统调用创建并打开,命名管道由 mkfifo() 系统调用创建并由 open() 打开,他们的通信本质是一样的。
// comm.hpp 头文件
#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <fcntl.h>
#include <fstream>
#define NAMED_PIPE "./named_pipe"
bool create_fifo(const std::string& path) {
umask(0);
int n = mkfifo(path.c_str(), 0666);
if (n == 0) {
return true;
}
else {
std::cout << "errno: " << errno << "err string: " << strerror(errno) << std::endl;
return false;
}
}
void remove_fifo(const std::string& path) {
int n = unlink(path.c_str());
// unlink 函数功能是删除文件,但会在判断此文件状态之后再删除
// 若有进程打开此文件,则不会立即删除,等到无进程打开该文件时才会删除
// 若此文件有多个链接,则进行连接数减一操作
// 执行成功返回 0,失败返回 -1
assert(n == 0);
}
// Server
#include "comm.hpp"
int main() {
std::cout << "server begin" << std::endl;
int rfd = open(NAMED_PIPE, O_RDONLY);
if (rfd < 0) {
bool r = create_fifo(NAMED_PIPE);
assert(r);
rfd = open(NAMED_PIPE, O_RDONLY);
}
// read
char buffer[1024];
while (true) {
ssize_t s = read(rfd, buffer, sizeof(buffer) - 1);
if (s > 0) {
buffer[s] = 0;
std::cout << "client->server# " << buffer << std::endl;
}
else if (s == 0) {
std::cout << "client quit, me too" << std::endl;
break;
}
else {
std::cout << "err string: " << strerror(errno) << std::endl;
break;
}
}
close(rfd);
std::cout << "server end" << std::endl;
remove_fifo(NAMED_PIPE);
return 0;
}
// Client
#include "comm.hpp"
int main() {
std::cout << "client begin" << std::endl;
int wfd = open(NAMED_PIPE, O_WRONLY, 0666);
if (wfd < 0) {
exit(1);
}
// write
char buffer[1024];
while (true) {
std::cout << "please say# ";
fgets(buffer, sizeof(buffer), stdin);
if (strlen(buffer) > 0) {
buffer[strlen(buffer) - 1] = 0;
}
if (strcmp(buffer, "quit") == 0) {
break;
}
ssize_t n = write(wfd, buffer, strlen(buffer));
assert(n == strlen(buffer));
}
close(wfd);
std::cout << "client end" << std::endl;
return 0;
}
二、共享内存
共享内存是由操作系统维护的一块地址空间,它的工作机制是全双工。
// comm.hpp
#pragma once
#include <iostream>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <cerrno>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#define NAME_PATH "."
#define PROJ_ID 0X66
#define MAX_SIZE 4096
// 创建共享内存时,通过 ftok() 函数获得 key 值保证共享内存在系统中的唯一性
key_t get_key() {
key_t k = ftok(NAME_PATH, PROJ_ID);
if (k < 0) {
std::cout << errno << ": " << strerror(errno) << std::endl;
exit(1);
}
return k;
}
int get_shm_helper(key_t k, int flags) {
// shmget : 创建共享内存
int shmID = shmget(k, MAX_SIZE, flags);
if (shmID < 0) {
std::cout << errno << ": " << strerror(errno) << std::endl;
exit(1);
}
return shmID;
}
int get_shm(key_t k) {
return get_shm_helper(k, IPC_CREAT);
}
int create_shm(key_t k) {
return get_shm_helper(k, IPC_CREAT | IPC_EXCL | 0600);
// IPC_EXCL 不能单独使用,若共享内存不存在则创建,存在则报错
}
// 删除共享内存
void del_shm(int shmID) {
// shmctl : 控制共享内存
if (shmctl(shmID, IPC_RMID, NULL) == -1) {
std::cout << errno << ": " << strerror(errno) << std::endl;
exit(1);
}
}
// 关联共享内存
void* attach_shm(int shmID) {
void* start = shmat(shmID, NULL, 0);
if ((long long)start == -1) {
std::cout << errno << ": " << strerror(errno) << std::endl;
exit(1);
}
return start;
}
// 取消关联
void del_attach_shm(void* start) {
if (shmdt(start) == -1) {
std::cout << errno << ": " << strerror(errno) << std::endl;
exit(1);
}
}
// Server
#include "comm.hpp"
int main() {
key_t k = get_key();
printf("0x%x\n", k);
int shmID = create_shm(k); // 创建共享内存
printf("shmID = %d\n", shmID);
// 内核为每个共享内存维护一个结构体:struct shmid_ds
// struct shmid_ds
// {
// struct ipc_perm shm_perm; /* Ownership and permissions */
// size_t shm_segsz; /* Size of segment (bytes) */
// time_t shm_atime; /* Last attach time */
// time_t shm_dtime; /* Last detach time */
// time_t shm_ctime; /* Last change time */
// pid_t shm_cpid; /* PID of creator */
// pid_t shm_lpid; /* PID of last shmat(2)/shmdt(2) */
// shmatt_t shm_nattch; /* No. of current attaches */
// };
struct shmid_ds ds;
shmctl(shmID, IPC_STAT, &ds);
printf("获取属性: size(%d) pid(%d) myself(%d) key(0x%x)\n",
ds.shm_segsz, ds.shm_cpid, getpid(), ds.shm_perm.__key);
sleep(2);
char* start = (char*)attach_shm(shmID); // 关联共享内存
printf("attach success, address start: %p\n", start);
// 从共享内存中读取数据
while (true) {
if (*start != '\0') {
printf("clent say# %s\n", start);
snprintf(start, MAX_SIZE, "\0"); // 打印消息后清空共享内存
}
}
sleep(2);
del_attach_shm(start);
return 0;
}
// Client
#include "comm.hpp"
// 查看ipc资源
// 查看:ipcs -m
// 删除:ipcrm -m shmID
int main() {
key_t k = get_key();
printf("0x%x\n", k);
int shmID = get_shm(k);
printf("%d\n", shmID);
sleep(2);
char* start = (char*)attach_shm(shmID); // 关联共享内存
printf("attch success, address start: %p\n", start);
// 将数据写入共享内存
int count = 1;
while (true) {
printf("[please input]# ");
char message[1024] = {0};
fgets(message, sizeof(message) - 1, stdin);
message[strlen(message) - 1] = 0;
snprintf(start, MAX_SIZE, "%s [pid(%d)][messageID: %d]\n", message, getpid(), count++);
}
sleep(2);
del_attach_shm(start);
return 0;
}
优点:相比于管道而言,共享内存能够支持任意进程之间的通信,而且访问数据的速度也比管道要快。这得益于通信直接访问内存,而管道则需要先通过操作系统访问文件再获得内存数据。
缺点:用于进程间通信时,共享内存本身不支持阻塞等待操作。这是因为当读端读取数据后,数据并不会在内存中清空。因此读端和写端可以同时访问内存空间,即全双工。因为共享内存本质是进程直接访问内存,无法主动停止读取,如果读端不加以限制,那么将持续读取数据。同理,写端也会持续写入数据。换句话说,共享内存本身没有访问控制。
三、消息队列
消息队列是由操作系统维护的一个数据结构,遵循队列的FIFO原则,半双工机制。
通过消息队列通信的两个进程也分为读端和写端,读端只负责从消息队列中拿数据,写端只负责向消息队列中写数据。
消息队列的优点是异步性和系统解耦,异步性是指发送消息的进程不需等待接收消息的进程的响应,可以继续执行自己的任务,系统解耦是指发送方和接收方都不关心对方进程的状态,只关注消息的发送和接收。
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include<string.h>
#include<unistd.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<sys/shm.h>
#include<fcntl.h>
#include<semaphore.h>
struct msg_buffer {
long type; //1表示sender发送的正常数据,2表示sender1发送的结束信号,3表示sender2发送的结束信号
char text[100];
};
void *sender1_thread() {
struct msg_buffer msg;
char buf[100];
int msqid = msgget((key_t) 2023, 0666|IPC_CREAT);
if(msqid == -1) {
printf("消息队列创建失败!");
exit(-1);
}
sem_t *mutex = sem_open("mutex", O_CREAT | O_RDWR, 0666, 0);
sem_t *sender1 = sem_open("sender1", O_CREAT | O_RDWR, 0666, 0);
sem_t *receive1 = sem_open("receive1", O_CREAT | O_RDWR, 0666, 0);
sem_t *next_input = sem_open("next_input", O_CREAT | O_RDWR, 0666, 0);
while(1) {
sem_wait(mutex);
sem_wait(next_input);
printf("sender1 input: ");
gets(buf);
msg.type = 1;
if(strcmp(buf, "exit") == 0) {
strcpy(msg.text, "end1");
msgsnd(msqid, (void *)&msg, 100, 0); // 将消息发送到消息队列中
sem_wait(receive1);
msgrcv(msqid, (void *)&msg, 100, 2, 0);
printf("sender1 receive: %s\n\n", msg.text);
sem_post(sender1);
sem_post(mutex);
sleep(1);
return 0;
}
else {
strcpy(msg.text, buf);
msgsnd(msqid, (void *)&msg, 100, 0);
sem_post(mutex);
sleep(1);
}
}
}
void *sender2_thread() {
struct msg_buffer msg;
char buf[100];
int msgid = msgget((key_t) 2023, 0666|IPC_CREAT);
if(msgid == -1) {
printf("消息队列创建失败!");
exit(-1);
}
sem_t *mutex = sem_open("mutex", O_CREAT | O_RDWR, 0666, 0);
sem_t *sender2 = sem_open("sender2", O_CREAT | O_RDWR, 0666, 0);
sem_t *receive2 = sem_open("receive2", O_CREAT | O_RDWR, 0666, 0);
sem_t *next_input = sem_open("next_input", O_CREAT | O_RDWR, 0666, 0);
while(1) {
sem_wait(mutex);
sem_wait(next_input);
printf("sender2 input: ");
gets(buf);
msg.type = 1;
if(strcmp(buf,"exit") == 0) {
strcpy(msg.text, "end2");
msgsnd(msgid, (void *)&msg, 100, 0);
sem_wait(receive2);
msgrcv(msgid, (void *)&msg, 100, 3, 0);
printf("sender2 receive: %s\n\n", msg.text);
sem_post(sender2);
sem_post(mutex);
sleep(1);
return 0;
}
else {
strcpy(msg.text, buf);
msgsnd(msgid, (void *)&msg, 100, 0);
sem_post(mutex);
sleep(1);
}
}
}
void *receiver_thread() {
int finish1 = 0;
int finish2 = 0;
struct msg_buffer msg;
char buf[100];
sem_t *sender1 = sem_open("sender1", O_CREAT | O_RDWR, 0666, 0);
sem_t *receive1 = sem_open("receive1", O_CREAT | O_RDWR, 0666, 0);
sem_t *sender2 = sem_open("sender2", O_CREAT | O_RDWR, 0666, 0);
sem_t *receive2 = sem_open("receive2", O_CREAT | O_RDWR, 0666, 0);
sem_t *next_input = sem_open("next_input", O_CREAT | O_RDWR, 0666, 0);
int msqid = msgget((key_t) 2023, 0666 | IPC_CREAT);
if( msqid == -1) {
printf("create failed");
exit(-1);
}
int n;
while(1){
if(msgrcv(msqid, (void *)&msg, 100, 0, 0) > 0) {
printf("Receiver receive: %s\n", msg.text);
sem_post(next_input);
if(strcmp(msg.text, "end1") == 0) {
msg.type = 2;
strcpy(msg.text, "over1");
n = msgsnd(msqid, (void *)&msg, 100, 0);
if(n != 0) {
sem_post(receive1);
sem_wait(sender1);
}
finish1 = 1;
}
else if(strcmp(msg.text, "end2") == 0) {
msg.type = 3;
strcpy(msg.text, "over2");
n = msgsnd(msqid, (void *)&msg, 100, 0); // 将详细发送到消息队列中
if(n != 0) {
sem_post(receive2);
sem_wait(sender2);
}
finish2 = 1;
}
}
if(finish1 == 1 && finish2 == 1) {
msgctl(msqid, IPC_RMID, 0); // 删除消息队列
exit(0);
}
}
}
int main() {
// msgget函数用于创建一个新的消息队列或者获取一个已经存在的消息队列的标识符
int msqid = msgget((key_t) 2023, 0666 | IPC_CREAT);
msgctl(msqid, IPC_RMID, 0); // 删除消息队列,释放相应的系统资源
sem_unlink("mutex");
sem_unlink("sender1");
sem_unlink("sender2");
sem_unlink("receiver1");
sem_unlink("receiver2");
sem_unlink("next_input");
sem_t *mutex = sem_open("mutex", O_CREAT | O_RDWR, 0666, 0);
sem_t *sender1 = sem_open("sender1", O_CREAT | O_RDWR, 0666, 0);
sem_t *receive1 = sem_open("receive1", O_CREAT | O_RDWR, 0666, 0);
sem_t *sender2 = sem_open("sender2", O_CREAT | O_RDWR, 0666, 0);
sem_t *receive2 = sem_open("receive2", O_CREAT | O_RDWR, 0666, 0);
sem_t *next_input = sem_open("next_input", O_CREAT | O_RDWR, 0666, 0);
pthread_t p1, p2, p3;
if(pthread_create(&p1, NULL, sender1_thread, NULL) != 0) {
printf("Sender1线程创建失败!");
exit(-1);
}
if(pthread_create(&p2, NULL, sender2_thread, NULL) != 0) {
printf("Sender2线程创建失败!");
exit(-1);
}
if(pthread_create(&p3, NULL, receiver_thread, NULL) != 0) {
printf("receiver线程创建失败!");
exit(-1);
}
sem_post(mutex);
sem_post(next_input);
pthread_join(p1, NULL);
pthread_join(p2, NULL);
pthread_join(p3, NULL);
return 0;
}