详解 Flink 容器化环境下的 OOM Killed
作者:林小铂 在生产环境中,Flink 通常部署在 YARN 或 k8s 等资源管理系统之上,译源源码以容器化(YARN 容器或 docker 等容器)形式运行。译源源码资源管理系统的译源源码严格限制与 JVM 复杂且可控性较弱的内存模型结合,容易导致 Flink 进程因资源使用超标被系统杀死。译源源码Flink 在 1. 版本的译源源码文华8源码输入内存管理重构,设计了全新的译源源码内存参数,以屏蔽复杂内存结构,译源源码但问题排查和修复仍需领域知识,译源源码对于普通用户较为困难。译源源码 本文将深入解析 JVM 和 Flink 的译源源码内存模型,并总结常见导致 Flink 内存使用超出容器限制的译源源码原因。主要讨论在 YARN 部署、译源源码使用 Oracle JDK/OpenJDK 8 及 Flink 1.+ 情况下的译源源码问题。特别感谢社区成员 @宋辛童 和 @唐云 的译源源码贡献,使笔者受益匪浅。JVM 内存分区
对于大多数 Java 用户,日常开发中与 JVM Heap 打交道的频率远高于其他内存分区。而对于 Flink 来说,内存超标问题通常来自 Off-Heap 内存。理解 JVM 内存模型对 Flink 内存管理至关重要。 JVM 8 规定的内存分区包括:Heap、Class、Thread、Compiler、Code Cache、GC、Symbol、Arena Chunk 和 NMT、Internal、Unknown。其中,Heap 区域主要用于存储 new 操作符创建的对象,由 GC 管理,家教php源码可被用户代码或 JVM 本身使用。Class 区域包含类的元数据,对应 Method Area(不含 Constant Pool),在 Java 8 中称为 Metaspace。Thread 区域是线程级别的内存,包含 PC Register、Stack 和 Native Stack 的总和。Compiler 区域用于存储 JIT 编译器使用的内存。Code Cache 存储 JIT 编译器生成的代码缓存。GC 区域用于垃圾回收器的内存。Symbol 区域存储 Symbol(字段名、方法签名、Interned String),对应 Constant Pool。Arena Chunk 是 JVM 申请操作系统内存的临时缓存区。NMT 用于 NMT 自身的内存消耗。Internal 区域是其他不符合分类的内存,包括用户代码申请的 Native/Direct 内存。Unknown 区域是无法分类的内存。 理想情况下,通过严格控制各分区的内存上限,可以保证进程总体内存在容器限额之内。然而,过于严格的管理带来额外使用成本且缺乏灵活性。因此,JVM 通常只对几个暴露给用户使用的分区提供了硬性上限,其他分区则被视为 JVM 内部消耗。Flink TaskManager 内存模型
Flink 1.+ 的 TaskManager 内存模型包括 JVM 管理的 Heap 内存,以及 Flink 自管理的 Off-Heap(Native 和 Direct)内存。Flink 内存管理策略分为硬限制、软限制和预留三种。 硬限制的内存分区是 Self-Contained 的,Flink 会确保其用量不超过设置的oa页面源码阈值,否则抛出类似 OOM 的异常。软限制意味着内存使用长期在阈值以下,但可能短暂超过配置阈值。预留意味着 Flink 不限制分区内存使用,但在规划内存时预留一部分空间,不能保证实际使用不超额。 Flink 内存分区与 JVM 分区间的关系如下:硬限制分区导致 Flink 报内存不足;JVM 管理的分区内存耗尽时,JVM 会报其所属的 JVM 分区 OOM(如 java.lang.OutOfMemoryError: Java heap space);持续内存溢出导致进程总体内存超出容器限制,资源管理器(如 YARN 或 k8s)会杀死该进程。OOM Killed 常见原因
实践中导致 OOM Killed 的常见原因主要源于 Native 内存的泄漏或过度使用。虚拟内存的 OOM 问题通常容易避免且影响不大,所以下文主要讨论物理内存的 OOM 问题。RocksDB Native 内存的不确定性
RocksDB 直接通过 JNI 申请 Native 内存,不受 Flink 控制,Flink 通过设置 RocksDB 内存参数间接影响其使用。由于内存估算不精确,存在部分内存难以准确计算的问题,如 Block Cache、Indexes and filter blocks、Memtable 和 Blocks pinned by Iterator。此外,RocksDB 的内存占用存在不确定性,如 Block Cache 的 bug 可能导致无法严格控制大小。glibc Thread Arena 问题
glibc 的 MB 问题可能导致 JVM 进程内存使用大幅增长,最终被 YARN 杀死。glibc 维护的 Arena 内存池,当线程需要内存但 Main Arena 被其他线程加锁时,glibc 分配大约 MB 的 Thread Arena。这些 Arena 对 JVM 透明,被计入 VIRT 和 RSS 内存,可能导致意外的内存使用。JDK8 Native 内存泄漏
Oracle JDK8u 之前的版本存在 Native 内存泄漏 bug,导致 JVM Internal 内存分区持续增长。appcloud例子源码具体是由于 JVM 缓存字符串符号(Symbol)到方法、成员变量的映射对,导致过时的 MemberName 不会被 GC 自动清理,形成内存泄漏。YARN mmap 内存算法
YARN 根据 /proc/${ pid} 计算进程总体内存时,存在 mmap 共享内存被重复计算的问题,可能导致 Flink 进程被误杀。为了解决这个问题,YARN 提供了配置选项 yarn.nodemanager.container-monitor.procfs-tree.smaps-based-rss.enabled,启用后 YARN 将更准确地计算内存占用。总结
本文深入探讨了 JVM 内存模型、Flink TaskManager 内存模型及常见 OOM Killed 原因。通过分析内存分区、Flink 内存管理策略和常见问题,本文旨在帮助用户理解和解决 Flink 容器化环境下的内存问题。若有不同意见,欢迎留言探讨。flink 1. 1.åºå«
flink 1. 1.åºå«å¨äºFlink 1. æ¯æäº Flink SQL Kafka upsert connector ãå ä¸ºå¨ Flink 1. ä¸ï¼å½åè¿ç±»ä»»å¡å¼å对äºç¨æ·æ¥è¯´ï¼è¿æ¯ä¸å¤å好ï¼éè¦å¾å¤ä»£ç ï¼åæ¶ä¹ä¼é æ Flink SQL åé¿ã
Flink 1. SQL Connector æ¯æ Kafka Upsert Connectorï¼è¿ä¹æ¯æä»¬å ¬å¸å é¨ä¸å¡æ¹å¯¹å®æ¶å¹³å°æåºçéæ±ã
æ¶çï¼ä¾¿å©ç¨æ·æè¿ç§éè¦ä» kafka åææ°è®°å½æä½çå®æ¶ä»»å¡å¼åï¼æ¯å¦è¿ç§ binlog -> kafkaï¼ç¶åç¨æ·èåæä½ï¼è¿ç§åºæ¯è¿æ¯é常å¤çï¼è¿è½æåå®æ¶ä½ä¸å¼åæçï¼åæ¶ 1. åäºä¼åï¼æ§è½ä¼æ¯å纯ç last_value æ§è½è¦å¥½ã
Flink Yarn ä½ä¸ On k8s çç产级å«è½åæ¯ï¼
Flink Jar ä½ä¸å·²ç»å ¨é¨ K8s åï¼Flink SQL ä½ä¸ç±äºæ¯æ¨å¹¿åæï¼è¿æ¯å¨ Yarn ä¸é¢è¿è¡è¿è¡ï¼ä¸ºäºå°å®æ¶è®¡ç® Flink å ¨é¨K8såã
æ以æ们 Flink SQL ä½ä¸ä¹éè¦è¿ç§»å° K8sï¼ç®å Flink 1. å·²ç»æ»¡è¶³ç产级å«ç Flink k8s åè½ï¼æ以 Flink SQL K8s åï¼æç®ç´æ¥ä½¿ç¨ç¤¾åºç On k8s è½åã
é£é©ï¼è½ç¶å社åºç人æ²éï¼Flink 1. on k8s 没æä»ä¹é®é¢ï¼ä½æ¯å ·ä½åè½è¿æ¯éè¦å POC éªè¯ä¸ä¸ï¼åæ¶å¯è½ç¤¾åº Flink on k8s çè½åã
å¯è½ä¼éå¶æ们è¿è¾¹ä¸äº k8s åè½ä½¿ç¨ï¼æ¯å¦ hostpath volome 以å Ingress ç使ç¨ï¼è¿éå¯è½éè¦æ¹åºå±æºç æ¥è¿è¡å¿«éæ¯æï¼ç¤¾åºæç¸å ³ JIRA è¦åï¼ã
Flink Connector 详解
Flink Connector 详解
连接器 Connector 是 Flink 与外部系统间沟通的桥梁,它作为 Flink 数据的重要来源和去向,发挥着至关重要的作用。例如,从 Kafka 读取数据,经过 Flink 处理后再写回到 HIVE、Elasticsearch 等外部系统。
在处理流程中,事件控制、负载均衡、数据解析与序列化等方面,连接器扮演着关键角色。事件处理水印(watermark)和检查点对齐记录,以及数据在不同格式间的转换,都是连接器需要处理的问题。
Source API 作为 Flink 数据的service项目源码入口,经历了接口演进,从 Flink 1. 版本之前的 SourceFunction API 和 InputFormat API,到引入的新 Source API。新 API 的特点是批流统一,实现简单,简化了开发者开发工作。
核心抽象包括记录分片(Split)、记录分片枚举器(Split Enumerator)和 Source 读取器(Source Reader)。记录分片是带有编号的记录集合,记录分片枚举器负责发现和分配记录分片,Source 读取器负责从记录分片读取数据、处理事件时间水印和数据解析。
枚举器-读取器架构中,枚举器运行在 Job Master 上,负责协调和分配任务;读取器运行在 Task Executor 上,负责执行任务。他们的检查点存储各自分开,但存在通信。
Source 读取器设计简化了开发者的开发工作,提供了 SourceReaderBase 类供开发者继承,从而减少开发工作量。
Sink API 作为 Flink 数据的出口,实现了精确一次的语义,通过二阶段提交机制保障数据的完整性和一致性。
Sink 模型未来发展包括连接器测试框架的引入,为连接器提供统一的测试标准,简化开发者的测试工作,降低开发门槛,吸引更多开发者参与 Flink 生态建设。
记一次kfd(kafka+flink+doris)的实时操作
在一次关于kafka+flink+doris的实时操作项目中,我们的目标是为某市医院构建一个统一的数据中心主索引,同时处理历史离线数据和增量实时数据,确保数据的实时性和准确性。我们的技术栈包括centos 7.2作为操作系统,mysql 5.7.用于数据存储,kafka 2.负责数据传输,flink 1..1作为流处理引擎,以及doris 0..0来高效地进行数据存储和分析。
离线处理策略是利用doris的强大能力,通过创建外部映射表并加载到ods库,生成包含患者身份证号的主索引。这个过程是关键的初始化步骤,为后续实时处理奠定了基础。
在增量处理方面,我们采取了高效的策略:首先,canal监控mysql的binlog,实时捕获数据库的更改,然后将增量数据推送到kafka。flink作为实时数据处理引擎,从kafka中读取这些变更,通过Redis缓存进行实时检查。当新数据的身份证号、档案号和医院代码匹配主索引时,flink会进行数据验证和处理,确保数据一致性,不存在时则生成新的索引,并将更新同步到Redis和doris。这一步骤对于实时索引的维护至关重要,避免了重复数据的插入。
项目依赖的库包括flink的kafka连接器、scala编程语言,以及log4j用于日志管理和Redis缓存。以下是一些关键依赖的配置:
org.apache.flink
flink-connector-kafka_${ scala.binary.version}
${ flink.version}
mysql
mysql-connector-java
5.1.
redis.clients
jedis
${ redis.version}
dorisdb-maven-releases
/repository/maven-releases/
在实施过程中,我们遇到了一些挑战,如FE资源限制和数据格式的兼容性问题。通过不断优化,我们最终选择使用HTTP PUT方法进行数据写入,并采取每条数据或3秒一次的频率,确保了数据的高效处理。整个流程包括mysql本地数据的写入、canal的实时同步、kafka的数据推送,以及flink和doris之间的无缝对接,实现了实时数据的实时处理和更新。
通过这次实践,我们不仅提升了数据处理的实时性,还锻炼了团队在分布式系统中的协作和问题解决能力。这次kafka+flink+doris的结合,为医院的数据中心管理带来了显著的效率提升和数据一致性保障。
Flink SQL 性能优化:multiple input 详解
在 Flink 1. 中,Flink 优化了作业执行效率,推出了 multiple input operator 和 source chaining,以处理 operator chaining 无法优化的场景,如多输入算子间的数据shuffle。这些优化旨在消除冗余数据传输,显著提高批作业性能。通过实例,如TPC-DS q的订单量统计,我们发现Flink 1.在执行计划中移除了不必要的forward shuffle,如图1中的hash join中,store_sales表的广播shuffle被优化。
冗余shuffle往往源于算子间的数据整理需求,如hash join的join key分布要求。Flink早期通过operator chaining将相邻单输入算子整合到一个task,但join等多输入算子仍存在shuffle问题。在1.版本中,引入的multiple input operator允许将多输入算子合并到一个task中,同时处理输入优先级,如hash join和nested loop join的顺序要求。
尽管operator chaining仅适用于下游算子入度为1的场景,但source chaining则针对source节点的shuffle进行了优化,将不被shuffle阻隔的source与operator chaining合并,进一步减少数据传输。通过TPC-DS测试,Flink 1.相比1.总用时缩短了近%,显示出source chaining + multiple input的显著效果。
未来,Flink团队计划支持更多批算子和更精细的推导算法,但流作业上的优化仍需解决一些技术挑战,如流算子的优化逻辑和数据格式的要求。随着社区的共同努力,我们期待尽早将这些性能优化应用于更多的作业场景。
Flink深入浅出:JDBC Connector源码分析
大数据开发中,数据分析与报表制作是日常工作中最常遇到的任务。通常,我们通过读取Hive数据来进行计算,并将结果保存到数据库中,然后通过前端读取数据库来进行报表展示。然而,使用FlinkSQL可以简化这一过程,通过一个SQL语句即可完成整个ETL流程。
在Flink中,读取Hive数据并将数据写入数据库是常见的需求。本文将重点讲解数据如何写入数据库的过程,包括刷写数据库的机制和原理。
以下是本文将讲解的几个部分,以解答在使用过程中可能产生的疑问:
1. 表的定义
2. 定义的表如何找到具体的实现类(如何自定义第三方sink)
3. 写入数据的机制原理
(本篇基于1..0源码整理而成)
1. 表的定义
Flink官网提供了SQL中定义表的示例,以下以oracle为例:
定义好这样的表后,就可以使用insert into student执行插入操作了。接下来,我们将探讨其中的技术细节。
2. 如何找到实现类
实际上,这一过程涉及到之前分享过的SPI(服务提供者接口),即DriverManager去寻找Driver的过程。在Flink SQL执行时,会通过translate方法将SQL语句转换为对应的Operation,例如insert into xxx中的xxx会转换为CatalogSinkModifyOperation。这个操作会获取表的信息,从而得到Table对象。如果这个Table对象是CatalogTable,则会进入TableFactoryService.find()方法找到对应的实现类。
寻找实现类的过程就是SPI的过程。即通过查找路径下所有TableFactory.class的实现类,加载到内存中。这个SPI的定义位于resources下面的META-INFO下,定义接口以及实现类。
加载到内存后,首先判断是否是TableFactory的实现类,然后检查必要的参数是否满足(如果不满足会抛出异常,很多人在第一次使用Flink SQL注册表时,都会遇到NoMatchingTableFactoryException异常,其实都是因为配置的属性不全或者Jar报不满足找不到对应的TableFactory实现类造成的)。
找到对应的实现类后,调用对应的createTableSink方法就能创建具体的实现类了。
3. 工厂模式+创建者模式,创建TableSink
JDBCTableSourceSinkFactory是JDBC表的具体实现工厂,它实现了stream的sinkfactory。在1..0版本中,它不能在batch模式下使用,但在1.版本中据说会支持。这个类使用了经典的工厂模式,其中createStreamTableSink负责创建真正的Table,基于创建者模式构建JDBCUpsertTableSink。
创建出TableSink之后,就可以使用Flink API,基于DataStream创建一个Sink,并配置对应的并行度。
4. 消费数据写入数据库
在消费数据的过程中,底层基于PreparedStatement进行批量提交。需要注意的是提交的时机和机制。
控制刷写触发的最大数量 'connector.write.flush.max-rows' = ''
控制定时刷写的时间 'connector.write.flush.interval' = '2s'
这两个条件先到先触发,这两个参数都是可以通过with()属性配置的。
JDBCUpsertFunction很简单,主要的工作是包装对应的Format,执行它的open和invoke方法。其中open负责开启连接,invoke方法负责消费每条数据提交。
接下来,我们来看看关键的format.open()方法:
接下来就是消费数据,执行提交了
AppendWriter很简单,只是对PreparedStatement的封装而已
5. 总结
通过研究代码,我们应该了解了以下关键问题:
1. JDBC Sink执行的机制,比如依赖哪些包?(flink-jdbc.jar,这个包提供了JDBCTableSinkFactory的实现)
2. 如何找到对应的实现?基于SPI服务发现,扫描接口实现类,通过属性过滤,最终确定对应的实现类。
3. 底层如何提交记录?目前只支持append模式,底层基于PreparedStatement的addbatch+executeBatch批量提交
4. 数据写入数据库的时机和机制?一方面定时任务定时刷新,另一方面数量超过限制也会触发刷新。
更多Flink内容参考:
2024-11-15 00:44
2024-11-14 23:54
2024-11-14 23:24
2024-11-14 22:59
2024-11-14 22:50