189 8069 5689

RocketMQ笔记:-创新互联

一、RocketMQ可以解决哪些问题? 1、微服务之间的解耦:

  如果微服务是直接相互通过Feign或其他方式互相调用的,那每个微服务都将在自己的代码中实现调用的逻辑,如果被调用端需要提供的参数或者逻辑发生了变动,则调用端可能也需要修改自己的代码。用图来理解,如果每个微服务都需要互相调用,若有n个微服务,则每个微服务都需要实现其他n-1个微服务的调用逻辑。

创新互联建站自2013年创立以来,是专业互联网技术服务公司,拥有项目网站建设、成都网站建设网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元泰山做网站,已为上家服务,为泰山各地企业和个人服务,联系电话:18980820575

  如果使用RocketMQ,将微服务直接的调用转换成调用者直接发送对应的消息给RocketMQ中,而被调用者直接从RocketMQ取属于自己的消息去消费,这样就无需再互相实现对方的调用逻辑了。而且这次请求也从同步变成了异步。

2、流量削峰:

  由于RocketMQ是一个队列,队列本身就具有缓和队头和队尾速度差的作用,如果没有队列,所有的请求都由生产者直接调用消费者,由于消费者消费请求需要时间,如果消费者是NIO模型,就会为之创建管道,将管道注册进Selector中,如果同时有一万个请求,则消费者的内存会很快溢出,造成消费者崩溃。而如果将这些请求放在MQ的队列中等待,则在MQ中它们只是以一个消息的方式存在,比起管道占用的内存就小了很多,消费者就可以慢慢从MQ中消费消息。

3、异步处理:

假设一个买票场景,假设前端请求后端要执行四个任务:前端发送所需信息调用后端接口,后端收到后:

1、支付系统等待用户支付;

2、订单系统修改订单状态为已支付;

3、向用户发送短信通知。

如果不使用消息队列,则可以这样处理:1处理完成后开始2,2处理完成后开始3......(以此类推)如果这样处理,则用户必须要等待四个任务全部都处理完成了,才能得到响应的结果,而且如果其中一个步骤出现了问题,则3个步骤都得回滚重新发起请求。但是这三个步骤并不需要一起完成后再反馈给用户,可以在支付系统收到请求之后,就反馈给用户已提交,剩下的异步处理(分别发送消息给订单系统和通知系统),完成后再通知用户即可,这样就提高了吞吐量。

二、RocketMQ如何保证信息不丢失:

  什么时候可能会发生信息丢失:

  1、生产者发送消息给Broker的时候

  2、Broker在进行集群间同步(主从同步)的时候

  3、Broker在持久化的时候

  4、消费者从Broker取消息的时候

  如何设计对应的解决方案,使这四个情况都不会造成消息丢失即可。

对于1,在RocketMQ中可以使用事物消息机制:

  ①:首先生产者发送half消息,即要提交的消息本身。

  ②:Broker回复一个half消息告诉生产者收到了消息,但是消息会被MQ标记为“暂不能投递”状态,需要生产者的本地事物完成并再发送二次确认的结果:Commit或者Rollback,才可以被投递或回滚。

  ③:生产者在本地事物完成以后再发送Commit或者Rollback,使之可以被交付或者回滚。

  在此期间Broker可以回查生产者本地事物的状态,生产者如果事物还未完成可以发送unknown给Broker,则Broker继续等待,过一会接着回查。如果在此期间生产者断网或重启了,在时间间隔内没有收到确认结果,则会向生产者集群中的任意一个发起回查。

对于2,即在MQ主从消息同步保证消息不丢失:

  RocketMQ提供了同步双写方式和异步复制方式。

  同步双写方式:在生产者发送消息给Master,Master收到消息后将消息复制到Slave,当Slave和Master都写入成功以后,才会像生产者发送收到回复。(此方式不会发生消息丢失,但是性能回避异步复制方式低10%左右)

  异步复制方式:在生产者发送消息给Master之后,Master写入成功就会返回成功,Master再异步将消息复制到Slave中去。

对于3,即在MQ进行持久化时保证消息不丢失

  同步刷盘:在生产者写入RocketMQ的时候,MQ立刻进行刷盘操作,在刷盘成功之后,再给生产者返回成功确认。不会造成消息丢失。

  异步刷盘:在生产者写入RocketMQ的时候,MQ在将消息发送任务给另一个负责存盘的线程,存入该线程内存中,内存写入成功就先返回成功确认,再由该存盘线程负责把内存中的消息刷盘。

对于4,即在消费者从MQ取消息的时候保证消息不丢失

  Consumer需要在收到消息后,执行本地事务,在本地事物完成之后,返回给MQ确认,MQ在收到确认之后,才会移动offset。(MQ在收到确认之后才会移动Offset)

三、如何保证消息消费的幂等性

什么是幂等性?

  防止消费者重复消费。

所有MQ产品并没有提供主动解决幂等性的机制,需要由消费者自行控制。

RocketMQ给每个消息分配了一个MessageID,该ID可以作为判断幂等的工具。本地事物在存入数据库的时候将该ID存入数据库,再在执行逻辑前检查收到的消息的MessageID是否已经存在数据库中。由于rocketMQ中在并发量很大时的MessageID并不能保证全局唯一。

或者可以自己带一个业务ID,通过该ID来判断。

四、RocketMQ如何保证消息顺序

  全局有序和局部有序:MQ只需要保证局部有序,不需要保证全局有序。比如对于同一个订单的不同步骤(订单内的步骤间)需要有序,而订单与订单之间的顺序无需关心。

  RocketMQ如何实现?在sendMessage时生产者重新生成一个MessageQueueSelector类的对象,但是需重写此类的select方法(根据订单号OrderID用队列的数量mqs.size()取余即可)

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(Listmqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();//对于同一个订单,由于其OrderID相同,因此会发送到同一个队列中
        return mqs.get(index);
    }
}, orderId);

  除了要保证生产者在发送消息给MQ时,还需要保证消费者在取消息时保证从队列中同时取。

  由于生产者在写入MQ的时候MQ中存在多个队列,MQ可能将这些消息负载均衡到多个队列中,我们可以通过在发送时多添加一个参数:MessageQueueSelector的对象,并重写select方法。

  对于消费者,在从MQ中消费消息时,令消费者一次取出某队列中的所有消息进行消费即可。

五、如何保证消息的高效读写?

  kafka和RocketMQ使用零拷贝技术:

  零拷贝技术有两种方式:1、Mmap方式;2、transFile方式。

  使用内存映射在用户程序中操作文件,通过MappedByteBuffer操作对象。

  Mmap(Memory Map内存映射)方式适合比较小的文件,通常文件大小在1.5-2G之间。

File file = new File("test.txt");
final RandomAccessFile rw = new RandomAccessFile(file,"rw");
final MappedByteBuffer map = rw.getChannel().map(FileChannel.MapMode.READ_WRITE,0,2048);
map.put("mmap content".getBytes());
rw.close();

  kafka使用transFile方式将硬盘数据加载到网卡。

六、使用MQ如何保证分布式事物的最终一致性?

  1、要保证生产者100%消息投递。事物消息机制。

  2、消费者保证幂等消费。唯一ID+业务自己实现。

  分布式MQ的三种语义:

  at least one:每个算子至少被处理一遍

  at most one:每个算子至多被处理一遍

  exactly one:每个算子只被处理一遍

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


新闻标题:RocketMQ笔记:-创新互联
URL链接:http://gzruizhi.cn/article/cdsihi.html

其他资讯