barriers / 阅读 / 详情

Javascript中内建函数reduce的应用详解

2023-05-31 17:01:13
TAG: cri as ri
共1条回复
里论外几

前言

一般而言,可以通过reduce方法实现的逻辑都可以通过forEach方法来变相的实现,虽然不清楚浏览器的js引擎是如何在C++层面实现这两个方法,但是可以肯定的是reduce方法肯定也存在数组的遍历,在具体实现细节上是否针对数组项的操作和存储做了什么优化,则不得而知。

数组的reduce方法的应用

reduce方法有两个参数,第一个参数是一个callback,用于针对数组项的操作;第二个参数则是传入的初始值,这个初始值用于单个数组项的操作。需要注意的是,reduce方法返回值并不是数组,而是形如初始值的经过叠加处理后的操作。

reduce方法最常见的场景就是叠加。

var

items

=

[10,

120,

1000];

//

our

reducer

function

var

reducer

=

function

add(sumSoFar,

item)

{

return

sumSoFar

+

item;

};

//

do

the

job

var

total

=

items.reduce(reducer,

0);

console.log(total);

//

1130

可以看出,reduce函数根据初始值0,不断的进行叠加,完成最简单的总和的实现。

前文中也提到,reduce函数的返回结果类型和传入的初始值相同,上个实例中初始值为number类型,同理,初始值也可为object类型。

var

items

=

[10,

120,

1000];

//

our

reducer

function

var

reducer

=

function

add(sumSoFar,

item)

{

sumSoFar.sum

=

sumSoFar.sum

+

item;

return

sumSoFar;

};

//

do

the

job

var

total

=

items.reduce(reducer,

{sum:

0});

console.log(total);

//

{sum:1130}

多重叠加

使用reduce方法可以完成多维度的数据叠加。如上例中的初始值{sum:

0}

,这仅仅是一个维度的操作,如果涉及到了多个属性的叠加,如{sum:

0,totalInEuros:

0,totalInYen:

0}

,则需要相应的逻辑进行处理。

在下面的方法中,采用分而治之的方法,即将reduce函数第一个参数callback封装为一个数组,由数组中的每一个函数单独进行叠加并完成reduce操作。所有的一切通过一个manager函数来管理流程和传递初始参数。

var

manageReducers

=

function(reducers)

{

return

function(state,

item)

{

return

Object.keys(reducers).reduce(

function(nextState,

key)

{

reducers[key](state,

item);

return

state;

},

{}

);

}

};

上面就是manager函数的实现,它需要reducers对象作为参数,并返回一个callback类型的函数,作为reduce的第一个参数。在该函数内部,则执行多维的叠加工作(

Object.keys()

)。

通过这种分治的思想,可以完成目标对象多个属性的同时叠加,完整代码如下:

var

reducers

=

{

totalInEuros

:

function(state,

item)

{

return

state.euros

+=

item.price

*

0.897424392;

},

totalInYen

:

function(state,

item)

{

return

state.yens

+=

item.price

*

113.852;

}

};

var

manageReducers

=

function(reducers)

{

return

function(state,

item)

{

return

Object.keys(reducers).reduce(

function(nextState,

key)

{

reducers[key](state,

item);

return

state;

},

{}

);

}

};

var

bigTotalPriceReducer

=

manageReducers(reducers);

var

initialState

=

{euros:0,

yens:

0};

var

items

=

[{price:

10},

{price:

120},

{price:

1000}];

var

totals

=

items.reduce(bigTotalPriceReducer,

initialState);

console.log(totals);

总结

以上就是Javascript中内建函数reduce应用的全部内容,希望本文的内容对大家的学习或者工作能有所帮助,如果有疑问大家可以留言交流。

相关推荐

reducer是什么意思

reducern.缩减者,减压器,还原剂; 减速器; [英][rɪ"dju:sə][美][rɪ"dju:sə]
2023-05-31 14:25:133

管件reducer是什么意思

管件中reducer是指大小头,如:Reducer 大小头 Concentric reducer 同心大小头 Eccentric reducer 偏心大小头
2023-05-31 14:25:201

减用英语怎么说

1、“减”的英文单词是reduce。2、单词14世纪晚期进入英语,直接源自古法语的reducer;最初源自拉丁语的reducere:re(回)、ducere(带,领),意为带回,恢复,撤退。3、相关例句:Weneedtoreduceourdependenceonoilasasourceofenergy。在能源方面,我们需减少对石油的依赖。
2023-05-31 14:25:291

管道管件中的reducer是什么意思

管道管件中这个单词是产品:变径管和大小头的意思。如果这个单词和管件三通一起用,就是变径三通的意思。
2023-05-31 14:25:381

节流阀中的reducer是指什么意思?

reduce 是减少缩小的意思,加上R就是相关名词,专业术语可以引申下。减速器啦,减径管之类。
2023-05-31 14:25:451

reducer如何翻译?

减压器
2023-05-31 14:25:523

reducer c*c中文是什么意思

reducer 英[rɪ"dju:sə] 美[rɪ"dju:sə] n. 减速器; 缩减者,减压器,还原剂; [例句]Then in accordance with the design criteria and design theory designed toroidal worm reducer.然后按照设计准则和设计理论设计了环面蜗轮蜗杆减速器。[其他] 形近词: seducer producer exducer
2023-05-31 14:26:011

reduce词性+解释+用法

reduce [rɪ"djuːs] vt. 减少;降低;使处于;把…分解vi. 减少;缩小;归纳为[ 过去式reduced 过去分词reduced 现在分词reducing ] 词组短语:reduce pollution 降低污染 reduce weight 整形美容减肥 reduce waste 减少浪费 reduce by 减少了 vt.减少;降低;使处于;把…分解shorten, weakenvi.减少;缩小;归纳为to cut down, fall off词根:reduceadj. reduced 减少的;[数] 简化的;缩减的reductive 还原的;减少的reducible [数] 可约的,可化简的;可还原的;可缩小的n. reduction 减少;下降;缩小reducer [助剂] 还原剂;减径管reducing 减低;减轻体重法,减肥法reductive [助剂] 还原剂v. reducing 减少(reduce的ing形式) reduce [ri"dju:s; -"du:s] vt.1. 减少;缩小;削减,缩减;精简: They have to reduce expenses this year.他们今年必须削减开支。2. 降低;减轻: She tried to reduce her weight.她设法降低她的体重。3. 将…归纳;简化: We can reduce his speech to three sentences.我们可以把他的讲演归纳成三句话。4. 换算;折合: If you reduce 3 lbs. to ounces, you have 48 ounces.如果将3磅换算成盎司,可得48盎司。5. 使变为: The heavy artillery bombardment reduced the village to ruins.猛烈的炮火使这个村庄变成一片废墟。His words reduced her to silence.他的话使她哑口无言。6. 把…弄碎(或熔化等);捣碎: Their daily work is to reduce the rocks to dust.他们每天的工作就是将岩石捣碎成粉末。7. 使降级;使降职: He was reduced to ordinary soldier due to a serious dereliction of duty.他因严重失职被降为普通士兵。8. 降服(使有秩序等);攻陷(城市等): The teacher soon reduced the noisy class to order.教师很快就使这个乱哄哄的班恢复了秩序。The fortress at the entrance to the village was reduced by a sudden attack.村口的堡垒遭到突然袭击而被攻陷。9. 使处境困难(或悲惨等);迫使,逼迫[常用被动语态]: The bad harvests in the last few years reduced the peasants to extreme poverty.过去几年的庄稼歉收使农民变得极为贫困。Poverty reduced them to begging for a living.贫困迫使他们乞讨为生。10. 使变弱;使变瘦;使衰退: He is reduced to skin and bones.他变得骨瘦如柴。11. 约束;限制: Business was reduced to local buying and selling when the war broke out and international trading ceased.战争爆发国际贸易停止后,商业就只限于地方性的买卖了。12. 【数学】约简: The last problem is to reduce an equation to its simplest form.最后一道题是将一方程式化简为最简单形式。13. 使…浓缩变稠: The last step is to add some salt and reduce the soup for three minutes.最后一个步骤是加些盐然后使汤浓缩三分钟。14. (用油)调稀(或冲淡): We can use this special oil to reduce the thick paint to a liquid thin enough to spread easily .我们可以使用这种特殊的油将浓油漆稀释一下,使它刷起来比较省力。15. 【化学】还原;分解(化合物): Water can be reduced to oxygen and hydrogen by electrolysis.水通过电解可以分解为氧和氢。16. 【外科学】使…复位(或复原): A doctor can reduce a fracture or dislocation.医生能使骨折或脱臼复原。17. 【语音学】使(重读元音)弱化: Some English vowel sounds which are not given weight in speech become reduced to a rather formless central vowel.英语中有些元音在讲话时没有重读,结果成了相当不定型的中央元音了。18. 【摄影术】减低(底片的)强度19. 【生理学】使(细胞)发生减数分裂vi.1. 减少;缩小;缩减;减弱2. (节食)减肥: She has been reducing for the last few weeks.她在最近几周一直在节食减肥。3. 【化学】还原: Ferrous iron reduces to ferric iron.二价铁能还原成三价铁。4. (液体)浓缩变稠: The soup will reduce by adding some cooking starch.汤中加些淀粉将变稠。5. (油漆、颜料等)稀释: Most paints reduce with turpentine.许多油漆可用松节油稀释。6. 【生物学】(细胞)减数分裂
2023-05-31 14:26:081

reducer是什么管件

大小头
2023-05-31 14:26:184

mr输出结果文件数量和reducer

MapReduce也有相应的输出格式。默认情况下只有一个 Reduce,输出只有一个文件,默认文件名为 part-r-00000,输出文件的个数与 Reduce 的个数一致。 如果有两个Reduce,输出结果就有两个文件,第一个为part-r-00000,第二个为part-r-00001,依次类推MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
2023-05-31 14:26:421

reducer 3×2 sch 40 是什么意思

1 Pipe sch 40, 20" dia ASTM 106 GR-B OR API 5L GR-B, Seamless Beveled End.20ft Long 8 lg2 Pipe sch 40, 12" dia ASTM 106 GR-B OR API 5L GR-B, Seamless Beveled End.20ft Long 2 lg3 Pipe sch 40, 10" dia ASTM 106 GR-B OR API 5L GR-B, Seamless Beveled End.20ft Long 2 lg 4 90 Deg. L.R. Elbow 20" dia sch 40 ASTM A-234 10 ea5 90 Deg. L.R. Elbow 12 " dia sch 40 ASTM A-234 4 ea6 90 Deg. L.R. Elbow 10" dia sch 40 ASTM A-234 3 ea 7 45 Deg. Elbow 20" dia sch 40 ASTM A-234 3 ea8 WNRF Flange sch 40, 20" dia ANSI 150 ASTM A 105 31 ea9 WNRF Flange sch 40, 24" dia ANSI 150 ASTM A 105 4 ea10 WNRF Flange sch 40, 16" dia ANSI 150 ASTM A 105 1 ea。
2023-05-31 14:27:042

前端知识 | Redux的使用

什么是 Redux? Redux 是整个项目的状态管理中心,数据存储仓库,集中式的存储和管理所有的组件的状态,并且可以让组件的状态以一种可预测的方式变化。 什么情况下使用 Redux? Redux 主要作为一个状态树的存在,主要作用可以用来集中管理共享数据,如果你想取某个数据,你就直接从状态树(store)上拿,你修改数据,其他页面上从状态树上取到的数据也会发生改变(如果你用了 subscribe 监听函数或者使用了 react-redux 类似的库帮你监听,则其他页面取到的数据会自动更新),Redux 不是必须的,它的使用场景是当你觉得项目内的组件通信太过于繁琐的时候使用,比如你有很多页面,很多组件,他们之间的通信很麻烦,或者说有些数据你需要保存起来供所有组件使用,这时候 Redux 的作用就体现出来了。 正如 Redux 的作者所说: Flux 架构就像眼镜:您自会知道什么时候需要它。 核心概念 action action 是一个对象,它包含了引起 store 状态变化的行为,他是将数据运输至 store 里的唯一手段。它通常包含一个 type 属性和一个需要传入 store 的数据,数据类型可以自定义。比如我们在做用户登录的时候经常需要将用户信息保存到 Redux,这时候 action 就可以是: reducer action 仅仅申明了我们想要改变 store 以及附带的数据,那么我们到底怎么去改变数据呢,比如说有一天用户的 money 变为2000,那么传入的 action 是: 那么这时候我们要怎么取改变 store 里面的值呢?这时候就要用到 reducer 简单来说,reducer 就是根据传入 actioon 类型描述如何去更改 store 中的状态。 store 单一状态树 action 描述了更改数据行为的发生,reducer 描述了如何去更改数据,那么我们数据还要有一个归宿就是 store,Redux 的核心就是一个 store 对象,它里面包含着我们所储存的所有状态,它类似一个物流中心,我们可以往里面存放数据,也可以从里面取出数据。它提供的方法包括: 1、getState() 获取当前的的 state 状态值 2、dispatch(action) 派发一个 action 行为更新 state 3、subscribe(listener) 注册一个监听器(当 state 跟新完了之后会自动执行) 这样这三个东西就串联在一起了。 store.dispatch(action) 用来接收不同的 action,表明要更新 state 的 type 类型以及更新需要的数据,再通过 reducer 函数计算到底怎么去更改 state,加还是减。 说了这么多,我们最后上个例子吧: app.jsaction.jsreducer.jsstore.jsPS:这是最原始的 redux 使用方法,在实际开发中,通常还会结合 create-redux 等插件一起使用。-END-
2023-05-31 14:27:111

redux的三个概念与三大核心

1、什么是redux? 一个组件里可能会有很多的状态,比如控制某个内容显示的flag,从后端获取的展示数据,那么这些状态可以在自己的单个页面进行管理,也可以选择别的管理方式,redux就是是一种状态管理的方式。 2、为什么要用redux? (1) 数据共享,当我们的很多页面都要用到同一数据时,就可以把数据放到redux中,达到状态共享的目的。 (2) 合并管理状态,业务当中可能会有很多的状态需要维护,且各个状态之间可能还有相互依赖的关系,不统一管理的话很难追踪状态的变化。 3、redux的基础概念 (1) store store是一个仓库,用来存储数据,它可以获取数据,也可以派发数据,还能监听到数据的变化。 (2) action action理解为动作,action的值一般为一个对象,格式如 { type: "", data: "" },type是必须要的,因为reducer处理数据的时候要根据不同的type来进行不同的操作。 (3) reducer reducer是初始化以及处理派发的action的纯函数。 4、如何使用redux? 首先安装redux的依赖,npm i redux -D (1) 定义action (2) 定义处理的action的reducer (3) 创建store 到这里,store就创建完成了,在组件里可以直接引入store和action,进行派发action的操作,此时有一个Home的组件,我们要在这里更改state中的数据。 5、redux的三大核心 (1) 单一数据源 当我们有多个数据需要放到redux中管理时,是放在一个对象里,这个对象放在store中管理,虽然redux并没有强制只能创建一个store,但是多个数据源的话不那么容易管理,单一的数据源可以更好的追踪状态的变化。 (2) state是只读的 想要改变state,无法在组件上直接手动修改state的值,这样可以保证状态不会被随意改变,唯一的方式就是派发action,而是通过集中管理的形式去改变state。 (3) reducer是纯函数 纯函数指的是有相同的输入必定有相同的输出,在这种情况下,不可以修改入参,也不能发送网络请求,也不能进行获取随机数这样的操作,通过reducer将上一个state的状态和当前派发的action连接起来,返回一个新的状态。 6、redux如何进行异步操作? redux中派发的action默认是只能进行同步的操作的,action被规定为一个对象,那如果想要在redux中进行异步操作,比如发送网络请求该怎么做? 这个时候需要用到中间件,常用的中间件有redux-thunk和redux-saga,需要安装依赖 npm i redux-thunk -D/ npm i redux-sage -D redux-thunk允许派发的action为一个函数,可以在这个函数中进行异步请求,请求执行完成之后再派发一个同步的action,用于修改store中的数据。 redux-saga派发的action仍然为一个对象,但是saga在外侧拦截action,使用生成器函数来监听action,当派发的action中的type为监听的type时,再进行网络请求的发送,以及改变store中的数据。 这里演示一下redux-thunk,在定义store时,需要将中间件传入 action就可以写成函数的形式了 7、拆分reducer 当reducer需要处理的逻辑比较多时,一个reducer需要进行非常多的switch case的判断,其中有获取异步请求数据的、有全局保存的状态,这时候逻辑就会比较杂乱,此时可以将reducer拆分,然后再进行合并。 假设此时有两个reducer,分别为 countReducer、userInfoReducer,分别的状态保存在store的countInfo和userInfo,此时可以使用 combineReducer 这个方法来合并 此时返回的reducer仍然是一个纯函数,combineReducer这个函数就是依次执行传入的reducers,如果store里储存的值发生了变化,就返回新的state,如果没有变化,就返回原来的state。
2023-05-31 14:27:171

Redux数据流

目前redux流行的解决方法包括了redux、react-redux、redux-thunk等等。以后会总体来说,现在先说一下,纯redux数据流是如何实现的。理解了这个数据流,就基本理解原生redux是如何实现的了,内部流程是什么<p> 下面这个图是redux的内部流程,我结合这个图说一下。(个人理解,说错了,欢迎交流)<p> action是一个单纯包含了{type}的对象,type是一个常量用来表示动作类型的。Action需要<code>store.dispath()</code>来来发送信息。例如下面这个例子:但是通常使用action creator函数来创建action对象,这样具有灵活性,可以提供更多种的配置。触发一个动作只需要调用<code>dispath (action_text(text))</code> reducer的作用是用来根据具体情况来更改action对应的state树的。reducer会接受两个参数,第一个是<code>state</code>,初始化的<code>state</code>;第二个是<code>action</code>。返回的是一个新的state,这个state可以通过监听来重新渲染整个UI,这部分在后面会讲到。<code>(oldState,action)=>newState</code> 现在有了 Action 和 Reducer,Store 的作用就是连接这两者,Store 的作用有这么几个: <ul> <li>Hold 住整个应用的 State 状态树 <li>提供一个 getState() 方法获取 State <li>提供一个 dispatch() 方法发送 action 更改 State <li>提供一个 subscribe() 方法注册回调函数监听 State 的更改 </ul> 具体案例以后具体分析。 看上面的图,可以看到数据流发生改变的时候,数据是如何流动的。 单向数据流调用<code> store.dispatch(action) -> reducer(state, action) -> store.getState()</code> (1)调用sote.dispath(action)(这里的action和上面提到的action不是一个概念,这个是一个纯的对象,上面是一个action creator)你可以在任何地方调用 store.dispatch(action),比如组件内部,Ajax 回调函数里面等等。 (2)Action 会触发给 Store 指定的 rootreducer rootreducer会返回一个完整的状态树,state状态树上的各个值都可以由对应的reducer来更新。 (3)store会保存状态树 更新完state后,新的 State 会替代旧的 State。然后可以添加监听函数<code>store.subscribe(listener)</code>在回调函数里面可以通过<code>store.getState()</code>来获取新的state。这样就能更新整个UI。只要调用<code>dispath</code>,state就会根据<code>reducer</code>对应更新,进而触发监听函数<code>subscribe</code>,然后触发回调函数渲染UI。
2023-05-31 14:27:241

一个hadoop程序可以有2个reducer吗

1、2个reduce类是无法使用的,但是可以通过Combiner来提前运行reduce代码,因Combiner是在多个台机器上运行,某些场景下效率高于reduce,例如对大量数字求和,用Combiner预执行reduce代码比最终一次执行要快的多。2、设置多个reduce在分布式下运行,是可以的,通过job.setNumReduceTasks来设置reduce数量,设置多个reduce会出现多输出结果的情况(多文件)。
2023-05-31 14:27:311

如何在redux中的reducer方法中调用其他reducer

在todo例子中,按照官方文档把state拆分成todos和visibility,现在我在另一个reducer中需要用到todos,比如设置所有的todos的completed为true(false),这个具体要怎么做? 以下是代码 function todos(state = initialState, action) { switch (
2023-05-31 14:27:371

【前端100问】Q37:为什么 Vuex 的 mutation 和 Redux 的 reducer 中不能做异步操作

因为异步操作是成功还是失败不可预测,什么时候进行异步操作也不可预测;当异步操作成功或失败时,如果不 commit(mutation) 或者 dispatch(action),Vuex 和 Redux 就不能捕获到异步的结果从而进行相应的操作 更改 state 的函数必须是纯函数,纯函数既是统一输入就会统一输出,没有任何副作用;如果是异步则会引入额外的副作用,导致更改后的 state 不可预测; 区分 actions 和 mutations 并不是为了解决竞态问题,而是为了能用 devtools 追踪状态变化。 事实上在 vuex 里面 actions 只是一个架构性的概念,并不是必须的,说到底只是一个函数,你在里面想干嘛都可以,只要最后触发 mutation 就行。 异步竞态怎么处理那是用户自己的事情。vuex 真正限制你的只有 mutation 必须是同步的这一点(在 redux 里面就好像 reducer 必须同步返回下一个状态一样)。 同步的意义在于这样每一个 mutation 执行完成后都可以对应到一个新的状态(和 reducer 一样),这样 devtools 就可以打个 snapshot 存下来,然后就可以随便 time-travel 了。 如果你开着 devtool 调用一个异步的 action,你可以清楚地看到它所调用的 mutation 是何时被记录下来的,并且可以立刻查看它们对应的状态。 其实我有个点子一直没时间做,那就是把记录下来的 mutations 做成类似 rx-marble 那样的时间线图,对于理解应用的异步状态变化很有帮助。
2023-05-31 14:27:441

hive执行作业时reduce任务个数设置为多少合适

reducer个数的设定极大影响执行效率 1. hive.exec.reducers.bytes.per.reducer(默认为1000^3) 2. hive.exec.reducers.max(默认为999) 计算reducer数的公式很简单: N=min(参数2,总输入数据量/参数1) 通常情况下,有必要手动指定reducer个数。考虑到map阶段的输出数据量通常会比输入有大幅减少,因此即使不设定reducer个数,重设参数2还是必要的。依据Hadoop的经验,可以将参数2设定为0.95*(集群中TaskTracker个数)。 正确的reduce任务的 个数应该是 0.95或者1.75 ×(节点数 ×mapred.tasktracker.tasks.maximum参数值)
2023-05-31 14:27:511

弯头三通 异径管 大弯 管帽用英语怎么说

弯头三通:bendthree-waypipe异径管reducer;大弯elbow;管帽(casing)cap
2023-05-31 14:27:583

hadoop mapper,reducer的value设置为job.setOutputValueClass(ArrayWritable.class); 时运行卡住

ArrayWritable不能直接作为mapreduce的输入输出类型。程序不是卡住了,而是报错了。估计是这个错误java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.hadoop.io.ArrayWritable.<init>()这个错误是ArrayWritable初始化异常,要自己实现一个ArrayWritable的派生类如果是text类型的数组 For example: public class TextArrayWritable extends ArrayWritable { public TextArrayWritable() { super(Text.class); } }即可,在maprecude中用TextArrayWritable 替换掉ArrayWritable A Writable for arrays containing instances of a class. The elements of this writable must all be instances of the same class. If this writable will be the input for a Reducer, you will need to create a subclass that sets the value to be of the proper type. For example: public class IntArrayWritable extends ArrayWritable { public IntArrayWritable() { super(IntWritable.class); } }==========================参考:http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/ArrayWritable.html
2023-05-31 14:28:051

hadoop中存储文件系统hdfs的冗余机制是怎么进行的?有什么特点?

避免节点挂掉。
2023-05-31 14:28:253

如何使用Hadoop的ChainMapper和ChainReducer

hadoop的mapreduce任务是在集群环境下跑的,所以调试存在一定的复杂度,但是貌似还是可以使用debug的,但是具体方式我没有实现,只是看到什么资料都有介绍。 如果只是想调试mapper和reducer的输入输出是否正确可以使用mrunit进行调试
2023-05-31 14:28:381

状态管理(1)- Redux

应用的整体全局状态以对象树的方式存放于单个store。唯一改变状态树(state tree)的方法是创建action, 一个描述发生了什么的对象,并将其dispatch给store。要指定状态树如何响应action来进行更新,你可以编写reducer函数,这些函数根据旧的action来计算新state。 上面这段摘抄自官网,个人认为是比较通俗的概括了redux的使用方法,你或许还没有很理解它,继续往下看,相信你会有所收获 一个简单的例子,带你熟悉redux ‘默认值" 存在state里,我们点击按钮之后,改变改值 使用脚手架创建应用,安装依赖 安装redux 编写ui 添加home组件 创建action: 在根目录下创建action文件夹,在action文件下创建index.js 这里我们定义一个action的构建函数,让他返回action对象 action 可以理解为表示即将放生的变化类别,它是一个普通的对象,必须包含type属性,这里我们添加我们需要的value属性,作为传递的数据 创建Reducer: 在根目录下新建reducer文件夹,在reducer文件夹下新建index.js 文件 本质就是函数,用于响应发送过来的action,函数接受两个参数,第一个是初始化state,第二个是发送过来的action 这里我们将默认的state也放到reducer 文件中Object.assign({},state,action); 如果action的type为send_type 则返回, 不要忘记将reducer暴露出去 创建store store是用来把action和reducer关联到一起的, 我们通过createStore来构建store,通过subscribe来注册监,通过dispatch来发送action。 store.dispatch() 并传入一个action对象,store将执行所有reducer函数并计算出更新后的state,调用getState()可以获取新state dispatch一个action可以形象的理解为“出发一个事件”,发生了一些事情,我们希望store知道这件事。Reducer就像事件监听器一样,当它们收到关注的action后,它就会更新state作为响应 这里是最难理解的地方,你可以认为store的dispatch方法才是最终告知要执行action的动作了,你这个动作具体做了什么需要reducer来处理 根目录下创建store文件夹,在store文件夹下创建index.js 同样的将store暴露出去 action、reducer、store 都写好了,现在我们将这个redux用到组件中 我们通过store的subscribe来注册监,通过dispatch来发送action 组件一加载完成就注册监听 Home组件 store.getState()获取当前state
2023-05-31 14:28:441

reduce派生词

reduce派生词供参考:reducerreductant
2023-05-31 14:28:531

mapreduce 怎么查看每个reducer处理的数据量

您好,第一种方法是用Mapper读取文本文件用StringTokenizer对读取文件内的每一行的数字(Hadoop处理文本文件时,处理时是一行一行记取的)进行分隔,获取每一个数字,然后求和,再将求得的值按Key/Value格式写入Context,最后用Reducer对求得中间值进行汇总求和,得出整个文件所有数字的和。 第二种方法是用Mapper读取文本文件用StringTokenizer对文件内的数字进行分隔,获取每一个数字,并救出文件中该数字有多少个,在合并过程中,求出每个数字在文件中的和,最后用Reducer对求得每个数字求得的和进行汇总求和,得出整个文件所有数字的和。package com.metarnet.hadoop;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class NumberSum {//对每一行数据进行分隔,并求和 public static class SumMapper extends Mapper<Object, Text, Text, LongWritable> { private Text word = new Text("sum"); private static LongWritable numValue = new LongWritable(1); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); long sum = 0; while (itr.hasMoreTokens()) { String s = itr.nextToken(); long val = Long.parseLong(s); sum += val; } numValue.set(sum); context.write(word, numValue); } } // 汇总求和,输出 public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> { private LongWritable result = new LongWritable(); private Text k = new Text("sum"); public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable val : values) { long v = val.get(); sum += v; } result.set(sum); context.write(k, result); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: numbersum <in> <out>"); System.exit(2); } Job job = new Job(conf, "number sum"); job.setJarByClass(NumberSum.class); job.setMapperClass(SumMapper.class); job.setReducerClass(SumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); System.out.println("ok"); }}第一种实现的方法相对简单,第二种实现方法用到了Combiner类,Hadoop 在 对中间求的的数值进行Combiner时,是通过递归的方式不停地对 Combiner方法进行调用,进而合并数据的。从两种方法中,我们可以看出Map/Reduce的核心是在怎样对输入的数据按照何种方式是进行Key/Value分对的,分的合理对整个结果输出及算法实现有很大的影响。
2023-05-31 14:29:001

如何用数组的reduce实现数组的map函数

假定我们需要计算大文本中每一行的长度,并且报告每个长度的行数。在HadoopMapReduce中,我们首先使用一个Mapper,生成为以行的长度作为key,1作为value的键值对。public class LineLengthMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> { @Override protected void map(LongWritable lineNumber, Text line, Context context) throws IOException, InterruptedException { context.write(new IntWritable(line.getLength()), new IntWritable(1)); }}值得注意的是Mappers和Reducers只对键值对进行操作。所以由TextInputFormat提供输入给LineLengthMapper,实际上也是以文本中位置为key(很少这么用,但是总是需要有东西作为Key),文本行为值的键值对。与之对应的Spark实现:lines.map(line => (line.length, 1))Spark中,输入只是String构成的RDD,而不是key-value键值对。Spark中对key-value键值对的表示是一个Scala的元组,用(A,B)这样的语法来创建。上面的map操作的结果是(Int,Int)元组的RDD。当一个RDD包含很多元组,它获得了多个方法,如reduceByKey,这对再现MapReduce行为将是至关重要的。Reducereduce()与reduceBykey()统计行的长度的键值对,需要在Reducer中对每种长度作为key,计算其行数的总和作为value。public class LineLengthReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { @Override protected void reduce(IntWritable length, Iterable<IntWritable> counts, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : counts) { sum += count.get(); } context.write(length, new IntWritable(sum)); }}Spark中与上述Mapper,Reducer对应的实现只要一行代码:val lengthCounts = lines.map(line => (line.length, 1)).reduceByKey(_ + _)Spark的RDD API有个reduce方法,但是它会将所有key-value键值对reduce为单个value。这并不是Hadoop MapReduce的行为,Spark中与之对应的是ReduceByKey。另外,Reducer的Reduce方法接收多值流,并产生0,1或多个结果。而reduceByKey,它接受的是一个将两个值转化为一个值的函数,在这里,就是把两个数字映射到它们的和的简单加法函数。此关联函数可以被调用者用来reduce多个值到一个值。与Reducer方法相比,他是一个根据Key来Reduce Value的更简单而更精确的API。Mappermap() 与 flatMap()现在,考虑一个统计以大写字母开头的单词的个数的算法。对于每行输入文本,Mapper可能产生0个,1个或多个键值对。public class CountUppercaseMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable lineNumber, Text line, Context context) throws IOException, InterruptedException { for (String word : line.toString().split(" ")) { if (Character.isUpperCase(word.charAt(0))) { context.write(new Text(word), new IntWritable(1)); } } }}Spark对应的写法:lines.flatMap(_.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1)))简单的Spark map函数不适用于这种场景,因为map对于每个输入只能产生单个输出,但这个例子中一行需要产生多个输出。所以,和MapperAPI支持的相比,Spark的map函数语义更简单,应用范围更窄。Spark的解决方案是首先将每行映射为一组输出值,这组值可能为空值或多值。随后会通过flatMap函数被扁平化。数组中的词会被过滤并被转化为函数中的元组。这个例子中,真正模仿Mapper行为的是flatMap,而不是map。groupByKey()写一个统计次数的reducer是简单的,在Spark中,reduceByKey可以被用来统计每个单词的总数。比如出于某种原因要求输出文件中每个单词都要显示为大写字母和其数量,在MapReduce中,实现如下:public class CountUppercaseReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text word, Iterable<IntWritable> counts, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : counts) { sum += count.get(); } context .write(new Text(word.toString().toUpperCase()), new IntWritable(sum)); }}但是redeceByKey不能单独在Spark中工作,因为他保留了原来的key。为了在Spark中模拟,我们需要一些更像Reducer API的操作。我们知道Reducer的reduce方法接受一个key和一组值,然后完成一组转换。groupByKey和一个连续的map操作能够达到这样的目标:groupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) }groupByKey只是将某一个key的所有值收集在一起,并且不提供reduce功能。以此为基础,任何转换都可以作用在key和一系列值上。此处,将key转变为大写字母,将values直接求和。
2023-05-31 14:29:071

redux 为什么页面刷新后 store 被重置了

逻辑反了,首先得触发action,才能去更新store,而不是store更新再被动触发action。 情况1: 在异步ajax的的时候,组件的某个事件dispatch一个对应的异步action,通过这个action去访问远程数据,当返回成功的时候,dispatch一个更新action的function,这个function指定了一个type类型,匹配了reducer所对应的type,将返回的数据传到reducer去做更新替换,当state更新之后,就会刷新整个store。 1、component:class Test extends Component {    handleClick() {        dispatch(action)    }    render() {        return (            <a onClick={() => this.handleClick()}>发起异步请求</a>        )    }}2、action:const function actionType(response) {    return {        type: "GET_DATA",        json: response.data    }}export function action() {    return async (dispatch) => {        try {            await getData(`/test`)                .then(response => {                    dispatch(actionType(response))                })        }catch(error) {            console.log("error: ", error)        }    }}3、reducer:let initData = {    data: {}}export function test(state = initData, action) {    switch(action.type) {        case "GET_DATA":            return {...state,data: action.json}        default:            return {...state}    }} redux的机制是,只要store的某个state发生了改变,就会自动重新渲染,整个流程大概就是这样。
2023-05-31 14:29:141

如何实现mapreduce计算框架以有效实现迭代

  MapReduce从出现以来,已经成为Apache Hadoop计算范式的扛鼎之作。它对于符合其设计的各项工作堪称完美:大规模日志处理,ETL批处理操作等。  随着Hadoop使用范围的不断扩大,人们已经清楚知道MapReduce不是所有计算的最佳框架。Hadoop 2将资源管理器YARN作为自己的顶级组件,为其他计算引擎的接入提供了可能性。如Impala等非MapReduce架构的引入,使平台具备了支持交互式SQL的能力。  今天,Apache Spark是另一种这样的替代,并且被称为是超越MapReduce的通用计算范例。也许您会好奇:MapReduce一直以来已经这么有用了,怎么能突然被取代?毕竟,还有很多ETL这样的工作需要在Hadoop上进行,即使该平台目前也已经拥有其他实时功能。  值得庆幸的是,在Spark上重新实现MapReduce一样的计算是完全可能的。它们可以被更简单的维护,而且在某些情况下更快速,这要归功于Spark优化了刷写数据到磁盘的过程。Spark重新实现MapReduce编程范式不过是回归本源。Spark模仿了Scala的函数式编程风格和API。而MapReduce的想法来自于函数式编程语言LISP。  尽管Spark的主要抽象是RDD(弹性分布式数据集),实现了Map,reduce等操作,但这些都不是Hadoop的Mapper或Reducer API的直接模拟。这些转变也往往成为开发者从Mapper和Reducer类平行迁移到Spark的绊脚石。  与Scala或Spark中经典函数语言实现的map和reduce函数相比,原有Hadoop提供的Mapper和Reducer API 更灵活也更复杂。这些区别对于习惯了MapReduce的开发者而言也许并不明显,下列行为是针对Hadoop的实现而不是MapReduce的抽象概念:  · Mapper和Reducer总是使用键值对作为输入输出。  · 每个Reducer按照Key对Value进行reduce。  · 每个Mapper和Reducer对于每组输入可能产生0个,1个或多个键值对。  · Mapper和Reducer可能产生任意的keys和values,而不局限于输入的子集和变换。  Mapper和Reducer对象的生命周期可能横跨多个map和reduce操作。它们支持setup和cleanup方法,在批量记录处理开始之前和结束之后被调用。  本文将简要展示怎样在Spark中重现以上过程,您将发现不需要逐字翻译Mapper和Reducer!  作为元组的键值对  假定我们需要计算大文本中每一行的长度,并且报告每个长度的行数。在HadoopMapReduce中,我们首先使用一个Mapper,生成为以行的长度作为key,1作为value的键值对。  public class LineLengthMapper extends  Mapper<LongWritable, Text, IntWritable, IntWritable> {  @Override  protected void map(LongWritable lineNumber, Text line, Context context)  throws IOException, InterruptedException {  context.write(new IntWritable(line.getLength()), new IntWritable(1));  }  }  值得注意的是Mappers和Reducers只对键值对进行操作。所以由TextInputFormat提供输入给LineLengthMapper,实际上也是以文本中位置为key(很少这么用,但是总是需要有东西作为Key),文本行为值的键值对。  与之对应的Spark实现:  lines.map(line => (line.length, 1))  Spark中,输入只是String构成的RDD,而不是key-value键值对。Spark中对key-value键值对的表示是一个Scala的元组,用(A,B)这样的语法来创建。上面的map操作的结果是(Int,Int)元组的RDD。当一个RDD包含很多元组,它获得了多个方法,如reduceByKey,这对再现MapReduce行为将是至关重要的。  Reduce  reduce()与reduceBykey()  统计行的长度的键值对,需要在Reducer中对每种长度作为key,计算其行数的总和作为value。  public class LineLengthReducer extends  Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {  @Override  protected void reduce(IntWritable length, Iterable<IntWritable> counts,  Context context) throws IOException, InterruptedException {  int sum = 0;  for (IntWritable count : counts) {  sum += count.get();  }  context.write(length, new IntWritable(sum));  }  }  Spark中与上述Mapper,Reducer对应的实现只要一行代码:  val lengthCounts = lines.map(line => (line.length, 1)).reduceByKey(_ + _)  Spark的RDD API有个reduce方法,但是它会将所有key-value键值对reduce为单个value。这并不是Hadoop MapReduce的行为,Spark中与之对应的是ReduceByKey。  另外,Reducer的Reduce方法接收多值流,并产生0,1或多个结果。而reduceByKey,它接受的是一个将两个值转化为一个值的函数,在这里,就是把两个数字映射到它们的和的简单加法函数。此关联函数可以被调用者用来reduce多个值到一个值。与Reducer方法相比,他是一个根据Key来Reduce Value的更简单而更精确的API。  Mapper  map() 与 flatMap()  现在,考虑一个统计以大写字母开头的单词的个数的算法。对于每行输入文本,Mapper可能产生0个,1个或多个键值对。  public class CountUppercaseMapper extends  Mapper<LongWritable, Text, Text, IntWritable> {  @Override  protected void map(LongWritable lineNumber, Text line, Context context)  throws IOException, InterruptedException {  for (String word : line.toString().split(" ")) {  if (Character.isUpperCase(word.charAt(0))) {  context.write(new Text(word), new IntWritable(1));  }  }  }  }  Spark对应的写法:  lines.flatMap(  _.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1))  )  简单的Spark map函数不适用于这种场景,因为map对于每个输入只能产生单个输出,但这个例子中一行需要产生多个输出。所以,和MapperAPI支持的相比,Spark的map函数语义更简单,应用范围更窄。  Spark的解决方案是首先将每行映射为一组输出值,这组值可能为空值或多值。随后会通过flatMap函数被扁平化。数组中的词会被过滤并被转化为函数中的元组。这个例子中,真正模仿Mapper行为的是flatMap,而不是map。  groupByKey()  写一个统计次数的reducer是简单的,在Spark中,reduceByKey可以被用来统计每个单词的总数。比如出于某种原因要求输出文件中每个单词都要显示为大写字母和其数量,在MapReduce中,实现如下:  public class CountUppercaseReducer extends  Reducer<Text, IntWritable, Text, IntWritable> {  @Override  protected void reduce(Text word, Iterable<IntWritable> counts, Context context)  throws IOException, InterruptedException {  int sum = 0;  for (IntWritable count : counts) {  sum += count.get();  }  context  .write(new Text(word.toString().toUpperCase()), new IntWritable(sum));  }  }  但是redeceByKey不能单独在Spark中工作,因为他保留了原来的key。为了在Spark中模拟,我们需要一些更像Reducer API的操作。我们知道Reducer的reduce方法接受一个key和一组值,然后完成一组转换。groupByKey和一个连续的map操作能够达到这样的目标:  groupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) }  groupByKey只是将某一个key的所有值收集在一起,并且不提供reduce功能。以此为基础,任何转换都可以作用在key和一系列值上。此处,将key转变为大写字母,将values直接求和。  setup()和cleanup()  在MapReduce中,Mapper和Reducer可以声明一个setup方法,在处理输入之前执行,来进行分配数据库连接等昂贵资源,同时可以用cleanup函数可以释放资源。  public class SetupCleanupMapper extends  Mapper<LongWritable, Text, Text, IntWritable> {  private Connection dbConnection;  @Override  protected void setup(Context context) {  dbConnection = ...;  }  ...  @Override  protected void cleanup(Context context) {  dbConnection.close();  }  }  Spark中的map和flatMap方法每次只能在一个input上操作,而且没有提供在转换大批值前后执行代码的方法,看起来,似乎可以直接将setup和cleanup代码放在Sparkmap函数调用之前和之后:  val dbConnection = ...  lines.map(... dbConnection.createStatement(...) ...)  dbConnection.close() // Wrong!  然而这种方法却不可行,原因在于:  · 它将对象dbConnection放在map函数的闭包中,这需要他是可序列化的(比如,通过java.io.Serializable实现)。而数据库连接这种对象一般不能被序列化。  · map是一种转换,而不是操作,并且拖延执行。连接对象不能被及时关闭。  · 即便如此,它也只能关闭driver上的连接,而不是释放被序列化拷贝版本分配的资源连接。  事实上,map和flatMap都不是Spark中Mapper的最接近的对应函数,Spark中Mapper的最接近的对应函数是十分重要的mapPartitions()方法,这个方法能够不仅完成单值对单值的映射,也能完成一组值对另一组值的映射,很像一个批映射(bulkmap)方法。这意味着mapPartitions()方法能够在开始时从本地分配资源,并在批映射结束时释放资源。  添加setup方法是简单的,添加cleanup会更困难,这是由于检测转换完成仍然是困难的。例如,这样是能工作的:  lines.mapPartitions { valueIterator =>  val dbConnection = ... // OK  val transformedIterator = valueIterator.map(... dbConnection ...)  dbConnection.close() // Still wrong! May not have evaluated iterator  transformedIterator  }  一个完整的范式应该看起来类似于:  lines.mapPartitions { valueIterator =>  if (valueIterator.isEmpty) {  Iterator[...]()  } else {  val dbConnection = ...  valueIterator.map { item =>  val transformedItem = ...  if (!valueIterator.hasNext) {  dbConnection.close()  }  transformedItem  }  }  }  虽然后者代码翻译注定不如前者优雅,但它确实能够完成工作。  flatMapPartitions方法并不存在,然而,可以通过调用mapPartitions,后面跟一个flatMap(a= > a)的调用达到同样效果。  带有setup和cleanup的Reducer对应只需仿照上述代码使用groupByKey后面跟一个mapPartition函数。  别急,等一下,还有更多  MapReduce的开发者会指出,还有更多的还没有被提及的API:  · MapReduce支持一种特殊类型的Reducer,也称为Combiner,可以从Mapper中减少洗牌(shuffled)数据大小。  · 它还支持同通过Partitioner实现的自定义分区,和通过分组Comparator实现的自定义分组。  · Context对象授予Counter API的访问权限以及它的累积统计。  · Reducer在其生命周期内一直能看到已排序好的key 。  · MapReduce有自己的Writable序列化方案。  · Mapper和Reducer可以一次发射多组输出。  · MapReduce有几十个调优参数。  有很多方法可以在Spark中实现这些方案,使用类似Accumulator的API,类似groupBy和在不同的这些方法中加入partitioner参数的方法,Java或Kryo序列化,缓存和更多。由于篇幅限制,在这篇文章中就不再累赘介绍了。  需要指出的是,MapReduce的概念仍然有用。只不过现在有了一个更强大的实现,并利用函数式语言,更好地匹配其功能性。理解Spark RDD API和原来的Mapper和ReducerAPI之间的差异,可以帮助开发者更好地理解所有这些函数的工作原理,以及理解如何利用Spark发挥其优势。
