(WebFlux)003、多数据源R2dbc事务失效分析
在项目改造过程中,我们将SpringMVC替换为SpringWebflux,同时将Mybatis升级为R2dbc。rpm编译源码项目进展顺利,直到新需求引入MongoDb,问题浮现。面对Mysql和MongoDb的多数据源挑战,事物操作出现异常。本文将深入分析问题原因与解决方案。
在本地测试时,强烈推荐使用虚拟机和Docker安装MySql与MongoDb,以避免Mac直连Docker带来的麻烦。SpringBoot版本为2.6.,本文基于已集成R2DBC与MongoDb的环境。
首先,我们创建了一个测试库r2dbc_test,包含user表。引入R2dbc并进行基本测试,实现事务操作,确保数据完整性。测试结果显示,R2dbc事务操作正常,当尝试删除并插入数据时,期望的异常和数据状态得到验证。
接着引入MongoDb,并开启事务支持。根据官方文档,除非手动配置MongoTransactionManager,否则事务支持默认禁用。在项目中添加相应代码,为Webflux环境配置MongoDB事务。然而,aosp7.0源码引入MongoDb后,事务操作再次出现问题,未按预期回滚。
为了解决此问题,我们深入分析了事务失效的原因。经过排查,发现事务管理器未能正确初始化,导致TransactionalOperator无法正常工作。通过查看源码,发现R2dbcTransactionManager的初始化依赖于是否存在ReactiveTransactionManager。由于MongoDb事务已先期初始化,导致R2dbcTransactionManager未能正确创建,从而影响了事物操作。
为解决此问题,我们采取了以下措施:创建两个配置类,分别为MongoConfig和R2dbcConfig,用于自定义事务管理器的初始化。通过别名方式创建两个TransactionalOperator,确保R2dbcTransactionManager的正确初始化。经过验证,设置正确的名称后,事务操作恢复正常,数据回滚验证成功。
本文提出了手动验证的方法,并指出了使用日志记录作为辅助工具的快捷途径。通过日志,可以清晰地追踪事务创建与回滚过程,验证操作的有效性。总结而言,在面对新工具和多数据源时,应充分实验、验证结果,面对问题时保持冷静,逐步解决问题。apache select 源码分析如有疑问,欢迎指正与交流。
使用Gateway作为SpringCloud网关
本着能用原生就用原生的原则,我们这里使用SpringGateway来作为云服务的网关
配置
从官网的介绍来看,spring网关拥有的功能有,路由(配置,过滤,重写等),熔断以及流量控制
首先引入包
动态路由
路由的配置比较简单,有两种方法:使用配置文件和代码注入,我们这里简单展示下两种方法
或者使用
路由配置中id、uri、order、predicates.path/host没什么好说的,根据需求配置即可,filters相关参数,这里最好还是参考源码相关部分或者Spring Cloud Gateway比较全面,比如常用的前缀切割
这里我们以常用的两种filter,流量控制和熔断降级举例
流量控制
通常我们需要限流来保证服务的可用性,保护一些不太稳定的服务不会因为高并发的请求而挂掉,这里我们一般在网关层做流量控制,减少实际进入的请求达到平波峰的目的
计数器算法
如果某个服务会在请求中数量达到时候挂掉,请求平均时间为2s,我们给一段时间一个请求量的限制,比如2秒次,每次请求进入就减少计数,每2s开始时重新计数,这样就能保证服务请求中数量在以内。但是对于抢购类接口,可能前ms请求数量就用完了,后面所有请求都被拒绝,即请求突刺现象,这样的用户体验是非常差的所以我们需要尽可能在所有的时间内保证接口的可用性(计数器算法就像DRAM中的集中式刷新一样不太能被接受),而且短时间内大量请求运行在相同代码段是hashmap源码实现原理非常危险的,在设计不好的情况很可能会出现数据库死锁等等问题
漏桶算法
我们需要让请求尽可能地能进行来,就需要平波峰填波谷,就上例而言,2s内最大请求为,也就是每个请求占用的时间比例为ms,我们设计一个容量为的桶(队列)每ms向接口发一个请求,可以让服务中请求数量不超过的情况下,每ms都能接受一个新的请求,这样就缓解了请求突刺现象。但是这里还有一个问题,对于抢购类接口,个容量可能ms就用完了,在第ms可能还会有个请求抢1个位置,个请求会被取消,这样也是相对来说不能被接受的
令牌桶算法
令牌桶算法就是目前spring cloud gateway采用的算法,这里采用的用户时间换用户失败的策略,假设我们认为用户的平均忍耐时间为8秒,接口超过8秒一些用户就要骂街了,减去实际执行的2秒,也就是说我们的可以利用6秒的时间容纳更多的请求。依上文而言每ms去调用这个端口,那么也就是说桶的设计可以更大,在桶里放上令牌,每个请求需要在桶里面拿到令牌才能调用,这里的桶容量就是6s/ms为个。但是我们的执行速度是不变的,也就是结果是,在请求多的情况下用户的执行时间在8秒左右,而在请求少的情况下执行速度在2s左右,这样就缓解了短时间内大量请求导致大量失败的问题了。这里比较重要的参数有两个,第一个是桶请求容量 defaultBurstCapacity,第二个是每秒执行的请求速度(也就是桶的填充速率)defaultReplenishRate
在这个例子中defaultBurstCapacity=而defaultReplenishRate=,这两个参数我们会在下方配置
这里我们需要引用redis包,在线cc攻击源码再说明一下,本站使用的是jdk的版本,其他版本的配置和引用可能会稍有变化,需要调整
覆写KeyResolver的实现类
流量控制,这里同样有代码实现和配置文件实现,由于目前idea对于复杂配置文件的支持不太好,如果使用配置文件方式会疯狂报红,但是如果全部使用代码的话会不方便实现动态路由,因为gateway是先加载配置再处理代码的。所以这里我们路由使用配置,filter之类复杂的使用代码实现,下面是简单示例
这样全服务层面的接口流量控制就完成了,具体的哪些服务使用流量控制,具体控制参数的配置,自行稍作修改即可
测试流量控制的话,可以将令牌回复量和令牌总容量调至比较低的水平,然后再浏览器直接curl接口,比如令牌回复量和容量为1,则单秒内curl即可触发浏览器提示,线上大令牌容量测试能需要多线程curl了,这里参考官方文档给的lua脚本
ip限流
如果我们需要对某个ip进行限流,比如防止脚本抢货,我们这里需要KeyResolver的实现不再使用exchange.getRequest().getURI().getPath() ,而是使用 exchange.getRequest().getRemoteAddress() 。但是这里还有一个问题,我们请求是经过层层转发的,nginx,docker等,所以我们可能并不能拿到原始的请求地址,所以这里我们需要在最外层,比如nginx中将原始地址存到header或者cookie当中,这里给出简单示例
当然还有其他类似X-Forwarded-For的字段不再本文主要探讨范围就不多拓展了,在nginx中配置记录初始远程地址到header后,我们这里需要在程序中取出来,如果你这里使用的标准的X-Real-IP的字段去存储,那么只需要
即可获取真实地址,如果你这里自定义了一个header的key那么需要在exchange.getRequest().getHeaders()里面自己找出来了
最后我们这里给出对同一个接口同时配置两种限流的示例
我在ip限流这里修改了返回的code由改为了,方便测试,这里我们将ip的限流参数设置为(2,2),将path的限流参数设置为(1,)然后不断请求接口就发现一开始返回错误,后续path令牌桶用完后返回错误,即设置成功
补充
如果这里你不希望返回,并且要求返回一个用户可读的带有json信息结果,那么比较好的业务处理方式是前端完成。如果是对外接口的话,那么我们这里就只能重写RateLimiter的实现了,不再使用RedisRateLimiter的类,而是自己去继承RateLimiter接口去实现,
参考 SpringCloudGateway限流后,默认返回的改造:改跳转或增加响应body,这篇文章已经很详细,这里就不赘述了
熔断降级
熔断降级,即某个接口调用失败时使用其他接口代替,来保证整体服务对外的可用性
首先需要引入熔断包
circuitbreaker-reactor-resilience4j 熔断的相关配置分为两个部分,熔断逻辑本身的配置以及在集成到gateway中时候,网关的配置,熔断的重要的配置有,触发熔断的接口,代替接口,熔断超时时间(当然还有其他的,比如自定义熔断HttpStatus等等,详细参数参考 Spring Cloud Circuit Breaker以及resilience4j官网)
这里熔断触发接口和代替接口配置位于gateway中,这里我们使用代码实现,位置参考前述
这里setName的目的是和熔断包中的配置产生对应关系,下方为熔断包的配置,这里定义默认超时时间(也就是没有匹配到name的超时时间)为s,your_breaker_id的超时时间为3s
最后
到这里网关的基本功能就差不多了,自定义的一些业务功能配置,比如header,cookie,以及调用方ip的处理逻辑等等其实都是在网关层处理的,可以参考 Spring Cloud Gateway WebFilter Factories以及Writing Custom Spring Cloud Gateway Filters,但是这种配置基本都没什么坑,这里就不谈了
网关由于不经常作为业务逻辑被重构,所以网络上的资料相对比较少,我这里使用的又是最新的版本还是蛮多和前版本不一样的地方,尤其是webflux的一些东西,很多问题需要看源码才能解决,非常的消耗意志力。这里建议小伙伴们如果是业务使用的这种资料相对较少的架构,最好还是不要使用最新版本的比较好,毕竟万一遇到坑,踩个一两天是很正常的事情,而这种在业务场景可能就没那么容易接受了
Reactor-Netty基本抽象类介绍
概述
之前已经把reactor3看的差不多了,在学会webflux之前还需要了解Reactor-Netty的相关知识,然后才能看懂webflux,然后才能看懂Gateway.
LoopResource首先先学习几个基本的类才能看懂Reactor-Netty在干什么.我们先来看LoopResource类.官方说这个类是一个EventLoopGroup 的 selector并且关联了 Channel的工厂
* An { @link EventLoopGroup} selector with associated* { @link io.netty.channel.Channel} factories.我们来看一下LoopResource提供的一些方法
static LoopResources create(String prefix) { if (Objects.requireNonNull(prefix, "prefix").isEmpty()) { throw new IllegalArgumentException("Cannot use empty prefix"); } return new DefaultLoopResources(prefix, DEFAULT_IO_SELECT_COUNT, DEFAULT_IO_WORKER_COUNT, true);}我们来看看DefaultLoopResource内部实现
其实内部就是缓存了一堆的EventLoopGroup
ChannelPipelineConfigurer这个类的作用就是Channel创建好之后,在读取数据之前的初始化工作,我们看几个实现类 HttpServerChannelInitializer
ChannelGroup官方解释: 一个线程安全的集合,里面装的是打开的Channel,并且提供了很多操作Channel的方法,关闭的Channel会自动被group剔除.一个Channel可以属于多个Group
先来看看唯一一个实现类DefaultChannelGroup的源码
可以看到内部就是两个Map维护服务端和客户端的Channel,然后还有一个监听器.接下来看看添加Channel的方法再来看看是如何自动把过期Channel移除的,channel关闭之后会出发listener,listener会调用remove方法
其实就是很简单的从map中移除数据的逻辑
ConnectionObserver从字面上看就是连接的观察者.是一个Connection的生命周期观察器.核心方法是 onStateChange.子类很多,等看源码的时候看到具体的再看源码.我们先来看ConnectionObserver定义的几个状态
TransportConfig一个配置的抽象类,里面保存了一些属性
我们上面介绍的那些类都被保存在了这个Config里面.来看看其中一些比较重要的子类
ServerTransportConfig可以看到这个子类里面提供了两个ConnectionObserver我们分别来看一看
ServerTransportDoOnconnectionServerTransportDoOn原文:/post/响应式编程入门之 Project Reactor
本文旨在为读者提供对响应式编程及其核心库——Reactor的入门理解。在介绍前,我们先回顾一下非阻塞IO编程的基础,理解为何在Spring MVC中引入了WebFlux以及Reactor。Reactor是基于Java 8函数式API,集成CompletableFuture、Stream和Duration,它提供了Flux和Mono等异步序列API,并实现了Reactive Streams规范,特别适合构建微服务架构中的响应式系统。
在非阻塞IO编程中,比如调用远程服务时,我们通常通过回调函数来处理数据可用情况。然而,当回调逻辑复杂时,代码往往难以阅读。响应式编程通过简化这种逻辑,提供了更简洁的实现方式。它将传统命令式编程抽象为一系列API,更适合非阻塞IO环境。尽管响应式编程在非阻塞IO框架中广泛应用,如Vertx和WebFlux,但这并不意味着非阻塞IO编程只能依赖响应式编程。
Reactor作为响应式编程的基础,实现了Java响应式编程规范,理解其内部工作原理有助于深入掌握其API。Reactor的核心接口展示了其运作机制,包括数据发布和订阅流程。在实际应用中,Publisher和Subscription共同作用,通过调用Subscriber的onNext、onComplete和onError方法来实现数据流转。
响应式编程思想可类比为一条流水线,Publisher定义了数据生产过程,Operators对数据进行解析、校验和转换等操作,最终流转到Subscriber。这种设计使得系统在未被订阅之前保持静默,直至实际使用时才启动。
Reactor中的Operator作为连接上下游的关键组件,实现了数据的转换和处理。例如,map操作符通过改变数据值来实现数据转换。实际实现虽然复杂且严谨,但遵循了相同的设计理念。
学习Reactor的关键在于理解核心接口以及实践API。首先理解响应式编程的基本概念和Reactor如何实现这些概念。接下来,深入阅读官方文档并进行代码实践。追踪源码时,关注subscribe方法和Subscription的作用,以及Subscriber中的onNext、onComplete和onError方法的实现。
总之,通过本文的学习,读者应能对响应式编程和Reactor有初步的了解,并掌握学习Reactor的方法和途径。尽管本文未详细探讨Reactor的每个细节,但它为深入探索提供了基础。欢迎读者通过实践和阅读源码进一步深入理解这一强大且灵活的编程范式。
神器 SpringDoc 横空出世!最适合 SpringBoot 的API文档工具来了
之前在SpringBoot项目中,我一直在使用SpringFox提供的Swagger库。然而,当我浏览其官网时,发现已经有将近两年没有出新版本了。最近,当我升级到SpringBoot 2.6.x版本时,发现这个库的兼容性也越来越差,有些常用注解属性甚至被废弃了,而库中并没有提供替代方案。偶然间,我发现了一款名为SpringDoc的Swagger库,试用后发现效果非常不错,因此推荐给大家。
SpringDoc是一款基于OpenAPI 3的API文档生成工具,可以与SpringBoot结合使用。在Github上,它已经获得了超过1.7K个Star,更新发布也相当频繁,可以说是一款比Swagger库更好用的工具。值得一提的是,SpringDoc不仅支持Spring WebMvc项目,还可以支持Spring WebFlux项目,甚至Spring Rest和Spring Native项目,功能非常强大。下面是一张SpringDoc的架构图。
接下来,我将介绍SpringDoc的使用方法。我将以之前集成SpringFox的mall-tiny-swagger项目为例,将其改造为使用SpringDoc。
首先,我们需要集成SpringDoc。在pom.xml中添加它的依赖即可,开箱即用,无需任何配置。
从SpringFox迁移结合SpringSecurity使用测试常用配置
SpringDoc还有一些常用的配置可以了解,更多配置可以参考官方文档。
总结
在SpringFox的Swagger库好久不出新版的情况下,迁移到SpringDoc确实是一个更好的选择。今天我体验了一把SpringDoc,确实很好用,与之前熟悉的用法相似,学习成本极低。而且SpringDoc能支持WebFlux之类的项目,功能也更加强大,对于使用SpringFox觉得有些卡手的朋友来说,迁移到SpringDoc是一个不错的选择!
参考资料项目源码地址:github.com/macrozheng/m...
来源:mp.weixin.qq.com/s/scit...
2024-11-20 12:57
2024-11-20 11:49
2024-11-20 11:19
2024-11-20 10:53
2024-11-20 10:51