NetMQ学习一

2017-07-09 19:19:43来源:CSDN作者:hellokobe人点击

1:zeromq是什么

NetMQ (ZeroMQ to .Net),ZMQ号称史上最快中间件。
它对socket通信进行了封装,使得我们不需要写socket函数调用就能完成复杂的网络通信。
它跟Socket的区别是:普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M的关系,人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单。
它是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。和一般意义上的消息队列产品不同的是,它没有消息队列服务器,而更像是一个网络通信库。从网络通信的角度看,它处于会话层之上,应用层之下,属于传输层。

2:zeromq的消息模型

zeromq将消息通信分为4种模型,分别是一对一结对模型(Exclusive-Pair)、请求回应模型(Request-Reply)、发布订阅模型(Publish-Subscribe)、推拉模型(Push-Pull)。这4种模型总结出了通用的网络通信模型,在实际中可以根据应用需要,组合其中的2种或多种模型来形成自己的解决方案。

2.1 一对一结对模型 Exclusive-Pair

最简单的1:1消息通信模型,用来支持传统的 TCP socket模型,主要用于进程内部线程间通信。可以认为是一个TCP Connection,但是TCP Server只能接受一个连接。采用了lock free实现,速度很快。数据可以双向流动,这点不同于后面的请求响应模型。(不推荐使用,没有例子)

2.2 请求回应模型 Request-Reply

由请求端发起请求,然后等待回应端应答。一个请求必须对应一个回应,从请求端的角度来看是发-收配对,从回应端的角度是收-发对。跟一对一结对模型的区别在于请求端可以是1~N个。
请求端和回应端都可以是1:N的模型。通常把1认为是server,N认为是ClientZeroMQ可以很好的支持路由功能(实现路由功能的组件叫作Device),把1:N扩展为N:M(只需要加入若干路由节点)。从这个模型看,更底层的端点地址是对上层隐藏的。每个请求都隐含有回应地址,而应用则不关心它。通常把该模型主要用于远程调用及任务分配等。

2.3 发布订阅模型 Publisher-Subscriber

发布端单向分发数据,且不关心是否把全部信息发送给订阅端。如果发布端开始发布信息时,订阅端尚未连接上来,则这些信息会被直接丢弃。订阅端未连接导致信息丢失的问题,可以通过与请求回应模型组合来解决。订阅端只负责接收,而不能反馈,且在订阅端消费速度慢于发布端的情况下,会在订阅端堆积数据。该模型主要用于数据分发。天气预报、微博明星粉丝可以应用这种经典模型。 

2.4 推拉模型 Push-Pull

Server端作为Push端,而Client端作为Pull端,如果有多个Client端同时连接到Server端,则Server端会在内部做一个负载均衡,采用平均分配的算法,将所有消息均衡发布到Client端上。与发布订阅模型相比,推拉模型在没有消费者的情况下,发布的消息不会被消耗掉;在消费者能力不够的情况下,能够提供多消费者并行消费解决方案。该模型主要用于多任务并行。

3:zeromq的优势

  • TCP:ZeroMQ基于消息,消息模式,而非字节流。
  • XMPP:ZeroMQ更简单、快速、更底层。Jabber可建在ZeroMQ之上。
  • AMQP:完成相同的工作,ZeroMQ要快100倍,而且不需要代理(规范更简洁——少278页)
  • IPC:ZeroMQ可以跨多个主机盒,而非单台机器。
  • CORBA:ZeroMQ不会将复杂到恐怖的消息格式强加于你。
  • RPC:ZeroMQ完全是异步的,你可以随时增加/删除参与者。
  • RFC 1149:ZeroMQ比它快多了!
  • 29west LBM:ZeroMQ是自由软件!
  • IBM低延迟:ZeroMQ是自由软件!
  • Tibco:仍然是自由软件!
4:实现代码
对NetMQ进行封装,使用<package id="AsyncIO" version="0.1.26.0" targetFramework="net452" />
<package id="NetMQ" version="4.0.0.1" targetFramework="net452" />
using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;namespace MessageQueueUseNetMQ{    public interface IMyPublisher:IDisposable    {        void Publish(string topicName, string data);    }}
using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;namespace MessageQueueUseNetMQ{    public interface IMySubscriber:IDisposable    {        event Action<string,string> Notify;        void RegisterSubscriber(List<string> topics);        void RegisterSubscriberAll();        void RemoveSubscriberAll();    }}