2023-05-31 14:29:231

js reduce错误TypeError: reduce of empty array with no initial value

js reduce()  方法对数组中的每个元素执行一个由您提供的 reducer 函数(升序执行),将其结果汇总为单个返回值。 例如: reducer 函数接收4个参数:     Accumulator (acc) (累计器)     Current Value (cur) (当前值)     Current Index (idx) (当前索引)     Source Array (src) (源数组)   您的 reducer 函数的返回值分配给累计器,该返回值在数组的每个迭代中被记住,并最后成为最终的单个结果值。 如果数组为空且没有提供initialValue,会抛出错误TypeError: reduce of empty array with no initial value 可以通过添加initialValue来解决。 详见: https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Array/Reduce
2023-05-31 14:29:291

React状态管理:react-redux和redux-saga(适合由vue转到react的同学)

注意:本文不会把所有知识点都写一遍,并不适合纯新手阅读 首先 Redux 是一种状态管理方案,本身和react并没有什么联系,redux也可以结合其他框架来用。 react-redux 是基于react的一种状态管理实现,他不像vuex那样直接内置在create-react-app里,需要自己去安装。 react-redux有三个重要概念,分别是:store,action,reducer。 1.store。store就是存储全局数据状态的仓库。像这样: 需要注意的是,state是 只读的 ,意味着我们不能 initState.xxx1 = "233" 这样去修改store里的数据。 其实,工作的项目里,仓库并不是放在store.js里,store.js里一般是做一些创建store,注册saga的操作,state一般是放在具体模块的reducer.js里(一个模块应该有action.js,reducer.js,saga.js三个文件) 2.action。 上文说了,state是只读的,所以我们只能通过派发action的方式修改store里的数据。 派发action这个说法我一直觉得很拗口,其实就是dispatch action的直译。 action包含两部分:type和payload。 type其实就是action的标识,一个常量的字符串,用来说明这个action是干嘛的,比如说 type:"GET_USERNAME" ,说明这个action是用来获取username的,payload就是参数了,就是调用api需要的参数。 注意:action是派发给store的。 3.reducer。 reducer接收一个state和action,返回一个state。 store在接收到action之后会把action和当前的state传给reducer,然后reducer根据action的type去判断执行什么样的操作,然后返回一个新的state给store,比如: return {...state,name:"接口获取到的username"} 。 工作流程图如下: 这个流程还是蛮好懂的,接下来我们来说下reducer要注意的地方。(这里牵扯到我们为什么还要用redux-saga) reducer必须是一个 纯函数 纯函数是指一个函数的返回结果只依赖于该函数传入的参数,而不能产生 副作用 副作用是指异步操作,DOM操作等... 好的,没明白是吧,说人话,reducer里是不能进行异步请求的!我们在工作中肯定会调用接口异步获取数据的,这样的话只靠react-redux是无法满足我们的需求的,而redux-saga就是来解决这个问题的。 如果你熟悉vuex,就知道vuex里有mutation和action,其中mutation提交更新数据的方法,只能是同步的,而action中就可以包含异步操作了,而且action提交的是mutation。 mutation和action的关系就近似于reducer和saga的关系:saga里调用接口获取到数据之后再提交action给reducer,最终返回新的state给store的还是reducer。 saga包含watch-saga函数和worker-saga函数,watch-saga用于监听系统派发出来的action,watch-saga一旦监听到了某个action就执行对应的worker-saga,worker-saga里进行异步操作(调用接口请求数据等),拿到数据之后派发第二个action给store,store再把state和action给reducer(至此就和无saga时的流程一样了)。 加入了saga,工作流程发生了变化: 我们注意到工作流中的action有两个,可以理解为第一个action是给watch-saga的,第二个action时异步操作完之后worker-saga给store的。 加入saga后的工作流程如下:
2023-05-31 14:29:541

Hive常用算子实现原理简述--MapReduce版

Hive中的常用算子包括distinct、join、group by、order by、distribute by、sort by、count等,这些操作符在SQL中使用起来很方便,能快速达到我们想要的效果,但是这些算子在底层是怎么实现的呢? order by很容易想到执行原理,在一个reduce中将所有记录按值排序即可。因此order by在数据量大的情况下执行时间非常长,容易out of memory,非特殊业务需求一般不使用。distribute by也比较明显,根据hash值将distribute的值分发到不同的reduce。sort by是小号的order by,只负责将本reducer中的值排序,达到局部有序的效果。sort by和distribute by配合使用风味更佳,二者可以合并简写为cluster by。count则更加明晰,在combiner或reducer处按相同键累加值就能得到。 比较复杂的是distinct、join、group by,本文重点讨论这三个算子在MapReduce引擎中的大致实现原理。班门弄斧,抛砖引玉。 map阶段,将group by后的字段组合作为key,如果group by单字段那么key就一个。将group by之后要进行的聚合操作字段作为值,如要进行count,则value是1;如要sum另一个字段,则value就是该字段。 shuffle阶段,按照key的不同分发到不同的reducer。注意此时可能因为key分布不均匀而出现数据倾斜的问题。 reduce阶段,将相同key的值累加或作其他需要的聚合操作,得到结果。 对group by的过程讲解的比较清楚的是这篇文章 http://www.mamicode.com/info-detail-2292193.html 图文并茂,很生动。 实例如下图,对应语句是 select rank, isonline, count(*) from city group by rank, isonline; 如果group by出现数据倾斜,除去替换key为随机数、提前挑出大数量级key值等通用调优方法,适用于group by的特殊方法有以下几种: (1)set hive.map.aggr=true,即开启map端的combiner,减少传到reducer的数据量,同时需设置参数hive.groupby.mapaggr.checkinterval 规定在 map 端进行聚合操作的条目数目。 (2)设置mapred.reduce.tasks为较大数量,降低每个reducer处理的数据量。 (3)set hive.groupby.skewindata=true,该参数可自动进行负载均衡。生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group ByKey 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce中),最后完成最终的聚合操作。 Hive中有两种join方式:map join和common join 如果不显式指定map side join,或者没有达到触发自动map join的条件,那么会进行reduce端的join,即common join,这种join包含map、shuffle、reduce三个步骤。 (1)Map阶段 读取源表的数据,Map输出时候以Join on条件中的列为key,如果Join有多个关联键,则以这些关联键的组合作为key。Map输出的value为join之后所关心的(select或者where中需要用到的)列;同时在value中还会包含表的Tag信息,用于标明此value对应哪个表。然后按照key进行排序。 (2)Shuffle阶段 根据key的值进行hash,并将key/value按照hash值推送至不同的reduce中,这样确保两个表中相同的key位于同一个reduce中 (3)Reduce阶段 根据key的值完成join操作,期间通过Tag来识别不同表中的数据。 以下面的SQL为例,可用下图所示过程大致表达其join原理。 SELECT u.name, o.orderid FROM user u JOIN order o ON u.uid = o.uid; 关联字段是uid,因此以uid为map阶段的输出key,value为选取的字段name和标记源表的tag。shuffle阶段将相同key的键值对发到一起,reduce阶段将不同源表、同一key值的记录拼接起来,可能存在一对多的情况。 如果指定使用map join的方式,或者join的其中一张表小于某个体积(默认25MB),则会使用map join来执行。具体小表有多小,由参数 hive.mapjoin.smalltable.filesize 来决定。 Hive0.7之前,需要使用hint提示 /*+ mapjoin(table) */才会执行MapJoin,否则执行Common Join,但在0.7版本之后,默认自动会转换Map Join,由参数 hive.auto.convert.join 来控制,默认为true。 以下图为例说明map join如何执行,该图来自 http://lxw1234.com/archives/2015/06/313.htm ,博主是一个水平深厚又乐于分享的前辈,图片水印上也有其网址。 yarn会启动一个Local Task(在客户端本地执行的Task)--Task A,负责扫描小表b的数据,将其转换成一个HashTable的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache中。 接下来是Task B,该任务是一个没有Reduce的MR,启动MapTasks扫描大表a,在Map阶段,根据a的每一条记录去和DistributeCache中b表对应的HashTable关联,并直接输出结果。 由于MapJoin没有Reduce,所以由Map直接输出结果文件,有多少个Map Task,就有多少个结果文件。 distinct一般和group by同时出现。 当distinct一个字段时,将group by的字段和distinct的字段组合在一起作为map输出的key,value设置为1,同时将group by的字段定为分区键,这可以确保相同group by字段的记录都分到同一个reducer,并且map的输入天然就是按照组合key排好序的。根据分区键将记录分发到reduce端后,按顺序取出组合键中的distinct字段,这时distinct字段也是排好序的。依次遍历distinct字段,每找到一个不同值,计数器就自增1,即可得到count distinct结果。例如下面的SQL语句,过程可以下图示意。 我暂时没有理解这是怎么实现的,别人写的也没有看明白。有善良的学富五车的大佬指点一下吗?
2023-05-31 14:34:381

hadoop使用reducejoin更新图书表的作者和编辑

1. 首先,需要将图书表和作者表以及编辑表按照作者和编辑的名称进行分组,生成三个不同的MapReduce任务。2. 在每个Map阶段,需要将图书表中的作者和编辑名称作为键,将作者表和编辑表中的相同名称作为值,将这些键值对写入输出。3. 在Reduce阶段中,需要将每个键对应的值进行合并,并将合并后的结果写入输出,这样就得到了每个作者和每个编辑所对应的所有图书。4. 接下来,需要进行ReduceJoin操作,将作者和编辑的信息与图书表中的相关信息进行关联。这可以通过使用MapReduce任务来实现。
2023-05-31 14:34:4611

