【大联盟 源码】【javacore源码分析】【iris源码大全】spark 内存管理源码_spark内存管理源码

时间:2024-11-14 14:42:49 来源:棋盘游戏php源码 编辑:aworks 源码

1.RDD的内内存cache和persist原理
2.Apache 两个开源项目比较:Flink vs Spark
3.spark有哪些组件
4.Spark原理 | 内存管理
5.SparkShuffle及Spark SQL图解执行流程语法
6.Spark原理详解

spark 内存管理源码_spark内存管理源码

RDD的cache和persist原理

       在Spark数据处理中,为了提升性能,存管通常会利用RDD的理源缓存功能,通过persist()或cache()方法将计算结果存储在内存或磁盘中。管理这不仅避免了重复计算,源码还支持算法迭代和快速交互式使用。内内存大联盟 源码RDD的存管缓存机制具有容错性,数据丢失后会自动重新计算。理源

       Spark提供了多种存储级别,管理如MEMORY_ONLY(仅内存)、源码MEMORY_AND_DISK(内存和磁盘)、内内存MEMORY_ONLY_SER(序列化内存)等,存管用户可以根据需求选择合适的理源级别,以平衡内存使用和CPU效率。管理默认情况下,源码RDD缓存级别为NONE,调用persist()后才会生效。Shuffle操作时,Spark会自动缓存一些数据以提高容错性。

       选择存储级别时,需考虑计算成本和数据访问速度。例如,如果数据量大且计算代价高,选择DISK_ONLY可能更合适;对于快速访问,MEMORY_ONLY或MEMORY_ONLY_SER更为理想。此外,replication选项允许设置数据副本,提供容错,但会增加存储和计算资源消耗。

       persist()函数的实现并不直接进行数据缓存,而是设置RDD的storageLevel,当读取或计算分区时,根据存储级别决定是否进行缓存。这个过程在计算RDD时触发,数据会存储在内存或磁盘,具体步骤包括判断存储级别、从内存或磁盘获取数据,或计算并保存数据。

       通过unpersist()函数,用户可以手动清除缓存,Spark会自动管理LRU缓存。在SparkContext中,一旦设置的storageLevel不可修改,确保数据操作的一致性。

       通过实践操作,如测试不同存储级别的缓存,可以更好地理解RDD缓存和persist的工作原理。总的来说,Spark的缓存功能是提高性能的关键手段,合理选择和管理存储级别至关重要。

Apache 两个开源项目比较:Flink vs Spark

       时间久远,我对云计算与大数据已感生疏,尤其是Flink的崛起。自动驾驶平台需云计算支撑,包括机器学习、深度学习训练、javacore源码分析高清地图、模拟仿真模块,以及车联网。近日看到一篇Spark与Flink的比较文章,遂转发分享,以便日后重新学习该领域新知识。

       Apache Flink作为新一代通用大数据处理引擎,致力于整合各类数据负载。它似乎与Apache Spark有着相似目标。两者都旨在构建一个单一平台,用于批处理、流媒体、交互式、图形处理、机器学习等。因此,Flink与Spark在理念上并无太大差异。但在实施细节上,它们却存在显著区别。

       以下比较Spark与Flink的不同之处。尽管两者在某些方面存在相似之处,但也有许多不同之处。

       1. 抽象

       在Spark中,批处理采用RDD抽象,而流式传输使用DStream。Flink为批处理数据集提供数据集抽象,为流应用程序提供DataStream。尽管它们听起来与RDD和DStreams相似,但实际上并非如此。

       以下是差异点:

       在Spark中,RDD在运行时表示为Java对象。随着project Tungsten的推出,它略有变化。但在Apache Flink中,数据集被表示为一个逻辑计划。这与Spark中的Dataframe相似,因此在Flink中可以像使用优化器优化的一等公民那样使用API。然而,Spark RDD之间并不进行任何优化。

       Flink的数据集类似Spark的Dataframe API,在执行前进行了优化。

       在Spark 1.6中,数据集API被添加到spark中,可能最终取代RDD抽象。

       在Spark中,所有不同的抽象,如DStream、Dataframe都建立在RDD抽象之上。但在Flink中,Dataset和DataStream是基于顶级通用引擎构建的两个独立抽象。尽管它们模仿了类似的API,但在DStream和RDD的情况下,无法将它们组合在一起。尽管在这方面有一些努力,但最终结果还不够明确。iris源码大全

       无法将DataSet和DataStream组合在一起,如RDD和DStreams。

       因此,尽管Flink和Spark都有类似的抽象,但它们的实现方式不同。

       2. 内存管理

       直到Spark 1.5,Spark使用Java堆来缓存数据。虽然项目开始时更容易,但它导致了内存不足(OOM)问题和垃圾收集(gc)暂停。因此,从1.5开始,Spark进入定制内存管理,称为project tungsten。

       Flink从第一天起就开始定制内存管理。实际上,这是Spark向这个方向发展的灵感之一。不仅Flink将数据存储在它的自定义二进制布局中,它确实直接对二进制数据进行操作。在Spark中,所有数据帧操作都直接在Spark 1.5的project tungsten二进制数据上运行。

       在JVM上执行自定义内存管理可以提高性能并提高资源利用率。

       3. 实施语言

       Spark在Scala中实现。它提供其他语言的API,如Java、Python和R。

       Flink是用Java实现的。它确实提供了Scala API。

       因此,与Flink相比,Spark中的选择语言更好。在Flink的一些scala API中,java抽象也是API的。这会有所改进,因为已经使scala API获得了更多用户。

       4. API

       Spark和Flink都模仿scala集合API。所以从表面来看,两者的API看起来非常相似。

       5. 流

       Apache Spark将流式处理视为快速批处理。Apache Flink将批处理视为流处理的特殊情况。这两种方法都具有令人着迷的含义。

       以下是两种不同方法的差异或含义:

       Apache Flink提供事件级处理,也称为实时流。它与Storm模型非常相似。

       Spark只有不提供事件级粒度的最小批处理(mini-batch)。这种方法被称为近实时。

       Spark流式处理是更快的批处理,Flink批处理是有限的流处理。

       虽然大多数应用程序都可以近乎实时地使用,但很少有应用程序需要事件级实时处理。这些应用程序通常是Storm流而不是Spark流。对于他们来说,Flink将成为一个非常有趣的选择。

       运行流处理作为更快批处理的优点之一是,我们可以在两种情况下使用相同的抽象。Spark非常支持组合批处理和流数据,小说源码会员因为它们都使用RDD抽象。

       在Flink的情况下,批处理和流式传输不共享相同的API抽象。因此,尽管有一些方法可以将基于历史文件的数据与流相结合,但它并不像Spark那样干净。

       在许多应用中,这种能力非常重要。在这些应用程序中,Spark代替Flink流式传输。

       由于最小批处理的性质,Spark现在对窗口的支持非常有限。允许根据处理时间窗口批量处理。

       与其他任何系统相比,Flink提供了非常灵活的窗口系统。Window是Flink流API的主要焦点之一。它允许基于处理时间、数据时间和无记录等的窗口。这种灵活性使Flink流API与Spark相比非常强大。

       6. SQL界面

       截至目前,最活跃的Spark库之一是spark-sql。Spark提供了像Hive一样的查询语言和像DSL这样的Dataframe来查询结构化数据。它是成熟的API并且在批处理中广泛使用,并且很快将在流媒体世界中使用。

       截至目前,Flink Table API仅支持DSL等数据帧,并且仍处于测试阶段。有计划添加sql接口,但不确定何时会落在框架中。

       目前为止,Spark与Flink相比有着不错的SQL故事。

       7. 数据源集成

       Spark数据源API是框架中最好的API之一。数据源API使得所有智能资源如NoSQL数据库、镶嵌木地板、优化行列(Optimized Row Columnar,ORC)成为Spark上的头等公民。此API还提供了在源级执行谓词下推(predicate push down)等高级操作的功能。

       Flink仍然在很大程度上依赖于map / reduce InputFormat来进行数据源集成。虽然它是足够好的提取数据API,但它不能巧妙地利用源能力。因此Flink目前落后于目前的数据源集成技术。

       8. 迭代处理

       Spark最受关注的功能之一就是能够有效地进行机器学习。在内存缓存和其他实现细节中,它是实现机器学习算法的真正强大的平台。

       虽然ML算法是循环数据流,但它表示为Spark内部的直接非循环图。通常,没有分布式处理系统鼓励循环数据流,因为它们变得难以理解。

       但是Flink对其他人采取了一些不同的方法。它们在运行时支持受控循环依赖图(cyclic dependence graph)。这使得它们与DAG表示相比以非常有效的方式表示ML算法。因此,Flink支持本机平台中的迭代,与DAG方法相比,strap源码分析可实现卓越的可扩展性和性能。

       9. 流作为平台与批处理作为平台

       Apache Spark来自Map / Reduce时代,它将整个计算表示为数据作为文件集合的移动。这些文件可能作为磁盘上的阵列或物理文件驻留在内存中。这具有非常好的属性,如容错等。

       但是Flink是一种新型系统,它将整个计算表示为流处理,其中数据有争议地移动而没有任何障碍。这个想法与像akka-streams这样的新的反应流系统非常相似。

       . 成熟

       Flink像批处理这样的部分已经投入生产,但其他部分如流媒体、Table API仍在不断发展。这并不是说在生产中就没人使用Flink流。

