Shuffle原理剖析

Shuffle,是指对Map输出结果进行分区、排序、合并等处理并交给Reduce的过程。分为Map端的操作和Reduce端的操作。
Shuffle过程
-  
Map端的Shuffle
Map的输出结果首先被缓存到内存,当缓存区容量到达80%(缓冲区默认100MB),就启动溢写操作。当启动溢写操作时,首先需要把缓存中的数据进行分区,然后对每个分区的数据进行排序和合并(combine),之后再写入磁盘文件。每次溢写操作会生成一个新的磁盘文件,随着Map任务的执行,磁盘中就会生成多个溢写文件。在Map任务全部结束前,这些溢写文件会被归并成一个大的磁盘文件,然后通知相应的Reduce任务来领取属于自己处理的数据。
 -  
在Reduce端的Shuffle过程
Reduce任务从Map端的不同Map机器领回属于自己处理的那部分数据,然后对数据进行合并排序后交给Reduce处理
 
作用
-  
保证每一个Reduce任务处理的数据大致是一致的
 -  
Map任务输出的key相同,一定是相同分区,并且肯定是相同的Reduce处理的,保证计算结果的准确性
 -  
Reduce任务的数量决定了分区的数量,Reduce任务越多计算处理的并行度也就越高
Reduce任务的数量(默认为1)可以通过:
job.setNumReduceTasks(数量) 
特点
- Map端溢写时,key相同的一定是在相同的分区
 - Map端溢写时,排序减少了Reduce的全局排序的复杂度
 - Map端溢写是,合并(combiner【可选】)减少溢写文件的体积,提高了Reduce任务在Fetch数据时的效率,它是一种MapReduce优化策略
 - Reduce端计算或者输出时,它的数据都是有序的
 
Shuffle源码追踪
-  
MapTask

 -  
ReduceTask
(略)
建议阅读
 
数据清洗
数据清洗指将原始数据处理成有价值的数据的过程,就称为数据清洗。
企业大数据开发的基本流程:
- 采集数据(flume、logstash)先保存到MQ(Kafka)中
 - 将MQ中的暂存数据存放到HDFS中保存
 - 数据清洗(低价值密度的数据处理)存放到HDFS
 - 算法干预(MapReduce),计算结果保存到HDFS或者HBase
 - 计算结果的可视化展示(Echarts、HCharts)
 
需求
现有某系统某天的Nginx的访问日志,格式如下:
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
110.52.250.126 - - [30/May/2013:17:38:20 +0800] "GET /data/cache/style_1_widthauto.css?y7a HTTP/1.1" 200 1292
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/hot_1.gif HTTP/1.1" 200 680
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/hot_2.gif HTTP/1.1" 200 682
27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/filetype/common.gif HTTP/1.1" 200 90
 
大数据处理的算法,需要参数客户端的ip地址、请求时间、资源、响应状态码
正则表达式提取数据
Regex Expression主要作用字符串
匹配、抽取和替换
语法
| 规则 | 解释 | 
|---|---|
| . | 匹配任意字符 | 
| \d | 匹配任意数字 | 
| \D | 匹配任意非数字 | 
| \w | 配置a-z和A-Z | 
| \W | 匹配非a-z和A-Z | 
| \s | 匹配空白符 | 
| ^ | 匹配字符串的开头 | 
| $ | 匹配字符串的末尾 | 
规则的匹配次数
| 语法 | 解释 | 
|---|---|
| * | 规则匹配0到N次 | 
| ? | 规则匹配1次 | 
| {n} | 规则匹配N次 | 
| {n,m} | 规则匹配n到m次 | 
| + | 规则匹配1到N次(至少一次) | 
应用
# 匹配手机号码 11位数值构成
\d{11}
# 邮箱地址校验  @
.+@.+
 