为什么会有mapreduce和spark

 MapReduce从出现以来,已经成为Apache Hadoop计算范式的扛鼎之作。它对于符合其设计的各项工作堪称完美:大规模日志处理,ETL批处理操作等。  随着Hadoop使用范围的不断扩大,人们已经清楚知道MapReduce不是所有计算的最佳框架。Hadoop 2将资源管理器YARN作为自己的顶级组件,为其他计算引擎的接入提供了可能性。如Impala等非MapReduce架构的引入,使平台具备了支持交互式SQL的能力。  今天,Apache Spark是另一种这样的替代,并且被称为是超越MapReduce的通用计算范例。也许您会好奇:MapReduce一直以来已经这么有用了,怎么能突然被取代?毕竟,还有很多ETL这样的工作需要在Hadoop上进行,即使该平台目前也已经拥有其他实时功能。  值得庆幸的是,在Spark上重新实现MapReduce一样的计算是完全可能的。它们可以被更简单的维护,而且在某些情况下更快速,这要归功于Spark优化了刷写数据到磁盘的过程。Spark重新实现MapReduce编程范式不过是回归本源。Spark模仿了Scala的函数式编程风格和API。而MapReduce的想法来自于函数式编程语言LISP。  尽管Spark的主要抽象是RDD(弹性分布式数据集),实现了Map,reduce等操作,但这些都不是Hadoop的Mapper或Reducer API的直接模拟。这些转变也往往成为开发者从Mapper和Reducer类平行迁移到Spark的绊脚石。  与Scala或Spark中经典函数语言实现的map和reduce函数相比,原有Hadoop提供的Mapper和Reducer API 更灵活也更复杂。这些区别对于习惯了MapReduce的开发者而言也许并不明显,下列行为是针对Hadoop的实现而不是MapReduce的抽象概念:  · Mapper和Reducer总是使用键值对作为输入输出。  · 每个Reducer按照Key对Value进行reduce。  · 每个Mapper和Reducer对于每组输入可能产生0个,1个或多个键值对。  · Mapper和Reducer可能产生任意的keys和values,而不局限于输入的子集和变换。  Mapper和Reducer对象的生命周期可能横跨多个map和reduce操作。它们支持setup和cleanup方法,在批量记录处理开始之前和结束之后被调用。  本文将简要展示怎样在Spark中重现以上过程,您将发现不需要逐字翻译Mapper和Reducer!  作为元组的键值对  假定我们需要计算大文本中每一行的长度,并且报告每个长度的行数。在HadoopMapReduce中,我们首先使用一个Mapper,生成为以行的长度作为key,1作为value的键值对。  public class LineLengthMapper extends  Mapper<LongWritable, Text, IntWritable, IntWritable> {  @Override  protected void map(LongWritable lineNumber, Text line, Context context)  throws IOException, InterruptedException {  context.write(new IntWritable(line.getLength()), new IntWritable(1));  }  }  值得注意的是Mappers和Reducers只对键值对进行操作。所以由TextInputFormat提供输入给LineLengthMapper,实际上也是以文本中位置为key(很少这么用,但是总是需要有东西作为Key),文本行为值的键值对。  与之对应的Spark实现:  lines.map(line => (line.length, 1))  Spark中,输入只是String构成的RDD,而不是key-value键值对。Spark中对key-value键值对的表示是一个Scala的元组,用(A,B)这样的语法来创建。上面的map操作的结果是(Int,Int)元组的RDD。当一个RDD包含很多元组,它获得了多个方法,如reduceByKey,这对再现MapReduce行为将是至关重要的。  Reduce  reduce()与reduceBykey()  统计行的长度的键值对,需要在Reducer中对每种长度作为key,计算其行数的总和作为value。  public class LineLengthReducer extends  Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {  @Override  protected void reduce(IntWritable length, Iterable<IntWritable> counts,  Context context) throws IOException, InterruptedException {  int sum = 0;  for (IntWritable count : counts) {  sum += count.get();  }  context.write(length, new IntWritable(sum));  }  }  Spark中与上述Mapper,Reducer对应的实现只要一行代码:  val lengthCounts = lines.map(line => (line.length, 1)).reduceByKey(_ + _)  Spark的RDD API有个reduce方法,但是它会将所有key-value键值对reduce为单个value。这并不是Hadoop MapReduce的行为,Spark中与之对应的是ReduceByKey。  另外,Reducer的Reduce方法接收多值流,并产生0,1或多个结果。而reduceByKey,它接受的是一个将两个值转化为一个值的函数,在这里,就是把两个数字映射到它们的和的简单加法函数。此关联函数可以被调用者用来reduce多个值到一个值。与Reducer方法相比,他是一个根据Key来Reduce Value的更简单而更精确的API。  Mapper  map() 与 flatMap()  现在,考虑一个统计以大写字母开头的单词的个数的算法。对于每行输入文本,Mapper可能产生0个,1个或多个键值对。  public class CountUppercaseMapper extends  Mapper<LongWritable, Text, Text, IntWritable> {  @Override  protected void map(LongWritable lineNumber, Text line, Context context)  throws IOException, InterruptedException {  for (String word : line.toString().split(" ")) {  if (Character.isUpperCase(word.charAt(0))) {  context.write(new Text(word), new IntWritable(1));  }  }  }  }  Spark对应的写法:  lines.flatMap(  _.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1))  )  简单的Spark map函数不适用于这种场景,因为map对于每个输入只能产生单个输出,但这个例子中一行需要产生多个输出。所以,和MapperAPI支持的相比,Spark的map函数语义更简单,应用范围更窄。  Spark的解决方案是首先将每行映射为一组输出值,这组值可能为空值或多值。随后会通过flatMap函数被扁平化。数组中的词会被过滤并被转化为函数中的元组。这个例子中,真正模仿Mapper行为的是flatMap,而不是map。  groupByKey()  写一个统计次数的reducer是简单的,在Spark中,reduceByKey可以被用来统计每个单词的总数。比如出于某种原因要求输出文件中每个单词都要显示为大写字母和其数量,在MapReduce中,实现如下:  public class CountUppercaseReducer extends  Reducer<Text, IntWritable, Text, IntWritable> {  @Override  protected void reduce(Text word, Iterable<IntWritable> counts, Context context)  throws IOException, InterruptedException {  int sum = 0;  for (IntWritable count : counts) {  sum += count.get();  }  context  .write(new Text(word.toString().toUpperCase()), new IntWritable(sum));  }  }  但是redeceByKey不能单独在Spark中工作,因为他保留了原来的key。为了在Spark中模拟,我们需要一些更像Reducer API的操作。我们知道Reducer的reduce方法接受一个key和一组值,然后完成一组转换。groupByKey和一个连续的map操作能够达到这样的目标:  groupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) }  groupByKey只是将某一个key的所有值收集在一起,并且不提供reduce功能。以此为基础,任何转换都可以作用在key和一系列值上。此处,将key转变为大写字母,将values直接求和。  setup()和cleanup()  在MapReduce中,Mapper和Reducer可以声明一个setup方法,在处理输入之前执行,来进行分配数据库连接等昂贵资源,同时可以用cleanup函数可以释放资源。  public class SetupCleanupMapper extends  Mapper<LongWritable, Text, Text, IntWritable> {  private Connection dbConnection;  @Override  protected void setup(Context context) {  dbConnection = ...;  }  ...  @Override  protected void cleanup(Context context) {  dbConnection.close();  }  }  Spark中的map和flatMap方法每次只能在一个input上操作,而且没有提供在转换大批值前后执行代码的方法,看起来,似乎可以直接将setup和cleanup代码放在Sparkmap函数调用之前和之后:  val dbConnection = ...  lines.map(... dbConnection.createStatement(...) ...)  dbConnection.close() // Wrong!  然而这种方法却不可行,原因在于:  · 它将对象dbConnection放在map函数的闭包中,这需要他是可序列化的(比如,通过java.io.Serializable实现)。而数据库连接这种对象一般不能被序列化。  · map是一种转换,而不是操作,并且拖延执行。连接对象不能被及时关闭。  · 即便如此,它也只能关闭driver上的连接,而不是释放被序列化拷贝版本分配的资源连接。  事实上,map和flatMap都不是Spark中Mapper的最接近的对应函数,Spark中Mapper的最接近的对应函数是十分重要的mapPartitions()方法,这个方法能够不仅完成单值对单值的映射,也能完成一组值对另一组值的映射,很像一个批映射(bulkmap)方法。这意味着mapPartitions()方法能够在开始时从本地分配资源,并在批映射结束时释放资源。  添加setup方法是简单的,添加cleanup会更困难,这是由于检测转换完成仍然是困难的。例如,这样是能工作的:  lines.mapPartitions { valueIterator =>  val dbConnection = ... // OK  val transformedIterator = valueIterator.map(... dbConnection ...)  dbConnection.close() // Still wrong! May not have evaluated iterator  transformedIterator  }  一个完整的范式应该看起来类似于:  lines.mapPartitions { valueIterator =>  if (valueIterator.isEmpty) {  Iterator[...]()  } else {  val dbConnection = ...  valueIterator.map { item =>  val transformedItem = ...  if (!valueIterator.hasNext) {  dbConnection.close()  }  transformedItem  }  }  }  虽然后者代码翻译注定不如前者优雅,但它确实能够完成工作。  flatMapPartitions方法并不存在,然而,可以通过调用mapPartitions,后面跟一个flatMap(a= > a)的调用达到同样效果。  带有setup和cleanup的Reducer对应只需仿照上述代码使用groupByKey后面跟一个mapPartition函数。  别急,等一下,还有更多  MapReduce的开发者会指出,还有更多的还没有被提及的API:  · MapReduce支持一种特殊类型的Reducer,也称为Combiner,可以从Mapper中减少洗牌(shuffled)数据大小。  · 它还支持同通过Partitioner实现的自定义分区,和通过分组Comparator实现的自定义分组。  · Context对象授予Counter API的访问权限以及它的累积统计。  · Reducer在其生命周期内一直能看到已排序好的key 。  · MapReduce有自己的Writable序列化方案。  · Mapper和Reducer可以一次发射多组输出。  · MapReduce有几十个调优参数。  有很多方法可以在Spark中实现这些方案,使用类似Accumulator的API,类似groupBy和在不同的这些方法中加入partitioner参数的方法,Java或Kryo序列化,缓存和更多。由于篇幅限制,在这篇文章中就不再累赘介绍了。  需要指出的是,MapReduce的概念仍然有用。只不过现在有了一个更强大的实现,并利用函数式语言,更好地匹配其功能性。理解Spark RDD API和原来的Mapper和ReducerAPI之间的差异,可以帮助开发者更好地理解所有这些函数的工作原理,以及理解如何利用Spark发挥其优势。
2023-05-31 14:35:091

MapReduce执行过程