spark有哪些组件

       Spark的组件主要包括以下几个部分:

       一、Spark Core(Spark核心组件)

       Spark Core是Spark框架的核心,它提供了Spark集群的运行环境以及任务调度、内存管理、错误检测等功能。Spark Core是整个Spark应用程序的起点和中心,负责管理和协调其他组件的工作。

       二、Spark SQL(Spark SQL组件)

       Spark SQL是Spark用于处理结构化数据的工具,它允许用户使用SQL语言来查询和分析数据。通过Spark SQL,用户可以更方便地处理大数据集并获取结果。

       三、Spark Streaming(Spark流处理组件)

       Spark Streaming是Spark中用于处理实时数据的组件。它可以接收来自各种源(如Kafka、Twitter等)的实时数据,并将其转换为DStream(离散数据流),然后进行处理和分析。这对于需要实时分析大数据的应用程序非常有用。

       四、Spark MLlib(Spark机器学习库)

       Spark MLlib是Spark中用于数据分析和机器学习的库。它提供了许多常用的算法和工具,如分类、聚类、回归等。通过Spark MLlib,用户可以在Spark集群上进行大规模的数据分析和机器学习任务。

       除了上述主要组件外,Spark还有其他一些辅助组件,如GraphX(用于图计算)、PySpark(Python接口的Spark)等。这些组件都是为了让用户在处理和分析大数据时更加方便和高效而设计的。通过结合使用这些组件,用户可以在Spark上构建出强大的大数据处理和分析应用程序。

       总体来说,Apache Spark是一个集成了多种组件的大规模数据处理框架。这些组件协同工作,使得在分布式环境中进行高效、快速的数据处理和分析成为可能。

