皮皮网
皮皮网

【张柏芝艳照源码】【旁路广告源码】【cs网站源码】wordcount源码

来源:补单神器源码 发表时间:2024-12-27 12:57:18

1.七爪源码:C# 中的扩展方法
2.如何在MaxCompute上运行HadoopMR作业
3.3、MapReduce详解与源码分析

wordcount源码

七爪源码:C# 中的扩展方法

       扩展方法在C#中允许将方法“添加”到现有类型,无需创建新的派生类型或修改原始类型。扩展方法实质上是静态方法,但其调用方式如同实例方法,这在C#、张柏芝艳照源码F#和Visual Basic的客户端代码中没有明显区别。常见的扩展方法包括向System.Collections.IEnumerable和System.Collections.Generic.IEnumerable添加查询功能的LINQ标准查询运算符。

       例如,使用System.Linq指令将标准查询运算符引入范围后,可以对整数数组调用OrderBy方法进行排序。扩展方法定义为静态方法,旁路广告源码使用实例方法语法调用,第一个参数指定方法操作的类型,并带有this修饰符。扩展方法的范围取决于是否使用using指令显式导入命名空间。

       以下示例展示了为System.String类定义的扩展方法,WordCount方法,定义在非嵌套、非泛型的静态类中。使用using指令即可进入范围并调用该方法。调用扩展方法时使用实例方法语法,编译器将生成中间语言(IL)以对静态方法进行调用。cs网站源码

       扩展方法允许在代码中调用,MyExtensions类和WordCount方法都是静态的,可以通过其他静态成员访问。WordCount方法可以像其他静态方法一样被调用。

       扩展方法的调用在编译时进行。当编译器遇到方法调用时,首先在类型的实例方法中查找匹配项,如果没有找到,则搜索该类型定义的任何扩展方法,并绑定到第一个找到的扩展方法。如果一个类型有一个名为Process(int i)的physx源码分析方法,并且有一个具有相同签名的扩展方法,则编译器将始终绑定到实例方法。

       使用扩展方法的常见模式包括:

       1. 收集功能:过去,为给定类型创建实现System.Collections.Generic.IEnumerable接口并包含该类型集合功能的“集合类”是常见的做法。然而,通过使用System.Collections.Generic.IEnumerable上的扩展,可以实现相同功能,提供从任何集合调用功能的灵活性,如System.Array或System.Collections.Generic.List上的实现。

       2. 特定层的功能:在使用洋葱架构或其他分层应用程序设计时,域实体或数据传输对象通常不包含功能或仅包含适用于所有层的VB源码好找最小功能。扩展方法可用于为每个应用程序层添加特定功能,无需引入其他层不需要或不需要的方法。

