had

阅读 / 问答 / 标签

shawty had them apple bottom jeans (jeans)的中文翻译

low ____flo ridaShawty had them Apple Bottom Jeans [Jeans] Boots with the fur [With the fur] The whole club was lookin at her She hit the flo [She hit the flo] Next thing you know Shawty got low low low low low low low low Them baggy sweat pants And the Reeboks with the straps [With the straps] She turned around and gave that big booty a smack [Ayy] She hit the flo [She hit the flo] Next thing you know Shawty got low low low low low low low low [Verse 1:] I ain"t never seen nuthin that"ll make me go, This crazy all night spendin my dough Had a million dollar vibe and a bottle to go Dem birthday cakes, they stole the show So sexual, she was flexible Professional, drinkin X and ooo Hold up wait a minute, do I see what I think I Whoa Did I think I seen shorty get low Ain"t the same when it"s up that close Make it rain, I"m makin it snow Work the pole, I got the bank roll Imma say that I prefer them no clothes I"m into that, I love women exposed She threw it back at me, I gave her more Cash ain"t a problem, I know where it goes She had them [Chorus:] Apple Bottom Jeans [Jeans] Boots with the fur [With the fur] The whole club was lookin at her She hit the flo [She hit the flo] Next thing you know Shawty got low low low low low low low low Them baggy sweat pants And the Reeboks with the straps [With the straps] She turned around and gave that big booty a smack [Ayy] She hit the flo [She hit the flo] Next thing you know Shawty got low low low low low low low low [Verse 2:] Hey Shawty what I gotta do to get you home My jeans full of gwap And they ready for Shones Cadillacs Maybachs for the sexy grown Patrone on the rocks that"ll make you moan One stack (come on) Two stacks (come on) Three stacks (come on, now that"s three grand) What you think I"m playin baby girl I"m the man, I"ll bend the rubber bands That"s what I told her, her legs on my shoulder I knew it was ova, that Henny and Cola Got me like a Soldier She ready for Rover, I couldn"t control her So lucky oo me, I was just like a clover Shorty was hot like a toaster Sorry but I had to fold her, Like a pornography poster She showed her [Chorus:] Apple Bottom Jeans [Jeans] Boots with the fur [With the fur] The whole club was lookin at her She hit the flo [She hit the flo] Next thing you know Shawty got low low low low low low low low Them baggy sweat pants And the Reeboks with the straps [With the straps] She turned around and gave that big booty a smack [Ayy] She hit the flo [She hit the flo] Next thing you know Shawty got low low low low low low low low [Verse 3:] Whoa Shawty Yea she was worth the money Lil mama took my cash, And I ain"t want it back, The way she bit that rag, Got her them paper stacks, Tattoo Above her crack, I had to handle that, I was on it, sexy woman, let me shownin They be want it two in the mornin I"m zonin in them rosay bottles foamin She wouldn"t stop, made it drop Shorty did that pop and lock, Had to break her off that gwap Gah it was fly just like my glock [Chorus:] Apple Bottom Jeans [Jeans] Boots with the fur [With the fur] The whole club was lookin at her She hit the flo [She hit the flo] Next thing you know Shawty got low low low low low low low low Them baggy sweat pants And the Reeboks with the straps [With the straps] She turned around and gave that big booty a smack [Ayy] She hit the flo [She hit the flo] Next thing you know Shawty got low low low low low low low low

Many agreed that the Prime Minister had in effect resigned dishonorably什么意思?

ManyagreedthatthePrimeMinisterhadineffectresigneddishonorably翻译:许多人都认同首相辞职实际上是很不光彩的/很不体面的/威名扫地的。祝你开心如意!O(∩_∩)O~~

he had resigned

不是同位语从句,同位语从句部分是完整的句子才行,而he has registered不完整,没谓语,所以是定语从句,举个同位语例子:The report that he gives me proves to be right.(句中he gives me主谓宾完整)

shadow gallery叫什么乐队

S全是欧美乐队!!!AQUA[水叮当合唱团]S.Q.ESadeSadistSadusSafri DuoSagaSaltatio MortisSam RobertsSamaelSammath NaurSammy SmooveSamsas TraumSanctuarySandstoneSandy MoucheSangre CavallumSantana[桑塔那]Sarah BrightmanSaraleeSaratogaSaturnusSaul StokesSavage garden[野人花园]SavatageSaves The DaySaxonScandyScar SymmetrySchloss TegalSchwadronScissor SistersScooterscornScorpions[蝎子乐队]SealSecret Garden[神秘园]SeetherSenses FailSentencedSepultura[埋葬]SerpensSeven PinesSeven WitchesSevendustSex PistolsShadow GalleryShadows fallShadowsphereShaggyShana MorrisonShania Twain[仙尼亚.唐恩]Shape Of DespairSharissaShawn MullinsSheryl Crow[希瑞 克劳]Shinjuku ThiefShizuko OverdriveShuvelSiebenburgenSiegSigur RosSilent CivilianSilent CrySilent VoicesSimon Garfunkel[西蒙和加芬克尔]Simple MindsSimple PlanSinamoreSinead O"Connor[西尼德 奥康娜]Sinead O"ConnorSinisterSinphoniaSiouxsie And The BansheesSireniaSirrahSister HazelSitaSix Feet UnderSixpence None The RicherSJKSkid row[穷街]SkinlabSkyfireSladeSlaughterSlayer[杀手]Sleep TerrorSleeping At LastSleeping Dogs WakeSlipknot[活结]SlitherynSlumberSmashSmashing Pumpkins[碎南瓜乐队]Snow in China[中国雪]Snow PatrolSocial DistortionSocialburnSodomSofa SurfersSoilSoilworkSol InvictusSoledad BrothersSon By FourSonata ArcticaSonic Youth[音速青年]Sonne HagalSophie Ellis BextorSopor Aeternus[The Ensemble Of Shadows]SormaSoul Asylum[灵魂庇难所]Soul Whirling Somewhere[灵魂漂流在某处]SoulflySpank RocksparklehorseSpice GirlsSpiritual FrontSpokenSR-71StaindStalingrad[斯大林格勒]StarkweatherStarsailorStat QuoStatic X[静电X乐队]Static-XStatus QuoStephen SimmondsSteppenwolfStereolabStereoPhonicsSteven Curtis ChapmanSting[斯汀]StoaStone AngelStone BreathStoneSourStormlordStrataStratovarius[灵云]StravaganzzaStray Cats[流浪猫乐队]Straylight RunStretch PrincessStrung OutStryper[银河]StylesStyrofoamStyxSubauditionSuede[山羊皮乐队]SuffocationSugababesSugarcultSuidakraSum 41Summoning[召唤]Sunn o)))Super Dj JeanSupergrassSupertrampSurvivorSusperiaSuzanne VegaSven VathSwitchblade SymphonySwitchfootSworn EnemySylverSymphony XSystem of a Down[退步的体制]

Hadoop,context输出

首先说下你的代码,MACHINEGOTFAILURE.equals(null) 这里真心奇葩。如果你确定System.out.println(MACHINEGOTFAILURE);这个能输出内容的话,就说明reduce有输出,这样的话问题就在OutputFormat的RecordWriter的write()方法了,你使用的是自定义的OutputFormat还是内置的? 具体是哪个?

如何查看hadoop mapreduce 性能

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:     1.JobClient 写代码,配置作业,提交作业。     2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。     3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。     4.HDFS:保存作业数据、配置信息等,保存作业结果。Map/Reduce 作业总体执行流程:     代码编写 ----> 作业配置  ---->  作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ---->  Reduce任务分配与执行 ---->  输出结果而对于每个作业的执行,又包含:     输入准备 ----> 任务执行 ----> 输出结果作业提交JobClient:     JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。     submitJob方法作业提交的过程:     1.向JobTracker请求一个新的JobId。     2.检查作业相关路径,如果路径不正确就会返回错误。     3.计算作业输入分片及其划分信息。     4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。     5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。作业的初始化JobTracker:     JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。     当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。     初始化过程就是JobInProgress对象的initTasks方法进行初始化的。     初始化步骤:          1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。          2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。          3.最后就是创建两个初始化task,进行map和reduce的初始化。任务的分配JobTracker:    消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。    分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。    分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。任务的执行TaskTracker:     TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。     1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。     2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。     3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。     在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。     对于单个Map,任务执行的简单流程是:     1.分配任务执行参数     2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)     3.配置log文件夹,配置map任务的通信和输出参数     4.读取input split,生成RecordReader读取数据     5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。     6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。Streaming和Pipes:     Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。     Streaming:使用标准输入输出Streaming与进程进行通信。     Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。     进度和状态更新:     一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。     状态消息与客户端的通信:     1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。     2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。     3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。     4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。     5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。     6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。     前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。     作业的完成:     当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。     如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。     jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。     失败     实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。     1.任务失败         子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。     子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。     任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。     任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。     任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。     2.tasktracker失败     tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。     即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。     3.jobtracker失败     老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。作业调度:     早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。     Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)     Capacity Scheduler:     集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。shuffle和排序:     MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。     整个shuffle的流程应该是这样:     map结果划分partition  排序sort 分割spill   合并同一划分   合并同一划分  合并结果排序 reduce处理 输出     Map端:     写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。     写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。     合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。     在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。     内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。     如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。     写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。     reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。     Reduce端:     复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。     合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。     Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。     Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。     Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。     如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。     随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。     排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。     reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。     细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。