Spark原理 | 内存管理

       Spark作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色。

        在执行Spark的应用程序时,Spark集群会启动Driver和Executor两种JVM进程:

        Spark管理的内存主要划分为4个区域:

        Executor作为一个JVM进程,它的内存管理建立在JVM的内存管理之上,Spark对JVM的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。

        堆内内存的大小,由 Spark 应用程序启动时的 executor-memory 或 spark.executor.memory 参数配置。Executor 内运行的并发任务共享 JVM 堆内内存,这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些 Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间。不同的管理模式下,这三部分占用的空间大小各不相同。

        Spark 对堆内内存的管理是一种逻辑上的"规划式"的管理,因为对象实例占用内存的申请和释放都由 JVM 完成,Spark 只能在申请后和释放前记录这些内存,我们来看其具体流程:

        为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。利用 JDK Unsafe API(从 Spark 2.0 开始),在管理堆外的存储内存时不再基于 Tachyon,而是与堆外的执行内存一样,基于 JDK Unsafe API 实现,Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。

        在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。

        Spark 1.6 之后默认为统一管理(UnifiedMemoryManager)方式,1.6 之前采用的静态管理(StaticMemoryManager)方式仍被保留,可通过配置 spark.memory.useLegacyMode=true 参数启用静态内存管理方式。下面我们介绍下两种内存管理模型的进化。

        在 Spark 最初采用的静态内存管理机制下,存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置,堆内内存的分配如下所示:

        Spark 1.6 之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域。如下图所示:

        其中最重要的优化在于动态占用机制,其规则如下:

        新的版本引入了新的配置项:

        凭借统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护 Spark 内存的难度,但并不意味着开发者可以高枕无忧。譬如,所以如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,降低任务执行时的性能,因为缓存的 RDD 数据通常都是长期驻留内存的。所以要想充分发挥 Spark 的性能,需要开发者进一步了解存储内存和执行内存各自的管理方式和实现原理。

SparkShuffle及Spark SQL图解执行流程语法

       SparkShuffle是Apache Spark中的一个核心概念,主要涉及数据分片、聚合与分发的过程。在使用reduceByKey等操作时,数据会被划分到不同的partition中,但每个key可能分布在不同的节点上。为了解决这一问题,Spark引入了Shuffle机制,主要分为两种类型:HashShuffleManager与SortShuffleManager。

       HashShuffleManager在Spark 1.2之前是默认选项,它通过分区器(默认是hashPartitioner)决定数据写入的磁盘小文件。在Shuffle Write阶段,每个map task将结果写入到不同的文件中。Shuffle Read阶段,reduce task从所有map task所在的机器上寻找属于自己的文件,确保了数据的聚合。然而,这种方法会产生大量的磁盘小文件,导致频繁的磁盘I/O操作、内存对象过多、频繁的垃圾回收(GC)以及网络通信故障,从而影响性能。

       SortShuffleManager在Spark 1.2引入,它改进了数据的处理流程。在Shuffle阶段,数据写入内存结构,当内存结构达到一定大小时(默认5M),内存结构会自动进行排序分区并溢写磁盘。这种方式在Shuffle阶段减少了磁盘小文件的数量,同时在Shuffle Read阶段通过解析索引文件来拉取数据,提高了数据读取的效率。

       Spark内存管理分为静态内存管理和统一内存管理。静态内存管理中内存大小在应用运行期间固定,统一内存管理则允许内存空间共享,提高了资源的利用率。Spark1.6版本默认采用统一内存管理,可通过配置参数spark.memory.useLegacyMode来切换。

       Shuffle优化涉及多个参数的调整。例如,`spark.shuffle.file.buffer`参数用于设置缓冲区大小,适当增加此值可以减少磁盘溢写次数。`spark.reducer.maxSizeInFlight`参数则影响数据拉取的次数,增加此值可以减少网络传输,提升性能。`spark.shuffle.io.maxRetries`参数控制重试次数,增加重试次数可以提高稳定性。

       Shark是一个基于Spark的SQL执行引擎,兼容Hive语法,性能显著优于MapReduce的Hive。Shark支持交互式查询应用服务,其设计架构对Hive的依赖性强,限制了其长期发展,但提供了与Spark其他组件更好的集成性。SparkSQL则是Spark平台的SQL接口,支持查询原生的RDD和执行Hive语句,提供了Scala中写SQL的能力。

       DataFrame作为Spark中的分布式数据容器,类似于传统数据库的二维表格,不仅存储数据,还包含数据结构信息(schema)。DataFrame支持嵌套数据类型,提供了一套更加用户友好的API,简化了数据处理的复杂性。通过注册为临时表,DataFrame的列默认按ASCII顺序显示。

       SparkSQL的数据源丰富,包括JSON、JDBC、Parquet、HDFS等。其底层架构包括解析、分析、优化、生成物理计划以及任务执行。谓词下推(predicate Pushdown)是优化策略之一,能够提前执行条件过滤,减少数据的处理量。

       创建DataFrame的方式多样,可以从JSON、非JSON格式的RDD、Parquet文件以及JDBC中的数据导入。DataFrame的转换与操作提供了灵活性和效率,支持通过反射方式转换非JSON格式的RDD,但不推荐使用。动态创建Schema是将非JSON格式的RDD转换成DataFrame的一种方法。读取Parquet文件和Hive中的数据均支持DataFrame的创建和数据的持久化存储。

       总之,SparkShuffle及Spark SQL通过高效的内存管理、优化的Shuffle机制以及灵活的数据源支持,为大数据处理提供了强大而高效的能力。通过合理配置参数和优化流程,能够显著提升Spark应用程序的性能。

