Linux进程间通信(IPC)编程实践(十二)Posix消息队列--基本API的

2016-12-02 12:51:48来源:网络收集作者:Worker人点击


 



posix消息队列与system v消息队列的差别:


(1)对posix消息队列的读总是返回最高优先级的最早消息,对system v消息队列的读则可以返回任意指定优先级的消息。
(2)当往一个空队列放置一个消息时,posix消息队列允许产生一个信号或启动一个线程,system v消息队列则不提供类似机制。



队列中的每个消息具有如下属性:


1、一个无符号整数优先级(posix)或一个长整数类型(system v)
2、消息的数据部分长度(可以为0)
3、数据本身(如果长度大于0)


Posix消息队列操作函数如下:



1.创建/获取一个消息队列



mqd_t mq_open(const char *name, int oflag); //专用于打开一个消息队列
mqd_t mq_open(const char *name, int oflag, mode_t mode,
struct mq_attr *attr);





参数:



name: 消息队列名字;



oflag: 与open函数类型,可以是O_RDONLY,O_WRONLY,O_RDWR,还可以按位或上O_CREAT,O_EXCL,O_NONBLOCK.



mode: 如果oflag指定了O_CREAT,需要指定mode参数;



attr: 指定消息队列的属性;





返回值:



成功:返回消息队列文件描述符;



失败:返回-1;



注意-PosixIPC名字限制:



1.必须以”/”开头,并且后面不能还有”/”,形如:/file-name;



2.名字长度不能超过NAME_MAX



3.链接时:Linkwith-lrt(Makefile中使用实时链接库-lrt)



2.关闭一个消息队列



#include
int mq_close(mqd_t mqdes);



返回: 成功时为0,出错时为-1。

功能: 关闭已打开的消息队列。



注意:System V没有此功能函数调用



3.删除一个消息队列



int mq_unlink(const char *name);
/** System V 消息队列
通过msgctl函数, 并将cmd指定为IPC_RMID来实现
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
**/ 返回: 成功时为0,出错时为-1

功能: 从系统中删除消息队列。
对上述三个函数的综合使用:
int main()
{
mqd_t mqid = mq_open("/abc", O_CREAT|O_RDONLY, 0666, NULL);
if (mqid == -1)
err_exit("mq_open error");
cout << "mq_open success" << endl;
mq_close(mqid);
mq_unlink("/abc");
cout << "unlink success" << endl;
}4.获取/设置消息队列属性

#include
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *attr);均返回:成功时为0, 出错时为-1



参数:





newattr:需要设置的属性



oldattr:原来的属性




每个消息队列有四个属性:

struct mq_attr
{
long mq_flags;/* message queue flag : 0, O_NONBLOCK */
long mq_maxmsg; /* max number of messages allowed on queue*/
long mq_msgsize;/* max size of a message (in bytes)*/
long mq_curmsgs;/* number of messages currently on queue */
};
int main(int argc,char **argv)
{
mqd_t mqid = mq_open("/test", O_RDONLY|O_CREAT, 0666, NULL);
if (mqid == -1)
err_exit("mq_open error");
struct mq_attr attr;
if (mq_getattr(mqid, &attr) == -1)
err_exit("mq_getattr error");
cout << "Max messages on queue: " << attr.mq_maxmsg << endl;
cout << "Max message size: " << attr.mq_msgsize << endl;
cout << "current messages: " << attr.mq_curmsgs << endl;
mq_close(mqid);
return 0;
}对比System V:


通过msgctl函数,并将cmd指定为IPC_STAT/IPC_SET来实现



intmsgctl(intmsqid,intcmd,structmsqid_ds*buf);


另外每个消息均有一个优先级,它是一个小于MQ_PRIO_MAX的无符号整数

#define MQ_PRIO_MAX 32768




5.发送消息/读取消息#include
int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);
ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop);



返回:成功时为0,出错为-1



返回:成功时为消息中的字节数,出错为-1

参数: 最后一个是消息的优先级



消息队列的限制:

MQ_OPEN_MAX : 一个进程能够同时拥有的打开着消息队列的最大数目

MQ_PRIO_MAX : 任意消息的最大优先级值加1

