`

hadoop优化

 
阅读更多

一. conf/hadoop-site.xml配置, 略过.
 
二. 注重job重用, 主要是设计key和自定义OutputFormat, 将能合并的mapred job合并.
举例 : 用户访问行为(userid, ip, cookie), 分别统计每个用户的ip数和cookie数.
(a). 把userid和字段存储到key中
public class UserKey implements WritableComparable<UserKey>{
int userId;//userid
byte field;//0代表ip, 1代表cookie
@Override
public int compareTo(UserKey o) {
if(userId > o.userId)return 1;
if(userId < o.userId)return -1;
if(field > o.field)return 1;
if(field < o.field)return -1;
return 0;
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub 
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub 
}
}
(b). 实现自定义的OutputFormat, 下面是两处关键代码如下 : 
(x). 
SequenceFile.Writer[] writers = new SequenceFile.Writer[2];
writers[0] = SequenceFile.createWriter(FileSystem.get(conf), conf, "ip", IntWritable.class, IntWritable.class, CompressionType.BLOCK, new DefaultCodec());
writers[1] = SequenceFile.createWriter(FileSystem.get(conf), conf, "field", IntWritable.class, IntWritable.class, CompressionType.BLOCK, new DefaultCodec());
(xx). 
writers[key.field].append(key.userId, value.get());
 
三. 避免不必要的reduce任务.
(1). 假定要处理的数据是排序且已经分区的. 或者对于一份数据, 需要多次处理, 可以先排序分区.
(2). 自定义InputSplit, 将单个分区作为单个mapred的输入.
(3). 在map中处理数据, Reducer设置为空.
这样, 既重用了已有的 "排序", 也避免了多余的reduce任务.
 
四. 使用自定义的MapRunnable.
hadoop自带了两个MapRunnable, 
(1). 一个是默认的单线程MapRunnable, org.apache.hadoop.mapred.MapRunner
(2).另一个是多线程的, org.apache.hadoop.mapred.lib.MultithreadedMapRunner.
根据特定情况, 可以自定义MapRunnable, 
(1). 启用多线程, 比如web爬行时, 可启用多线程抓取网页.
(2). 避免map时, 单台tasktracker上辅助数据冗余, 比如在多模匹配时, 避免生成多份DFA.
 
五. 在某些情况下, 利用数据分布特性设计PARTITIONER的分区算法, 避免单个mapred消耗时间过长.
比如处理大量字符串时, 
(1). 已知首字不同的字符串之间不存在任何关联关系
(2). 原始数据在某些 "首字" 上分布密集, 另一些 "首字" 上分布稀疏.
例如, 原始数据中, 1亿个以3开头, 1亿个以7开头, 3个以6开头.
那么, 
(1). 如果以首字对4求余分区, 则 "1亿个以3开头" 和 "1亿个以7开头"将落在同一分区.若hadoop群集只支持同时2个map任务, 则...
(2). 如果以首字对3求余分区, 则 "1亿个以3开头" 和 "1亿个以7开头"将落在不同分区.
 
六. 最大限度地重用对象, 避免对象的生成/销毁开销.
该点在hadoop自带的org.apache.hadoop.mapred.MapRunner中尤为突出, 
它使用同一个key对象和同一个value对象不停读取原始数据, 再将自身交给mapper处理.
(此处注意, 若要保留该对象的即时状态, 需要clone, 深克隆或浅克隆.)
 
七. 在逻辑意义上, 合并同一对象. 如dotnet和java中的字符串INTERN技术.
 
八. 根据已有条件, 简化循环判定. 
比如, for(int i = 0; i < end && i < size; i++);
可以改成 : 
end = end < size ? end : size;
for(int i = 0; i < end; i++);
 
九. 降低多线程数目, 而让固定数目的线程循环处理.
比如, 一台机器8个CPU, 现在需要处理80亿个数据, 
那么下面两个方案 : 
(1). 启动800个线程, 每个线程处理80亿/800个数据.
(2). 启动8个线程(注意, 此处是8个), 每个线程循环处理, 每次循环处理100万个.
通常我个人选择方案(2).因为 : 
(1). 最大限度利用了CPU. 
(2). 避免了线程调度.
(3). 在java中, 可以使用AtomicInteger控制线程循环, AtomicInteger的效率很高.
 
十. 使用位移替代浮点数计算. 比如用 100 >> 3替代100 * 0.125.
(另外, 我们会需要将某个中间值乘以一个调节因子(经验值), 比如乘以0.12, 
如果乘以0.12和0.124 "差不多" 时, 可以考虑直接使用位移).
 
十一. 避免循环体内不必要的判断逻辑, 与第八条不同.
比如, 处理10亿个数据, 每遇到一个有效数据时, 需要同前一个有效数据进行关联处理(或与前一个中间值进行关联处理), 
for(int i = 0; i < size; i++)
{
//1. 判定是否存在前一个有效数据
//2. 如果不存在前一个有效数据, 则continue;
//3. 如果存在前一个有效数据, 则进行关联处理, 再continue.
}
通常在此种需求下, 一旦遇到一个有效数据, 必定会产生一个可供后续紧邻数据关联的值, 
那么 : 
int i = 0; 
for(int i = 0; i < size; i++)
{
//1. data[i]是否有效?
//2. data[i]无效, continue;
//3. data[i]有效, break;
}
for(; i < size; i++)
{
//与前一个有效数据进行关联处理, 再continue.
}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics