1.Flink Collector Output 接口源码解析
2.Flink深入浅出:JDBC Connector源码分析
3.flink的示灯ParameterTool获取中文乱码问题
4.Flink源码算子
5.Flink mysql-cdc connector 源码解析
6.FLINK 部署(阿里云)、监控 和 源码案例
Flink Collector Output 接口源码解析
Flink Collector Output 接口源码解析
Flink中的源码Collector接口和其扩展Output接口在数据传递中起关键作用。Output接口增加了Watermark功能,码示是示灯数据传输的基石。本文将深入解析collect方法及相关重要实现类,源码帮助理解数据传递的码示轴心点系统源码逻辑和场景划分。Collector和Output接口
Collector接口有2个核心方法,示灯Output接口则增加了4个功能,源码WatermarkGaugeExposingOutput接口则专注于显示Watermark值。码示主要关注collect方法,示灯它是源码数据发送的核心操作,Flink中有多个Output实现类,码示针对不同场景如数据传递、示灯Metrics统计、源码广播和时间戳处理。码示Output实现类分类
Output类可以归类为:同一operatorChain内的数据传递(如ChainingOutput和CopyingChainingOutput)、跨operatorChain间(RecordWriterOutput)、统计Metrics(CountingOutput)、广播(BroadcastingOutputCollector)和时间戳处理(TimestampedCollector)。示例应用与调用链路
通过一个示例,我们了解了Kafka Source与Map算子之间的数据传递使用ChainingOutput,而Map到Process之间的传递则用RecordWriterOutput。在不同Output的选择中,objectReuse配置起着决定性作用,影响性能和安全性。 总结来说,ChainingOutput用于operatorChain内部,RecordWriterOutput处理跨chain,CountingOutput负责Metrics,BroadcastingOutputCollector用于广播,TimestampedCollector则用于设置时间戳。开启objectReuse会影响选择的Output类型。阅读推荐
Flink任务实时监控
Flink on yarn日志收集
Kafka Connector更新
自定义Kafka反序列化
SQL JSON Format源码解析
Yarn远程调试源码
State Processor API状态操作
侧流输出源码
Broadcast流状态源码解析
Flink启动流程分析
Print SQL Connector取样功能
Flink深入浅出:JDBC Connector源码分析
大数据开发中,数据分析与报表制作是日常工作中最常遇到的任务。通常,我们通过读取Hive数据来进行计算,下载cm源码并将结果保存到数据库中,然后通过前端读取数据库来进行报表展示。然而,使用FlinkSQL可以简化这一过程,通过一个SQL语句即可完成整个ETL流程。
在Flink中,读取Hive数据并将数据写入数据库是常见的需求。本文将重点讲解数据如何写入数据库的过程,包括刷写数据库的机制和原理。
以下是本文将讲解的几个部分,以解答在使用过程中可能产生的疑问:
1. 表的定义
2. 定义的表如何找到具体的实现类(如何自定义第三方sink)
3. 写入数据的机制原理
(本篇基于1..0源码整理而成)
1. 表的定义
Flink官网提供了SQL中定义表的示例,以下以oracle为例:
定义好这样的表后,就可以使用insert into student执行插入操作了。接下来,我们将探讨其中的技术细节。
2. 如何找到实现类
实际上,这一过程涉及到之前分享过的SPI(服务提供者接口),即DriverManager去寻找Driver的过程。在Flink SQL执行时,会通过translate方法将SQL语句转换为对应的Operation,例如insert into xxx中的xxx会转换为CatalogSinkModifyOperation。这个操作会获取表的信息,从而得到Table对象。如果这个Table对象是CatalogTable,则会进入TableFactoryService.find()方法找到对应的实现类。
寻找实现类的过程就是SPI的过程。即通过查找路径下所有TableFactory.class的实现类,加载到内存中。这个SPI的定义位于resources下面的META-INFO下,定义接口以及实现类。
加载到内存后,首先判断是否是TableFactory的实现类,然后检查必要的参数是否满足(如果不满足会抛出异常,很多人在第一次使用Flink SQL注册表时,都会遇到NoMatchingTableFactoryException异常,磁力链接 源码其实都是因为配置的属性不全或者Jar报不满足找不到对应的TableFactory实现类造成的)。
找到对应的实现类后,调用对应的createTableSink方法就能创建具体的实现类了。
3. 工厂模式+创建者模式,创建TableSink
JDBCTableSourceSinkFactory是JDBC表的具体实现工厂,它实现了stream的sinkfactory。在1..0版本中,它不能在batch模式下使用,但在1.版本中据说会支持。这个类使用了经典的工厂模式,其中createStreamTableSink负责创建真正的Table,基于创建者模式构建JDBCUpsertTableSink。
创建出TableSink之后,就可以使用Flink API,基于DataStream创建一个Sink,并配置对应的并行度。
4. 消费数据写入数据库
在消费数据的过程中,底层基于PreparedStatement进行批量提交。需要注意的是提交的时机和机制。
控制刷写触发的最大数量 'connector.write.flush.max-rows' = ''
控制定时刷写的时间 'connector.write.flush.interval' = '2s'
这两个条件先到先触发,这两个参数都是可以通过with()属性配置的。
JDBCUpsertFunction很简单,主要的工作是包装对应的Format,执行它的open和invoke方法。其中open负责开启连接,invoke方法负责消费每条数据提交。
接下来,我们来看看关键的format.open()方法:
接下来就是消费数据,执行提交了
AppendWriter很简单,只是对PreparedStatement的封装而已
5. 总结
通过研究代码,我们应该了解了以下关键问题:
1. JDBC Sink执行的机制,比如依赖哪些包?(flink-jdbc.jar,这个包提供了JDBCTableSinkFactory的实现)
2. 如何找到对应的实现?基于SPI服务发现,扫描接口实现类,通过属性过滤,互助系统 源码最终确定对应的实现类。
3. 底层如何提交记录?目前只支持append模式,底层基于PreparedStatement的addbatch+executeBatch批量提交
4. 数据写入数据库的时机和机制?一方面定时任务定时刷新,另一方面数量超过限制也会触发刷新。
更多Flink内容参考:
flink的ParameterTool获取中文乱码问题
在使用ParameterTool获取参数时,可能遇到中文乱码问题。解决方法如下。
首先,检查配置文件内容。
接着,编写测试代码。
运行后,观察输出结果。
深入分析,发现ParameterTool的最终文件读取操作为字节流。
字节流转自中文,容易引发乱码。解决策略是将字节流转换为字符流。
查阅源码,了解InputStreamReader的作用。
使用字符流加载Properties,并通过ParameterTool.fromMap方法创建ParameterTool。
调整后的获取ParameterTool方法。
执行测试,确认问题解决。
Flink源码算子
Flink应用程序的核心组件包括源(source)、转换(transformation)和目的地(sink),它们共同构成有向图,数据流从源开始,流向sink结束。源算子如env.addSource的底层实现涉及监控函数和连续读取文件操作,如env.readTextFile()调用了一系列方法,最终通过add.source添加到流处理环境。
转换算子种类繁多,如map和sum。centos 内核源码map算子通过函数转换,经过层层调用,最终调用transformations.add方法,将算子添加到作业的血缘依赖列表中。print算子作为sink,通过addSink操作生成StreamSink operator,其SinkFunction负责数据处理,如PrintSinkFunction的打印操作。
构建过程中,每次转换都会产生新的数据流,这些StreamTransformation会以隐式链表或图的形式组织起来,input属性记录上下游关系。执行阶段,会生成StreamGraph和JobGraph,然后提交到集群进行调度。
Flink mysql-cdc connector 源码解析
Flink 1. 引入了 CDC功能,用于实时同步数据库变更。Flink CDC Connectors 提供了一组源连接器,支持从MySQL和PostgreSQL直接获取增量数据,如Debezium引擎通过日志抽取实现。以下是Flink CDC源码解析的关键部分:
首先,MySQLTableSourceFactory是实现的核心,它通过DynamicTableSourceFactory接口构建MySQLTableSource对象,获取数据库和表的信息。MySQLTableSource的getScanRuntimeProvider方法负责创建用于读取数据的运行实例,包括DeserializationSchema转换源记录为Flink的RowData类型,并处理update操作时的前后数据。
DebeziumSourceFunction是底层实现,继承了RichSourceFunction和checkpoint接口,确保了Exactly Once语义。open方法初始化单线程线程池以进行单线程读取,run方法中配置DebeziumEngine并监控任务状态。值得注意的是,目前只关注insert, update, delete操作,表结构变更暂不被捕捉。
为了深入了解Flink SQL如何处理列转行、与HiveCatalog的结合、JSON数据解析、DDL属性动态修改以及WindowAssigner源码,可以查阅文章。你的支持是我写作的动力,如果文章对你有帮助,请给予点赞和关注。
本文由文章同步助手协助完成。
FLINK 部署(阿里云)、监控 和 源码案例
FLINK部署、监控与源码实例详解
在实际部署FLINK至阿里云时,POM.xml配置是一个关键步骤。为了减小生产环境的包体积并提高效率,我们通常选择将某些依赖项设置为provided,确保在生产环境中这些jar包已预先存在。而在本地开发环境中,这些依赖需要被包含以支持测试。 核心代码示例中,数据流API的运用尤其引人注目。通过Flink,我们实现了从Kafka到Hologres的高效数据流转。具体步骤如下:Kafka配置:首先,确保Kafka作为数据源的配置正确无误,包括连接参数、主题等,这是整个流程的开端。
Flink处理:Flink的数据流API在此处发挥威力,它可以实时处理Kafka中的数据,执行各种复杂的数据处理操作。
目标存储:数据处理完成后,Flink将结果无缝地发送到Hologres,作为最终的数据存储和分析目的地。
十二、flink源码解析-创建和启动TaskManager二
深入探讨Flink源码中创建与启动TaskManager的过程,我们首先聚焦于内部启动onStart阶段。此阶段核心在于启动TaskExecutorServices服务,具体步骤包括与ResourceManager的连接、注册和资源分配。
当TaskExecutor启动时,首先生成新的注册并创建未完成的future,随后等待注册成功并执行注册操作。这一过程由步骤1至步骤5组成,确保注册与资源连接的无缝集成。一旦注册成功,资源管理器会发送SlotReport报告至TaskExecutor,然后分配slot。
TaskSlotTable开始分配slot,JobTable获取并提供slot至JobManager。这一流程确保资源的有效分配与任务的高效执行。与此同时,ResourceManager侧的TaskExecutor注册流程同样重要,包括连接与注册TaskExecutor。
一旦完成注册与资源分配,ResourceManager会发送SlotReport报告至JobMaster,提供slot以供调度任务。这一步骤标志着slot的分配与JobManager的准备工作就绪,为后续任务部署打下基础。
在ResourceManager侧,slot管理组件注册新的taskManager,根据规则更新slot状态、释放资源或继续执行注册。这一过程确保资源的高效管理与任务的顺利进行。
在JobMaster侧,slot的分配与管理通过slotPool进行,确保待调度任务能够得到所需资源。这一阶段标志着任务调度与执行的准备就绪。
流程的最后,回顾整个创建与启动TaskManager的过程,从资源连接与注册到slot分配与任务调度,各个环节紧密相连,确保Flink系统的高效运行与任务的顺利执行。
经验总结:分享一个Flink checkpoint失败的问题和解决办法
本文分享了Flink作业在执行过程中遇到的checkpoint失败问题及其解决策略。问题背景是Flink作业在执行过程中,多次出现checkpoint失败的情况,导致作业频繁重启,尽管重启后作业通常能恢复正常。最近,同事频繁遇到此问题,因此,本文将深入分析问题原因并提出解决方案。
我们的Flink测试环境配置了三个节点,其中每个节点部署了一个HDFS的DataNode节点,用于Flink的checkpoint和savepoint。日志显示有三个datanode存活,文件副本数量为1,但写文件时出现失败。通过网络搜索相关错误信息,我们尝试了在HDFS上上传和下载文件,结果均正常,这表明HDFS服务没有问题,datanode也处于正常状态。
继续排查过程中,我们注意到namenode日志中出现了一些警告信息,进而怀疑可能与块放置策略有关。按照日志提示,我们开启相应的debug开关并对配置进行了调整。通过追踪日志信息,我们发现存储空间虽有G,但写入块所需的多M空间超出了预留的存储量,导致namenode认为空间不足。
接下来,我们分析了HDFS源码,发现BlockPlacementPolicyDefault等类负责在为块选择datanode时进行检查,包括剩余空间和繁忙程度的评估。在我们的场景中,日志显示存储空间的预留量与实际需求不匹配,导致namenode误判datanode的空间不足。
经过深入分析,我们发现问题的根本原因在于HDFS的块大小设置不当,导致在高并发作业时,短时间内预留了大量存储空间。Flink的checkpoint机制在多个任务线程中频繁写入HDFS,特别是在大量小文件的场景下,短时间内产生的大量小文件(每个文件只有几K大小)导致了datanode的存储空间被大量预占,从而出现空间不足的问题。
为了解决此问题,我们提出了一套配置策略。首先,明确指出块大小不是集群属性,而是文件属性,可以通过客户端配置进行调整。在conf/flink-conf.yaml文件中,我们配置了一个HDFS配置文件路径,与Flink配置文件路径相一致。此外,我们编写了一个hdfs-site.xml文件,其中包含了blockSize的配置,例如设置为1M。配置块大小时,需要根据作业状态文件大小灵活调整,以适应不同的作业需求。
通过上述配置调整,我们成功解决了Flink checkpoint失败的问题,并将其同步至集群自动化部署脚本中,部署时会专门添加blockSize的配置。尽管Flink依赖HDFS的checkpoint方案在轻量级流计算场景中显得较为复杂,但通过优化HDFS的块大小配置,我们有效地避免了空间预占问题,确保了Flink作业的稳定执行。未来,我们期望探索使用其他存储方案,如Elasticsearch,作为checkpoint的分布式存储选项,以进一步优化Flink作业的性能和稳定性。
flink自定义trigger-实现窗口随意输出
之前,我曾简要介绍过flink的窗口以及与Spark Streaming窗口的对比。
关于flink的窗口操作,尤其是基于事件时间的窗口操作,以下三个关键知识点是大家需要掌握的:
flink提供了多种内置的触发器,其中用于基于事件时间的窗口触发器被称为EventTimeTrigger。
若要实现基于事件时间的窗口随意输出,例如每个元素触发一次输出,我们可以通过修改这个触发器来实现。
可能你没有注意到之前提到的触发器的重要性,因为没有触发器的话,在允许事件滞后的情况下,输出时间会延迟较大。而我们需要尽早看到数据,这时就可以自定义窗口触发。
自定义触发器
可以通过修改基于处理时间的触发器来实现,以下是源码:
主要实现逻辑是在onElement函数中,增加了每个元素触发一次计算结果输出的逻辑。
主函数
代码测试已通过。
明天将在知识星球分享一篇干货和代码案例。