MapReduce存在以下4个独立的实体。 1. JobClient:运行于client node,负责将MapReduce程序打成Jar包存储到HDFS,并把Jar包的路径提交到Jobtracker,由Jobtracker进行任务的分配和监控。 2. JobTracker:运行于name node,负责接收JobClient提交的Job,调度Job的每一个子task运行于TaskTracker上,并监控它们,如果发现有失败的task就重新运行它。 3. TaskTracker:运行于data node,负责主动与JobTracker通信,接收作业,并直接执行每一个任务。 4. HDFS:用来与其它实体间共享作业文件。具体细节如下: 1.JobClient通过RPC协议向JobTracker请求一个新应用的ID,用于MapReduce作业的ID 2.JobTracker检查作业的输出说明。例如,如果没有指定输出目录或目录已存在,作业就不提交,错误抛回给JobClient,否则,返回新的作业ID给JobClient 3.JobClient将作业所需的资源(包括作业JAR文件、配置文件和计算所得得输入分片)复制到以作业ID命名的HDFS文件夹中 4.JobClient通过submitApplication()提交作业 5.JobTracker收到调用它的submitApplication()消息后,进行任务初始化 6.JobTracker读取HDFS上的要处理的文件,开始计算输入分片,每一个分片对应一个TaskTracker map任务不是随随便便地分配给某个TaskTracker的,这里有个概念叫:数据本地化(Data-Local)。意思是:将map任务分配给含有该map处理的数据块的TaskTracker上,同事将程序jar包复制到该TaskTracker上来运行,这叫“运算移动,数据不移动”。而分配reduce任务时并不考虑数据本地化 7.TaskTracker通过心跳机制领取任务(任务的描述信息) 8.TaskTracker读取HDFS上的作业资源(JAR包、配置文件等) 9.TaskTracker启动一个java child子进程,用来执行具体的任务(MapperTask或ReducerTask) 10.TaskTracker将Reduce结果写入到HDFS当中 注:以HDFS的一个块的大小(默认64M)为一个分片  15年2.7.3版本开始,block size由64 MB变成了128 MB的。Shuffle分析Shuffle的中文意思是“洗牌”,如果我们这样看:一个map产生的数据,结果通过hash过程分区缺分配给了不同的reduce任务,是不是一个对数据洗牌的过程呢?  shuffle的概念:   Collections.shuffle(List list):随机打乱list里的元素顺序。   MapReduce里的Shuffle:描述着数据从map task输出到reduce task输入的这段过程。 Map端流程分析 1 每个输入分片会让一个map任务来处理,默认情况下,以HDFS的一个块的大小(默认64M)为一个分片,当然我们也可以设置块的大小。map输出的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为100M,由io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件。 2 在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样做是为了避免有些reduce任务分配到大量数据,而有些reduce任务却分到很少数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行hash的过程。然后对每个分区中的数据进行排序,如果此时设置了Combiner,将排序后的结果进行Combianer操作,这样做的目的是让尽可能少的数据写入到磁盘。 3 当map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和combiner操作,目的有两个:1、尽量减少每次写入磁盘的数据量;2、尽量减少下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件。为了减少网络传输的数据量,这里可以将数据压缩,只要将mapred.compress.map.out设置为true就可以。 数据压缩:Gzip、Lzo、snappy。 4 将分区中的数据拷贝给相对应的reduce任务。有人可能会问:分区中的数据怎么知道它对应的reduce是哪个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和obTracker保持心跳。所以JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置就OK了。 reduce端流程分析 1 reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。如果reduce端接收的数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间百分比),如果数据量超过了该缓冲区大小的一定比例(由mapred.job.shuffle.merg.percent决定),则对数据合并后溢写到磁盘中。 2 随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省空间。其实不管在map端还是在reduce端,MapReduce都是反复地执行排序,合并操作,现在终于明白了有些人为什么会说:排序是hadoop的灵魂。 3 合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数。 4 Reducer的输入文件。不断地merge后,最后会生成一个“最终文件”。为什么加引号?因为这个文件可能存在于磁盘上,也可能存在于内存中。对我们来说,希望它存放于内存中,直接作为Reducer的输入,但默认情况下,这个文件是存放于磁盘中的。当Reducer的输入文件已定,整个Shuffle才最终结束。然后就是Reducer执行,把结果放到HDSF上。 详细过程: 1 每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后在对磁盘中这个map task产生的所有临时文件做一个合并,生成最终的正式输出文件,然后等待reduce task来拉数据。 2 在map task执行时,它的输入数据来源于HDFS的block,当然在MapReduce概念中,map task只读取split。split与block对应关系可能是多对一,默认是一对一。在wordcount例子里,假设map的输入数据都是是像“aaa”这样的字符串。 3 在经过mapper的运行后,我们得知mapper的输出是这样一个key/value对:key是“aaa”,value是数值1。因为当前map端只做加1的操作,在reduce task里采取合并结果集。前面我们知道这个job有3个reduce task。那到底当前的“aaa”究竟该丢给哪个reduce去处理呢?是需要现在做决定的。 4 MapReduce提供Partitioner接口,作用就是根据key或value及reduce的数量来决定当前的输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数据取模(取余)。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以定制并设置到job上。 5 在例子中,“aaa”经过Partition后返回0,也就是这对值应当交由第一个reduce来处理。接下来,需要将数据写入内存缓冲区中,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然,写入之前,key与value值都会被序列化成字节数组。 6 内存缓冲区是有大小限制的,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为spill,中文可理解为溢写。溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。比例默认是0.8,也就是当缓冲区的数据值已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。map task的输出结果还可以往剩下的20MB内存中写,互不影响。 7 当溢写线程启动后,需要对这80MB空间内的key做排序(sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。 8 因为map task的输出是需要发送到不同的reduce端去,而内存缓冲区没有对将发送到相同reduce端的数据做合并,那么这种合并应该是体现在磁盘文件中的。从官方图上也可以看到写到磁盘中的一些文件是对不同的reduce端的数值做过合并。所以溢写过程一个很重要的细节在于,如果有很多个key/value对需要发送到某个reduce端去,那么需要将这些key/value值拼接到一块,减少与partition相关的索引记录。   在针对每个reduce端而合并数据时,有些数据可能像这样:“aaa”/1,“aaa”/1。对于wordcount例子,只是简单地统计单词出现的次数,如果在同一个map task的结果中有很多像“aaa”一样出现多次的key,我们就应该把它们的值合并到一块,这个过程叫reduce也叫combine。但MapReduce的术语中,reduce只值reduce端执行从多个map task取数据做计算的过程。除reduce外,非正式地合并数据只能算作combine了。其实大家知道的,MapReduce中将Combiner等同于Reducer。   如果client设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。 9 每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。当map task真正完成时,内存缓冲区中的数据也全部溢写到磁盘中形成一个溢写文件。最终磁盘中会至少有一个这样的溢写文件存在(如果map的输出结果很少,当map执行完成时,只会产生一个溢写文件),因为最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫Merge。Merge是怎样的?如前面的例子,“aaa”从某个map task读取过来时值是5,从另外一个map读取时值是8,因为他们有相同的key,所以要merge成group。   什么是group:对于“aaa”就是像真阳的:{“aaa”,[5,8,2,...]},数组中的值就是从不同的溢写文件中读取出来的,然后再把这些值加起来。请注意,因为merge是将多个溢写文件合并到一个文件,所以可能也有相同的key存在,在这个过程中,如果client设置过Combiner,也会使用Combiner来合并相同的key。 至此,map端的所有工作都已经结束,最终生成的这个文件也存放在TaskTracker够得到的某个本地目录中。每个reduce task不断地通过RPC从JobTRacker那获取map task是否完成的信息,如果reduce task得到通知,获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程开始启动。 Reduce端的shuffle过程: 1 copy过程,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过http方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。 2 Merge阶段。这里的merge和map端的merge动作相同,只是数组中存放的是不同map端copy来的数值。copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle使用。 3 Merge有三种形式:1、内存到内存;2、内存到磁盘;3、磁盘到磁盘。默认情况下第一种形式不启用,让人比较困惑。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map端类似,这也是溢写的过程,在这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。Map端流程分析 1 每个输入分片会让一个map任务来处理,默认情况下,以HDFS的一个块的大小(默认64M)为一个分片,当然我们也可以设置块的大小。map输出的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为100M,由io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件。 2 在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样做是为了避免有些reduce任务分配到大量数据,而有些reduce任务却分到很少数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行hash的过程。然后对每个分区中的数据进行排序,如果此时设置了Combiner,将排序后的结果进行Combianer操作,这样做的目的是让尽可能少的数据写入到磁盘。 3 当map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和combiner操作,目的有两个:1、尽量减少每次写入磁盘的数据量;2、尽量减少下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件。为了减少网络传输的数据量,这里可以将数据压缩,只要将mapred.compress.map.out设置为true就可以。 数据压缩:Gzip、Lzo、snappy。 4 将分区中的数据拷贝给相对应的reduce任务。有人可能会问:分区中的数据怎么知道它对应的reduce是哪个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和obTracker保持心跳。所以JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置就OK了。 reduce端流程分析 1 reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。如果reduce端接收的数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间百分比),如果数据量超过了该缓冲区大小的一定比例(由mapred.job.shuffle.merg.percent决定),则对数据合并后溢写到磁盘中。 2 随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省空间。其实不管在map端还是在reduce端,MapReduce都是反复地执行排序,合并操作,现在终于明白了有些人为什么会说:排序是hadoop的灵魂。 3 合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数。 4 Reducer的输入文件。不断地merge后,最后会生成一个“最终文件”。为什么加引号?因为这个文件可能存在于磁盘上,也可能存在于内存中。对我们来说,希望它存放于内存中,直接作为Reducer的输入,但默认情况下,这个文件是存放于磁盘中的。当Reducer的输入文件已定,整个Shuffle才最终结束。然后就是Reducer执行,把结果放到HDSF上。 注意:对MapReduce的调优在很大程度上就是对MapReduce Shuffle的性能的调优。 三、内存缓冲区:MapOutputBuffer 两级索引结构: 环形缓冲区: 1 kvoffsets缓冲区:也叫偏移量索引数组,用于保存key/value信息在位置索引kvindices中的偏移量。当kvoffsets的使用率超过io.sort.spill.percent(默认为80%)后,便会触发一次SpillThread线程的“溢写”操作,也就是开始一次spill阶段的操作。 2 kvindices缓冲区:也叫位置索引数组,用于保存key/value在数据缓冲区kvbuffer中的起始位置。 3 kvbuffer数据缓冲区:用于保存实际的key/value的值。默认情况下该缓冲区最多可以使用io.sort.mb的95%,当kvbuffer使用率超过io.sort.spill.percent(默认80%)后,便会触发一次SpillThread线程的“溢写”操作,也就是开始一次spill阶段的操作。
2023-05-31 14:35:191

redux与react-redux

redux 与 react-redux 直接一起使用, 让我总分不清楚这两个各自的职责,分别整理一下。 reudx 对于redux 整个过程如上所示。 1. 用户在UI组件中通过 store.dispatch触发action ; 2. store 自动调用reducer,并传入之前的state,以及1中用户的action, 经过reducer返回新的state; 3. store.subscribe(listener) 订阅state的变化,可通过setState更新react UI。 redux 整个设计是: 所有你“写”的逻辑都集中在一个单独的函数(reducer)中,并且执行这些逻辑的唯一方式就是传给 Redux 一个能够描述当时情景的普通对象(action)。Redux store 调用这些逻辑函数,并传入当前的 state tree 以及这些描述对象,返回新的 state tree,接着 Redux store 便开始通知这些订阅者(subscriber)state tree 已经改变了。 因此要求 reducer 是纯函数和可预测,不能突变。 react-redux react-redux 主要是redux执行的第3步(标红部分)。 使用mapStateToProps订阅 Store,每当state更新的时候,就会自动执行,重新计算 UI 组件的参数,从而触发 UI 组件的重新渲染。 mapStateToProps 是输入逻辑(将state输入到react的UI中); mapDispatchToProps定义了哪些用户的操作应该当作 Action,传给 Store。 mapDispatchToProps是输出逻辑(将用户的操作变成action,从react的UI中发出)。 容器组件 = connect(mapStateToProps,mapDispatchToProps)(UI 组件); mapStateToProps 中使用的state ,来自于provider组件中注入的store。其实现是react 的context属性。 资料:
2023-05-31 14:35:261

redux和react有什么关系

WhatRedux 是 JavaScript 状态容器,提供可预测化的状态管理。一个完整单独的State Tree操作State Tree的Producer纯函数(可拆分为多个子项)通过Action来表达修改State的意图Differ和Fulx类似,也是一种推荐的数据传输方式。只是对比起flux的Dispatcher、Store再分类成Store、Action和Reducer,其中,Action是数据的传输形式,Reducer是事件的集合以及操作的处理(例如增删改查) Redux 由 Flux 演变而来,但受 Elm 的启发,避开了 Flux 的复杂性。Flux 常常被表述为 (state, action) => state。但是用纯函数,而不是用事件处理器来进行。(No EventEmitter)和flux的区别flux和redux的不同从flux的store,view,dispatcher(ACTION_TYPE)演变成了:view,ACTION_TYPE,Reducer,Store(redux中自己有,你在上面注册调用事件即可。或者不注册直接使用默认的dispatcher就好)Redux 并没有 dispatcher 的概念,store(dispatcher)的部分已经在store自动实现了。各层的职责:view负责展示和发送事件,Action_type负责事件信息并且获取数据(同步或者异步)传给reducer,reducer负责处理数据。Redux 设想你永远不会变动你的数据,因此每次都会返回一个新的stateWhy在 React 中,UI 以组件的形式来搭建,组件之间可以嵌套组合。另,React 中组件间通信的数据流是单向的,顶层组件可以通过 props 属性向下层组件传递数据,而下层组件不能向上层组件传递数据,兄弟组件之间同样不能。这样简单的单向数据流支撑起了 React 中的数据可控性。 
2023-05-31 14:35:351

createStore原理及作用

createStore创建一个 Redux store 来以存放应用中所有的 state,应用中应有且仅有一个 store。其中暴露 dispatch, subscribe, getState, replaceReducer 方法。 首先定义了一个 ActionTypes 对象,它是一个 action,是一个 Redux 的私有 action,不允许外界触发,用来初始化 Store 的状态树和改变 reducers 后初始化 Store 的状态树。 它可以接受三个参数,reducer、preloadedState、enhancer: 调用完函数它返回的接口是 dispatch、subscribe、getStatere、placeReducer,这也是我们开发中主要使用的几个接口。
2023-05-31 14:35:421

reducer是什么牌子

reducer减速器双语对照词典结果:reducer[英][rɪ"dju:sə][美][rɪ"dju:sə]n.缩减者,减压器,还原剂; 减速器; 以上结果来自金山词霸例句:1.The reducer gear is the hardened face. 齿轮减速器是硬的脸。2.Cylindrical gear reducer with long service life. 具有使用寿命长圆柱齿轮减速机。3.The forestry sector is the biggest carbon reducer, and electricity generation the biggest carbon source. 林业部门是最大的减碳者,而电力部门则是最大的排放者。
2023-05-31 14:36:021