Spark原理详解

       Spark原理详解:

       Spark是一个专为大规模数据处理设计的内存计算框架,其高效得益于其核心组件——弹性数据分布集RDD。RDD是Spark的数据结构,它将数据存储在分布式内存中,通过逻辑上的集中管理和物理上的分布式存储,提供了高效并行计算的能力。

       RDD的五个关键特性如下:

       每个RDD由多个partition组成,用户可以指定分区数量,默认为CPU核心数。每个partition独立处理,便于并行计算。

       Spark的计算基于partition,算子作用于partition上,无需保存中间结果,提高效率。

       RDD之间有依赖性,数据丢失时仅重新计算丢失分区,避免全量重算。

       对于key-value格式的RDD,有Partitioner决定分片和数据分布,优化数据处理的本地化。

       Spark根据数据位置调度任务,实现“移动计算”而非数据。

       Spark区分窄依赖(一对一)和宽依赖(一对多),前者不涉及shuffle,后者则会根据key进行数据切分。

       Spark的执行流程包括用户提交任务、生成DAG、划分stage和task、在worker节点执行计算等步骤。创建RDD的方式多样,包括程序中的集合、本地文件、HDFS、数据库、NoSQL和数据流等。

       技术栈方面,Spark与HDFS、YARN、MR、Hive等紧密集成,提供SparkCore、SparkSQL、SparkStreaming等扩展功能。

       在编写Spark代码时,首先创建SparkConf和SparkContext,然后操作RDD进行转换和应用Action,最后关闭SparkContext。理解底层机制有助于优化资源使用,如HDFS文件的split与partition关系。

       搭建Spark集群涉及上传、配置worker和master信息,以及启动和访问。内存管理则需注意Executor的off-heap和heap,以及Spark内存的分配和使用。

Spark内存管理详解(下)——内存管理

        弹性分布式数据集(RDD)作为Spark最根本的数据抽象,是只读的分区记录(Partition)的集合,只能基于在稳定物理存储中的数据集上创建,或者在其他已有的RDD上执行转换(Transformation)操作产生一个新的RDD。转换后的RDD与原始的RDD之间产生的依赖关系,构成了血统(Lineage)。凭借血统,Spark保证了每一个RDD都可以被重新恢复。但RDD的所有转换都是惰性的,即只有当一个返回结果给Driver的行动(Action)发生时,Spark才会创建任务读取RDD,然后真正触发转换的执行。

        Task在启动之初读取一个分区时,会先判断这个分区是否已经被持久化,如果没有则需要检查Checkpoint或按照血统重新计算。所以如果一个RDD上要执行多次行动,可以在第一次行动中使用persist或cache方法,在内存或磁盘中持久化或缓存这个RDD,从而在后面的行动时提升计算速度。事实上,cache方法是使用默认的MEMORY_ONLY的存储级别将RDD持久化到内存,故缓存是一种特殊的持久化。堆内和堆外存储内存的设计,便可以对缓存RDD时使用的内存做统一的规划和管理(存储内存的其他应用场景,如缓存broadcast数据,暂时不在本文的讨论范围之内)。

        RDD的持久化由Spark的Storage模块 [1] 负责,实现了RDD与物理存储的解耦合。Storage模块负责管理Spark在计算过程中产生的数据,将那些在内存或磁盘、在本地或远程存取数据的功能封装了起来。在具体实现时Driver端和Executor端的Storage模块构成了主从式的架构,即Driver端的BlockManager为Master,Executor端的BlockManager为Slave。Storage模块在逻辑上以Block为基本存储单位,RDD的每个Partition经过处理后唯一对应一个Block(BlockId的格式为 rdd_RDD-ID_PARTITION-ID )。Master负责整个Spark应用程序的Block的元数据信息的管理和维护,而Slave需要将Block的更新等状态上报到Master,同时接收Master的命令,例如新增或删除一个RDD。

        在对RDD持久化时,Spark规定了MEMORY_ONLY、MEMORY_AND_DISK等7种不同的 存储级别 ,而存储级别是以下5个变量的组合 [2] :

        通过对数据结构的分析,可以看出存储级别从三个维度定义了RDD的Partition(同时也就是Block)的存储方式:

        RDD在缓存到存储内存之前,Partition中的数据一般以迭代器( Iterator )的数据结构来访问,这是Scala语言中一种遍历数据集合的方法。通过Iterator可以获取分区中每一条序列化或者非序列化的数据项(Record),这些Record的对象实例在逻辑上占用了JVM堆内内存的other部分的空间,同一Partition的不同Record的空间并不连续。

        RDD在缓存到存储内存之后,Partition被转换成Block,Record在堆内或堆外存储内存中占用一块连续的空间。将Partition由不连续的存储空间转换为连续存储空间的过程,Spark称之为“展开”(Unroll)。Block有序列化和非序列化两种存储格式,具体以哪种方式取决于该RDD的存储级别。非序列化的Block以一种DeserializedMemoryEntry的数据结构定义,用一个数组存储所有的Java对象,序列化的Block则以SerializedMemoryEntry的数据结构定义,用字节缓冲区(ByteBuffer)来存储二进制数据。每个Executor的Storage模块用一个链式Map结构(LinkedHashMap)来管理堆内和堆外存储内存中所有的Block对象的实例 [6] ,对这个LinkedHashMap新增和删除间接记录了内存的申请和释放。

        因为不能保证存储空间可以一次容纳Iterator中的所有数据,当前的计算任务在Unroll时要向MemoryManager申请足够的Unroll空间来临时占位,空间不足则Unroll失败,空间足够时可以继续进行。对于序列化的Partition,其所需的Unroll空间可以直接累加计算,一次申请。而非序列化的Partition则要在遍历Record的过程中依次申请,即每读取一条Record,采样估算其所需的Unroll空间并进行申请,空间不足时可以中断,释放已占用的Unroll空间。如果最终Unroll成功,当前Partition所占用的Unroll空间被转换为正常的缓存RDD的存储空间,如下图2所示。

        在 《Spark内存管理详解(上)——内存分配》 的图3和图5中可以看到,在静态内存管理时,Spark在存储内存中专门划分了一块Unroll空间,其大小是固定的,统一内存管理时则没有对Unroll空间进行特别区分,当存储空间不足是会根据动态占用机制进行处理。

        由于同一个Executor的所有的计算任务共享有限的存储内存空间,当有新的Block需要缓存但是剩余空间不足且无法动态占用时,就要对LinkedHashMap中的旧Block进行淘汰(Eviction),而被淘汰的Block如果其存储级别中同时包含存储到磁盘的要求,则要对其进行落盘(Drop),否则直接删除该Block。

        存储内存的淘汰规则为:

        落盘的流程则比较简单,如果其存储级别符合 _useDisk 为true的条件,再根据其 _deserialized 判断是否是非序列化的形式,若是则对其进行序列化,最后将数据存储到磁盘,在Storage模块中更新其信息。

        Executor内运行的任务同样共享执行内存,Spark用一个HashMap结构保存了任务到内存耗费的映射。每个任务可占用的执行内存大小的范围为 1/2N ~ 1/N ,其中N为当前Executor内正在运行的任务的个数。每个任务在启动之时,要向MemoryManager请求申请最少为1/2N的执行内存,如果不能被满足要求则该任务被阻塞,直到有其他任务释放了足够的执行内存,该任务才可以被唤醒。

        执行内存主要用来存储任务在执行Shuffle时占用的内存,Shuffle是按照一定规则对RDD数据重新分区的过程,我们来看Shuffle的Write和Read两阶段对执行内存的使用:

        在ExternalSorter和Aggregator中,Spark会使用一种叫AppendOnlyMap的哈希表在堆内执行内存中存储数据,但在Shuffle过程中所有数据并不能都保存到该哈希表中,当这个哈希表占用的内存会进行周期性地采样估算,当其大到一定程度,无法再从MemoryManager申请到新的执行内存时,Spark就会将其全部内容存储到磁盘文件中,这个过程被称为溢存(Spill),溢存到磁盘的文件最后会被归并(Merge)。

        Shuffle Write阶段中用到的Tungsten是Databricks公司提出的对Spark优化内存和CPU使用的计划 [4] ,解决了一些JVM在性能上的限制和弊端。Spark会根据Shuffle的情况来自动选择是否采用Tungsten排序。Tungsten采用的页式内存管理机制建立在MemoryManager之上,即Tungsten对执行内存的使用进行了一步的抽象,这样在Shuffle过程中无需关心数据具体存储在堆内还是堆外。每个内存页用一个MemoryBlock来定义,并用 Object obj 和 long offset 这两个变量统一标识一个内存页在系统内存中的地址。堆内的MemoryBlock是以long型数组的形式分配的内存,其 obj 的值为是这个数组的对象引用, offset 是long型数组的在JVM中的初始偏移地址,两者配合使用可以定位这个数组在堆内的绝对地址;堆外的MemoryBlock是直接申请到的内存块,其 obj 为null, offset 是这个内存块在系统内存中的位绝对地址。Spark用MemoryBlock巧妙地将堆内和堆外内存页统一抽象封装,并用页表(pageTable)管理每个Task申请到的内存页。

        Tungsten页式管理下的所有内存用位的逻辑地址表示,由页号和页内偏移量组成:

        有了统一的寻址方式,Spark可以用位逻辑地址的指针定位到堆内或堆外的内存,整个Shuffle Write排序的过程只需要对指针进行排序,并且无需反序列化,整个过程非常高效,对于内存访问效率和CPU使用效率带来了明显的提升 [5] 。

        Spark的存储内存和执行内存有着截然不同的管理方式:对于存储内存来说,Spark用一个LinkedHashMap来集中管理所有的Block,Block由需要缓存的RDD的Partition转化而成;而对于执行内存,Spark用AppendOnlyMap来存储Shuffle过程中的数据,在Tungsten排序中甚至抽象成为页式内存管理,开辟了全新的JVM内存管理机制。

        Spark的内存管理是一套复杂的机制,且Spark的版本更新比较快,笔者水平有限,难免有叙述不清、错误的地方,若读者有好的建议和更深的理解,还望不吝赐教。

大数据面试题-Spark的内存模型

       面试题来源:可回答:1)Spark内存管理的结构;2)Spark的Executor内存分布(参考“内存空间分配”)

       1、堆内和堆外内存规划

       作为一个JVM 进程,Executor 的内存管理建立在JVM的内存管理之上,Spark对JVM的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。

       堆内内存受到JVM统一管理,堆外内存是直接向操作系统进行内存的申请和释放。

       默认情况下,Spark 仅仅使用了堆内内存。Executor 端的堆内内存区域大致可以分为以下四大块:堆内内存的大小,由Spark应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置。这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划。

       Spark对堆内内存的管理是一种逻辑上的”规划式”的管理。不同管理模式下,这三部分占用的空间大小各不相同。

       堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。

       利用JDK Unsafe API(从Spark 2.0开始),Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。

       在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。

       2、内存空间分配

       静态内存管理与统一内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域。

       统一内存管理的堆内内存结构如图所示:其中最重要的优化在于动态占用机制。统一内存管理的堆外内存结构如下图所示。

       凭借统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护 Spark 内存的难度,但并不意味着开发者可以高枕无忧。如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,降低任务执行时的性能。

       3、存储内存管理

       RDD的持久化机制

       弹性分布式数据集(RDD)作为 Spark 最根本的数据抽象,是只读的分区记录(Partition)的集合。RDD的持久化由 Spark的Storage模块负责,实现了RDD与物理存储的解耦合。Storage模块负责管理Spark在计算过程中产生的数据,将那些在内存或磁盘、在本地或远程存取数据的功能封装了起来。在具体实现时Driver端和 Executor 端的Storage模块构成了主从式的架构。

       在对RDD持久化时,Spark规定了MEMORY_ONLY、MEMORY_AND_DISK 等7种不同的存储级别,而存储级别是以下5个变量的组合。

       通过对数据结构的分析,可以看出存储级别从三个维度定义了RDD的 Partition(同时也就是Block)的存储方式。

       4、执行内存管理

       执行内存主要用来存储任务在执行Shuffle时占用的内存。

       若在map端选择普通的排序方式,会采用ExternalSorter进行外排,在内存中存储数据时主要占用堆内执行空间。

       若在map端选择 Tungsten 的排序方式,则采用ShuffleExternalSorter直接对以序列化形式存储的数据排序,在内存中存储数据时可以占用堆外或堆内执行空间,取决于用户是否开启了堆外内存以及堆外执行内存是否足够。

       在Shuffle Write 阶段中用到的Tungsten是Databricks公司提出的对Spark优化内存和CPU使用的计划。在Shuffle过程中,Spark会根据Shuffle的情况来自动选择是否采用Tungsten排序。

       Tungsten 采用的页式内存管理机制建立在MemoryManager之上,即 Tungsten 对执行内存的使用进行了一步的抽象,这样在 Shuffle 过程中无需关心数据具体存储在堆内还是堆外。每个内存页用一个MemoryBlock来定义,并用 Object obj 和 long offset 这两个变量统一标识一个内存页在系统内存中的地址。