hadoop的mapreduce常见算法案例有几种

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:     1.JobClient 写代码,配置作业,提交作业。     2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。     3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。     4.HDFS:保存作业数据、配置信息等,保存作业结果。Map/Reduce 作业总体执行流程:     代码编写 ----> 作业配置  ---->  作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ---->  Reduce任务分配与执行 ---->  输出结果而对于每个作业的执行,又包含:     输入准备 ----> 任务执行 ----> 输出结果作业提交JobClient:     JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。     submitJob方法作业提交的过程:     1.向JobTracker请求一个新的JobId。     2.检查作业相关路径,如果路径不正确就会返回错误。     3.计算作业输入分片及其划分信息。     4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。     5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。作业的初始化JobTracker:     JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。     当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。     初始化过程就是JobInProgress对象的initTasks方法进行初始化的。     初始化步骤:          1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。          2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。          3.最后就是创建两个初始化task,进行map和reduce的初始化。任务的分配JobTracker:    消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。    分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。    分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。任务的执行TaskTracker:     TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。     1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。     2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。     3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。     在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。     对于单个Map,任务执行的简单流程是:     1.分配任务执行参数     2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)     3.配置log文件夹,配置map任务的通信和输出参数     4.读取input split,生成RecordReader读取数据     5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。     6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。Streaming和Pipes:     Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。     Streaming:使用标准输入输出Streaming与进程进行通信。     Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。     进度和状态更新:     一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。     状态消息与客户端的通信:     1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。     2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。     3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。     4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。     5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。     6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。     前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。     作业的完成:     当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。     如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。     jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。     失败     实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。     1.任务失败         子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。     子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。     任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。     任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。     任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。     2.tasktracker失败     tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。     即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。     3.jobtracker失败     老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。作业调度:     早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。     Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)     Capacity Scheduler:     集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。shuffle和排序:     MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。     整个shuffle的流程应该是这样:     map结果划分partition  排序sort 分割spill   合并同一划分   合并同一划分  合并结果排序 reduce处理 输出     Map端:     写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。     写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。     合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。     在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。     内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。     如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。     写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。     reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。     Reduce端:     复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。     合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。     Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。     Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。     Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。     如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。     随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。     排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。     reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。     细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。

Hadoop到底是什么玩意

Hadoop释义:Hadoop:分布式系统基础架构,分布式存储和计算平台。狭义上,Hadoop就是单独指代Hadoop这个软件;广义上,Hadoop指代大数据的一个生态圈,包括很多其他的软件。网页链接Hadoop这个名字是Hadoop项目创建者Doug Cutting 的儿子的一只玩具的名字。他的儿子一直称呼一只黄色的大象玩具为 Hadoop 。这刚好满足Cutting 的命名需求,简短,容易拼写和发音,毫无意义,不会在别处使用。于是 Hadoop 就诞生了。

请简要描述Hadoop计算框架MapReduce的工作原理

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:     1.JobClient 写代码,配置作业,提交作业。     2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。     3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。     4.HDFS:保存作业数据、配置信息等,保存作业结果。Map/Reduce 作业总体执行流程:     代码编写 ----> 作业配置  ---->  作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ---->  Reduce任务分配与执行 ---->  输出结果而对于每个作业的执行,又包含:     输入准备 ----> 任务执行 ----> 输出结果作业提交JobClient:     JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。     submitJob方法作业提交的过程:     1.向JobTracker请求一个新的JobId。     2.检查作业相关路径,如果路径不正确就会返回错误。     3.计算作业输入分片及其划分信息。     4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。     5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。作业的初始化JobTracker:     JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。     当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。     初始化过程就是JobInProgress对象的initTasks方法进行初始化的。     初始化步骤:          1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。          2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。          3.最后就是创建两个初始化task,进行map和reduce的初始化。任务的分配JobTracker:    消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。    分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。    分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。任务的执行TaskTracker:     TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。     1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。     2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。     3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。     在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。     对于单个Map,任务执行的简单流程是:     1.分配任务执行参数     2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)     3.配置log文件夹,配置map任务的通信和输出参数     4.读取input split,生成RecordReader读取数据     5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。     6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。Streaming和Pipes:     Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。     Streaming:使用标准输入输出Streaming与进程进行通信。     Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。     进度和状态更新:     一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。     状态消息与客户端的通信:     1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。     2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。     3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。     4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。     5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。     6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。     前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。     作业的完成:     当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。     如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。     jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。     失败     实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。     1.任务失败         子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。     子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。     任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。     任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。     任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。     2.tasktracker失败     tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。     即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。     3.jobtracker失败     老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。作业调度:     早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。     Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)     Capacity Scheduler:     集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。shuffle和排序:     MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。     整个shuffle的流程应该是这样:     map结果划分partition  排序sort 分割spill   合并同一划分   合并同一划分  合并结果排序 reduce处理 输出     Map端:     写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。     写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。     合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。     在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。     内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。     如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。     写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。     reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。     Reduce端:     复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。     合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。     Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。     Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。     Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。     如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。     随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。     排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。     reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。     细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。

hadoop并行过程,是由什么机制来进行控制

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:     1.JobClient 写代码,配置作业,提交作业。     2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。     3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。     4.HDFS:保存作业数据、配置信息等,保存作业结果。Map/Reduce 作业总体执行流程:     代码编写 ----> 作业配置  ---->  作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ---->  Reduce任务分配与执行 ---->  输出结果而对于每个作业的执行,又包含:     输入准备 ----> 任务执行 ----> 输出结果作业提交JobClient:     JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。     submitJob方法作业提交的过程:     1.向JobTracker请求一个新的JobId。     2.检查作业相关路径,如果路径不正确就会返回错误。     3.计算作业输入分片及其划分信息。     4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。     5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。作业的初始化JobTracker:     JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。     当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。     初始化过程就是JobInProgress对象的initTasks方法进行初始化的。     初始化步骤:          1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。          2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。          3.最后就是创建两个初始化task,进行map和reduce的初始化。任务的分配JobTracker:    消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。    分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。    分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。任务的执行TaskTracker:     TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。     1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。     2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。     3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。     在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。     对于单个Map,任务执行的简单流程是:     1.分配任务执行参数     2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)     3.配置log文件夹,配置map任务的通信和输出参数     4.读取input split,生成RecordReader读取数据     5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。     6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。Streaming和Pipes:     Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。     Streaming:使用标准输入输出Streaming与进程进行通信。     Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。     进度和状态更新:     一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。     状态消息与客户端的通信:     1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。     2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。     3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。     4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。     5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。     6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。     前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。     作业的完成:     当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。     如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。     jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。     失败     实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。     1.任务失败         子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。     子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。     任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。     任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。     任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。     2.tasktracker失败     tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。     即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。     3.jobtracker失败     老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。作业调度:     早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。     Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)     Capacity Scheduler:     集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。shuffle和排序:     MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。     整个shuffle的流程应该是这样:     map结果划分partition  排序sort 分割spill   合并同一划分   合并同一划分  合并结果排序 reduce处理 输出     Map端:     写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。     写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。     合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。     在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。     内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。     如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。     写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。     reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。     Reduce端:     复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。     合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。     Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。     Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。     Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。     如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。     随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。     排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。     reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。     细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。

Hadoop集群执行MapReduce时报错:

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:     1.JobClient 写代码,配置作业,提交作业。     2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。     3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。     4.HDFS:保存作业数据、配置信息等,保存作业结果。Map/Reduce 作业总体执行流程:     代码编写 ----> 作业配置  ---->  作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ---->  Reduce任务分配与执行 ---->  输出结果而对于每个作业的执行,又包含:     输入准备 ----> 任务执行 ----> 输出结果作业提交JobClient:     JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。     submitJob方法作业提交的过程:     1.向JobTracker请求一个新的JobId。     2.检查作业相关路径,如果路径不正确就会返回错误。     3.计算作业输入分片及其划分信息。     4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。     5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。作业的初始化JobTracker:     JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。     当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。     初始化过程就是JobInProgress对象的initTasks方法进行初始化的。     初始化步骤:          1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。          2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。          3.最后就是创建两个初始化task,进行map和reduce的初始化。任务的分配JobTracker:    消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。    分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。    分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。任务的执行TaskTracker:     TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。     1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。     2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。     3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。     在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。     对于单个Map,任务执行的简单流程是:     1.分配任务执行参数     2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)     3.配置log文件夹,配置map任务的通信和输出参数     4.读取input split,生成RecordReader读取数据     5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。     6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。Streaming和Pipes:     Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。     Streaming:使用标准输入输出Streaming与进程进行通信。     Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。     进度和状态更新:     一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。     状态消息与客户端的通信:     1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。     2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。     3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。     4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。     5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。     6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。     前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。     作业的完成:     当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。     如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。     jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。     失败     实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。     1.任务失败         子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。     子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。     任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。     任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。     任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。     2.tasktracker失败     tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。     即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。     3.jobtracker失败     老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。作业调度:     早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。     Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)     Capacity Scheduler:     集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。shuffle和排序:     MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。     整个shuffle的流程应该是这样:     map结果划分partition  排序sort 分割spill   合并同一划分   合并同一划分  合并结果排序 reduce处理 输出     Map端:     写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。     写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。     合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。     在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。     内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。     如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。     写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。     reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。     Reduce端:     复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。     合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。     Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。     Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。     Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。     如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。     随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。     排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。     reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。     细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。

