1.反应式编程 Reactor 3.x
2.Java响应式编程 第十一篇 WebFlux集成Redis
3.Java异步非阻塞编程的源码几种方式
4.反应式流 Reactive Streams 入门介绍
5.有什么使用了rxjava或rxandroid的开源项目?
6.Reactiveï¼ååºå¼ï¼ç¼ç¨
反应式编程 Reactor 3.x
Reactor 3.x是一个Java库,用于构建反应式应用程序,源码基于Reactive Streams标准,源码可以轻松与RxJava 2及其他反应流库集成。源码它提供了丰富的源码API和工具,如IPC API,源码网站问答系统源码用于网络和非JVM通信。源码
Reactive Streams是源码一种处理异步且无阻塞数据流的模式,提供背压机制,源码确保Publisher发布者不会给Subscriber订阅者带来过多压力,源码同时允许订阅者维护内部缓冲区或避免阻塞。源码
Reactor的源码两个主要类型是Flux和Mono。Flux与RxJava的源码Observable类似,可以发射0或多个事件,源码而Mono仅能发射一次事件,源码等效于RxJava的Single和Maybe。这种简单区别使得API更加直观,易于理解和使用。
Reactor提供了一系列API和工具,如Scheduler、StepVerifier等,用于测试和管理反应式流。例如,您可以创建空的Mono或Flux,或使用工厂方法创建包含特定事件的Mono或Flux。同时,Reactor支持流的转换和合并,以及错误处理。
转换流时,您可以对事件进行同步或异步映射,或使用延迟发布者以避免阻塞。合并流时,您可以将事件交织或串联,以实现特定的事件顺序。错误处理则提供了在发生错误时返回默认值、使用不同流或执行特定操作的能力。
Reactor还支持与同步API的资金翻倍指标源码交互,通过使用Mono或Flux进行阻塞调用。此外,它提供了一种方法将集合转换为Flux,用于高延迟的资源获取,或处理快速的发布者和缓慢的订阅者。
通过Reactor,开发者可以构建高效、响应式且易于维护的应用程序,利用其丰富的功能集和API简化反应式编程。
Java响应式编程 第十一篇 WebFlux集成Redis
在现代的分布式系统中,缓存是提高性能和扩展性的关键因素之一。Redis,作为一个开源的内存数据结构存储系统,不仅可以作为数据库,还可以作为缓存和消息中间件。WebFlux,作为Spring框架提供的响应式编程模型,在处理高并发和大数据量方面表现出色。
本文将探讨如何使用Reactor和WebFlux与Redis集成,利用其响应式特性来执行缓存操作。
首先,我们需要在项目的pom.xml文件中引入Spring WebFlux和Spring Data Redis的依赖项。
然后,在application.properties文件中配置Redis的连接信息。
在配置类中创建一个RedisCacheManager以管理缓存,并在其中使用RedisCacheConfiguration配置缓存的默认过期时间、键和值的序列化方式。
接下来,定义一个Service类来处理缓存操作。使用Spring框架的缓存注解来定义缓存逻辑,如@Cacheable用于读取缓存,@CachePut用于更新缓存,@CacheEvict用于清除缓存。同时,使用ReactiveRedisOperations执行Redis操作。
编写WebFlux控制器以处理请求,physx 开放源码使用@GetMapping、@PostMapping和@DeleteMapping映射URL,并调用UserService中的相应方法处理业务逻辑。
在集成过程中可能会遇到错误或异常,例如无法连接到Redis服务器或Redis命令执行失败。通过使用Spring的全局异常处理器(@ControllerAdvice)或Reactor的操作符(如onErrorResume)来处理异常,可以提高系统的健壮性和可靠性。
根据具体需求和环境,可能还会遇到其他问题。但通过研究和调试,您应该能够成功集成WebFlux和Redis,并实现预期的功能和效果。
本文介绍了如何利用Reactor和WebFlux与Redis集成来处理缓存操作。通过使用ReactiveRedisOperations和Spring框架的缓存注解,我们可以方便地实现响应式的缓存逻辑,提高系统的性能和扩展性,尤其适用于高并发和大数据量的场景。
Java异步非阻塞编程的几种方式
深入探讨Java异步非阻塞编程的几种方式,旨在提升并发性能和优化资源利用,本文将系统地介绍这些方法,以及它们如何解决同步编程中的线程阻塞问题,进而阐述如何通过异步编程提升业务逻辑的响应速度与吞吐量。
首先,以一个简单的HTTP调用为例,同步调用方式在IO等待期间,会导致线程资源无法被充分利用,限制了业务吞吐量。为解决此问题,引入了JDK NIO和Future机制。
在JDK 1.5版本中,JUC提供了Future抽象,允许主线程在不阻塞的情况下发送多个IO请求,并在请求完成后得到结果。通过异步方式,主线程可以执行其他任务,比如发送更多请求,高清影院源码提高了资源利用率。但需要注意,虽然主线程不再等待IO响应,仍需等待Future对象完成,这在一定程度上限制了非阻塞的优势。
接着,使用Callback回调方式进一步优化,允许在发送请求后立即执行其他逻辑,避免了主线程阻塞。对于HTTP请求,可以通过异步Servlet在Servlet 3.1中实现。此方法在非阻塞编程中实现了更高效的线程资源利用,确保了整个过程中没有线程阻塞现象。
然而,回调地狱是异步编程中常见的问题,它发生在回调函数嵌套时。为解决这一问题,引入了CompletableFuture。通过将操作封装为独立的CompletableFuture,并使用compose和whenComplete方法,可以有效避免回调地狱。此方法通过栈结构管理依赖操作,使得异步逻辑的执行仿佛同步进行,简化了代码结构。
Vert.x Future同样提供了解决方案,通过使用Handler概念,实现了异步逻辑的分层管理。Vert.x Future的核心执行逻辑与CompletableFuture相似,但使用了不同的实现方式,同样解决了线程阻塞问题。
引入了统一的抽象概念,如Reactive Streams,以解决异步编程中的问题。Reactive Streams由Publisher、Subscriber、Processor、直播程序app源码Subscription四个接口构成,它们提供了统一的异步编程框架,帮助开发者构建高并发、低延迟的应用。
在JDK 9中,Reactive Streams被封装为Java.util.concurrent.Flow接口。这为开发者提供了一种标准化的方法来实现异步数据流的处理,提高了编程的可读性和可维护性。
以Reactor、Spring 5以及Spring WebFlux为例,展示了Flux和Mono在处理异步数据流时的高效性。Reactor框架提供了一系列工具和库,使得开发者能够轻松地构建和管理异步数据流,而Spring WebFlux则通过集成Reactor,为基于HTTP的异步应用提供了强大的支持。
反应式流 Reactive Streams 入门介绍
在Java中,处理异步任务的机制一直以来都相对较弱,但第三方框架对此有所补充。我最近对这方面的知识进行了探索,并在此分享学习过程。
核心概念是Java中的Reactive Streams,它旨在解决异步编程中的复杂性。尽管名称看似生硬,但它并非新事物,而是反应式编程思想的一种应用,旨在处理未知时间点的数据流变化,通过异步、回调的方式处理问题。
Reactive Streams起源于年,由Netflix、Pivotal和Lightbend的工程师合作推出,目标是为异步数据流处理提供统一的规范,适用于JVM和JavaScript,以及网络协议等环境。它借鉴了Java的API设计,如JPA和JDBC,但提供了处理异步流的标准化接口和操作。
Reactive Streams的主要目标有两个:一是简化异步编程中任务调度和依赖关系的管理,二是引入了回压机制,动态控制数据流速率,避免生产者和消费者之间的不平衡问题。
理解Reactive Streams,关键在于其"reactive"和"stream"两部分。"reactive"表示基于消息驱动的被动响应,而"stream"则强调数据的流动和节点的处理。它类似于流水线生产,通过异步操作,避免了阻塞,提高了性能。
尽管Reactive Streams不是Java 1.8的直接要求,但Java 8的lambda表达式使其优势得以展现。在Java 9中,它已经成为官方API的一部分,与Java 1.9的Flow类内容一致。
目前,有许多实现Reactive Streams的框架,如RxJava、Reactor、Akka Streams、Ratpack和Vert.x,它们各自在不同的应用环境中提供了不同的功能和兼容性。
总结来说,Reactive Streams的出现解决了库间互操作性的问题,使得反应式编程能够广泛应用,如MongoDB驱动程序就支持与Reactive Streams的集成,提升了数据处理的效率和灵活性。
有什么使用了rxjava或rxandroid的开源项目?
在探索使用了 RxJava 或 RxAndroid 的开源项目时,我们首先可以回顾 GitHub 上的官方资源:ReactiveX/RxJava。这个项目作为 RxJava 的源头,提供了核心库和文档,是学习 RxJava 的重要起点。值得一提的是,中国在 RxJava 领域有着优秀的贡献者,如@hi大头鬼hi,他的教程以其精准性和实用性,对众多学习者提供了巨大帮助。国内的开发者常常将翻译或撰写的资料先请大头鬼审校,可见其权威性之高。
接下来,我们聚焦到 Flipoard 的扔物线,他的开源库 MaterialEditText 和对 Dagger 源码的解析,都是深入 Android 开发领域的经典之作。虽然扔物线的教程现在可能不在公开博客中发布,但感兴趣的开发者依然可以通过搜索找到相关信息。
此外,yongjhih 这位台湾开发者同样值得推荐。作为 RxJava 的狂热爱好者,yongjhih 的 GitHub 上积累了丰富的 Examples,为学习者提供了实际操作的参考和灵感。
在寻找使用了 RxJava 或 RxAndroid 的项目时,上述提到的资源和开发者无疑是很好的起点。然而,阅读这些资料仅是学习的开始,更重要的是实践。动手编写 Demo,将 RxJava 与传统 Android 组件(如 Handler、AsyncTask、BroadcastReceiver 等)结合使用,可以显著加深理解。不断练习,相信自己能够掌握,是学习过程中的关键。
在这个领域,持续探索、实践和分享是推动技术进步的重要力量。无论是从官方文档开始,还是追随这些知名开发者的学习路径,最终的目标是将理论知识转化为实际能力,解决实际问题。在这个过程中,不断尝试、总结和反思,将带来最大的成长。通过实践和交流,我们可以更加深入地理解 RxJava 或 RxAndroid 的应用场景,从而在项目中发挥它们的独特优势。
Reactiveï¼ååºå¼ï¼ç¼ç¨
Reactor åRxjavaæ¯Reactive Programmingèä¾çä¸ä¸ªå ·ä½å®ç°ï¼å¯ä»¥æ¦æ¬ä¸ºï¼
ä½ä¸ºååºå¼ç¼ç¨æ¹åç第ä¸æ¥ï¼Microsoftå¨.NETçæç³»ç»ä¸å建äºReactive Extensionsï¼Rxï¼åºãç¶åRxJavaå¨JVMä¸å®ç°äºååºå¼ç¼ç¨ãéçæ¶é´çæ¨ç§»ï¼éè¿Reactive Streamså·¥ä½åºç°äºJavaçæ ååï¼è¿ä¸è§èå®ä¹äºJVMä¸çååºåºçä¸ç»æ¥å£å交äºè§åãå®çæ¥å£å·²ç»å¨ç¶ç±»Flowä¸éæå°Java 9ä¸ã
å¦å¤Java 8è¿å¼å ¥äºStreamï¼å®æ¨å¨ææå°å¤çæ°æ®æµï¼å æ¬åå§ç±»åï¼ï¼è¿äºæ°æ®æµå¯ä»¥å¨æ²¡æ延è¿æå¾å°å»¶è¿çæ åµä¸è®¿é®ãå®æ¯åºäºæçï¼åªè½ä½¿ç¨ä¸æ¬¡ï¼ç¼ºå°ä¸æ¶é´ç¸å ³çæä½ï¼å¹¶ä¸å¯ä»¥æ§è¡å¹¶è¡è®¡ç®ï¼ä½æ æ³æå®è¦ä½¿ç¨ç线ç¨æ± ãä½æ¯å®è¿æ²¡æ设计ç¨äºå¤ç延è¿æä½ï¼ä¾å¦I / Oæä½ãå ¶æä¸æ¯æçç¹æ§å°±æ¯ReactoræRxJavaçReactive APIçç¨æ¦ä¹å°ã
Reactor æ Rxjavaçååºæ§APIä¹æä¾Java 8 Streamçè¿ç®ç¬¦ï¼ä½å®ä»¬æ´éç¨äºä»»ä½æµåºåï¼ä¸ä» ä» æ¯éåï¼ï¼å¹¶å 许å®ä¹ä¸ä¸ªè½¬æ¢æä½ç管éï¼è¯¥ç®¡éå°åºç¨äºéè¿å®çæ°æ®ï¼è¿è¦å½åäºæ¹ä¾¿çæµç APIå使ç¨lambdasãå®ä»¬æ¨å¨å¤çåæ¥æå¼æ¥æä½ï¼å¹¶å 许æ¨ç¼å²ï¼å并ï¼è¿æ¥æ对æ°æ®åºç¨åç§è½¬æ¢ã
é¦å èèä¸ä¸ï¼ä¸ºä»ä¹éè¦è¿æ ·çå¼æ¥ååºå¼ç¼ç¨åºï¼ç°ä»£åºç¨ç¨åºå¯ä»¥æ¯æ大é并åç¨æ·ï¼å³ä½¿ç°ä»£ç¡¬ä»¶çåè½ä¸ææé«ï¼ç°ä»£è½¯ä»¶çæ§è½ä»ç¶æ¯ä¸ä¸ªå ³é®é®é¢ã
人们å¯ä»¥éè¿ä¸¤ç§æ¹å¼æ¥æé«ç³»ç»çè½åï¼
é常ï¼Javaå¼å人å使ç¨é»å¡ä»£ç ç¼åç¨åºãè¿ç§åæ³å¾å¥½ï¼ç´å°åºç°æ§è½ç¶é¢ï¼æ¤æ¶éè¦å¼å ¥é¢å¤ç线ç¨ãä½æ¯ï¼èµæºå©ç¨ççè¿ç§æ©å±ä¼å¾å¿«å¼å ¥äºç¨å并åé®é¢ã
æ´ç³ç³çæ¯ï¼ä¼å¯¼è´æµªè´¹èµæºãä¸æ¦ç¨åºæ¶åä¸äºå»¶è¿ï¼ç¹å«æ¯I / Oï¼ä¾å¦æ°æ®åºè¯·æ±æç½ç»è°ç¨ï¼ï¼èµæºå°±ä¼è¢«æµªè´¹ï¼å 为线ç¨ï¼æ许å¤çº¿ç¨ï¼ç°å¨å¤äºç©ºé²ç¶æï¼çå¾ æ°æ®ã
æ以并è¡åæ¹æ³ä¸æ¯çµä¸¹å¦è¯ï¼è·å¾ç¡¬ä»¶çå ¨é¨åè½æ¯å¿ è¦çã
第äºç§æ¹æ³ï¼å¯»æ±ç°æèµæºçæ´é«ç使ç¨çï¼å¯ä»¥è§£å³èµæºæµªè´¹é®é¢ãéè¿ç¼åå¼æ¥ï¼éé»å¡ä»£ç ï¼æ¨å¯ä»¥ä½¿ç¨ç¸åçåºå±èµæºå°æ§è¡åæ¢å°å¦ä¸ä¸ªæ´»å¨ä»»å¡ï¼ç¶åå¨å¼æ¥å¤çå®æåè¿åå°å½å线ç¨è¿è¡ç»§ç»å¤çã
ä½æ¯å¦ä½å¨JVMä¸çæå¼æ¥ä»£ç ï¼ Javaæä¾äºä¸¤ç§å¼æ¥ç¼ç¨æ¨¡åï¼
ä½æ¯ä¸é¢ä¸¤ç§æ¹æ³é½æå±éæ§ãé¦å å¤ä¸ªcallbacké¾ä»¥ç»åå¨ä¸èµ·ï¼å¾å¿«å¯¼è´ä»£ç é¾ä»¥é 读以åé¾ä»¥ç»´æ¤ï¼ç§°ä¸ºâCallback Hellâï¼:
èèä¸é¢ä¸ä¸ªä¾åï¼å¨ç¨æ·çUIä¸å±ç¤ºç¨æ·å欢çtop 5个ååç详ç»ä¿¡æ¯ï¼å¦æä¸åå¨çè¯åè°ç¨æ¨èæå¡è·å5个ï¼è¿ä¸ªåè½çå®ç°éè¦ä¸ä¸ªæå¡æ¯æï¼ä¸ä¸ªæ¯è·åç¨æ·å欢çååçIDçæ¥å£ï¼userService.getFavoritesï¼ï¼ç¬¬äºä¸ªæ¯è·ååå详æ ä¿¡æ¯æ¥å£ï¼favoriteService.getDetailsï¼ï¼ç¬¬ä¸ä¸ªæ¯æ¨èååä¸åå详æ çæå¡ï¼suggestionService.getSuggestionsï¼ï¼åºäºcallback模å¼å®ç°ä¸é¢åè½ä»£ç å¦ä¸ï¼
å¦ä¸ä¸ºäºå®ç°è¯¥åè½ï¼æ们åäºå¾å¤ä»£ç ï¼ä½¿ç¨äºå¤§écallback,è¿äºä»£ç æ¯è¾æ¦æ¶©é¾æ,并ä¸åå¨ä»£ç éå¤ï¼ä¸é¢æ们使ç¨Reactoræ¥å®ç°çä»·çåè½ï¼
futureç¸æ¯callbackè¦å¥½ä¸äºï¼ä½å°½ç®¡CompletableFutureå¨Java 8ä¸è¿è¡äºæ¹è¿ï¼ä½å®ä»¬ä»ç¶è¡¨ç°ä¸ä½³ãä¸èµ·ç¼æå¤ä¸ªfutureæ¯å¯è¡ä½æ¯ä¸å®¹æçï¼å®ä»¬ä¸æ¯æ延è¿è®¡ç®ï¼æ¯å¦rxjavaä¸çdeferæä½ï¼åé«çº§é误å¤çï¼ä¾å¦ä¸é¢ä¾åãèèå¦å¤ä¸ä¸ªä¾åï¼é¦å æ们è·åä¸ä¸ªidå表ï¼ç¶åæ ¹æ®idåå«è·å对åºçnameåç»è®¡æ°æ®ï¼ç¶åç»åæ¯ä¸ªid对åºçnameåç»è®¡æ°æ®ä¸ºä¸ä¸ªæ°çæ°æ®ï¼æåè¾åºææç»å对çå¼ï¼ä¸é¢æ们使ç¨CompletableFutureæ¥å®ç°è¿ä¸ªåè½ï¼ä»¥ä¾¿ä¿è¯æ´ä¸ªè¿ç¨æ¯å¼æ¥çï¼å¹¶ä¸æ¯ä¸ªid对åºçå¤çæ¯å¹¶åçï¼
Reactoræ¬èº«æä¾äºæ´å¤çå¼ç®±å³ç¨çæä½ç¬¦ï¼ä½¿ç¨Reactoræ¥å®ç°ä¸é¢åè½ä»£ç å¦ä¸:
å¦ä¸ä»£ç 使ç¨reactoræ¹å¼ç¼åç代ç ç¸æ¯ä½¿ç¨CompletableFutureå®ç°ç¸ååè½æ¥è¯´ï¼æ´ç®æ´ï¼æ´éä¿ææã
å¯ç»åæ§ï¼æçæ¯ç¼æå¤ä¸ªå¼æ¥ä»»å¡çè½åï¼ä½¿ç¨å åä»»å¡çç»æä½ä¸ºåç»ä»»å¡çè¾å ¥æ以fork-joinæ¹å¼æ§è¡å¤ä¸ªä»»å¡ã
ç¼æä»»å¡çè½åä¸ä»£ç çå¯è¯»æ§åå¯ç»´æ¤æ§ç´§å¯ç¸å ³ãéçå¼æ¥è¿ç¨å±æ°éåå¤ææ§çå¢å ï¼è½å¤ç¼åå读å代ç åå¾è¶æ¥è¶å°é¾ãæ£å¦æ们æçå°çï¼callback模åå¾ç®åï¼ä½å ¶ä¸»è¦ç¼ºç¹ä¹ä¸æ¯ï¼å¯¹äºå¤æçå¤çï¼æ¨éè¦ä»åè°æ§è¡åè°ï¼æ¬èº«åµå¥å¨å¦ä¸ä¸ªåè°ä¸ï¼ä¾æ¤ç±»æ¨ãé£ä¸ªæ··ä¹±è¢«ç§°ä¸ºCallback Hellï¼æ£å¦ä½ å¯ä»¥çå°çï¼æè ä»ç»éªä¸å¾ç¥ï¼ï¼è¿æ ·ç代ç å¾é¾åå½å¹¶æ¨çã
Reactoræä¾äºä¸°å¯çç»åé项ï¼å ¶ä¸ä»£ç åæ äºæ½è±¡è¿ç¨çç»ç»ï¼å¹¶ä¸ææå 容é常é½ä¿æå¨åä¸çº§å«ï¼åµå¥æå°åï¼ã
åææå¯ä»¥ç»ååç§è½¬æ¢åå ¶ä»ä¸é´æ¥éª¤ï¼æè æ¯å°ä¸é´å ç´ èéå¨ä¸èµ·å½¢æè¾å¤§è£ é 线çä¸é¨åãå¦æå¨è£ é 线ä¸æä¸ç¹åºç°å µå¡ï¼åå½±åçå·¥ä½ç«å¯åä¸æ¸¸ååºä¿¡å·ä»¥éå¶åææçåä¸æµå¨ã
è½ç¶Reactive Streamsè§èæ ¹æ¬æ²¡ææå®è¿ç®ç¬¦ï¼ä½Reactoræè rxjavaçååºåºçæä½³éå å¼ä¹ä¸æ¯å®ä»¬æä¾ç丰å¯çè¿ç®ç¬¦ãè¿äºæ¶åå¾å¤æ¹é¢ï¼ä»ç®åç转æ¢åè¿æ»¤å°å¤æçç¼æåé误å¤çã
å¨Reactorä¸ï¼å½æ¨ç¼åPublisheré¾æ¶ï¼é»è®¤æ åµä¸æ°æ®ä¸ä¼å¯å¨ãç¸åï¼æ¨å¯ä»¥å建å¼æ¥è¿ç¨çæ½è±¡æè¿°ï¼è¿å¯ä»¥å¸®å©éç¨åç»åï¼ã
ä¸æ¸¸ä¼ æä¿¡å·ä¹ç¨äºå®ç°èåï¼æ们å¨è£ é 线ä¸å°å ¶æ述为å½å·¥ä½ç«æ¯ä¸æ¸¸å·¥ä½ç«å¤çéåº¦æ ¢æ¶åä¸æ¸¸çº¿è·¯åéçåé¦ä¿¡å·ã
è¿å°æ¨æ¨¡å转æ¢ä¸ºæ¨æå¼æ··å模å¼ï¼å¦æä¸æ¸¸ç产äºå¾å¤å ç´ ï¼åä¸æ¸¸å¯ä»¥ä»ä¸æ¸¸æåºn个å ç´ ãä½æ¯å¦æå ç´ æ²¡æåå¤å¥½ï¼å°±ä¼å¨ä¸æ¸¸ç产åºå ç´ åæ¨æ°æ®å°ä¸æ¸¸ã
reactive-native项目框架搭建步骤
搭建一个 React Native 项目通常包括以下几个步骤,让我们逐一过一遍。
首先,你需要创建一个 React Native 项目。完成创建后,运行项目,确保一切正常。
成功创建项目后,使用 Git 初始化并提交代码,以便于版本管理。
接着,添加 Normalize 库。这一步是为了手机进行适配,以确保应用在不同设备上显示一致。你可以从 react-native-normalize 网站获取。
为了更方便地处理文件路径,添加文件路径别名插件。在 babel.config.js 中配置别名,例如使用 @icons、@components 等别名,简化代码结构。
引入 styled-components,这将帮助你以更高效的方式处理样式。详细教程可参考 Marno 的《React Native 高效开发》。
对项目进行整理,将需要的文件夹组织好,保持代码结构清晰。
添加导航库基础,访问 reactnavigation.org 获取详细配置步骤。在安卓代码中进行相应配置,确保 react-native-screens package 正常工作,需要在 MainActivity.java 文件中添加特定代码。
根据需要,可以添加底部导航栏,参考 reactnavigation.org 的文档获取更多信息。完成配置后,根据 UI 设计调整样式。
为了实现简易的提示信息展示,可以使用 GitHub 的 react-native-toast-message 库。如果需要自定义样式,直接在网站上查看并调整即可。
Reactive系统的反压机制
一月份中旬,我在一个Kotlin的聚会上分享了我基于迁移到Reactive的必要条件Spring Boot应用的文章[1],并展示了如何使用Kotlin代码进行展示,同时还介绍了将代码库迁移到协程的步骤。
在问答环节,有人问到是否协程实现了反压。我承认我也不确定,所以我做了一点研究。
本文提供了关于反压的概要信息,并介绍了如何使用Rxjava(v3)、Project Reactor和Kotlin的协程Coroutines处理反压。
什么是反压?反压是指对管道中流体的抵御或反向作用力,导致丧失摩擦力和压力降低。在软件中,反压与这有点关系,但也有不同的含义:假设有一个很快的数据发送方和一个比较慢的数据接收方,反压是指一种机制可以反向推动发送方不要把接收方压垮。
无论是reactivestreams.org还是java.until.concurrent.Flow,反应流都提供以下四个构建块:
•Publisher发送元素
•Subscriber对收到的元素产生反应
•一个Subscription来绑定Publisher和Subscriber
•一个Processor
这是类图:
Subscription的request()方法是反压的顶层。规范很直白:Subscriber必须通过Subscription.request(long n)来发送需求信号后接收onNext信号。这里隐含的规则就是由Subscriber决定什么时候和有多少元素需要被接收。为了避免可重入Subscription方法引起的信号重排序,强烈推荐Subscriber方法的实现在调用Subscription方法的最后对任何信号处理都是用同步的方式。推荐Subscriber请求它们可以处理的上限,因为一次只请求一个元素会导致低效的“停止和等待”协议。
响应流的规范很标准。它们也有基于Java的TCK。
但要定义如何管理producer发送下游无法处理的元素就超出这个规范的范围了。问题比较简单,解决方法也多。每种Reactive框架都有提供方案,我们来看下。
RxJava3的反压提供了以下基础类:
在这些类中,Flowable是唯一实现了Reactive流-反压的流。因此,提供反压不是唯一的问题。RxJava wiki指出:反压并没有解决Observable过度生成或Subscriber过度消费。它只是将这个问题从处理的链条中移动到了一个比较好处理的地方。
为了解决这个,RxJava提供处理“过度生产“元素的两个主要策略:
•将元素存储到一个缓存里,如果没有足够的缓存,可能会产生OutOfMemoryError。
•丢掉数据
Project Reactor中提供的策略与RxJava类似。
API有点不一样。比如,如果生产者溢出Project Reactor提供一个方便的方法来抛异常:
var stream = Stream.generate(Math::random); // RxJava Flowable.fromStream(stream) // 1 .onBackpressureBuffer(0); // 2 // Project Reactor Flux.fromStream(stream) // 1 .onBackpressureError(); // 2
•创建Reactive流
•如果生产者溢出抛异常
下面是高亮了反压能力的Flux类图:
与其他框架相比,Project Reactor提供设置缓存TTL的方法来防止溢出。
协程提供同样的缓存和失效能力。协程的基础类是Flow。
你可以这样使用:
flow { // 1 while (true) emit(Math.random()) // 2 }.buffer()
•建一个Flow类,由下面定义content
•定义Flow的内容
•设置缓存容量为
RxJava,Project Reactor,Kotlin协程都提供反压能力。在生产者比消费者更快时提供两种策略:缓存数据或抛弃数据。