spark sql源码系列 | with as 语句真的会把查询的数据存内存嘛?

       在探讨 Spark SQL 中 with...as 语句是否真的会把查询的数据存入内存之前,我们需要理清几个关键点。首先,网上诸多博客常常提及 with...as 语句会将数据存放于内存中,来提升性能。那么,实际情况究竟如何呢?

       让我们以 hive-sql 的视角来解答这一问题。在 hive 中,有一个名为 `hive.optimize.cte.materialize.threshold` 的参数。默认情况下,其值为 -1,代表关闭。当值大于 0 时(如设置为 2),with...as 语句生成的表将在被引用次数达到设定值后物化,从而确保 with...as 语句仅执行一次,进而提高效率。

       接下来,我们通过具体测试来验证上述结论。在不调整该参数的情况下,执行计划显示 test 表被读取了两次。此时,我们将参数调整为 `set hive.optimize.cte.materialize.threshold=1`,执行计划显示了 test 表被物化的情况,表明查询结果已被缓存。

       转而观察 Spark SQL 端,我们并未发现相关优化参数。Spark 对 with...as 的操作相对较少,在源码层面,通过获取元数据时所做的参数判断(如阈值与 cte 引用次数),我们可以发现 Spark 在这个逻辑上并未提供明确的优化机制,来专门针对 with...as 语句进行高效管理。

       综上所述,通过与 hive-sql 的对比以及深入源码分析,我们得出了 with...as 语句在 Spark SQL 中是否把数据存入内存的结论,答案并不是绝对的。关键在于是否通过参数调整来物化结果,以及 Spark 在自身框架层面并未提供特定优化策略来针对 with...as 语句进行内存管理。因此,正确使用 with...as 语句并结合具体业务场景,灵活调整优化参数策略,是实现性能提升的关键。

copyright © 2016 powered by 皮皮网   sitemap