hadoop和mapreduce是一种什么关系?

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:     1.JobClient 写代码,配置作业,提交作业。     2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。     3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。     4.HDFS:保存作业数据、配置信息等,保存作业结果。Map/Reduce 作业总体执行流程:     代码编写 ----> 作业配置  ---->  作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ---->  Reduce任务分配与执行 ---->  输出结果而对于每个作业的执行,又包含:     输入准备 ----> 任务执行 ----> 输出结果作业提交JobClient:     JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。     submitJob方法作业提交的过程:     1.向JobTracker请求一个新的JobId。     2.检查作业相关路径,如果路径不正确就会返回错误。     3.计算作业输入分片及其划分信息。     4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。     5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。作业的初始化JobTracker:     JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。     当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。     初始化过程就是JobInProgress对象的initTasks方法进行初始化的。     初始化步骤:          1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。          2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。          3.最后就是创建两个初始化task,进行map和reduce的初始化。任务的分配JobTracker:    消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。    分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。    分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。任务的执行TaskTracker:     TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。     1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。     2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。     3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。     在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。     对于单个Map,任务执行的简单流程是:     1.分配任务执行参数     2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)     3.配置log文件夹,配置map任务的通信和输出参数     4.读取input split,生成RecordReader读取数据     5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。     6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。Streaming和Pipes:     Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。     Streaming:使用标准输入输出Streaming与进程进行通信。     Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。     进度和状态更新:     一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。     状态消息与客户端的通信:     1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。     2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。     3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。     4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。     5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。     6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。     前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。     作业的完成:     当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。     如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。     jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。     失败     实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。     1.任务失败         子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。     子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。     任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。     任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。     任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。     2.tasktracker失败     tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。     即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。     3.jobtracker失败     老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。作业调度:     早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。     Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)     Capacity Scheduler:     集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。shuffle和排序:     MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。     整个shuffle的流程应该是这样:     map结果划分partition  排序sort 分割spill   合并同一划分   合并同一划分  合并结果排序 reduce处理 输出     Map端:     写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。     写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。     合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。     在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。     内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。     如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。     写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。     reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。     Reduce端:     复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。     合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。     Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。     Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。     Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。     如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。     随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。     排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。     reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。     细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。

Hadoop的组件MapReduce和HDFS分别是做什么的?

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:     1.JobClient 写代码,配置作业,提交作业。     2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。     3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。     4.HDFS:保存作业数据、配置信息等,保存作业结果。Map/Reduce 作业总体执行流程:     代码编写 ----> 作业配置  ---->  作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ---->  Reduce任务分配与执行 ---->  输出结果而对于每个作业的执行,又包含:     输入准备 ----> 任务执行 ----> 输出结果作业提交JobClient:     JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。     submitJob方法作业提交的过程:     1.向JobTracker请求一个新的JobId。     2.检查作业相关路径,如果路径不正确就会返回错误。     3.计算作业输入分片及其划分信息。     4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。     5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。作业的初始化JobTracker:     JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。     当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。     初始化过程就是JobInProgress对象的initTasks方法进行初始化的。     初始化步骤:          1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。          2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。          3.最后就是创建两个初始化task,进行map和reduce的初始化。任务的分配JobTracker:    消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。    分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。    分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。任务的执行TaskTracker:     TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。     1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。     2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。     3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。     在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。     对于单个Map,任务执行的简单流程是:     1.分配任务执行参数     2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)     3.配置log文件夹,配置map任务的通信和输出参数     4.读取input split,生成RecordReader读取数据     5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。     6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。Streaming和Pipes:     Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。     Streaming:使用标准输入输出Streaming与进程进行通信。     Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。     进度和状态更新:     一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。     状态消息与客户端的通信:     1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。     2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。     3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。     4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。     5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。     6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。     前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。     作业的完成:     当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。     如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。     jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。     失败     实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。     1.任务失败         子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。     子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。     任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。     任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。     任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。     2.tasktracker失败     tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。     即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。     3.jobtracker失败     老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。作业调度:     早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。     Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)     Capacity Scheduler:     集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。shuffle和排序:     MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。     整个shuffle的流程应该是这样:     map结果划分partition  排序sort 分割spill   合并同一划分   合并同一划分  合并结果排序 reduce处理 输出     Map端:     写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。     写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。     合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。     在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。     内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。     如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。     写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。     reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。     Reduce端:     复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。     合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。     Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。     Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。     Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。     如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。     随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。     排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。     reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。     细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。

Hadoop,Combiner有什么用?

Combiner,Combiner号称本地的Reduce,Reduce最终的输入,是Combiner的输出。Combiner是用reducer来定义的,多数的情况下Combiner和reduce处理的是同一种逻辑,所以job.setCombinerClass()的参数可以直接使用定义的reduce。当然也可以单独去定义一个有别于reduce的Combiner,继承Reducer,写法基本上定义reduce一样。

如何使用Python为Hadoop编写一个简单的MapReduce程序

看视频真的会成为高手吗?视频教学真的好吗,不会让人感到烦躁、困倦?大讲台(百度搜索即可)采用任务驱动的学习模式,提倡自适应的学习,学习者根据个人自身情况,制定学习步骤和学习任务;教学过程以文字为主要内容载体,期间穿插视频,回归教育以学员为主体的核心,重在学习效果,学习体验,欢迎有兴趣的童鞋免费体验学习,体验不一样的学习方式!对了现在正是活动期间,全部课程最低仅需700元哦!!!

hadoop MapReduce Job失效模型

  hadoop设计的初衷就是容错 计算任务(MapReduce task)能够在节点宕机或其它随机错误下自行恢复   但是hadoop并不完美 在实际运营中 我发现MapReduce Job仍然经常会因为一些偶发性错误而   运行失败 所以我决定深入探究一下各种不同因素是如何导致job失败的   如果一个hadoop job的某个给定task在失败预定次(默认是 )后 整个job就会失败   这可以通过 mapred map max attempts 和 mapred reduce max attempts 属性来设置   一个task可能由于各种偶发原因而失败 比如我发现的情况就有磁盘满 hadoop本身的bug 或者硬件失效(e g : 磁盘只读)   下面是针对job失败的概率总结的一个大致公式:   P[个别task失败的最大次数] = P[task失败] ^ (task总失败次数)   P[task成功] = P[个别task失败的最大次数]   P[job成功] = P[task成功] ^ (task数量)   P[job失败] = P[job成功]   P[job失败] = ( P[task失败] ^ (task总失败次数) ) ^ (task数量)   task失败的最大次数通过mapred max max tracker failures设置(默认为 )   我们来分析一个负载为 个map task的job:   task数量  最大失败数  P[task失败]  P[job失败]                                                                                             如果task失败概率低于 %的话 job失败概率几乎可以不计 重点就是保证集群稳定 保持较小失败概率     我们同样可以看到 mapred max tracker failures 参数的重要性 如果其取值小于 时 job失败的概率明显上升 就算task失败概率降低到 %     相较mapper而言 reducer运行的时间更长 这意味着其更容易遭受意外事故 也就是说 我们可以肯定reducer的失败概率比mapper要大很多 但是从另一方面来说 通常reducer task的数量要小于mapper数量 这个又作了一定补偿   下面我们来看看一组基于reducer的失效概率分析:   task数量  最大失败数  P[task失败]  P[job失败]                                                                                            从上述数据中可以发现 只有当reducer失败的概率超过 %时才会导致一定的job失败几率 (同样可发现 task最大失败数低于 时 job失败率显著上升)   坏节点(有故障的机器节点)   在整个失效模型中还有一个很重要的因子需要考虑 那就是失效节点 通常若出现整个节点失效 那么在此节点上运行的所有task都会失败 失效原因可能是因为磁盘损坏(通常的症状是出现 磁盘只读   或 盘符丢失   ) 磁盘写满等 一旦出现坏节点 你会发现在此节点被列入黑名单之前(被job列入黑名单的节点不会被job再次分配其任务) 会有一大堆 map/reduce task失败 为了简化我们的分析 我假设给定坏节点会导致固定数量的task失败 另外 我假设给定task只会在给定坏节点上中招一次 因为节点会在不久后被列入黑名单 我们用 b tasks 来标记在坏节点上失败过的task 其它task标记为 n tasks b task 会在坏节点上遭受一次失败 所以后续如果job再出现 最大task失败数 次失败task就会导致job失败 在我们的集群中 我曾发现一个坏节点引发 个task失败 那么我就以此为据 给出reduce阶段失效概率的公式:   b tasks数量 = 坏节点数量 *   P[所有b task都成功] = ( P[task失败概率] ^ (最大task失败数 )) ^ (b tasks数量)   P[所有n task都成功] = ( P[task失败] ^ (最大task失败数)) ^ (task数量 b task数量)   P[job成功] = P[所有b task成功] * P[所有n task成功]   P[job成功] = ( P[task失败]^(最大task失败数 ))^(b task数量) * ( P[task失败]^(最大task失败数))^(最大task数 b task数)   P[job失败] = P[job成功]   因为mapper数量通常较多 所以少数坏节点对于以上公式计算的结果并没有太大的出入 但对reducer而言 其数量较少 所以以上公式计算出   的结果就有比较明显的变化:   task数量  最大失败数  P[task失败]  坏节点数量  P[job失败]                                                                                                   值得庆幸的是结果并没有发生戏剧性的变化 在 个坏节点的情况下 只导致失败率提高到了排除坏节点条件下的 ~ 倍 lishixinzhi/Article/program/Java/hx/201311/25960

