文章目录
模拟实现
启动结果如下: ![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/71841546ad8043f1bd51e4408df791de.png)![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/f6e3e72ff9a4483c978ec48e24f075c2.png)
模拟实现
模拟消费者
package com. example. demo. demo ;
import com. example. demo. common. Consumer ;
import com. example. demo. common. MqException ;
import com. example. demo. mqclient. Channel ;
import com. example. demo. mqclient. Connection ;
import com. example. demo. mqclient. ConnectionFactory ;
import com. example. demo. mqsever. core. BasicProperties ;
import com. example. demo. mqsever. core. ExchangeType ;
import java. io. IOException ;
public class DemoConsumer {
public static void main ( String [ ] args) throws IOException , MqException , InterruptedException {
System . out. println ( "启动消费者!" ) ;
ConnectionFactory factory = new ConnectionFactory ( ) ;
factory. setHost ( "127.0.0.1" ) ;
factory. setPort ( 9090 ) ;
Connection connection = factory. newConnection ( ) ;
Channel channel = connection. createChannel ( ) ;
channel. exchangeDeclare ( "testExchange" , ExchangeType . DIRECT , true , false , null ) ;
channel. queueDeclare ( "testQueue" , true , false , false , null ) ;
channel. basicConsume ( "testQueue" , true , new Consumer ( ) {
@Override
public void handleDelivery ( String consumerTag, BasicProperties basicProperties, byte [ ] body) throws MqException , IOException {
System . out. println ( "[消费数据] 开始!" ) ;
System . out. println ( "consumerTag=" + consumerTag) ;
System . out. println ( "basicProperties=" + basicProperties) ;
String bodyString = new String ( body, 0 , body. length) ;
System . out. println ( "body=" + bodyString) ;
System . out. println ( "[消费数据] 结束!" ) ;
}
} ) ;
while ( true ) {
Thread . sleep ( 500 ) ;
}
}
}
模拟生产者
package com. example. demo. demo ;
import com. example. demo. mqclient. Channel ;
import com. example. demo. mqclient. Connection ;
import com. example. demo. mqclient. ConnectionFactory ;
import com. example. demo. mqsever. core. ExchangeType ;
import java. io. IOException ;
public class DemoProducer {
public static void main ( String [ ] args) throws IOException , InterruptedException {
System . out. println ( "启动生产者" ) ;
ConnectionFactory factory = new ConnectionFactory ( ) ;
factory. setHost ( "127.0.0.1" ) ;
factory. setPort ( 9090 ) ;
Connection connection = factory. newConnection ( ) ;
Channel channel = connection. createChannel ( ) ;
channel. exchangeDeclare ( "testExchange" , ExchangeType . DIRECT , true , false , null ) ;
channel. queueDeclare ( "testQueue" , true , false , false , null ) ;
byte [ ] body = "hello" . getBytes ( ) ;
boolean ok = channel. basicPublish ( "testExchange" , "testQueue" , null , body) ;
System . out. println ( "消息投递完成! ok=" + ok) ;
Thread . sleep ( 500 ) ;
channel. close ( ) ;
connection. close ( ) ;
}
}
效果展示
启动结果如下: