博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
在C#中使用消息队列RabbitMQ
阅读量:5860 次
发布时间:2019-06-19

本文共 4988 字,大约阅读时间需要 16 分钟。

1、什么是RabbitMQ。详见 。

    作用就是提高系统的并发性,将一些不需要及时响应客户端且占用较多资源的操作,放入队列,再由另外一个线程,去异步处理这些队列,可极大的提高系统的并发能力。

2、安装

    RabbitMQ服务:。

    (安装完RabbitMQ服务后,会在Windows服务中看到。如果没有Erlang运行环境,在安装过程中会提醒先安装Erlang环境。http://www.erlang.org/downloads)

    .net客户端类库:

3、插件

     RabbitMQ提供了很多好用的插件,最常用的就是web管理工具,启动此插件。

     CMD中运行命令:rabbitmq-plugins enable rabbitmq_management

     注:rabbitmq-plugins 所在路径为:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.0\sbin

     web管理工具的地址是:http://localhost:15672,初始用户名:guest 初始密码:guest

4、配置

    配置文件地址为:C:\Documents and Settings\Administrator\Application Data\RabbitMQ\rabbitmq.config,默认没有rabbit.config文件,需要手工新建(默认会有rabbitmq.config.example 作为参考)。基于安全,做了两个配置,如下:

    

[{rabbit,[{loopback_users, [<<"guest">>]},{tcp_listeners, [{"127.0.0.1", 1234},{"10.121.1.48", 8009}]}]}].

loopback_users:设置只能在与RabbitMq服务同一台机器上访问服务的用户。

tcp_listeners:设置RabbitMQ监听的IP地址与端口。只监听局域网内网iP、修改默认端口,防止被入侵攻击。

设置完后,别忘记了以下操作,否则配置不起作用。

  • 停止RabbitMQ服务;
  • 重新安装服务使配置生效:rabbitmq-service.bat install

        此命令要切换到路径:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.0\sbin

  • 启动RabbitMQ服务;

5、Demo练习。

消息生产者:class Program    {        static void Main(string[] args)        {            try            {                ConnectionFactory factory = new ConnectionFactory();                factory.HostName = Constants.MqHost;                factory.Port = Constants.MqPort;                factory.UserName = Constants.MqUserName;                factory.Password = Constants.MqPwd;                using (IConnection conn = factory.CreateConnection())                {                    using (IModel channel = conn.CreateModel())                    {                        //在MQ上定义一个持久化队列,如果名称相同不会重复创建                        channel.QueueDeclare("MyFirstQueue", true, false, false, null);                        while (true)                        {                            string customStr = Console.ReadLine();                            RequestMsg requestMsg = new RequestMsg();                            requestMsg.Name = string.Format("Name_{0}", customStr);                            requestMsg.Code = string.Format("Code_{0}", customStr);                            string jsonStr = JsonConvert.SerializeObject(requestMsg);                            byte[] bytes = Encoding.UTF8.GetBytes(jsonStr);                                                        //设置消息持久化                            IBasicProperties properties = channel.CreateBasicProperties();                            properties.DeliveryMode = 2;                            channel.BasicPublish("", "MyFirstQueue", properties, bytes);                            //channel.BasicPublish("", "MyFirstQueue", null, bytes);                            Console.WriteLine("消息已发送:" + requestMsg.ToString());                        }                    }                }            }            catch (Exception e1)            {                Console.WriteLine(e1.ToString());            }            Console.ReadLine();        }    }

 

class Program    {        static void Main(string[] args)        {            try            {                ConnectionFactory factory = new ConnectionFactory();                factory.HostName = Constants.MqHost;                factory.Port = Constants.MqPort;                factory.UserName = Constants.MqUserName;                factory.Password = Constants.MqPwd;                using (IConnection conn = factory.CreateConnection())                {                    using (IModel channel = conn.CreateModel())                    {                        //在MQ上定义一个持久化队列,如果名称相同不会重复创建                        channel.QueueDeclare("MyFirstQueue", true, false, false, null);                        //输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息                        channel.BasicQos(0, 1, false);                                                Console.WriteLine("Listening...");                        //在队列上定义一个消费者                        QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);                        //消费队列,并设置应答模式为程序主动应答                        channel.BasicConsume("MyFirstQueue", false, consumer);                        while (true)                        {                            //阻塞函数,获取队列中的消息                            BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();                            byte[] bytes = ea.Body;                            string str = Encoding.UTF8.GetString(bytes);                            RequestMsg msg = JsonConvert.DeserializeObject
(str); Console.WriteLine("HandleMsg:" + msg.ToString()); //回复确认 channel.BasicAck(ea.DeliveryTag, false); } } } } catch (Exception e1) { Console.WriteLine(e1.ToString()); } Console.ReadLine(); } }

 

原文出自于:http://www.cnblogs.com/qy1141/p/4054135.html

转载于:https://www.cnblogs.com/zwyAndDong/p/8477471.html

你可能感兴趣的文章
DBA很忙—MySQL的性能优化及自动化运维实践
查看>>
通过调用笑话列表API获取笑话列表
查看>>
POI事件模式指北(三)- 读取Excel实战
查看>>
Java —— jdk环境变量搭建
查看>>
前端如何实现图片懒加载(lazyload) 提高用户体验
查看>>
puppet连载六:创建测试模块test
查看>>
光宇游戏CTO沈崴:《问道》“2019”年度数字大服为何选择阿里云!
查看>>
Java 工程师成神之路 | 2019正式版
查看>>
用条码标签打印软件批量打印物料标签
查看>>
资本寒冬下的android面经
查看>>
ASP.NET CORE 中用单元测试测试控制器
查看>>
.NET中使用APlayer组件自制播放器
查看>>
Nginx 1.15.10 主线版发布,高性能 Web 服务器
查看>>
android 记一次解决键盘遮挡问题
查看>>
Dubbo 源码分析 - 集群容错之 Directory
查看>>
微服务设计指南
查看>>
搞定JVM垃圾回收就是这么简单
查看>>
Android开发之ViewPager简单使用
查看>>
使用rekit脚手架创建react项目
查看>>
LiveVideoStackCon讲师热身分享 ( 十三 ) —— Intel QSV技术在FFmpeg中的实现与使用
查看>>