如何在MaxCompute上运行HadoopMR作业

       MaxCompute(原ODPS)有一套自己的MapReduce编程模型和接口,简单说来,这套接口的输入输出都是MaxCompute中的Table,处理的数据是以Record为组织形式的,它可以很好地描述Table中的数据处理过程,然而与社区的Hadoop相比,编程接口差异较大。Hadoop用户如果要将原来的Hadoop MR作业迁移到MaxCompute的MR执行,需要重写MR的代码,使用MaxCompute的接口进行编译和调试,运行正常后再打成一个Jar包才能放到MaxCompute的平台来运行。这个过程十分繁琐,需要耗费很多的开发和测试人力。如果能够完全不改或者少量地修改原来的Hadoop MR代码就能在MaxCompute平台上跑起来,将是一个比较理想的方式。

       çŽ°åœ¨MaxCompute平台提供了一个HadoopMR到MaxCompute MR的适配工具,已经在一定程度上实现了Hadoop MR作业的二进制级别的兼容,即用户可以在不改代码的情况下通过指定一些配置,就能将原来在Hadoop上运行的MR jar包拿过来直接跑在MaxCompute上。目前该插件处于测试阶段,暂时还不能支持用户自定义comparator和自定义key类型,下面将以WordCount程序为例,介绍一下这个插件的基本使用方式。

       ä½¿ç”¨è¯¥æ’件在MaxCompute平台跑一个HadoopMR作业的基本步骤如下:

       1. 下载HadoopMR的插件

       ä¸‹è½½æ’件,包名为hadoop2openmr-1.0.jar,注意,这个jar里面已经包含hadoop-2.7.2版本的相关依赖,在作业的jar包中请不要携带hadoop的依赖,避免版本冲突。

       2. 准备好HadoopMR的程序jar包

       ç¼–译导出WordCount的jar包:wordcount_test.jar ,wordcount程序的源码如下:

       package com.aliyun.odps.mapred.example.hadoop;

       import org.apache.hadoop.conf.Configuration;

       import org.apache.hadoop.fs.Path;

       import org.apache.hadoop.io.IntWritable;

       import org.apache.hadoop.io.Text;

       import org.apache.hadoop.mapreduce.Job;

       import org.apache.hadoop.mapreduce.Mapper;

       import org.apache.hadoop.mapreduce.Reducer;

       import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

       import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

       import java.io.IOException;

       import java.util.StringTokenizer;

       public class WordCount {

       public static class TokenizerMapper

       extends Mapper<Object, Text, Text, IntWritable>{

       private final static IntWritable one = new IntWritable(1);

       private Text word = new Text();

       public void map(Object key, Text value, Context context

       ) throws IOException, InterruptedException {

       StringTokenizer itr = new StringTokenizer(value.toString());

       while (itr.hasMoreTokens()) {

       word.set(itr.nextToken());

       context.write(word, one);

       }

       }

       }

       public static class IntSumReducer

       extends Reducer<Text,IntWritable,Text,IntWritable> {

       private IntWritable result = new IntWritable();

       public void reduce(Text key, Iterable<IntWritable> values,

       Context context

       ) throws IOException, InterruptedException {

       int sum = 0;

       for (IntWritable val : values) {

       sum += val.get();

       }

       result.set(sum);

       context.write(key, result);

       }

       }

       public static void main(String[] args) throws Exception {

       Configuration conf = new Configuration();

       Job job = Job.getInstance(conf, "word count");

       job.setJarByClass(WordCount.class);

       job.setMapperClass(TokenizerMapper.class);

       job.setCombinerClass(IntSumReducer.class);

       job.setReducerClass(IntSumReducer.class);

       job.setOutputKeyClass(Text.class);

       job.setOutputValueClass(IntWritable.class);

       FileInputFormat.addInputPath(job, new Path(args[0]));

       FileOutputFormat.setOutputPath(job, new Path(args[1]));

       System.exit(job.waitForCompletion(true) ? 0 : 1);

       }

       }

       3. 测试数据准备

       åˆ›å»ºè¾“入表和输出表

       create table if not exists wc_in(line string);

       create table if not exists wc_out(key string, cnt bigint);

       é€šè¿‡tunnel将数据导入输入表中

       å¾…导入文本文件data.txt的数据内容如下:

       hello maxcompute

       hello mapreduce

       ä¾‹å¦‚可以通过如下命令将data.txt的数据导入wc_in中,

       tunnel upload data.txt wc_in;

       4. 准备好表与hdfs文件路径的映射关系配置

       é…ç½®æ–‡ä»¶å‘½åä¸ºï¼šwordcount-table-res.conf

       {

       "file:/foo": {

       "resolver": {

       "resolver": "c.TextFileResolver",

       "properties": {

       "text.resolver.columns.combine.enable": "true",

       "text.resolver.seperator": "\t"

       }

       },

       "tableInfos": [

       {

       "tblName": "wc_in",

       "partSpec": { },

       "label": "__default__"

       }

       ],

       "matchMode": "exact"

       },

       "file:/bar": {

       "resolver": {

       "resolver": "openmr.resolver.BinaryFileResolver",

       "properties": {

       "binary.resolver.input.key.class" : "org.apache.hadoop.io.Text",

       "binary.resolver.input.value.class" : "org.apache.hadoop.io.LongWritable"

       }

       },

       "tableInfos": [

       {

       "tblName": "wc_out",

       "partSpec": { },

       "label": "__default__"

       }

       ],

       "matchMode": "fuzzy"

       }

       }

3、MapReduce详解与源码分析

       文章目录

       1

       Split阶段

       在MapReduce的流程中,Split阶段是将输入文件根据指定大小(默认MB)切割成多个部分,每个部分称为一个split。split的大小由minSize、maxSize、blocksize决定。以wordcount代码为例,split数量由FileInputFormat的getSplits方法确定,返回值即为mapper的数量。默认情况下,mapper的数量是文件大小除以block大小。此步骤由FileInputFormat的子类TextInputFormat完成,它负责将输入文件分割为InputSplit,从而决定mapper的数量。

       2

       Map阶段

       每个map task在执行过程中,会有内存缓冲区用于存储处理结果,缓冲区大小默认为MB,超过MB阈值时,数据将被写入磁盘作为临时文件,最后将所有临时文件合并为最终输出。在写入过程中,数据将被分区、排序、并执行combine操作,以优化数据处理效率。

       2.1

       分区

       MapReduce自带的分区器HashPartitioner将数据按照key值进行分区,确保数据均匀分布在reduce task之间。

       2.2

       排序

       在完成分区后,数据会按照key值进行排序,以便后续的Shuffle阶段能够高效地将相同key值的数据汇聚到一起。

       3

       Shuffle阶段

       Shuffle阶段是MapReduce的核心,负责数据从map task输出到reduce task输入的过程。reduce task会根据自己的分区号从各个map task中获取相应数据分区,之后会对这些文件进行合并(归并排序),将相同key值的数据汇聚到一起,为reduce阶段做好准备。

       4

       Reduce阶段

       Reduce阶段分为抓取、合并、排序三个步骤。reduce task创建并行抓取线程,通过HTTP协议从完成的map task中获取结果文件。抓取的数据先保存在内存中,超过内存大小时,数据将被溢写到磁盘。合并后的数据将按照key值排序,最终交给reduce函数进行计算,形成有序的计算结果。

       调节Reduce任务数量

       在处理大数据量时,调节Reduce任务数量是优化MapReduce性能的关键。如果设置过低,会导致节点资源闲置,效率低下。通常情况下,将Reduce任务设置为一个较大的值(最大值为),以充分利用资源。调节方法在于合理设置reduce task的数量,避免资源浪费,同时保证计算的高效性。

相关栏目:热点