使用正则表达式提取Nginx访问日志中的四项指标
测试站点:http://regex101.com
分析后得到需要的正则表达式
^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*\[(.*)\]\s"\w*\s(.*)\sHTTP\/1.1"\s(\d{3}).*$
 
使用MapReduce分布式并行计算框架进行数据清洗
注意: 因为数据清洗不涉及统计计算,所以MapReduce程序通常只有map任务,而没有Reduce任务
job.setNumReduceTasks(0)
实现代码
数据清洗的Mapper
package com.baizhi.dataclean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class DataCleanMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    /**
     * @param key
     * @param value   nginx访问日志中的一行记录(原始数据)
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        final String regex = "^(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}).*\\[(.*)\\]\\s\"\\w*\\s(.*)\\sHTTP\\/1.1\"\\s(\\d{3}).*$";
        String line = value.toString();
        final Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE);
        final Matcher matcher = pattern.matcher(line);
        while (matcher.find()) {
            // 四项关键指标  ip 请求时间 请求资源 响应状态码
            String clientIp = matcher.group(1);
            // yyyy-MM-dd HH:mm:ss
            String accessTime = matcher.group(2);
            String accessResource = matcher.group(3);
            String status = matcher.group(4);
            // 30/May/2013:17:38:21 +0800
            // 30/05/2013:17:38:21
            SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
            try {
                Date date = sdf.parse(accessTime);
                SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String finalDate = sdf2.format(date);
                context.write(new Text(clientIp + " " + finalDate + " " + accessResource + " " + status), null);
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }
    }
}
 
初始化类
package com.baizhi.dataclean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class DataCleanApplication {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration(), "data clean");
        job.setJarByClass(DataCleanApplication.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        TextInputFormat.setInputPaths(job,new Path("file:///E:/access.log"));
        TextOutputFormat.setOutputPath(job,new Path("file:///E:/final"));
        job.setMapperClass(DataCleanMapper.class);
        // 注意:数据清洗通常只有map任务而没有reduce
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.waitForCompletion(true);
    }
}
 
数据倾斜
数据分区默认策略
数据倾斜指大量的key相同的数据交由一个reduce任务统计计算,造成”闲的闲死,忙的忙死“这样的现象。不符合分布式并行计算的设计初衷的。
现象
- 某一个reduce运行特别耗时
 - Reduce任务内存突然溢出
 
解决方案
- 增大Reduce任务机器JVM的内存(硬件的水平扩展)
 - 增加Reduce任务的数量,每个Reduce任务只负责极少部分的数据处理,并且Reduce任务的数量增加提高了数据计算的并行度
 
Reduce任务的正确数量: 0.95或者1.75 * (NodeManage数量 * 每个节点最大容器数量)
- 自定义分区规则Partitioner
 
package com.baizhi.partition;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * 自定义分区规则
 */
public class CustomPartitioner extends Partitioner<Text, LongWritable> {
    /**
     * @param key
     * @param value
     * @param i     numReduceTasks
     * @return 分区序号
     */
    public int getPartition(Text key, LongWritable value, int i) {
        if (key.toString().equals("CN-GD")) return 0;
        else if (key.toString().equals("CN-GX")) return 1;
        else if (key.toString().equals("CN-HK")) return 2;
        else if (key.toString().equals("JP-TY")) return 3;
        else return 4;
    }
}
 
- 合适使用
Combiner,将key相同的value进行整合合并 
在combiner合并时,v必须得能支持迭代计算,并且不能够影响Reduce任务的输入
combiner通常就是Reducer任务
// 优化策略:combiner合并操作
job.setCombinerClass(MyReducer.class);
                ![[论文笔记]思维链提示的升级版——回退提示](https://img-blog.csdnimg.cn/img_convert/23e94b28e7419e3047befef347c9dd40.png)









![[java]小程序,输出20个随机数,并统计每个随机数出现的次数](https://img-blog.csdnimg.cn/d5de42f869834b67b3e63fd13e9e4332.png)







