上一篇文章中,簡單的介紹了一下RabbitMQ的work模型。這篇文章來學習一下RabbitMQ中的發(fā)布訂閱模型。
發(fā)布訂閱模型(Publish/Subscribe):簡單的說就是隊列里面的消息會被多個消費者同時接受到,消費者接收到的信息一致。
發(fā)布訂閱模型適合于做模塊之間的異步通信。

img
適用場景
- 發(fā)送并記錄日志信息
- springcloud的config組件里面通知配置自動更新
- 緩存同步
- 微信訂閱號
演示
生產者
public class Producer {
private static final String EXCHANGE_NAME = "exchange_publish_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
for (int i = 0; i < 100; i++) {
channel.basicPublish(EXCHANGE_NAME, "", null, ("發(fā)布訂閱模型的第 " + i + " 條消息").getBytes());
}
channel.close();
connection.close();
}
}
消費者
public class Consumer {
private static final String QUEUE_NAME = "queue_publish_1";
private static final String EXCHANGE_NAME = "exchange_publish_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("隊列1接收到的消息是:" + new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
public class Consumer2 {
private static final String QUEUE_NAME = "queue_publish_2";
private static final String EXCHANGE_NAME = "exchange_publish_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("隊列2接收到的消息是:" + new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
測試
先啟動2個消費者,再啟動生產者


可以看出來消費者1和消費者2接收到的消息是一模一樣的 ,每個消費者都收到了生產者發(fā)送的消息;
發(fā)布訂閱模型,用到了一個新的東西-交換機,這里也解釋一下相關方法的參數:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map< String, Object > arguments) throws IOException;
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
小結
本文到這里就結束了,介紹了RabbitMQ通信模型中的發(fā)布訂閱模型,適合于做模塊之間的異步通信。