/** 示例: 向消息队列中发送消息, prio需要从命令行参数中读取 **/
struct Student
{
char name[36];
int age;
};
int main(int argc,char **argv)
{
if (argc != 2)
err_quit("./send ");
mqd_t mqid = mq_open("/test", O_WRONLY|O_CREAT, 0666, NULL);
if (mqid == -1)
err_exit("mq_open error");
struct Student stu = {"xiaofang", 23};
unsigned prio = atoi(argv[1]);
if (mq_send(mqid, (const char *)&stu, sizeof(stu), prio) == -1)
err_exit("mq_send error");
mq_close(mqid);
return 0;
}
/** 示例: 从消息队列中获取消息 **/
int main(int argc,char **argv)
{
mqd_t mqid = mq_open("/test", O_RDONLY);
if (mqid == -1)
err_exit("mq_open error");
struct Student buf;
int nrcv;
unsigned prio;
struct mq_attr attr;
if (mq_getattr(mqid, &attr) == -1)
err_exit("mq_getattr error");
if ((nrcv = mq_receive(mqid, (char *)&buf, attr.mq_msgsize, &prio)) == -1)
err_exit("mq_receive error");
cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: "
<< buf.name << ", age: " << buf.age << endl;
mq_close(mqid);
return 0;
}6.建立/删除消息到达通知事件

#include
int mq_notify(mqd_t mqdes, const struct sigevent *notification);返回: 成功时为0,出错时为-1

功能: 给指定队列建立或删除异步事件通知



sigev_notify代表通知的方式:一般常用两种取值:SIGEV_SIGNAL,以信号方式通知;SIGEV_THREAD,以线程方式通知



如果以信号方式通知:则需要设定一下两个参数:



sigev_signo:信号的代码



sigev_value:信号的附加数据(实时信号)



如果以线程方式通知:则需要设定以下两个参数:



sigev_notify_function



sigev_notify_attributes


union sigval
{
int sival_int;/* Integer value */
void *sival_ptr;/* pointer value */
};
struct sigevent
{
int sigev_notify; /* SIGEV_{ NONE, ISGNAL, THREAD} */
int sigev_signo;/* signal number if SIGEV_SIGNAL */
union sigval sigev_value; /* passed to signal handler or thread */
void(*sigev_notify_function)(union sigval);
pthread_attr_t *sigev_notify_attribute;
};


参数sevp:



NULL:表示撤销已注册通知;



非空:表示当消息到达且消息队列当前为空,那么将得到通知;



通知方式:



1.产生一个信号,需要自己绑定



2.创建一个线程,执行指定的函数



注意:这种注册的方式只是在消息队列从空到非空时才产生消息通知事件,而且这种注册方式是一次性的!


**PosixIPC所特有的功能,SystemV没有**/


/**示例: 将下面程序多运行几遍, 尤其是当消息队列”从空->非空”, 多次”从空->非空”, 当消息队列不空时运行该程序时, 观察该程序的状态;
**/
mqd_t mqid;
long size;
void sigHandlerForUSR1(int signo)
{
//将数据的读取转移到对信号SIGUSR1的响应函数中来
struct Student buf;
int nrcv;
unsigned prio;
if ((nrcv = mq_receive(mqid, (char *)&buf, size, &prio)) == -1)
err_exit("mq_receive error");
cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: "
<< buf.name << ", age: " << buf.age << endl;
}
int main(int argc,char **argv)
{
// 安装信号响应函数
if (signal(SIGUSR1, sigHandlerForUSR1) == SIG_ERR)
err_exit("signal error");
mqid = mq_open("/test", O_RDONLY);
if (mqid == -1)
err_exit("mq_open error");
// 获取消息的最大长度
struct mq_attr attr;
if (mq_getattr(mqid, &attr) == -1)
err_exit("mq_getattr error");
size = attr.mq_msgsize;
// 注册消息到达通知事件
struct sigevent event;
event.sigev_notify = SIGEV_SIGNAL;//指定以信号方式通知
event.sigev_signo = SIGUSR1; //指定以SIGUSR1通知
if (mq_notify(mqid, &event) == -1)
err_exit("mq_notify error");
//死循环, 等待信号到来
while (true)
pause();
mq_close(mqid);
return 0;
}
/** 示例:多次注册notify, 这样就能过多次接收消息, 但是还是不能从队列非空的时候进行接收, 将程序改造如下:
**/
mqd_t mqid;
long size;
struct sigevent event;
void sigHandlerForUSR1(int signo)
{
// 注意: 是在消息被读走之前进行注册,
// 不然该程序就感应不到消息队列"从空->非空"的一个过程变化了
if (mq_notify(mqid, &event) == -1)
err_exit("mq_notify error");
//将数据的读取转移到对信号SIGUSR1的响应函数中来
struct Student buf;
int nrcv;
unsigned prio;
if ((nrcv = mq_receive(mqid, (char *)&buf, size, &prio)) == -1)
err_exit("mq_receive error");
cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: "
<< buf.name << ", age: " << buf.age << endl;
}
int main(int argc,char **argv)
{
// 安装信号响应函数
if (signal(SIGUSR1, sigHandlerForUSR1) == SIG_ERR)
err_exit("signal error");
mqid = mq_open("/test", O_RDONLY);
if (mqid == -1)
err_exit("mq_open error");
// 获取消息的最大长度
struct mq_attr attr;
if (mq_getattr(mqid, &attr) == -1)
err_exit("mq_getattr error");
size = attr.mq_msgsize;
// 注册消息到达通知事件
event.sigev_notify = SIGEV_SIGNAL;//指定以信号方式通知
event.sigev_signo = SIGUSR1; //指定以SIGUSR1通知
if (mq_notify(mqid, &event) == -1)
err_exit("mq_notify error");
//死循环, 等待信号到来
while (true)
pause();
mq_close(mqid);
return 0;
}



