本站提供最佳软件源码跑码服务,欢迎转载和分享。

【升突源码】【cvv支付源码】【aspnetcore源码解读】flinkhadoop源码

2025-01-28 00:57:29 来源:react源码问题 分类:焦点

1.下面哪些是flink架构的组成部分
2.flink写入hdfs
3.大数据开发_Flink_概述,部署,运行架构,流处理API,Window,WaterMark,ProcessFunction,状态编程
4.docker部署大数据平台(hadoop生态及flink)
5.Flink系列十九Flink 作业Hadoop 依赖冲突解决NoSuchMethodError
6.flink1.10.0连接apache-hive-2.3.7(Java程序)

flinkhadoop源码

下面哪些是flink架构的组成部分

       下面哪些是flink架构的组成部分

       Flink 是一个开源的分布式流处理框架,它由以下几个组成部分:

       Flink 运行时:负责管理 Flink 应用程序的执行,包括任务调度、资源管理、容错等。

       Flink 库:提供各种功能,升突源码如数据流处理、批处理、图算法、机器学习等。

       Flink SQL:一种用于数据仓库和流处理查询的查询语言,支持将 SQL 查询转换为 Flink 应用程序。

       Flink Streaming:一种用于实时数据处理的高级流处理 API,支持事件驱动的cvv支付源码流式应用程序。

       Flink DataSet API:一种用于批处理和流处理的数据集 API,支持迭代式和批处理式的数据处理。

       Flink YARN:一种用于在 YARN 上运行 Flink 应用程序的组件,支持在 Hadoop 生态系统中进行数据分析和处理。

       这些组件共同构成了 Flink 框架,使开发人员能够构建高效、可扩展的实时数据处理应用程序。

flink写入hdfs

       /

**

        * 该策略在以下三种情况下滚动处于 In-progress 状态的部分文件(part file):

       

*

        * 它至少包含 分钟的数据

        * 最近 5 分钟没有收到新的记录

        * 文件大小达到 1GB (写入最后一条记录后)

        * 部分文件(part file)可以处于以下三种状态之一:

       

*

        * In-progress :当前文件正在写入中

        * Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态

        * Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态

        */

        DefaultRollingPolicy rollingPolicy = DefaultRollingPolicy

        .builder()

        .withMaxPartSize(**)// 设置每个文件的最大大小 ,默认是M。这里设置为1G

                .withRolloverInterval(TimeUnit.MINUTES.toMillis(5))// 至少包含分钟的数据

                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))// s空闲,就滚动写入新的文件 近 5 分钟没有收到新的记录

                .build();

        /

**

        * 输出配置

        */

        OutputFileConfig config = OutputFileConfig

        .builder()

        .withPartPrefix("kefu_client_")//前缀

                .withPartSuffix(".log")//后缀

                .build();

        StreamingFileSink sink = StreamingFileSink

        .forRowFormat(new Path(properties.getProperty("etl_hadoop_url")

        + properties.getProperty("etl_storm_kefu_basepath")),new SimpleStringEncoder("UTF-8"))

        .withRollingPolicy(rollingPolicy)

        .withBucketAssigner(new BasePathBucketAssigner())

        .withBucketCheckInterval(L)// 桶检查间隔,这里设置为1s

                .withOutputFileConfig(config)

        .build();

大数据开发_Flink_概述,部署,运行架构,流处理API,Window,WaterMark,ProcessFunction,状态编程

       Apache Flink是一个处理框架,专为实时和离线数据流的复杂状态计算设计,旨在提供低延迟、高吞吐量、准确性和容错性的处理能力。

       批处理作为其特殊类型,aspnetcore源码解读Flink旨在通过并行处理和分布式架构来优化性能。

       快速上手Flink,可选择在Standalone模式部署,通过slot(资源分配的基本单位)来分配资源,或者在生产环境中利用YARN(容器化资源管理)和Hadoop。session-cluster模式适用于小规模短时作业,而per-job-cluster模式则适合大型长期作业,甚至在Kubernetes上运行,以简化运维。

       Flink的运行架构包括客户端提交任务,通过HDFS和YARN进行资源管理。任务首先由JobManager调度至TaskManager,TaskManager之间通过流式通信。11101000的源码客户端负责数据流的准备和提交,而JobManager和TaskManager作为独立的JVM进程运行。

       Flink的流处理API基于数据流的链式结构,包括数据源、转换和sink。算子的并行度决定了子任务的数量。对于数据处理,Flink支持多种数据源,如Kafka、Redis、Elasticsearch和自定义JDBC sink。窗口功能将无限流分割为有限流,便于分析。hook时间源码

       EventTime和Watermark机制在处理乱序数据时至关重要,通过设置Watermark的延迟,Flink确保数据的准确处理和迟到数据的处理。ProcessFunction API允许开发者访问时间戳、Watermark,以及创建自定义事件驱动应用和业务逻辑。

       Flink的核心容错机制是一致性检查点,通过保存任务处理状态实现故障恢复。除了检查点,用户还可以利用保存点进行备份、更新或迁移应用。状态一致性保证了流处理结果的准确性,而端到端的数据保证则确保了整个处理过程的可靠性。

docker部署大数据平台(hadoop生态及flink)

       使用docker部署大数据平台(如Hadoop生态及Flink)能够显著缩短部署时间,提高数据开发效率。以下为搭建过程的概要:

       主流的大数据平台架构包括数据采集(Flume或Beats)、数据存储(HDFS、Hive、ES、HBase)、实时分析(Flink)、数据查询(Presto、Clickhouse)等组件。

       通过docker-compose一键部署,实现大数据平台快速搭建。组件版本如下:Apache Hadoop 3.2、Prestodb 0.、Kafka 2.0+、Hbase 2.2、Hive 3.1.2、ELK 7.9.1、Flink on yarn 1..3。

       部署步骤如下:

       1. 安装docker,确保系统兼容性。

       2. 安装docker-compose工具。

       3. 通过git clone获得docker compose文件。

       4. 切换至部署目录,运行docker-compose up -d命令启动服务。

       当前,各组件的dockerfile文件暂未开源,但所有组件基于Apache开源版本,开发时可安心使用。后续计划整合相关测试工具并开源。

Flink系列十九Flink 作业Hadoop 依赖冲突解决NoSuchMethodError

       Flink提交作业时,可能会遇到NoSuchMethodError的异常,这通常与Hadoop依赖冲突有关。查看源码后发现,错误源于2.6.0-cdh5..1版本的FsTracer通过hadoop-common加载了TraceUtils,但实际加载的是2.7.x版本的TraceUtils。因此,问题出在版本兼容性上。有以下两种解决方案:

       第一类解决方案是手动从jar包中排除冲突依赖。这需要识别冲突的库,并在Flink构建过程中排除它们,确保加载的库版本与期望一致。

       第二类解决方案是通过打包工具精确排除字节码。这可以更细致地控制类加载过程,避免不兼容版本的类被加载。

       深入理解这一问题,有助于我们意识到在使用Flink与外部系统集成时,版本兼容性是一个不容忽视的挑战。为避免此类问题,需要仔细管理依赖库的版本,确保它们之间无冲突。

       解决此类问题的最新方法(适用于所有Flink版本)在上一篇文章中已有详细描述,参见Flink系列十八HDFS_DELEGATION_TOKEN过期的问题解决汇总。

flink1..0连接apache-hive-2.3.7(Java程序)

       通过Java程序连接Apache Flink 1..0与Apache Hive 2.3.7,主要步骤如下:

       第一步:添加依赖,包括以下库:

       org.apache.flink flink-table-planner-blink_2. 1..0

       org.apache.flink flink-connector-hive_2. 1..0

       org.apache.flink flink-table-api-java-bridge_2. 1..0

       org.apache.hive hive-exec ${ hive.version} provided

       org.apache.hadoop hadoop-mapreduce-client-core 2.7.3

       org.apache.hadoop hadoop-common 2.7.3

       org.apache.hadoop hadoop-mapreduce-client-common 2.7.3

       org.apache.hadoop hadoop-mapreduce-client-jobclient 2.7.3

       org.apache.flink flink-shaded-hadoop-2-uber 2.7.5-8.0 provided

       org.datanucleus datanucleus-api-jdo 4.2.4

       org.datanucleus datanucleus-core 4.1.

       org.datanucleus datanucleus-rdbms 4.1.

       mysql mysql-connector-java 8.0.

       org.datanucleus javax.jdo 3.2.0-m3

       第二步:编写程序,加载hive-site.xml,并执行如下代码:

       java

       import org.apache.flink.table.api.EnvironmentSettings;

       import org.apache.flink.table.api.Table;

       import org.apache.flink.table.api.TableEnvironment;

       import org.apache.flink.table.catalog.hive.HiveCatalog;

       public class HiveConnect {

        public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings

        .newInstance()

        .useBlinkPlanner()

        .inBatchMode()

        .build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String name = "myhive";

        String defaultDatabase = "ydj";

        String hiveConfDir = "D:\\OfficeWork\\WorkSpace\\GitLab\\big-data-extension\\achilles-algorithm-platform\\src\\main\\resources";

        String version = "2.3.4";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);

        tableEnv.registerCatalog("myhive", hive);

        tableEnv.useCatalog("myhive");

        String createDbSql = "select * from ydj.center";

        Table table = tableEnv.sqlQuery(createDbSql);

        System.out.println(table);

        }

       }

【本文网址:http://8o.net.cn/news/19d145498526.html 欢迎转载】

copyright © 2016 powered by 皮皮网   sitemap