reducer ecc是什么管件

Ecc Reducer 是偏心异径管件。“管件中reducer是指大小头,如: Reducer 大小头 Concentric reducer 同心大小头 Eccentric reducer 偏心大小头”。
2023-05-31 14:36:101

reducer.ecc 法兰中什么意思

这是偏心变径的意思,和法兰不挨边吧
2023-05-31 14:36:182

mr输出结果文件数量和reducer

MapReduce也有相应的输出格式。默认情况下只有一个 Reduce,输出只有一个文件,默认文件名为 part-r-00000,输出文件的个数与 Reduce 的个数一致。 如果有两个Reduce,输出结果就有两个文件,第一个为part-r-00000,第二个为part-r-00001,依次类推MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
2023-05-31 14:36:251

退烧英语

na.bring down a feverPanadol; Fever Reducer; Acetaminophen正确表达:bring down a fever退烧例句:This kind of medicine can bring down a fever.这种药能退烧。“退烧”还可以用reduce fever来表示,而且,“退烧”可以理解为“体温降下来”所以用英语说就是:The temperature comes down.如果想说“已经退烧了”,最简单的说法是:The fever is gone.发烧时吃的“退烧药”,用英语来表示是fever reducer
2023-05-31 14:36:311

如何在Windows下面运行hadoop的MapReduce程序

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:     1.JobClient 写代码,配置作业,提交作业。     2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。     3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。     4.HDFS:保存作业数据、配置信息等,保存作业结果。Map/Reduce 作业总体执行流程:     代码编写 ----> 作业配置  ---->  作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ---->  Reduce任务分配与执行 ---->  输出结果而对于每个作业的执行,又包含:     输入准备 ----> 任务执行 ----> 输出结果作业提交JobClient:     JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。     submitJob方法作业提交的过程:     1.向JobTracker请求一个新的JobId。     2.检查作业相关路径,如果路径不正确就会返回错误。     3.计算作业输入分片及其划分信息。     4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。     5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。作业的初始化JobTracker:     JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。     当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。     初始化过程就是JobInProgress对象的initTasks方法进行初始化的。     初始化步骤:          1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。          2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。          3.最后就是创建两个初始化task,进行map和reduce的初始化。任务的分配JobTracker:    消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。    分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。    分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。任务的执行TaskTracker:     TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。     1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。     2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。     3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。     在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。     对于单个Map,任务执行的简单流程是:     1.分配任务执行参数     2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)     3.配置log文件夹,配置map任务的通信和输出参数     4.读取input split,生成RecordReader读取数据     5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。     6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。Streaming和Pipes:     Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。     Streaming:使用标准输入输出Streaming与进程进行通信。     Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。     进度和状态更新:     一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。     状态消息与客户端的通信:     1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。     2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。     3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。     4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。     5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。     6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。     前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。     作业的完成:     当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。     如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。     jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。     失败     实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。     1.任务失败         子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。     子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。     任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。     任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。     任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。     2.tasktracker失败     tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。     即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。     3.jobtracker失败     老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。作业调度:     早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。     Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)     Capacity Scheduler:     集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。shuffle和排序:     MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。     整个shuffle的流程应该是这样:     map结果划分partition  排序sort 分割spill   合并同一划分   合并同一划分  合并结果排序 reduce处理 输出     Map端:     写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。     写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。     合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。     在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。     内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。     如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。     写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。     reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。     Reduce端:     复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。     合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。     Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。     Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。     Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。     如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。     随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。     排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。     reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。     细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。
2023-05-31 14:36:542

hive 动态分区后可以指定reduce数吗

reducer个数的设定极大影响执行效率 1. hive.exec.reducers.bytes.per.reducer(默认为1000^3) 2. hive.exec.reducers.max(默认为999) 计算reducer数的公式很简单: N=min(参数2,总输入数据量/参数1) 通常情况下,有必要手动指定reducer个数。考虑到map阶段的输出数据量通常会比输入有大幅减少,因此即使不设定reducer个数,重设参数2还是必要的。依据Hadoop的经验,可以将参数2设定为0.95*(集群中TaskTracker个数)。 正确的reduce任务的 个数应该是 0.95或者1.75 ×(节点数 ×mapred.tasktracker.tasks.maximum参数值)
2023-05-31 14:37:071

reducerecc是什么管件

EccReducer偏心异径管ConReducer同心异径管ECC和CON都是缩写,Eccentricconcentric的缩写。
2023-05-31 14:37:141

redux 多个reducer怎么connect

通结构: export function myAction(id) { return { type: "floor", loadStatus: "loading", loadNum: id } } reducer:-
2023-05-31 14:37:211

如何在hadoop环境下执行mapreduce任务

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:     1.JobClient 写代码,配置作业,提交作业。     2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。     3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。     4.HDFS:保存作业数据、配置信息等,保存作业结果。Map/Reduce 作业总体执行流程:     代码编写 ----> 作业配置  ---->  作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ---->  Reduce任务分配与执行 ---->  输出结果而对于每个作业的执行,又包含:     输入准备 ----> 任务执行 ----> 输出结果作业提交JobClient:     JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。     submitJob方法作业提交的过程:     1.向JobTracker请求一个新的JobId。     2.检查作业相关路径,如果路径不正确就会返回错误。     3.计算作业输入分片及其划分信息。     4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。     5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。作业的初始化JobTracker:     JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。     当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。     初始化过程就是JobInProgress对象的initTasks方法进行初始化的。     初始化步骤:          1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。          2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。          3.最后就是创建两个初始化task,进行map和reduce的初始化。任务的分配JobTracker:    消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。    分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。    分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。任务的执行TaskTracker:     TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。     1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。     2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。     3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。     在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。     对于单个Map,任务执行的简单流程是:     1.分配任务执行参数     2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)     3.配置log文件夹,配置map任务的通信和输出参数     4.读取input split,生成RecordReader读取数据     5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。     6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。Streaming和Pipes:     Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。     Streaming:使用标准输入输出Streaming与进程进行通信。     Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。     进度和状态更新:     一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。     状态消息与客户端的通信:     1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。     2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。     3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。     4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。     5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。     6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。     前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。     作业的完成:     当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。     如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。     jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。     失败     实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。     1.任务失败         子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。     子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。     任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。     任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。     任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。     2.tasktracker失败     tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。     即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。     3.jobtracker失败     老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。作业调度:     早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。     Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)     Capacity Scheduler:     集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。shuffle和排序:     MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。     整个shuffle的流程应该是这样:     map结果划分partition  排序sort 分割spill   合并同一划分   合并同一划分  合并结果排序 reduce处理 输出     Map端:     写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。     写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。     合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。     在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。     内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。     如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。     写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。     reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。     Reduce端:     复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。     合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。     Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。     Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。     Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。     如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。     随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。     排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。     reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。     细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。
2023-05-31 14:37:292

Hadoop,MapReduce,YARN和Spark的区别与联系

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:     1.JobClient 写代码,配置作业,提交作业。     2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。     3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。     4.HDFS:保存作业数据、配置信息等,保存作业结果。Map/Reduce 作业总体执行流程:     代码编写 ----> 作业配置  ---->  作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ---->  Reduce任务分配与执行 ---->  输出结果而对于每个作业的执行,又包含:     输入准备 ----> 任务执行 ----> 输出结果作业提交JobClient:     JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。     submitJob方法作业提交的过程:     1.向JobTracker请求一个新的JobId。     2.检查作业相关路径,如果路径不正确就会返回错误。     3.计算作业输入分片及其划分信息。     4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。     5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。作业的初始化JobTracker:     JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。     当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。     初始化过程就是JobInProgress对象的initTasks方法进行初始化的。     初始化步骤:          1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。          2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。          3.最后就是创建两个初始化task,进行map和reduce的初始化。任务的分配JobTracker:    消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。    分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。    分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。任务的执行TaskTracker:     TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。     1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。     2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。     3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。     在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。     对于单个Map,任务执行的简单流程是:     1.分配任务执行参数     2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)     3.配置log文件夹,配置map任务的通信和输出参数     4.读取input split,生成RecordReader读取数据     5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。     6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。Streaming和Pipes:     Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。     Streaming:使用标准输入输出Streaming与进程进行通信。     Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。     进度和状态更新:     一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。     状态消息与客户端的通信:     1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。     2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。     3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。     4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。     5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。     6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。     前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。     作业的完成:     当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。     如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。     jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。     失败     实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。     1.任务失败         子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。     子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。     任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。     任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。     任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。     2.tasktracker失败     tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。     即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。     3.jobtracker失败     老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。作业调度:     早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。     Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)     Capacity Scheduler:     集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。shuffle和排序:     MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。     整个shuffle的流程应该是这样:     map结果划分partition  排序sort 分割spill   合并同一划分   合并同一划分  合并结果排序 reduce处理 输出     Map端:     写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。     写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。     合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。     在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。     内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。     如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。     写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。     reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。     Reduce端:     复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。     合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。     Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。     Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。     Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。     如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。     随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。     排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。     reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。     细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。
2023-05-31 14:38:312

con reducer and ecc reducer中con、ecc分别表示什么?

Ecc Reducer 偏心异径管 Con Reducer同心异径管 ECC 和CON都是缩写,Eccentric concentric的缩写.
2023-05-31 14:38:431

Hadoop:是什么,如何工作,可以用来做什么

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:     1.JobClient 写代码,配置作业,提交作业。     2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。     3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。     4.HDFS:保存作业数据、配置信息等,保存作业结果。Map/Reduce 作业总体执行流程:     代码编写 ----> 作业配置  ---->  作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ---->  Reduce任务分配与执行 ---->  输出结果而对于每个作业的执行,又包含:     输入准备 ----> 任务执行 ----> 输出结果作业提交JobClient:     JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。     submitJob方法作业提交的过程:     1.向JobTracker请求一个新的JobId。     2.检查作业相关路径,如果路径不正确就会返回错误。     3.计算作业输入分片及其划分信息。     4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。     5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。作业的初始化JobTracker:     JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。     当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。     初始化过程就是JobInProgress对象的initTasks方法进行初始化的。     初始化步骤:          1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。          2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。          3.最后就是创建两个初始化task,进行map和reduce的初始化。任务的分配JobTracker:    消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。    分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。    分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。任务的执行TaskTracker:     TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。     1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。     2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。     3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。     在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。     对于单个Map,任务执行的简单流程是:     1.分配任务执行参数     2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)     3.配置log文件夹,配置map任务的通信和输出参数     4.读取input split,生成RecordReader读取数据     5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。     6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。Streaming和Pipes:     Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。     Streaming:使用标准输入输出Streaming与进程进行通信。     Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。     进度和状态更新:     一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。     状态消息与客户端的通信:     1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。     2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。     3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。     4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。     5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。     6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。     前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。     作业的完成:     当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。     如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。     jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。     失败     实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。     1.任务失败         子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。     子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。     任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。     任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。     任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。     2.tasktracker失败     tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。     即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。     3.jobtracker失败     老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。作业调度:     早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。     Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)     Capacity Scheduler:     集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。shuffle和排序:     MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。     整个shuffle的流程应该是这样:     map结果划分partition  排序sort 分割spill   合并同一划分   合并同一划分  合并结果排序 reduce处理 输出     Map端:     写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。     写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。     合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。     在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。     内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。     如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。     写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。     reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。     Reduce端:     复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。     合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。     Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。     Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。     Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。     如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。     随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。     排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。     reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。     细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。
2023-05-31 14:38:532