å¦ä½ä½¿ç¨Python为Hadoopç¼åä¸ä¸ªç®åçMapReduceç¨åº
MichaelG.Nollå¨ä»çBlogä¸æå°å¦ä½å¨Hadoopä¸ç¨Pythonç¼åMapReduceç¨åºï¼é©å½çgogamzaå¨å ¶Bolgä¸ä¹æå°å¦ä½ç¨Cç¼åMapReduceç¨åºï¼æç¨å¾®ä¿®æ¹äºä¸ä¸åç¨åº,å 为ä»çMap对åè¯åå使ç¨tabé®ï¼ãæå并ä»ä»¬ä¸¤äººçæç« ï¼ä¹è®©å½å çHadoopç¨æ·è½å¤ä½¿ç¨å«çè¯è¨æ¥ç¼åMapReduceç¨åºãããé¦å æ¨å¾é 好æ¨çHadoopé群ï¼è¿æ¹é¢çä»ç»ç½ä¸æ¯è¾å¤ï¼è¿å¿ç»ä¸ªé¾æ¥ï¼Hadoopå¦ä¹ ç¬è®°äºå®è£ é¨ç½²ï¼ãHadoopStreaming帮å©æ们ç¨éJavaçç¼ç¨è¯è¨ä½¿ç¨MapReduceï¼Streamingç¨STDIN(æ åè¾å ¥)åSTDOUT(æ åè¾åº)æ¥åæ们ç¼åçMapåReduceè¿è¡æ°æ®ç交æ¢æ°æ®ãä»»ä½è½å¤ä½¿ç¨STDINåSTDOUTé½å¯ä»¥ç¨æ¥ç¼åMapReduceç¨åºï¼æ¯å¦æ们ç¨Pythonçsys.stdinåsys.stdoutï¼æè æ¯Cä¸çstdinåstdoutãããæ们è¿æ¯ä½¿ç¨Hadoopçä¾åWordCountæ¥å示èå¦ä½ç¼åMapReduceï¼å¨WordCountçä¾åä¸æ们è¦è§£å³è®¡ç®å¨ä¸æ¹ææ¡£ä¸æ¯ä¸ä¸ªåè¯çåºç°é¢çãé¦å æ们å¨Mapç¨åºä¸ä¼æ¥åå°è¿æ¹ææ¡£æ¯ä¸è¡çæ°æ®ï¼ç¶åæ们ç¼åçMapç¨åºæè¿ä¸è¡æç©ºæ ¼åå¼æä¸ä¸ªæ°ç»ã并对è¿ä¸ªæ°ç»éåæ"1"ç¨æ åçè¾åºè¾åºæ¥ï¼ä»£è¡¨è¿ä¸ªåè¯åºç°äºä¸æ¬¡ãå¨Reduceä¸æ们æ¥ç»è®¡åè¯çåºç°é¢çãããããPythonCodeããMap:mapper.pyãã#!/usr/bin/envpythonimportsys#mapswordstotheircountsword2count={ }#inputcomesfromSTDIN(standardinput)forlineinsys.stdin:#removeleadingandtrailingwhitespaceline=line.strip()#splitthelineintowordswhileremovinganyemptystringswords=filter(lambdaword:word,line.split())#increasecountersforwordinwords:#writetheresultstoSTDOUT(standardoutput);#whatweoutputherewillbetheinputforthe#Reducestep,i.e.theinputforreducer.py##tab-delimited;thetrivialwordcountis1print'%s\t%s'%(word,1)ããReduce:reducer.pyãã#!/usr/bin/envpythonfromoperatorimportitemgetterimportsys#mapswordstotheircountsword2count={ }#inputcomesfromSTDINforlineinsys.stdin:#removeleadingandtrailingwhitespaceline=line.strip()#parsetheinputwegotfrommapper.pyword,count=line.split()#convertcount(currentlyastring)tointtry:count=int(count)word2count[word]=word2count.get(word,0)+countexceptValueError:#countwasnotanumber,sosilently#ignore/discardthislinepass#sortthewordslexigraphically;##thisstepisNOTrequired,wejustdoitsothatour#finaloutputwilllookmoreliketheofficialHadoop#wordcountexamplessorted_word2count=sorted(word2count.items(),key=itemgetter(0))#writetheresultstoSTDOUT(standardoutput)forword,countinsorted_word2count:print'%s\t%s'%(word,count)ããCCodeããMap:Mapper.cãã#include#include#include#include#defineBUF_SIZE#defineDELIM"\n"intmain(intargc,char*argv[]){ charbuffer[BUF_SIZE];while(fgets(buffer,BUF_SIZE-1,stdin)){ intlen=strlen(buffer);if(buffer[len-1]=='\n')buffer[len-1]=0;char*querys=index(buffer,'');char*query=NULL;if(querys==NULL)continue;querys+=1;/*nottoinclude'\t'*/query=strtok(buffer,"");while(query){ printf("%s\t1\n",query);query=strtok(NULL,"");}}return0;}h>h>h>h>ããReduce:Reducer.cãã#include#include#include#include#defineBUFFER_SIZE#defineDELIM"\t"intmain(intargc,char*argv[]){ charstrLastKey[BUFFER_SIZE];charstrLine[BUFFER_SIZE];intcount=0;*strLastKey='\0';*strLine='\0';while(fgets(strLine,BUFFER_SIZE-1,stdin)){ char*strCurrKey=NULL;char*strCurrNum=NULL;strCurrKey=strtok(strLine,DELIM);strCurrNum=strtok(NULL,DELIM);/*necessarytocheckerrorbut.*/if(strLastKey[0]=='\0'){ strcpy(strLastKey,strCurrKey);}if(strcmp(strCurrKey,strLastKey)){ printf("%s\t%d\n",strLastKey,count);count=atoi(strCurrNum);}else{ count+=atoi(strCurrNum);}strcpy(strLastKey,strCurrKey);}printf("%s\t%d\n",strLastKey,count);/*flushthecount*/return0;}h>h>h>h>ããé¦å æ们è°è¯ä¸ä¸æºç ï¼ããchmod+xmapper.pychmod+xreducer.pyecho"foofooquuxlabsfoobarquux"|./mapper.py|./reducer.pybar1foo3labs1quux2g++Mapper.c-oMapperg++Reducer.c-oReducerchmod+xMapperchmod+xReducerecho"foofooquuxlabsfoobarquux"|./Mapper|./Reducerbar1foo2labs1quux1foo1quux1ããä½ å¯è½çå°Cçè¾åºåPythonçä¸ä¸æ ·,å 为Pythonæ¯æä»æ¾å¨è¯å ¸éäº.æ们å¨Hadoopæ¶,ä¼å¯¹è¿è¿è¡æåº,ç¶åç¸åçåè¯ä¼è¿ç»å¨æ åè¾åºä¸è¾åº.ããå¨Hadoopä¸è¿è¡ç¨åºããé¦å æ们è¦ä¸è½½æ们çæµè¯ææ¡£wget页é¢ä¸æä¸çç¨phpç¼åçMapReduceç¨åº,ä¾phpç¨åºååèï¼Map:mapper.phpãã#!/usr/bin/php$word2count=array();//inputcomesfromSTDIN(standardinput)while(($line=fgets(STDIN))!==false){ //removeleadingandtrailingwhitespaceandlowercase$line=strtolower(trim($line));//splitthelineintowordswhileremovinganyemptystring$words=preg_split('/\W/',$line,0,PREG_SPLIT_NO_EMPTY);//increasecountersforeach($wordsas$word){ $word2count[$word]+=1;}}//writetheresultstoSTDOUT(standardoutput)//whatweoutputherewillbetheinputforthe//Reducestep,i.e.theinputforreducer.pyforeach($word2countas$word=>$count){ //tab-delimitedecho$word,chr(9),$count,PHP_EOL;}?>ããReduce:mapper.phpãã#!/usr/bin/php$word2count=array();//inputcomesfromSTDINwhile(($line=fgets(STDIN))!==false){ //removeleadingandtrailingwhitespace$line=trim($line);//parsetheinputwegotfrommapper.phplist($word,$count)=explode(chr(9),$line);//convertcount(currentlyastring)toint$count=intval($count);//sumcountsif($count>0)$word2count[$word]+=$count;}//sortthewordslexigraphically////thissetisNOTrequired,wejustdoitsothatour//finaloutputwilllookmoreliketheofficialHadoop//wordcountexamplesksort($word2count);//writetheresultstoSTDOUT(standardoutput)foreach($word2countas$word=>$count){ echo$word,chr(9),$count,PHP_EOL;}?>ããä½è ï¼é©¬å£«åå表äºï¼--
Rematch 源码系列四、Third-Party plugins
本文深入探讨了rematch的两个常用第三方插件:immer与loading。immer插件旨在简化state的修改过程,通过引入immerjs,允许开发者在reducer中使用mutable状态,带程序源码网站进而生成immutable状态,简化了常规操作。immer插件的实现相对简单,只需将常规reducer包裹一层,使之通过immerjs处理即可。
immer插件的核心在于其对reducer的封装,通过immer.produce方法处理draft状态,简化了mutable状态的管理,避免了复杂的clone和赋值操作。当状态为简单数据类型时,不会使用immer.produce,以保持代码的简洁性。更多关于immer.produce和combineReducers的使用和原理可参考官方文档。
然而,immer插件的设计存在缺陷,即许多reducer配置若不能以数组形式存储,而是被替换,则可能导致插件配置失效。app源码天空rematch v2版本通过引入更细粒度的plugin hooks(如onReducer)解决了这一问题,提升了配置的灵活性。
紧接着是loading插件,专注于管理异步操作的状态,包括网络请求等。其核心在于onModel钩子的使用,定义了全局和模型级别的loading状态,并为特定操作定义了show和hide两个reducer,动态跟踪和控制加载状态。
loading插件的实现通过初始化代码定义了全局和模型级别的loading状态,并使用onModel钩子处理模型操作,对特定的effect动作进行管理,包装原始动作以实现状态控制。两个reducer,show和hide,分别用于增加和减少操作状态的计数,以此实现对加载状态的动态更新。
本文综述了rematch的immer和loading插件的实现原理、使用场景及优化策略,为开发者提供了深入理解这些工具的框架。后续文章将探讨rematch v1升级到v2的设计变化以及TypeScript支持的实现,期待与开发者共同探索rematch的最新进展和优化。
如何用行代码写一个NgRx Store
深入解析 NgRx Store 的deepin 源码编译内部运作机制,通过精简的行代码实现一个基础版本的 StoreService,探索 NgRx Store 如何通过 RxJS 进行状态管理。本文旨在为开发者提供一个简化版的 NgRx Store 实现,以深入理解其核心原理。
通过一个简单的 Angular NgRx-Seed app,我们可以学习 NgRx Store 的基础组件和工作流程。本文章将提供一个超简化的 StoreService,包含 dispatching action、accumulating state、以及使用 selector 订阅更新状态的核心功能。
构建一个与 NgRx 非常相似但高度简化的 StoreService,代码覆盖了基本的 Store 功能,包括创建行为主题、调度 action、以及实现状态的积累与更新。此 StoreService 实现仅供学习和理解 NgRx Store 的内部构造,不可用于实际项目。
关注 queueScheduler 的使用,确保 action 以初始化顺序同步接收,避免因重新进入而导致的内存溢出问题。action$ 和 reducer$ 的融合通过 withLatestFrom 操作符完成,确保了状态更新的正确执行。
reducerFactory 是分栏模板源码 NgRx Store 的复杂部分,通过闭包实现状态的融合。简化版本的 StoreService 中,忽略了对 meta reducers 的处理,使用 combineReducers 作为默认工厂函数,用于创建一个可作为 StoreService 的源的 reducer 融合函数。
在扫描操作符(scan)的作用下,action$ 和 reducer$ 被混合以创建一个具有状态记忆能力的 stream。实现的累计函数 reduceState 实现了状态的更新与累积,以响应 action 和 reducer 的变化。
对于 select 和 createSelector 的实现,本文简化了类型安全功能,直接提供基础的实现,以展示如何从 StoreService 中获取状态。通过一个闭包和 map 操作符,select 函数实现了从 StoreService 获取数据并应用到模板中的逻辑。
StoreService 实现中的 createSelector 提供了一个从所有 selectors 的结果中分离特定 selector 的工具,简化了状态的获取与展示。
在实际应用中,将 StoreService 注入到 Angular app 的组件中,通过 ngOnInit 生命周期钩子获取状态并将其结果显示在模板中。组件中包含 dispatch 功能,实现与 NgRx Store API 类似的操作。
本文源代码已提供,创富源码欢迎阅读与学习。如有任何问题或建议,欢迎直接联系作者。
重读Redux源码的感悟
大道至简的createStore
创造理解的%在createStore.js中体现,剩下%涉及中间件,整体来看软件开发追求高内聚,内耦合,以简洁面世。Redux源码由9个文件构成,包含中间件的代码。整体而言,Redux的深层含义超出了源码大小所能体现,业界常言“Redux是百行代码千行文档”,强调其复杂性。
回到createStore.js,剥离中间件影响,仅留下核心代码骨架。最终返回的对象即store,提供了常用API。通过观察者模式或发布/订阅模式理解此框架,但要认识到Redux并非仅此,它结合现代前端开发与函数式编程,带来限制与便利,如纯函数要求、测试便利性、功能解耦及性能优化。
实现撤销功能(undo)示例,通过高阶reducer存储过往状态值,结合Redux实现撤销与重做。函数式编程的FP特性,使实现变得可能。
combineReducer利用闭包概念,接收多个reducer,生成单个reducer,可遍历执行所有reducer。若两个reducer同时处理相同type的action,它们都会执行更新状态。此特性可能带来冲突,需合理命名以避免问题。
使用CLI工具搭建开发环境可能耗时,codesandbox.io提供多种框架支持及快速加载依赖,适合灵感突发时快速测试代码。
在命名Action时,采用namespace前缀(如/或@)可避免重复,有助于清晰管理状态与减少冲突。
compose方法实现多个方法串联执行,功能强大,易于实现并用于中间件处理。在Redux中,中间件处理Action,与服务器端处理request、response的Koa或Express不同,但核心原理相似,利用compose方法串联功能。
中间件本质为方法代理,通过增强原方法执行前后添加操作,实现AOP。在Redux中,中间件位于store.dispatch之前,通过代理dispatch实现场景扩展与功能增强。理解中间件需关注enhancer参数及createStore方法传递,最后实现store与中间件串联。
以redux-thunk为例,底层参数接收中间件API,只传递store的getState和dispatch方法,遵循特定逻辑处理action,提供方法执行选择与状态管理。中间件使用时需阅读文档,理解其规范与实现细节。
综上,Redux源码展示了现代前端开发与函数式编程的结合,从createStore、combineReducer到中间件,提供了高效状态管理与功能扩展。理解其核心概念与实现机制,有助于深入应用与开发。
å¦ä½å¨MaxComputeä¸è¿è¡HadoopMRä½ä¸
MaxComputeï¼åODPSï¼æä¸å¥èªå·±çMapReduceç¼ç¨æ¨¡ååæ¥å£ï¼ç®å说æ¥ï¼è¿å¥æ¥å£çè¾å ¥è¾åºé½æ¯MaxComputeä¸çTableï¼å¤ççæ°æ®æ¯ä»¥Record为ç»ç»å½¢å¼çï¼å®å¯ä»¥å¾å¥½å°æè¿°Tableä¸çæ°æ®å¤çè¿ç¨ï¼ç¶èä¸ç¤¾åºçHadoopç¸æ¯ï¼ç¼ç¨æ¥å£å·®å¼è¾å¤§ãHadoopç¨æ·å¦æè¦å°åæ¥çHadoop MRä½ä¸è¿ç§»å°MaxComputeçMRæ§è¡ï¼éè¦éåMRç代ç ï¼ä½¿ç¨MaxComputeçæ¥å£è¿è¡ç¼è¯åè°è¯ï¼è¿è¡æ£å¸¸ååææä¸ä¸ªJarå æè½æ¾å°MaxComputeçå¹³å°æ¥è¿è¡ãè¿ä¸ªè¿ç¨ååç¹çï¼éè¦èè´¹å¾å¤çå¼ååæµè¯äººåãå¦æè½å¤å®å ¨ä¸æ¹æè å°éå°ä¿®æ¹åæ¥çHadoop MR代ç å°±è½å¨MaxComputeå¹³å°ä¸è·èµ·æ¥ï¼å°æ¯ä¸ä¸ªæ¯è¾çæ³çæ¹å¼ã
ç°å¨MaxComputeå¹³å°æä¾äºä¸ä¸ªHadoopMRå°MaxCompute MRçéé å·¥å ·ï¼å·²ç»å¨ä¸å®ç¨åº¦ä¸å®ç°äºHadoop MRä½ä¸çäºè¿å¶çº§å«çå ¼å®¹ï¼å³ç¨æ·å¯ä»¥å¨ä¸æ¹ä»£ç çæ åµä¸éè¿æå®ä¸äºé ç½®ï¼å°±è½å°åæ¥å¨Hadoopä¸è¿è¡çMR jarå æ¿è¿æ¥ç´æ¥è·å¨MaxComputeä¸ãç®å该æ件å¤äºæµè¯é¶æ®µï¼ææ¶è¿ä¸è½æ¯æç¨æ·èªå®ä¹comparatoråèªå®ä¹keyç±»åï¼ä¸é¢å°ä»¥WordCountç¨åºä¸ºä¾ï¼ä»ç»ä¸ä¸è¿ä¸ªæ件çåºæ¬ä½¿ç¨æ¹å¼ã
使ç¨è¯¥æ件å¨MaxComputeå¹³å°è·ä¸ä¸ªHadoopMRä½ä¸çåºæ¬æ¥éª¤å¦ä¸ï¼
1. ä¸è½½HadoopMRçæ件
ä¸è½½æ件ï¼å å为hadoop2openmr-1.0.jarï¼æ³¨æï¼è¿ä¸ªjaréé¢å·²ç»å å«hadoop-2.7.2çæ¬çç¸å ³ä¾èµï¼å¨ä½ä¸çjarå ä¸è¯·ä¸è¦æºå¸¦hadoopçä¾èµï¼é¿å çæ¬å²çªã
2. åå¤å¥½HadoopMRçç¨åºjarå
ç¼è¯å¯¼åºWordCountçjarå ï¼wordcount_test.jar ï¼wordcountç¨åºçæºç å¦ä¸:
package com.aliyun.odps.mapred.example.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
3. æµè¯æ°æ®åå¤
å建è¾å ¥è¡¨åè¾åºè¡¨
create table if not exists wc_in(line string);
create table if not exists wc_out(key string, cnt bigint);
éè¿tunnelå°æ°æ®å¯¼å ¥è¾å ¥è¡¨ä¸
å¾ å¯¼å ¥ææ¬æ件data.txtçæ°æ®å 容å¦ä¸ï¼
hello maxcompute
hello mapreduce
ä¾å¦å¯ä»¥éè¿å¦ä¸å½ä»¤å°data.txtçæ°æ®å¯¼å ¥wc_inä¸ï¼
tunnel upload data.txt wc_in;
4. åå¤å¥½è¡¨ä¸hdfsæ件路å¾çæ å°å ³ç³»é ç½®
é ç½®æ件å½å为ï¼wordcount-table-res.conf
{
"file:/foo": {
"resolver": {
"resolver": "c.TextFileResolver",
"properties": {
"text.resolver.columns.combine.enable": "true",
"text.resolver.seperator": "\t"
}
},
"tableInfos": [
{
"tblName": "wc_in",
"partSpec": { },
"label": "__default__"
}
],
"matchMode": "exact"
},
"file:/bar": {
"resolver": {
"resolver": "openmr.resolver.BinaryFileResolver",
"properties": {
"binary.resolver.input.key.class" : "org.apache.hadoop.io.Text",
"binary.resolver.input.value.class" : "org.apache.hadoop.io.LongWritable"
}
},
"tableInfos": [
{
"tblName": "wc_out",
"partSpec": { },
"label": "__default__"
}
],
"matchMode": "fuzzy"
}
}
Redux(4.0.4)源码解析
Redux源码解析 Redux源代码解析旨在清晰展示其核心组件及工作流程,力求用最简洁的语言阐述每个关键部分的功能。Redux提供了一个状态管理库,以管理应用的全局状态。以下是Redux核心组件的主要解析: createStore.js export default function createStore(reducer, preloadedState, enhancer) createStore函数是Redux的核心,负责创建一个状态存储对象。它可以接受三个参数:reducer(减少操作函数)、预加载状态(初始状态)和增强器(可选参数,用于添加额外功能)。 getState 获取当前状态,操作简单直接。 subscribe 向监听列表中添加监听函数,返回取消监听函数。在调用dispatch时订阅或取消订阅,不会影响正在进行的dispatch。下一次dispatch时,将使用订阅列表的最新快照。 dispatch 执行reducer获取最新状态,并依次执行监听队列中的函数。 replaceReducer 替换当前的reducer。执行后,dispatch一次更新状态。一般不常用。 observable 未见实际应用,可能用于特定场景。使用了symbol-observable包,对于熟悉该包的开发者来说,此部分可能有更多探索空间。 utils 包括actionTypes.js、isPlainObject.js、warning.js等辅助函数。actionTypes.js定义了Redux保留的私有操作类型,用于确保操作的正确处理。isPlainObject.js用于判断action对象是否为原生对象。warning.js用于抛出错误,保持代码质量。 applyMiddleware.js 通过createStore(reducer,applyMiddleware(...middleware))执行,返回带有中间件增强的dispatch。精简后,代码更加清晰。 compose.js 实现中间件的串联,依次增强dispatch流程。使用函数式编程技巧,代码简洁高效。 bindActionCreators.js 将单个或多个ActionCreator转化为dispatch(action)的函数集合,简化Action的使用方式。 combineReducers.js 将多个reducer整合为一个,调整state结构,便于管理和操作。 整体而言,Redux的源码解析展示了其如何通过一系列核心组件实现状态管理的流程,从创建store到管理state、执行reducer、中间件串联,直至整合多个reducer,提供了一套高效、模块化的状态管理方案。理解这些组件及其功能是掌握Redux并能灵活应用的关键。2024-11-20 00:25
2024-11-19 23:56
2024-11-19 22:39
2024-11-19 22:09
2024-11-19 22:06