using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;using NetMQ;using NetMQ.Sockets;namespace MessageQueueUseNetMQ.Implement{    public class MyPublisher:IMyPublisher    {        private object _lockObj=new object();        private PublisherSocket _publisher = null;        public MyPublisher(string endpoint)        {            _publisher=new PublisherSocket();            _publisher.Options.SendHighWatermark = 1000;            _publisher.Bind(endpoint);        }        public void Dispose()        {            lock (_lockObj)            {                _publisher.Close();                _publisher.Dispose();            }        }        public void Publish(string topicName, string data)        {            lock (_lockObj)            {                _publisher.SendMoreFrame(topicName).SendFrame(data);            }        }    }}
using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;using NetMQ;using NetMQ.Sockets;namespace MessageQueueUseNetMQ.Implement{    public class MySubscriber : IMySubscriber    {        private SubscriberSocket _subSocket = null;        private string _endpoint = null;        public MySubscriber(string endpoint)        {            _subSocket = new SubscriberSocket();            _endpoint = endpoint;        }        public void Dispose()        {            InnerStop();        }        public event Action<string, string> Notify = delegate { };        public void RegisterSubscriber(List<string> topics)        {            InnerRegisterSubscriber(topics);        }        public void RegisterSubscriberAll()        {            InnerRegisterSubscriber();        }        public void RemoveSubscriberAll()        {            InnerStop();        }        private void InnerRegisterSubscriber(List<string> topics=null)        {            InnerStop();            _subSocket = new SubscriberSocket();            _subSocket.Options.ReceiveHighWatermark = 1000;            _subSocket.Connect(_endpoint);            if (topics == null)            {                _subSocket.SubscribeToAnyTopic();            }            else            {                topics.ForEach(item=>_subSocket.Subscribe(item));            }            Task.Factory.StartNew(() =>            {                while (true)                {                    string messageTopicReceived = _subSocket.ReceiveFrameString();                    string messageReceived = _subSocket.ReceiveFrameString();                    Notify(messageTopicReceived, messageReceived);                }            });        }        private void InnerStop()        {            _subSocket.Close();        }    }}
测试代码
publisher
using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;using System.Windows;using System.Windows.Controls;using System.Windows.Data;using System.Windows.Documents;using System.Windows.Input;using System.Windows.Media;using System.Windows.Media.Imaging;using System.Windows.Navigation;using System.Windows.Shapes;using MessageQueueUseNetMQ;using MessageQueueUseNetMQ.Implement;namespace publisher{    /// <summary>    /// MainWindow.xaml 的交互逻辑    /// </summary>    public partial class MainWindow : Window    {        private IMyPublisher _myPublisher = null;        public MainWindow()        {            InitializeComponent();            _myPublisher=new MyPublisher(IpAddressCommonStr.Publisher_IP_Address);        }        private void button_Click(object sender, RoutedEventArgs e)        {            string record = String.Format("{0}/t{1}", System.DateTime.Now, this.textBox.Text.Trim());            _myPublisher.Publish("publisher",record);            this.listBox.Items.Add(record);        }    }}
subsriber
using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;using System.Windows;using System.Windows.Controls;using System.Windows.Data;using System.Windows.Documents;using System.Windows.Input;using System.Windows.Media;using System.Windows.Media.Imaging;using System.Windows.Navigation;using System.Windows.Shapes;using MessageQueueUseNetMQ;using MessageQueueUseNetMQ.Implement;namespace subscriber{    /// <summary>    /// MainWindow.xaml 的交互逻辑    /// </summary>    public partial class MainWindow : Window    {        private IMySubscriber _mySubscriber = null;        public MainWindow()        {            InitializeComponent();            _mySubscriber=new MySubscriber(IpAddressCommonStr.Subscriber_IP_Address);            _mySubscriber.RegisterSubscriberAll();            _mySubscriber.Notify += OnmySubscriber_Notify;                    }        private void OnmySubscriber_Notify(string arg1, string arg2)        {            this.listBox.Dispatcher.BeginInvoke((Action) (() => this.listBox.Items.Add(arg2)));        }    }}
测试效果








最新文章

123

最新摄影

闪念基因

微信扫一扫

第七城市微信公众平台