hadoop mapreduce的整个map/reduce过程里面map和reduce分别是在master上还是在slaver上执行?

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:     1.JobClient 写代码,配置作业,提交作业。     2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。     3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。     4.HDFS:保存作业数据、配置信息等,保存作业结果。Map/Reduce 作业总体执行流程:     代码编写 ----> 作业配置  ---->  作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ---->  Reduce任务分配与执行 ---->  输出结果而对于每个作业的执行,又包含:     输入准备 ----> 任务执行 ----> 输出结果作业提交JobClient:     JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。     submitJob方法作业提交的过程:     1.向JobTracker请求一个新的JobId。     2.检查作业相关路径,如果路径不正确就会返回错误。     3.计算作业输入分片及其划分信息。     4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。     5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。作业的初始化JobTracker:     JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。     当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。     初始化过程就是JobInProgress对象的initTasks方法进行初始化的。     初始化步骤:          1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。          2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。          3.最后就是创建两个初始化task,进行map和reduce的初始化。任务的分配JobTracker:    消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。    分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。    分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。任务的执行TaskTracker:     TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。     1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。     2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。     3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。     在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。     对于单个Map,任务执行的简单流程是:     1.分配任务执行参数     2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)     3.配置log文件夹,配置map任务的通信和输出参数     4.读取input split,生成RecordReader读取数据     5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。     6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。Streaming和Pipes:     Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。     Streaming:使用标准输入输出Streaming与进程进行通信。     Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。     进度和状态更新:     一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。     状态消息与客户端的通信:     1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。     2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。     3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。     4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。     5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。     6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。     前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。     作业的完成:     当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。     如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。     jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。     失败     实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。     1.任务失败         子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。     子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。     任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。     任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。     任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。     2.tasktracker失败     tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。     即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。     3.jobtracker失败     老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。作业调度:     早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。     Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)     Capacity Scheduler:     集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。shuffle和排序:     MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。     整个shuffle的流程应该是这样:     map结果划分partition  排序sort 分割spill   合并同一划分   合并同一划分  合并结果排序 reduce处理 输出     Map端:     写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。     写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。     合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。     在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。     内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。     如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。     写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。     reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。     Reduce端:     复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。     合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。     Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。     Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。     Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。     如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。     随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。     排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。     reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。     细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。

如何使用Hadoop的ChainMapper和ChainReducer

hadoop的mapreduce任务是在集群环境下跑的,所以调试存在一定的复杂度,但是貌似还是可以使用debug的,但是具体方式我没有实现,只是看到什么资料都有介绍。 如果只是想调试mapper和reducer的输入输出是否正确可以使用mrunit进行调试

Hadoop:是什么,如何工作,可以用来做什么

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:     1.JobClient 写代码,配置作业,提交作业。     2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。     3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。     4.HDFS:保存作业数据、配置信息等,保存作业结果。Map/Reduce 作业总体执行流程:     代码编写 ----> 作业配置  ---->  作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ---->  Reduce任务分配与执行 ---->  输出结果而对于每个作业的执行,又包含:     输入准备 ----> 任务执行 ----> 输出结果作业提交JobClient:     JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。     submitJob方法作业提交的过程:     1.向JobTracker请求一个新的JobId。     2.检查作业相关路径,如果路径不正确就会返回错误。     3.计算作业输入分片及其划分信息。     4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。     5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。作业的初始化JobTracker:     JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。     当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。     初始化过程就是JobInProgress对象的initTasks方法进行初始化的。     初始化步骤:          1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。          2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。          3.最后就是创建两个初始化task,进行map和reduce的初始化。任务的分配JobTracker:    消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。    分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。    分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。任务的执行TaskTracker:     TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。     1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。     2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。     3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。     在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。     对于单个Map,任务执行的简单流程是:     1.分配任务执行参数     2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)     3.配置log文件夹,配置map任务的通信和输出参数     4.读取input split,生成RecordReader读取数据     5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。     6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。Streaming和Pipes:     Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。     Streaming:使用标准输入输出Streaming与进程进行通信。     Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。     进度和状态更新:     一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。     状态消息与客户端的通信:     1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。     2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。     3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。     4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。     5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。     6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。     前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。     作业的完成:     当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。     如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。     jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。     失败     实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。     1.任务失败         子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。     子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。     任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。     任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。     任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。     2.tasktracker失败     tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。     即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。     3.jobtracker失败     老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。作业调度:     早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。     Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)     Capacity Scheduler:     集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。shuffle和排序:     MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。     整个shuffle的流程应该是这样:     map结果划分partition  排序sort 分割spill   合并同一划分   合并同一划分  合并结果排序 reduce处理 输出     Map端:     写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。     写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。     合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。     在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。     内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。     如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。     写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。     reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。     Reduce端:     复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。     合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。     Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。     Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。     Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。     如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。     随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。     排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。     reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。     细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。

Hadoop,MapReduce,YARN和Spark的区别与联系

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:     1.JobClient 写代码,配置作业,提交作业。     2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。     3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。     4.HDFS:保存作业数据、配置信息等,保存作业结果。Map/Reduce 作业总体执行流程:     代码编写 ----> 作业配置  ---->  作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ---->  Reduce任务分配与执行 ---->  输出结果而对于每个作业的执行,又包含:     输入准备 ----> 任务执行 ----> 输出结果作业提交JobClient:     JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。     submitJob方法作业提交的过程:     1.向JobTracker请求一个新的JobId。     2.检查作业相关路径,如果路径不正确就会返回错误。     3.计算作业输入分片及其划分信息。     4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。     5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。作业的初始化JobTracker:     JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。     当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。     初始化过程就是JobInProgress对象的initTasks方法进行初始化的。     初始化步骤:          1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。          2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。          3.最后就是创建两个初始化task,进行map和reduce的初始化。任务的分配JobTracker:    消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。    分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。    分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。任务的执行TaskTracker:     TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。     1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。     2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。     3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。     在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。     对于单个Map,任务执行的简单流程是:     1.分配任务执行参数     2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)     3.配置log文件夹,配置map任务的通信和输出参数     4.读取input split,生成RecordReader读取数据     5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。     6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。Streaming和Pipes:     Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。     Streaming:使用标准输入输出Streaming与进程进行通信。     Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。     进度和状态更新:     一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。     状态消息与客户端的通信:     1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。     2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。     3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。     4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。     5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。     6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。     前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。     作业的完成:     当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。     如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。     jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。     失败     实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。     1.任务失败         子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。     子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。     任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。     任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。     任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。     2.tasktracker失败     tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。     即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。     3.jobtracker失败     老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。作业调度:     早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。     Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)     Capacity Scheduler:     集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。shuffle和排序:     MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。     整个shuffle的流程应该是这样:     map结果划分partition  排序sort 分割spill   合并同一划分   合并同一划分  合并结果排序 reduce处理 输出     Map端:     写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。     写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。     合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。     在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。     内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。     如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。     写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。     reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。     Reduce端:     复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。     合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。     Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。     Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。     Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。     如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。     随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。     排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。     reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。     细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。

如何在hadoop环境下执行mapreduce任务

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:     1.JobClient 写代码,配置作业,提交作业。     2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。     3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。     4.HDFS:保存作业数据、配置信息等,保存作业结果。Map/Reduce 作业总体执行流程:     代码编写 ----> 作业配置  ---->  作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ---->  Reduce任务分配与执行 ---->  输出结果而对于每个作业的执行,又包含:     输入准备 ----> 任务执行 ----> 输出结果作业提交JobClient:     JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。     submitJob方法作业提交的过程:     1.向JobTracker请求一个新的JobId。     2.检查作业相关路径,如果路径不正确就会返回错误。     3.计算作业输入分片及其划分信息。     4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。     5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。作业的初始化JobTracker:     JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。     当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。     初始化过程就是JobInProgress对象的initTasks方法进行初始化的。     初始化步骤:          1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。          2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。          3.最后就是创建两个初始化task,进行map和reduce的初始化。任务的分配JobTracker:    消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。    分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。    分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。任务的执行TaskTracker:     TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。     1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。     2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。     3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。     在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。     对于单个Map,任务执行的简单流程是:     1.分配任务执行参数     2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)     3.配置log文件夹,配置map任务的通信和输出参数     4.读取input split,生成RecordReader读取数据     5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。     6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。Streaming和Pipes:     Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。     Streaming:使用标准输入输出Streaming与进程进行通信。     Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。     进度和状态更新:     一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。     状态消息与客户端的通信:     1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。     2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。     3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。     4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。     5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。     6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。     前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。     作业的完成:     当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。     如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。     jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。     失败     实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。     1.任务失败         子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。     子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。     任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。     任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。     任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。     2.tasktracker失败     tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。     即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。     3.jobtracker失败     老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。作业调度:     早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。     Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)     Capacity Scheduler:     集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。shuffle和排序:     MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。     整个shuffle的流程应该是这样:     map结果划分partition  排序sort 分割spill   合并同一划分   合并同一划分  合并结果排序 reduce处理 输出     Map端:     写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。     写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。     合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。     在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。     内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。     如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。     写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。     reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。     Reduce端:     复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。     合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。     Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。     Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。     Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。     如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。     随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。     排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。     reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。     细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。

