Do not deify machine learning,it is not that cool
25 Oct 2013
之前学习Erlang时实现过一个粗糙的Erlang分布式计算系统EDCFP,因为Erlang天然的分布式特性支持,以及异构机器之间互调函数的极大便利性,所以做这种事情一(hao)点(wu)都(ke)不(ji)费(han)事(liang)。现在回头看EDCFP,真是too young too simple,it’s totally naive。于是花点心思重构了一下,以期普适性有所增强。
###一,核心思想
仍然是pmap的思想,即在map中新建进程,大家分头行动。如果在同一台机器上,模型类似多进程编程;如果是在多台机器上,就是分布式计算了。但是对erlang来说,二者并无太大差别,所以用erlang写分布式程序跟用传统语言写多进程程序感觉是差不多的。要说差别,就是比传统语言的多进程编程模型还要简洁,要考虑的杂事更少了,比如互斥锁什么的,因为erlang中不同进程之间的交互是通过异步消息传输进行的,而非共享内存。所以很少存在竞争资源的情况。
这个framework的分布式架构模仿Hadoop,采用Master-Slave的主从结构,不同的是Master同样参与计算。节点之间的安全通信就交给erlang虚拟机来解决啦,我们只需要在不同节点的home目录下放置一个内容相同的cookie文件即可。考虑到分布式计算作业中不同节点之间的相互影响很小,因为即使某个节点的进程挂掉了,打个log等master善后处理即可(不过,让人呵呵的是这个framework中log模块还没有加,所以这还只是一个坑:(),不需要其他节点都停下来。所以新建进程使用spawn,而不是spawn_link。重构后的系统支持本地模式(即只调用Master节点进行计算)和分布式模式。
###二,更新内容
本系统的历史故事请移步这里,这次重构主要集中在增加使用的灵活性和健壮性方面。变动主要在mprd_master.erl文件,新增修改内容包括:
###三,用法示例
由于系统很不成熟,用法并不像Hadoop那样有严格的规范流程,看到大跌眼镜之处请默默谅解。
首先以一个简单示例入手,还是那个求阶乘的栗子(当然你可以换成斐波那契数列,乘方等)。这个例子是按照ErlangMapReduceFramework的标准用法来的。首先编写用户函数:
-module(factorial). -export([my_map/1, my_reduce/1]). fact(0) -> 1; fact(N) when N < 0 -> io:format("参数错误~n"); fact(N) when N > 0 -> N * fact(N - 1). % used at all nodes my_map(InDat) -> fact(InDat). % used only at master side my_reduce([]) -> []; my_reduce(OutDat) -> io:format("my reduce come in~n", []), io:format("~w~n", [OutDat]).
我们使用my_map进行阶乘的计算,使用my_reduce对最终结果进行处理,这里我们采取直接输出的处理方式 。
看一下master节点的代码:
-module(mprd_master). -compile(export_all). map(Func, UserReduce, List, SlaveNum) -> Pid = self(), Pids = lists:map(fun(I) -> spawn(fun() -> do_work(Pid, Func, I) end) end, List), case SlaveNum > 0 of true -> Res = gather(Pids, SlaveNum); _ -> Res = Pids end, R = reduce(Res), case whereis(master) of undefined -> ok; _ -> unregister(master) end, UserReduce(R). reduce([]) -> []; reduce([H | T]) -> receive {H, Res} -> [Res | reduce(T)] end. gather(Pids, 0) -> Pids; gather(Pids, SlaveNum) -> receive {finished, SlaveRes} -> Res = lists:append(Pids, SlaveRes), gather(Res, SlaveNum - 1) end. print(Ele) -> io:format("~w~n", [Ele]). do_work(Parent, Func, I) -> Parent ! {self(), (catch Func(I))}. my_spawn({SlaveNode, L}, Func) -> spawn(SlaveNode, mprd_slave, map, [Func, L, master, node()]). my_split([], _, _, L) -> L; my_split(List, Len, NodeCnt, L) when length(List) >= Len -> case length(L) of NodeCnt -> [List | L]; _ -> {H, T} = lists:split(Len, List), my_split(T, Len, NodeCnt, [H | L]) end; my_split(List, Len, _, L) -> L. start(Func, UserReduce, L) -> %spawn(mprd_master, map, [Func, UserReduce, L, 0]). map(Func, UserReduce, L, 0). start(SlaveNodes, Func, UserReduce, L) when length(SlaveNodes) > length(L) -1 -> io:format("Make sure the number of slave node is less than the length of List please!\n"); start(SlaveNodes, Func, UserReduce, L) -> % slave + master Nodes = length(SlaveNodes) + 1, Len = length(L) div Nodes, [H | Lists] = my_split(L, Len, length(SlaveNodes), []), io:format("Master: ~w~n", [H]), XS = lists:zip(SlaveNodes, Lists), io:format("~p~n", [XS]), register(master, spawn(mprd_master, map, [Func, UserReduce, H, length(SlaveNodes)])), [my_spawn(X, Func) || X <- XS], ok.
slave节点代码:
-module(mprd_slave). -compile(export_all). map(Func, List, MasterName, MasterNode) -> Pids = lists:map(fun(I) -> spawn(fun() -> do_work(MasterName, MasterNode, Func, I) end) end, List), {MasterName, MasterNode} ! {finished, Pids}. do_work(MasterName, MasterNode, Func, I) -> {MasterName, MasterNode} ! {self(), (catch Func(I))}.
用法如下:
1,环境准备,生成输入数据
这次使用的是3台slave + 1台master的架构,环境都是Centos X64, erlang版本为:
Erlang R16B02 (erts-5.10.3) [source] [64-bit] [smp:16:16] [async-threads:10] [hipe] [kernel-poll:false]
节点名称分别为master,qb2,qb3,qb4
输入数据:
2,调用master函数,将用户函数和输入数据传入
首先是单机版:
(master@QBHadoop1)4> mprd_master:start(fun(X) -> factorial:my_map(X) end, fun(X) -> factorial:my_reduce(X) end, L).
输出:
[1,2,6,24,120,720,5040,40320,362880,3628800,39916800,479001600,
6227020800,87178291200,1307674368000,20922789888000,355687428096000,
6402373705728000,121645100408832000,2432902008176640000]
然后是集群版:
(master@QBHadoop1)2> Slaves=[qb2@QBHadoop2, qb3@QBHadoop3, qb4@QBHadoop4]. (master@QBHadoop1)4> mprd_master:start(Slaves, fun(X) -> factorial:my_map(X) end, fun(X) -> factorial:my_reduce(X) end, L).
输出:
[20922789888000,355687428096000,6402373705728000,121645100408832000,
2432902008176640000,39916800,479001600,6227020800,
87178291200,1307674368000,720,5040,40320,362880,3628800,1,2,6,24,120]
图中红色圈出的是为每个节点分配的list。由于各个节点计算完毕的时间不同,因此结果列表和输入列表顺序是不一致的。
下面再来看一个非常规用法的示例:快速排序的并行版本。
单进程版的快排erlang代码这里有。并行版本代码如下:
-module(qsort). -compile(export_all). my_reduce([]) -> []; my_reduce(OutDat) -> OutDat. qsort([]) -> []; qsort([Pivot]) -> [Pivot]; qsort([Pivot | Rest]) -> L = [X || X <- Rest, X =< Pivot], R = [X || X <- Rest, X > Pivot], [SortL, SortR] = mprd_master:start(fun qsort/1, fun my_reduce/1, [L, R]), SortL ++ [Pivot] ++ SortR.
输出:
注意,这里的并行只是单机多进程模式,而非分布式模式。因为排序时列表拆分需要保证列表之间有序,所以列表的自动拆分对排序这种情况是不适用的。这也暴露了这个系统封装性做的还远远不够啊远远不够。同时,这里说的“非常规用法”是指我们在应用程序中调用了framework的入口函数,和传统用法中的直接调用framework的入口函数,然后把用户函数传递进去的方式不同,感觉很诡异吧~ ###四,总结 Erlang内置的分布式支持,对用户极其友好的的节点通信认证机制等特性,非常适合用来做分布式程序的开发。尤其对于数据分布在多台业务机上的情形,可以考虑使用erlang来实现一些简单的并发数据统计的工作,而不需要先把数据拉到同一台处理机上再做处理了。