headers = new HashMap<>();
headers.put("name","张三");
headers.put("phone","123456789");
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(headers).build();
channel.basicPublish(HEADERS,"",properties,"hello rabbitmq ".getBytes());
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
System.out.println("服务端启动.");
# 非官方 7 事务
事务几乎无处不在,而现在谈及事务绝不是简单的事务,而是分布式事务.遗憾的是这里的事务跟分布式事务没有必然联系.
这里单纯的谈及rabbitmq的事务.首先说一下,rabbitmq是基于tcp协议的,tcp三次握手四次挥手,这里就涉及到消息的确认
机制.而rabbitmq的事务也是依赖这个确认机制的.再来说一下确认机制,我们在使用rabbitmq或者jms默认都是
有确认机制的,只不过是默认实现,我们可以通过一些ack的参数或接口设置.一般都是默认批量自动ack,
什么时候ack呢?rabbitmq中没有消息过期的概念,只有消息被正常处理了,客户端发送了ack,才会删除.
批量ack,则是在ack到一定数量之后才一块发送ack,减少带宽,但是失败则影响较大.传统的事务,是先
开启事务,进行操作,事务提交,事务回滚,速度将减慢到原来的2倍(经过本地测试,差不多这个数).
rabbitmq提供了一个高级的Publisher Confirm机制,跟传统不太一样,实际上是将事务的提交拆分了,
等所有事务提交完毕,在最终确认.速度介于并接近非事务速度(可能测试用例问题,跟传统tx差不多?!).
当开启publisher confirm时,该信道上会为每一个消息分配一个id,当消息被发送到消费端时,rabbitmq就
会发确认到生产端,消息的发送和确认是异步.
[无事务消息代码](http://dwz.cn/6lD4ad)
static class NoPublisher implements Runnable {
/**
* When an object implementing interface Runnable
is used
* to create a thread, starting the thread causes the object's
* run
method to be called in that separately executing
* thread.
*
* The general contract of the method run
is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
try {
try (Connection connection = factory.newConnection()) {
Channel channel = connection.createChannel();
channel.queueDeclare(NO_TRANSACTION, false, false, false, null);
long start = System.currentTimeMillis();
try {
for (int i = 0; i < MSG_NUM; i++) {
String msg = "rabbitmq msg!";
channel.basicPublish("", NO_TRANSACTION, null, msg.getBytes());
}
channel.basicPublish("", NO_TRANSACTION, null, "end".getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
channel.close();
}
long end = System.currentTimeMillis();
System.out.println("[发送方]发送方耗时:" + (end - start));
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
static class NoConsumer implements Runnable {
/**
* When an object implementing interface Runnable
is used
* to create a thread, starting the thread causes the object's
* run
method to be called in that separately executing
* thread.
*
* The general contract of the method run
is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(NO_TRANSACTION, false, false, false, null);
//每次1条
channel.basicQos(1);
long start = System.currentTimeMillis();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
channel.basicAck(envelope.getDeliveryTag(),false);
if (msg.equalsIgnoreCase("end")){
long end = System.currentTimeMillis();
System.out.println("[接收方]接收完毕"+(end-start));
try {
channel.close();
connection.close();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
};
//手动ack
channel.basicConsume(NO_TRANSACTION, false, consumer);
System.out.println("[接收方]客户端等待中......");
latch.countDown();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
输出:
[接收方]客户端等待中......
[发送方]发送方耗时:4080
[接收方]接收完毕16904
[事务消息代码](http://dwz.cn/6lD4ad)
static class TranPublisher implements Runnable {
/**
* When an object implementing interface Runnable
is used
* to create a thread, starting the thread causes the object's
* run
method to be called in that separately executing
* thread.
*
* The general contract of the method run
is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
try {
try (Connection connection = factory.newConnection()) {
Channel channel = connection.createChannel();
channel.queueDeclare(TRANSACTION, false, false, false, null);
long start = System.currentTimeMillis();
try {
for (int i = 0; i < MSG_NUM;) {
if (i%BATCH ==0){
//开启事务
channel.txSelect();
for (int j = 0; j < BATCH; j++) {
String msg = "rabbitmq msg!";
if(i + j != MSG_NUM -1 ){
channel.basicPublish("", TRANSACTION, null, msg.getBytes());
}else{
channel.basicPublish("", TRANSACTION, null, "end".getBytes());
}
}
i += BATCH;
//commit
channel.txCommit();
}
}
} catch (Exception e) {
//回滚事务
channel.txRollback();
e.printStackTrace();
} finally {
channel.close();
}
long end = System.currentTimeMillis();
System.out.println("[tx发送方]发送方耗时:" + (end - start)+" 批量大小="+BATCH);
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
static class TranConsumer implements Runnable {
/**
* When an object implementing interface Runnable
is used
* to create a thread, starting the thread causes the object's
* run
method to be called in that separately executing
* thread.
*
* The general contract of the method run
is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TRANSACTION, false, false, false, null);
//每次1条
channel.basicQos(1);
long start = System.currentTimeMillis();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
//发送ack
channel.basicAck(envelope.getDeliveryTag(), false);
if (msg.equalsIgnoreCase("end")){
long end = System.currentTimeMillis();
System.out.println("[tx接收方]接收完毕"+(end-start));
try {
channel.close();
connection.close();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
};
//手动ack
channel.basicConsume(TRANSACTION, false, consumer);
System.out.println("[tx接收方]客户端等待中......");
latch.countDown();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
输出:
[tx接收方]客户端等待中......
[tx发送方]发送方耗时:8703 批量大小=100
[tx接收方]接收完毕22160
[消息确认代码](http://dwz.cn/6lD4ad)
static class ConfirmPublisher implements Runnable {
/**
* When an object implementing interface Runnable
is used
* to create a thread, starting the thread causes the object's
* run
method to be called in that separately executing
* thread.
*
* The general contract of the method run
is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
try {
try (Connection connection = factory.newConnection()) {
Channel channel = connection.createChannel();
long start = System.currentTimeMillis();
try {
for (int i = 0; i < MSG_NUM; ) {
if (i%BATCH ==0){
//开启confirm3
channel.confirmSelect();
for (int j = 0; j < BATCH; j++) {
String msg = "rabbitmq msg!";
if(i + j != MSG_NUM -1){
channel.basicPublish("", CONFIRM, null, msg.getBytes());
}else{
channel.basicPublish("", CONFIRM, null, "end".getBytes());
}
}
i += BATCH;
//confirm
// waitForConfirmsOrDie 相对于 waitForConfirms 来说,只要有nack就好抛出异常,同时也是一种阻塞式
channel.waitForConfirmsOrDie();
//channel.addConfirmListener(new ConfirmListener() {
// @Override
// public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//// System.out.println("ack deliveryTag = " + deliveryTag);
// }
//
// @Override
// public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//// System.out.println("nack deliveryTag = " + deliveryTag);
// }
// });
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
channel.close();
}
long end = System.currentTimeMillis();
System.out.println("[confirm发送方]发送方耗时:" + (end - start)+" 批量大小="+BATCH);
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
static class ConfirmConsumer implements Runnable {
/**
* When an object implementing interface Runnable
is used
* to create a thread, starting the thread causes the object's
* run
method to be called in that separately executing
* thread.
*
* The general contract of the method run
is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(CONFIRM, false, false, false, null);
//每次1条
channel.basicQos(1);
long start = System.currentTimeMillis();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
//发送ack
channel.basicAck(envelope.getDeliveryTag(), false);
// System.out.println("确认"+msg);
if (msg.equals("end")){
long end = System.currentTimeMillis();
System.out.println("[confirm接收方]接收完毕"+(end-start));
try {
channel.close();
connection.close();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
};
//手动ack
channel.basicConsume(CONFIRM, false, consumer);
System.out.println("[confirm接收方]客户端等待中......");
latch.countDown();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
输出:
[confirm接收方]客户端等待中......
[confirm发送方]发送方耗时:5358 批量大小=100
[confirm接收方]接收完毕22502
10w简单消息发送时间
无事务:15s左右
tx事务:20s左右
confirm事务:`20s左右`
本地测试,所以这里没有网络的延迟.
`这里有个疑问,confirm事务没有像官方说明的一样,接近无事务的效率.`