如何在Windows下面运行hadoop的MapReduce程序

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:     1.JobClient 写代码,配置作业,提交作业。     2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。     3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。     4.HDFS:保存作业数据、配置信息等,保存作业结果。Map/Reduce 作业总体执行流程:     代码编写 ----> 作业配置  ---->  作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ---->  Reduce任务分配与执行 ---->  输出结果而对于每个作业的执行,又包含:     输入准备 ----> 任务执行 ----> 输出结果作业提交JobClient:     JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。     submitJob方法作业提交的过程:     1.向JobTracker请求一个新的JobId。     2.检查作业相关路径,如果路径不正确就会返回错误。     3.计算作业输入分片及其划分信息。     4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。     5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。作业的初始化JobTracker:     JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。     当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。     初始化过程就是JobInProgress对象的initTasks方法进行初始化的。     初始化步骤:          1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。          2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。          3.最后就是创建两个初始化task,进行map和reduce的初始化。任务的分配JobTracker:    消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。    分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。    分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。任务的执行TaskTracker:     TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。     1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。     2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。     3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。     在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。     对于单个Map,任务执行的简单流程是:     1.分配任务执行参数     2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)     3.配置log文件夹,配置map任务的通信和输出参数     4.读取input split,生成RecordReader读取数据     5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。     6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。Streaming和Pipes:     Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。     Streaming:使用标准输入输出Streaming与进程进行通信。     Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。     进度和状态更新:     一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。     状态消息与客户端的通信:     1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。     2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。     3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。     4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。     5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。     6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。     前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。     作业的完成:     当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。     如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。     jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。     失败     实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。     1.任务失败         子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。     子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。     任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。     任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。     任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。     2.tasktracker失败     tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。     即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。     3.jobtracker失败     老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。作业调度:     早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。     Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)     Capacity Scheduler:     集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。shuffle和排序:     MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。     整个shuffle的流程应该是这样:     map结果划分partition  排序sort 分割spill   合并同一划分   合并同一划分  合并结果排序 reduce处理 输出     Map端:     写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。     写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。     合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。     在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。     内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。     如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。     写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。     reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。     Reduce端:     复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。     合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。     Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。     Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。     Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。     如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。     随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。     排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。     reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。     细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。

hadoop使用reducejoin更新图书表的作者和编辑

1. 首先,需要将图书表和作者表以及编辑表按照作者和编辑的名称进行分组,生成三个不同的MapReduce任务。2. 在每个Map阶段,需要将图书表中的作者和编辑名称作为键,将作者表和编辑表中的相同名称作为值,将这些键值对写入输出。3. 在Reduce阶段中,需要将每个键对应的值进行合并,并将合并后的结果写入输出,这样就得到了每个作者和每个编辑所对应的所有图书。4. 接下来,需要进行ReduceJoin操作,将作者和编辑的信息与图书表中的相关信息进行关联。这可以通过使用MapReduce任务来实现。

hadoop中存储文件系统hdfs的冗余机制是怎么进行的?有什么特点?

避免节点挂掉。

如何使用Hadoop的ChainMapper和ChainReducer

hadoop的mapreduce任务是在集群环境下跑的,所以调试存在一定的复杂度,但是貌似还是可以使用debug的,但是具体方式我没有实现,只是看到什么资料都有介绍。 如果只是想调试mapper和reducer的输入输出是否正确可以使用mrunit进行调试

hadoop mapper,reducer的value设置为job.setOutputValueClass(ArrayWritable.class); 时运行卡住

ArrayWritable不能直接作为mapreduce的输入输出类型。程序不是卡住了,而是报错了。估计是这个错误java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.hadoop.io.ArrayWritable.<init>()这个错误是ArrayWritable初始化异常,要自己实现一个ArrayWritable的派生类如果是text类型的数组 For example: public class TextArrayWritable extends ArrayWritable { public TextArrayWritable() { super(Text.class); } }即可,在maprecude中用TextArrayWritable 替换掉ArrayWritable A Writable for arrays containing instances of a class. The elements of this writable must all be instances of the same class. If this writable will be the input for a Reducer, you will need to create a subclass that sets the value to be of the proper type. For example: public class IntArrayWritable extends ArrayWritable { public IntArrayWritable() { super(IntWritable.class); } }==========================参考:http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/ArrayWritable.html

一个hadoop程序可以有2个reducer吗

1、2个reduce类是无法使用的,但是可以通过Combiner来提前运行reduce代码,因Combiner是在多个台机器上运行,某些场景下效率高于reduce,例如对大量数字求和,用Combiner预执行reduce代码比最终一次执行要快的多。2、设置多个reduce在分布式下运行,是可以的,通过job.setNumReduceTasks来设置reduce数量,设置多个reduce会出现多输出结果的情况(多文件)。

shadowofthesun歌寓意

shadowofthesun歌寓意是美好即将开始,一扫阴霾。

When she got back from the South,Susan had her car (wash)----throughtly

准确翻译:当她回到南方,苏珊把她的车(洗)----彻底。望采纳!谢谢!

hadoop出现了kettle还有用吗

determine the proper shim for hadoop Distro and version 大概意思是 为hadoop版本选择合适的套件。表格上面的一行:apache、cloudera、hortonworks、intel、mapr指的是发行方。点击他们来选择你 想连接的hadoop的发行方 。上图 以apache hadoop为例:Version 指版hadoop版本号 ,shim 指kettle提供给该hadoop套件的名称,Download 里面的 included in 5.0,5.1 指kettle的5.0、5.1版本安装包里面已经有内置的插件,一句话来讲 就是kettle5.1及5.0版本已有插件提供支持apache hadoop版本0.20.x 。不需要额外下载。NS 是不支持的意思 图片下面也有解释。

She had lived in the flat for thirty years and was a veritable magpie at hoarding.

magpie at hoarding解释为收藏家magpie喜鹊,有喜欢收集东西的习惯hoarding贮藏这里连起来翻译就可以了,意思肯定没错

There was once a farmer who had a fine orchard (果园). He worked very hard all his life and the o.

小题1:A小题2:D小题3:C小题4:D         

i remember the whole thing as if happened yesterday 为什么不是had

后面有过去时的标志词yesterday

astarhades是什么档次

三线品牌。以精彩,来自时间的见证作为品牌理念。坚持诚信、创新、服务、品质的核心经营理念,为消费者提供体验。astarhades翻译为阿斯塔哈德。

had.made,back,can,make,sad,ran,same,cane,分别怎么读

英文原文:had.made,back,can,make,sad,ran,same,cane英式音标:[hæd] . [meɪd] , [bæk] , [kæn] , [meɪk] , [sæd] , [ræn] , [seɪm] , [keɪn] 美式音标:[hæd] . [med] , [bæk] , [kæn] , [mek] , [sæd] , [ræn] , [sem] , [ken]

John Legend----tonight(best you ever had) 准确翻译 勉强敷衍的肯定拖出去毙了

我也觉得这歌特好听、其实我也在找这首歌的QQ空间背景音乐链接、斗胆想问一句、请问你有吗?有的话请发给我好吗?拜托~

BuddhaDharma是什么东西? 谁会翻译.?

佛法。

adult,shade,oxygen,nap,sip分别是什么意思?

adult 英[ˈædʌlt] 美[əˈdʌlt, ˈædʌlt] adj. 成年的; 成年人的; 成熟的; (智力、思想、行为) 成熟的; n. 成年的人或动物; [网络] 成虫; 成体; 成年人; [例句]Children under 14 must be accompanied by an adult.14岁以下的儿童必须由成人陪同。[其他] 复数:adults 形近词: exult indult shade 英[ʃeɪd] 美[ʃed] n. 遮阳,遮棚; 挡风物; 玻璃罩; (画的) 阴暗部分; vt. 遮蔽; 险胜; 加灯罩; 画阴影于…之上; vi. 逐渐变化; [例句]You sold wrapping paper for paint, T shirts for new furniture, and magazine subscriptions for shade trees in the school playground.你们为重新粉刷学校兜售包装纸,为购置新家具兜售体恤衫,为在学校操场上种植遮阳树劝人订阅各种杂志。[其他] 第三人称单数:shades 复数:shades 现在分词:shading过去式:shaded 过去分词:shaded oxygen 英[ˈɒksɪdʒən] 美[ˈɑ:ksɪdʒən] n. [化] 氧,氧气; [网络] [化]氧; 氧气; [例句]Ozone is a highly reactive form of oxygen gas.臭氧是一种非常活跃的氧气形态。[其他] 形近词: oxygon ocygen nap 英[næp] 美[næp] n. 绒毛; 小睡,打盹; 一种牌戏; 孤注一掷; vi. 打瞌睡; 疏忽; vt. 使起毛; [例句]The cloth from which such a coat is made, usually of wool, often with a heavy nap.制成麦基诺厚大衣的布料,通常是毛的,表面覆有很厚的绒毛。[其他] 第三人称单数:naps 复数:naps 现在分词:napping 过去式:napped过去分词:napped SIP abbr. solvent-in-pulp 溶剂矿浆萃取; [网络] 会话发起协议; 矽智财; [例句]It will provide methods that expose message interactions with the SIPinfrastructure including subscribe, unsubscribe, and sendNotifications.它将提供方法来公开与SIP基础架构的消息交互,这些方法包括subscribe、unsubscribe和sendNotifications。SIP服务器SIP(Session Initiation Protocol,会话初始协议)的开发目的是用来帮助提供跨越因特网的高级电话业务。因特网电话(IP电话)正在向一种正式的商业电话模式演进,SIP就是用来确保这种演进实现而需要的NGN(下一代网络)系列协议中重要的一员。支持H.264协议。