mq_notify注意点总结:



1.任何时刻只能有一个进程可以被注册为接收某个给定队列的通知;



2.当有一个消息到达某个先前为空的队列,而且已有一个进程被注册为接收该队列的通知时,只有没有任何线程阻塞在该队列的mq_receive调用的前提下,通知才会发出;



3.当通知被发送给它的注册进程时,该进程的注册被撤销.进程必须再次调用mq_notify以重新注册(如果需要的话),但是要注意:重新注册要放在从消息队列读出消息之前而不是之后(如同示例程序);




异步信号安全函数

#include
int sigwait(const sigset_t *set, int *sig);


可以使用sigwait函数代替信号处理程序的信号通知,将信号阻塞到某个函数中,仅仅等待该信号的递交。采用sigwait实现上面的程序如下:



#include
#include
#include
#include
#include
#include
#include
int main(int argc,char *argv[])
{
mqd_tmqd;
intsigno;
void *buff;
ssize_t n;
sigset_tnewmask;
struct mq_attrattr;
struct sigevent sigev;
if(argc != 2)
{
printf("usage :mqnotify ");
exit(0);
}
mqd = mq_open(argv[1],O_RDONLY);
mq_getattr(mqd,&attr);
buff = malloc(attr.mq_msgsize);
sigemptyset(&newmask);
sigaddset(&newmask,SIGUSR1);
sigprocmask(SIG_BLOCK,&newmask,NULL);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
if(mq_notify(mqd,&sigev) == -1)
{
perror("mq_notify error");
exit(-1);
}
for(; ;)
{
sigwait(&newmask,&signo); //阻塞并等待该信号
if(signo == SIGUSR1)
{
mq_notify(mqd,&sigev);
while((n = mq_receive(mqd,buff,attr.mq_msgsize,NULL))>=0)
printf("read %ld bytes/n",(long) n);
if(errno != EAGAIN)
{
perror("mq_receive error");
exit(-1);
}
}
}
eixt(0);
}
启动线程处理消息通知,程序如下:



#include
#include
#include
#include
#include
#include
#include
mqd_tmqd;
struct mq_attrattr;
struct sigevent sigev;
static void notify_thread(union sigval);
int main(int argc,char *argv[])
{
if(argc != 2)
{
printf("usage :mqnotify ");
exit(0);
}
mqd = mq_open(argv[1],O_RDONLY | O_NONBLOCK);
mq_getattr(mqd,&attr);
sigev.sigev_notify = SIGEV_THREAD;
sigev.sigev_value.sival_ptr = NULL;
sigev.sigev_notify_function = notify_thread;
sigev.sigev_notify_attributes = NULL;
if(mq_notify(mqd,&sigev) == -1)
{
perror("mq_notify error");
exit(-1);
}
for(; ;)
{
pause();
}
eixt(0);
}
static void notify_thread(union sigval arg)
{
ssize_t n;
void *buff;
printf("notify_thread started/n");
buff = malloc(attr.mq_msgsize);
mq_notify(mqd,&sigev);
while((n = mq_receive(mqd,buff,attr.mq_msgsize,NULL))>=0)
printf("read %ld bytes/n",(long) n);
if(errno != EAGAIN)
{
perror("mq_receive error");
exit(-1);
}
free(buff);
pthread_exit(NULL);
}



附-查看已经成功创建的Posix消息队列



#其存在与一个虚拟文件系统中,需要将其挂载到系统中才能查看



MountingthemessagequeuefilesystemOnLinux,messagequeuesarecreatedinavirtualfilesystem.



(Otherimplementationsmayalsoprovidesuchafeature,butthedetailsarelikelytodiffer.)This



filesystemcanbemounted(bythesuperuser,注意是使用root用户才能成功)usingthefollowingcommands:



mkdir/dev/mqueue



mount-tmqueuenone/dev/mqueue



还可以使用cat查看该消息队列的状态,rm删除:



cat/dev/mqueue/abc



rmabc



还可umount该文件系统



umount/dev/mqueue


最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台