Pig和Hive有什么不同啊?hadoop

网页链接

if only we had nwe had not made any mi

如果我们之间只是一场美丽的邂逅,而不能与对方天长地久地在一起,那我宁愿我们不曾相遇.

had的音标

had /haed /.

have与had的用法归纳高中

区别在于不同时态,两个单词其实表达为一个意思。如果是一般现在时,第三人称单数的时候用has,其它用have。如果是过去,全部都用had。has/have been是现在完成时,has用于第三人称单数,had been过去完成时,跟前者相比,比前者更为过去。 扩展资料   have的用法总结   (1)have作“有”讲,强调“所属关系”,含有“拥有”之意。其主语一般是人,有时也可以是物。   They have many new books. 他们有许多新书。   (2)have可以作“买”讲。   I want to have a kilo of beef. 我想买一公斤牛肉。   (3)have作“用、使用”讲。   Excuse me, may I have your bike, please? 打扰了,我可以用(借用)你的自行车吗?   (4)have+表示动作的名词(这类名词常由同形的`动词转化而来),意为“做(某事)”(=do sth. )   have a drink (of...) 喝一点(……)   have a look(at...) (朝……)看一眼   have a rest 休息一下   have a swim 游泳   这些短语常用于口语,这里的have可以用take替换。   (5)have+表示一日三餐的名词,意为“用餐”。   have breakfast/lunch/supper 吃早饭/午饭/晚饭   (6)have+表示食品、饮料等的名词,意为“吃;喝”(=eat, drink)。如:   have(some) bread 吃面包   have(a cup of )tea 喝(一杯)茶   这里的have也可以用take替换。   (7)have+表示某种活动的名词,意为“进行;举行”。如:   have a class (学生)上课   have a sports meeting 开运动会   (8)have+表示疾病的名词,意为“患(病)”,有时也用have got。   have a bad cold 患重感冒   have (got) a cough 咳嗽   (9)have+表示一段时间的名词,表示“经历;度过”。   have a good time 度过快乐的时光,玩得愉快   (10)have意为“邀请;招待”。   Thank you for having me. 感谢你们邀请我。

had的音标是什么

[h�0�3d]

had是什么时态的标志词

had是过去时。 例句: They had had their pet dog stuffed. 他们请人把他们的宠物狗制成了标本。 She had repented of what she had done. 她对自己所做的事深感懊悔。 扩展资料   Everything had gone.   一切都过去了。   The car had had it.   这辆车无法修复了。   We had walked further than I had realized.   在我不知不觉中我们已走得很远。

have has had那个是单数那个是复数

have是复数意义。如,Theyhavefinishedtheirwork.has是单数意义。如:Hehasfinishedhiswork.had是have的过去式,其前的句子主语单复数够可以的。仅供参考!

had作为一个单词可以单独使用对吧?

had是have的过去式,它能否单独使用要看具体情况:一、作为实义动词,had的意思是“吃,玩,上(课),有”等,用在一般过去时的句子中作谓语。例如,I had an English class yesterday.昨天我上英语课了。She had breakfast.她吃过早饭了。二、如果had为助动词,它没有具体的意思,一般不能单独使用,必须与其他动词的过去分词连用,在过去完成时的句子中。例如,I had had breakfast before he came back.他回来之前,我就吃完饭了。(不知题主是否学过现在完成时态,如果没学过,了解一下即可。)三、在某些固定短语在had不能单独有,否则就失去了短语的原意了。如,had better“最好…”,如果拆开,就不能表达“最好…”的意思了。总之,had一词根据词性不同而有不同用法,多总结吧!希望对你有帮助哦。

has和had是什么时态

  has和had这两个单词我们都很熟悉,那么你知道has和had是什么时态吗?下面是我为你整理的has、had的时态的相关资料,希望大家喜欢!   has、had的时态   has   一般现在时   v. 有,吃,得到,从事,允许,雇用,享有(have的第三人称单数)   had   一般过去式和过去完成时   v.有( have的过去式和过去分词 ); (亲属关系中)接受; 拿; 买到   has的造句   1. Beauty is an attitude. It has nothing to do with age.   美是一种态度,与年龄无关。   2. English has hurt me a thousand times, but I still regard it as my first love.   英语伤我千百遍,我待英语如初恋。   3. A changing world has put pressures on the corporation.   日新月异的世界使这家公司感到了压力。   4. There has been a busy start to polling in today"s local elections.   今天地方选举的投票一开始就人头攒动。   5. The Estonian parliament has passed a resolution declaring the republic fully independent.   爱沙尼亚议会已经通过了宣布共和国完全独立的决议。   6. The Politburo has been meeting in Peking to discuss the situation.   政治局已在北京召开会议讨论形势。   7. The strike has taken on overtones of a civil rights campaign.   罢工带上了民权运动的意味。   8. He has received extensive corrective surgery to his skull.   他的头骨做过大面积的矫形手术。   9. There has been a very mixed reaction to the decision.   对于这个决定的反应非常不一致。   had的用法   1. The house seemed muted, hushed as if it had been deserted.   房子里似乎悄然无声,安静得好像已经没人在住一样。   2. I had obtained the authentic details about the birth of the organization.   我已经掌握了有关该组织诞生的可靠的详细资料。   3. Barry had his nose put out of joint by Lucy"s aloof sophistication.   露西的冷淡与世故使得巴里十分不快。   4. I distinctly remember wishing I had not got involved.   我清楚地记得希望自己没有被牵扯进去。   5. It had once been the home of a wealthy nobleman.   这里曾是一个有钱贵族的宅邸。   6. The opportunity had gone. His mind scrabbled for alternatives.   机会已经失去。他苦苦思索别的方案。   7. I try to remember all the good times I"ve had here.   我试着回忆在这里度过的所有美好时光。   8. The commander and some of the men had been released.   指挥官和一些士兵已经获释。   9. They had a snobbish dislike for their intellectual and social inferiors.   他们非常势利,不喜欢智力和社会地位不如自己的人。   10. Mark had for some time been making advances towards her.   马克追她已经有一段时间了。

had是什么意思

have 的过去式, 有

英语题 had+过去式 have/has+过去分词有什么区别,都是什么时态?

前一个是过去完成时,发生在过去的过去,对现在造成的影响,后一个是现在完成时,过去发生的事对现在造成的影响

Had 和Has怎么区别呀?什么情况下使用哪一种?

has是have的单数形式、had是have的过去式。本质上来说,两个单词所处的语境是不同的.has一般用在一般时态,比如一般现在时,还有就是说明一般的普遍或者经常性的情况.而had则用在过去时,比如一般过去时,过去完成时.两个单词在主语人称不同时也有区别.has只在单数第三人称(he,she,it)或者主语是单个物体时使用.而had的使用范围比较广泛.在过去时中,任意人称都是had.这是has和had在作联系动词时的区别,在作情态动词的区别也是这样的.需要的是特殊语法中的多个物体也有用has的时候.有点长

have、had、has的区别与用法是什么

havehadhas这三个单词中文意思都是“拥有(表示一种状态)”的意思,但英文上用法有些区别,具体如下:1.在英语句子中,若句子的时态是一般现在时,则have/has适用于这种情况下,其中当主语是第一人称,第二人称,第三人称复数(I,They,have)时,使用have,当主语时第三人称单数(she/he),则使用has,如下列例句:I/You/Theyhaveabigapple.我/你/他们有一个大苹果。She/Hehasasmallapple.她/他有一个小苹果。2.若句子的时态时一般过去式,则无论人称是哪一种,都是用had。如下列例句:I/You/They/She/Hehadacar.我/你/他们/她/他过去有辆车。3.若句子是完成式的形式。则has/havebeen是表示现在完成时,has用于第三人称单数,hadbeen则用于过去完成时。如例句:IhavebeenstudyingEnglishfortenyears.我一直在学英语,已达十年之久。扩展资料:1.have英[həv] 美[hæv] 2.aux.用以构成完成式及完成式的不定式,表示已经…3.vt.有,具有;拿,取得;从事;必须,不得不4.n.〈口〉有产者,有钱人;富国;〈英俚〉欺骗,诈骗5.第三人称单数:has现在分词:having过去式:had过去分词:had参考资料:have-百度翻译

havehad放在一起什么意思?

表示的意思是在过去的某一时间之前某事已经被做。have had是现在完成时。现在完成时的结构是:have/has+V-ed,V-ed是动词的过去分词。比如:We have had dinner an hour ago.我们一个小时前已经吃过晚饭了。这里的have是现在完成时中的have,had是吃晚饭have dinner中have的过去分词。扩展资料have和had的区别区别在于不同时态,两个单词其实表达为一个意思。如果是一般现在时,第三人称单数的时候用has,其它用have。如果是过去,全部都用had。has/have been是现在完成时,has用于第三人称单数,had been过去完成时,跟前者相比,比前者更为过去。had通常作动词have的过去式和过去分词,表示得到,也可用于过去完成时和过去完成进行时,表示已经。

have,had,has的用法和区别

have、had、has的用法和区别:1、根据主语形式不同变形不同。如果是一般现在时,第三人称单数的时候用has,其它(如第一人称和第二人称)用have。2、主语是过去式时形式不同。如果是一般过去时,全部都用had。3、主语是完成式时形式不同。has/have been是现在完成时,has用于第三人称单数,had been过去完成时扩展资料:have英 [həv]   美 [hæv]  aux.用以构成完成式及完成式的不定式,表示已经…vt.有,具有;拿,取得;从事;必须,不得不n.〈口〉有产者,有钱人;富国;〈英俚〉欺骗,诈骗第三人称单数: has 现在分词: having 过去式: had 过去分词: had例句:When I met her, she had just returned from a job interview 我遇见她时,她刚参加完一场求职面试回来。You haven"t sent her away, have you? 你还没有把她送走,是吗?

had怎么读 had如何读

1、had英[həd]、美[həd] 2、v.有; 持有; 占有; 由…组成; 显示出(性质、特征); 3、aux.had 有时代替 if 用于从句句首,表示某种情况可能发生但并未发生; 4、[词典]have的过去分词和过去式; 5、[例句]This is too big ─ have you got a small one?这个太大——有没有小的? 6、[其他]原型: have

had的原形为什么不是has

因为“had”是“have”的过去式,所以“had”的原型为“have”。“has”是“have”的单三形式,所以“has”的原型也是“have”。综上所述:“had”和“has”的原型均是“have”。have用作助动词时,可与动词的过去分词或“been+现在分词”连用,构成动词的各种完成时态。have的过去分词had还可与主语倒置,构成虚拟条件状语从句。have (got) to作“不得不”解,强调客观上的必要,或由环境、习惯、协约等迫使而不得不做某事。have (got) to用于疑问句或否定句时,一般要借助do,在英式英语也可不借助do。词汇搭配:have abilities 有能力。have English at one"s finger-ends 熟练掌握英语。have a good understanding 有很好的理解力。have a house 有一所房子。have a liking 喜爱。have an objection 表示反对。have patience 有耐心。have a quick eye 有敏锐的眼光。have a sense of shame 有廉耻之心。have talents 有才智。have a taste 有鉴赏力。have a bath 洗个澡。

had是什么时态的标志

可用于一般过去时。 had v. 拥有(have 的过去式和过去分词);患病(have 的过去式和过去分词) aux. 已经(用于过去完成时和过去完成进行时) 例句: Everything had gone. 一切都过去了。 The car had had it. 这辆车无法修复了。 扩展资料   We had walked further than I had realized.   在我不知不觉中我们已走得很远。   He had heard that the trophy had been sold.   他听说那个奖杯已经被卖掉了。   We"ve had words.   我们吵过架。

had的过去式是什么

过去完成时的基本结构:由 had 加动词的过去分词构成。过去完成时的基本用法,有以下几种:(1)表示在过去某一时刻或动作以前完成了的动作,即“过去的过去”。可以用by,before等介词短语或一个时间状语从句来表示,也可以用一个表示过去的动作来表示,还可能通过上下文来表示。例如:By nine o"clock last night, we had got 200 pictures from the spaceship.到昨晚9点钟,我们已经收到200 张飞船发来的图片。(2)表示由过去的某一时刻开始,一直延续到过去另一时间的动作或状态,常和for, since构成的时间状语连用。例如:I had been at the bus stop for 20 minutes when a bus finally came.当车来的时候,我在车站已等了20分钟。(3)叙述过去发生的事情,在已叙述了过去发生的事情后,反过来追述或补述以前发生的动作时,常使用过去完成时。例如:Mr. Smith died yesterday. He had been a good friend of mine.史密斯先生昨天去世了。他以前是我的好友。扩展资料结构主语+had+动词过去分词①肯定句:主语+had+动词过去分词+其他.②否定句:主语+had+not+动词过去分词+其他.③一般疑问句:Had+主语+动词过去分词+其他?肯定回答:Yes,主语+had.否定回答:No,主语+hadn"t.④特殊疑问句:特殊疑问词或词组+had+主语+过去分词+其他?⑤被动语态:主语+had(not) +been+动词过去分词+其他.

have had 是什么时态跟句型啊,为什么要用两个have ,求解

have 是现在完成时 had是过去完成时 句子里的have had是现在完成时的have done结构

had(原形形式)

have(原形)-has(三单[第三人称单数])-had(过去式)-having(进行时)

hadhad怎么用?

hadhad是过去完成时态的用法。过去完成时由“助动词had+过去分词”构成,其中前一个had通用于各种人称,后一个had是have的过去分词,表示已经吃、已经有或其它意思。过去完成时表示在过去某一时间或动作之前已经发生或完成了的动作,即“过去的过去。如:1.Theyhadalreadyhadbreakfastbeforetheyarrivedatthehotel.2.Shehadfinishedwritingthecompositionby10:00thismorning. 另外还有如:havesthdone 请或让某人(别人)做某事 例:Thekinghadhadhistombbuiltbeforehedied.

英语had造句

I had lunch at home我在家吃午饭

had怎么读

had[英][hæd] [美][hæd] 生词本简明释义v.有( have的过去式和过去分词 );(亲属关系中)接受;拿;买到易混淆的单词:HadHAD以下结果由 金山词霸 提供柯林斯高阶英汉词典 网络释义 百科释义The auxiliary verb is pronounced /həd, strong 强读 hæd/. For the main verb, and for the meanings 2 to 5, the pronunciation is /"hæd/. 助动词读作/həd, strong 强读 hæd/。实义动词及义项2至义项5在句中作重读。1.Had is the past tense and past participle of have .2.AUX(had 有时代替 if 用于从句句首,表示某种情况可能发生但并未发生)Had is sometimes used instead of "if" to begin a clause which refers to a situation that might have happened but did not. For example, the clause "had he been elected" means the same as "if he had been elected".

had属于什么词徃

动词had 基本词汇 英 [hæd]     美 [hæd]    v.有;吃;得到(动词have的过去式和过去分词)aux.已经(用于过去完成时和过去完成进行时)

has had的区别是什么?

has had意思是:有;已经发生什么变化has had,是现在完成时的时态,然后第一个has是助动词,是have的主语第三人称;第二个had是动词have 的过去分词,它才是真正表达动作的谓语。如:Not every one has had a friend.  并不是所有的人都有过一个朋友。词汇解析:have读法:英 [həv;hæv]  美 [həv;hæv] 释义:v. 有;让;拿;从事;允许aux. 已经短语:have some 有一些;吃一点吧have to be 必定是,无疑是have to do 不得不做某事变形词:第三人称单数: has 现在分词: having 过去式: had 过去分词: had扩展资料词语用法:1、have用作助动词时,可与动词的过去分词或“been+现在分词”连用,构成动词的各种完成时态。2、have的过去分词had还可与主语倒置,构成虚拟条件状语从句。3、have (got) to作“不得不”解,强调客观上的必要,或由环境、习惯、协约等迫使而不得不做某事。4、have (got) to用于疑问句或否定句时,一般要借助do,在英式英语也可不借助do。5、have的基本意思是“有”,指物质上的所有、身心上的具有或构成上的含有。引申可作“享有”“容许”“招致”等解。

have、 had、 has的区别是什么?

have、had、has的用法和区别:1、have、had、has区别在于不同时态和不同人称之间,三个单词其实表达为一个意思。2、如果是一般现在时,第三人称单数的时候用has,其它(如第一人称和第二人称)用have。3、如果是一般过去时,全部都用had。4、has/have been是现在完成时,has用于第三人称单数,had been过去完成时,跟前者相比,比前者更为过去。拓展资料have1、Alex has already gone 亚历克斯已经走了。2、When I met her, she had just returned from a job interview 我遇见她时,她刚参加完一场求职面试回来。3、You haven"t sent her away, have you? 你还没有把她送走,是吗?4、He arrived in San Francisco, having left New Jersey on January 19th 1月19日离开新泽西后,他到了旧金山。5、I went out and had a walk around 我出去在周围散了一会儿步。had1、Had he succeeded, he would have acquired a monopoly 要是他当时成功了,他就会取得垄断地位。2、If your customer thinks he"s been had, you have to make him happy. 要是顾客感觉自己当了冤大头,你就得设法让他消气。3、Unless she loses some weight, she"s had it 除非她减点儿肥,否则她就没戏了。4、I"ve had it. Let"s call it a day 我受不了了。今天就到这儿吧。5、Had I known what the problem was, we could have addressed it. 如果我当时知道问题出在哪里,我们就可以设法解决了。

have与had的用法

have的用法:1、have +过去分词,构成完成时态。例句:He has left for Japan.。他已去了日本。2、have + been +现在分词,构成完成进行时。例句:I have been studying English for 8 years。我学英语已达八年了。3、have+been +过去分词,构成完成式被动语态。例句:English has been taught in China for many years。中国教英语已经多年。had的用法:1、后接过去分词,构成现在和过去完成、完成进行时。例句:He has gone to Beijing。他已经到北京去了。2、had也可以作为一般动词,意为“拥有”“进行”“吃”“使得”“让”等.如果后接不定式,意为“不得不”。例句:I had my hair cut yesterday. 我昨天理了发。扩展资料:have美 [həv] 英 [həv]v.有;吃;让;得到n.有钱人;(天然资源多的)富国;诈骗auxv.与过去分词连用构成完成时网络拥有;具有;现在完成时第三人称单数:has  现在分词:having  过去分词:had  反义词:lack,lose,abstain,seek,encouragehad美 [hæd] 英 [hæd]v.have(有)的过去式和过去分词网络有空穴储存层的光敏二极管(Hole Accumulation Diode);吃;have的过去式同义词:must,make sure,think of,suffer from,organize

had是什么时态

  had可以用于过去时,此时是动词,如Ihadmydinnerat8.p.m.aloneinthecafeteria。再就是作为非谓语,可以是过去完成时haddone。如Ihadlostmybook。过去完成进行时hadbeendoing。如Ihadbeenplayingfootballsince7yearsold。

have had什么意思?

“have had”即“have done”表示现在完成时,had是have的过去分词形式,这里只不过刚好那个done又是have而已。例句中植物的基因到现在时刻已经得到了改变,基因得到改变这个动作已经到现在完成了,所以用现在完成时态。现在完成时的用法:主语+have(has)+过去分词(done) 1、现在完成时用来表示现在之前已发生过或完成的动作或状态,但其结果却和现在有联系,也就是说,动作或状态发生在过去但它的影响现在还存在。I have lost my wallet.(含义是:现在我没有钱花了。)2. 现在完成时可以用来表示发生在过去某一时刻的,持续到现在的情况,常与for,since连用。Mary has been ill for three days。3、现在完成时往往同表示不确定的过去时间状语连用,如already, yet, just, before, recently,still, lately等:He has already obtained a scholarship。

has和had的用法区别

has具有吃、有、得到、从事、允许等含义;had通常作动词have的过去式和过去分词,表示得到,也可用于过去完成时和过去完成进行时,表示已经。1、has用于一般现在时,还有就是说明一般的普遍或者经常性的情况。 She has become a lot more tolerant and communicative. 她变得宽容多了,也爱说话了。 He has fooled a lot of people into believing he is a rich man. 他骗了许多人,让人相信他是个富翁。 2、has只在单数第三人称(he,she,it)或者主语是单个物体时使用。 Beauty is an attitude. It has nothing to do with age. 美是一种态度,与年龄无关。 My husband has several shirts of different colors. 我丈夫有好几件颜色不同的衬衫。 3、had则用在一般过去时,过去完成时。 He had an operation last year to widen a heart artery 去年他接受了一个扩张心脏动脉的手术。 He had our friend"s cap! 他拿了我们朋友的帽子! 4、had的使用范围比较广泛,在过去时中,任意人称都是had。 I had a coke in the mini-bar. 我拿了一瓶小吧柜中的可乐。 Have you had some training related to telesales in your past years? 在过去的几年中,你接受过电话销售的相关培训吗?

had的时态是什么

“have had”即“have done”表示现在完成时,had是have的过去分词形式,这里只不过刚好那个done又是have而已。例句中植物的基因到现在时刻已经得到了改变,基因得到改变这个动作已经到现在完成了,所以用现在完成时态。 扩展资料   现在完成时的用法   主语+have(has)+过去分词(done)   1.现在完成时用来表示现在之前已发生过或完成的动作或状态,但其结果却和现在有联系,也就是说,动作或状态发生在过去但它的影响现在还存在.   I have lost my wallet.(含义是:现在我没有钱花了.)   2. 现在完成时可以用来表示发生在过去某一时刻的.,持续到现在的情况,常与for,since连用.   Mary has been ill for three days.   3. 现在完成时往往同表示不确定的过去时间状语连用,如already, yet, just, before, recently,still, lately等:   He has already obtained a scholarship.

has had连用什么意思

已经

had怎么读

1、had英[h?d]、美[h?d]2、v.有; 持有; 占有; 由…组成; 显示出(性质、特征);3、aux.had 有时代替 if 用于从句句首,表示某种情况可能发生但并未发生;4、[词典]have的过去分词和过去式;5、[例句]This is too big ─ have you got a small one?这个太大——有没有小的?6、[其他]原型: have

Had的过去式?

你好,had是have的过去式。have v. 有;吃;得到(动词have的过去式和过去分词)aux. 已经(用于过去完成时和过去完成进行时)所以had没有过去式。望采纳。

had是什么时态

had可以用于过去时,此时是动词,如Ihadmydinnerat8.p.m.aloneinthecafeteria。再就是作为非谓语,可以是过去完成时haddone。如Ihadlostmybook。过去完成进行时hadbeendoing。如Ihadbeenplayingfootballsince7yearsold。

have had的区别在哪里?

“have had”即“have done”表示现在完成时,had是have的过去分词形式,这里只不过刚好那个done又是have而已。例句中植物的基因到现在时刻已经得到了改变,基因得到改变这个动作已经到现在完成了,所以用现在完成时态。现在完成时的用法 主语+have(has)+过去分词(done) 1.现在完成时用来表示现在之前已发生过或完成的动作或状态,但其结果却和现在有联系,也就是说,动作或状态发生在过去但它的影响现在还存在. I have lost my wallet.(含义是:现在我没有钱花了.)2. 现在完成时可以用来表示发生在过去某一时刻的,持续到现在的情况,常与for,since连用. Mary has been ill for three days. 3. 现在完成时往往同表示不确定的过去时间状语连用,如already, yet, just, before, recently,still, lately等: He has already obtained a scholarship. have 是现在完成时 had是过去完成时 句子里的have had是现在完成时的have done结构。have done为现在完成时态,后面的had是have(有)的过去分词形式。

had中文什么意思

  had 英[həd] 美[hæd]  v. 有( have的过去式和过去分词 ); (亲属关系中) 接受; 拿; 买到;  [例句]Had he succeeded, he would have acquired a monopoly  要是他当时成功了,他就会取得垄断地位。  [其他] 原型: have

have, has, had的区别是什么?

have had has这三个单词中文意思都是“拥有(表示一种状态)”的意思,但英文上用法有些区别,具体如下:1.在英语句子中,若句子的时态是一般现在时,则have/has适用于这种情况下,其中当主语是第一人称,第二人称,第三人称复数(I,They,have)时,使用have,当主语时第三人称单数(she/he),则使用has,如下列例句:I/You/They have a big apple.我/你/他们 有一个大苹果。She/He has a small apple.她/他有一个小苹果。2.若句子的时态时一般过去式,则无论人称是哪一种,都是用had。如下列例句:I/You/They/She/He had a car. 我/你/他们/她/他过去有辆车。3.若句子是完成式的形式。则has/have been是表示现在完成时,has用于第三人称单数,had been则用于过去完成时。如例句:I have been studying English for ten years.我一直在学英语,已达十年之久。扩展资料:1.have英 [həv]   美 [hæv]  2.aux.用以构成完成式及完成式的不定式,表示已经…3.vt.有,具有;拿,取得;从事;必须,不得不4.n.〈口〉有产者,有钱人;富国;〈英俚〉欺骗,诈骗5.第三人称单数: has 现在分词: having 过去式: had 过去分词: had参考资料:have-百度翻译

had had怎么用?

had had是过去完成时态的用法。过去完成时由“助动词 had + 过去分词”构成,其中 前一个had 通用于各种人称,后一个had是have的过去分词,表示已经吃、已经有或其它意思。 过去完成时表示在过去某一时间或动作之前已经发生或完成了的动作,即“过去的过去。如:1.They had already had breakfast before they arrived at the hotel. 2.She had finished writing the composition by 10 :00 this morning. 另外还有如:have sth done 请或让某人(别人)做某事 例: The king had had his tomb built before he died.希望对你有帮助 希望采纳

have,has,had 用法告诉下吧...

原形三单过去时(完成时)

had 是什么时态

had是过去时态。 had: v.有;持有;占有;由…组成;显示出(性质、特征)。 aux.had 有时代替 if 用于从句句首,表示某种情况可能发生但并未发生。 have的过去分词和过去式。 扩展资料   Had I known that I would never have come.   要是早知道,我绝不会来的。   He had to go to hospital for treatment.   他不得不去医院接受治疗。   I"ve had to put my plans into cold storage.   我不得不把我的"计划暂时搁置起来。   I had a long conversation with her the other day.   前几天我与她作了一次长谈。   We had no say in the decision to sell the company.   在决定出售公司的问题上,我们没有发言权。
 1 2  下一页  尾页