目录
1.说明
2.怎样设置默认切片数
2.1 RDD默认切片设置
2.2 SparkSQL默认切片设置
3. makeRDD 切片原理
4. textFile 切片原理
4.1 切片规则
4.2 怎样设置切片大小
4.3 测试代码
5.hadoopFile 切片原理
5.1 说明
5.2 切片规则
5.3 怎样设置切片大小
5.4 代码测试
5.5 minPartitions 在 CombineTextInputFormat 中的作用?
5.6 重点关注
1.说明
在spark中为我们提供了用来读取数据的方法
     比如 makeRDD、parallelize、textFile、hadoopFile等方法
     
 这些方法按照数据源可以分为两类 文件系统、Driver内存中的集合数据
 当我们使用指定的方法读取数据后,会按照指定的切片个数对文件进行切片
2.怎样设置默认切片数
在我们在使用RDD的算子时,经常会遇到可以显式的指定切片个数,或者隐式的使用默认切片个数,下面会告诉我们,怎样设置默认切片个数
2.1 RDD默认切片设置
1.驱动程序中设置
val sparkconf: SparkConf = new SparkConf().setAppName("测试默认切片数")
   .set("spark.default.parallelism","1000")
   .setMaster("local[100]")
2.spark-shell或spark-submit 设置
spark-shell \
--master yarn \
--name "spark-shell-tmp" \
--conf spark.default.parallelism=1000 \
--driver-memory 40G \
--executor-memory 40G \
--num-executors 40 \
--executor-cores 6 \
3.不指定 spark.default.parallelism 参数时,将使用默认值
    local模式:
        local[100] :  100
        local      :  客户端机器核数
    集群模式(yarn):
        2 或者 核数总和 
源码:

查看默认切片数:
// 获取默认切片数
val parallelism = sc.defaultParallelism 
2.2 SparkSQL默认切片设置
-- 设置默认切片数
set spark.sql.shuffle.partitions=1000;
默认值:
  当不设置时,默认为200
注意:
  spark.default.parallelism 只有在处理RDD时才会起作用,对SparkSQL的无效
  spark.sql.shuffle.partitions 则是对sparks SQL专用的设置 
3. makeRDD 切片原理
可用通过 makeRDD算子 将Driver中序列集合中数据转换成RDD,在转换的过程中,会根据指定的切片个数 和 集合索引对集合切片
切片规则:
根据集合长度和切片数将集合切分成若干子集合(和集合元素内容无关)
示例代码:
  test("makeRDD - 切片逻辑") {
    // 初始化 spark配置实例
    val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sparkconf)
    val rdd: RDD[(String, String)] = sc.makeRDD(List(
      ("张飞1", "张飞java scala spark")
      , ("张飞2", "张飞java scala spark")
      , ("刘备3", "刘备java spark")
      , ("刘备4", "刘备java scala spark")
      , ("刘备5", "刘备scala spark")
      , ("关羽6", "关羽java scala spark")
      , ("关羽7", "关羽java scala")
      , ("关羽8", "关羽java scala spark")
      , ("关羽9", "关羽java spark")))
    // 查看每个分区的内容
    rdd.mapPartitionsWithIndex(
      (i, iter) => {
        println(s"分区编号$i :${iter.mkString(" ")}");
        iter
      }
    ).collect()
    rdd.getNumPartitions
    sc.stop()
  }
 
结果:
源码阅读:
1. 通过SparkContext创建rdd
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
2. ParallelCollectionRDD类中的 getPartitions方法
override def getPartitions: Array[Partition] = {
  val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
  slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
3. ParallelCollectionRDD对象的slice方法(核心切片逻辑)
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
  // 对切片数做合法性校验
  if (numSlices < 1) {
    throw new IllegalArgumentException("Positive number of partitions required")
  }
  // TODO 通过 集合长度和切片数 获取每个切片的位置信息
  // 从这可以得出 对集合的切片只和 集合索引和切片数相关,和集合内容无关
  // 将 集合索引按照切片数 切分成若干元素
  def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
    (0 until numSlices).iterator.map { i =>
      val start = ((i * length) / numSlices).toInt
      val end = (((i + 1) * length) / numSlices).toInt
      (start, end)
    }
  }
  // 对集合类型做判断
  seq match {
    case r: Range =>
      positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
        // If the range is inclusive, use inclusive range for the last slice
        if (r.isInclusive && index == numSlices - 1) {
          new Range.Inclusive(r.start + start * r.step, r.end, r.step)
        } else {
          new Range.Inclusive(r.start + start * r.step, r.start + (end - 1) * r.step, r.step)
        }
      }.toSeq.asInstanceOf[Seq[Seq[T]]]
    case nr: NumericRange[T] =>
      // For ranges of Long, Double, BigInteger, etc
      val slices = new ArrayBuffer[Seq[T]](numSlices)
      var r = nr
      for ((start, end) <- positions(nr.length, numSlices)) {
        val sliceSize = end - start
        slices += r.take(sliceSize).asInstanceOf[Seq[T]]
        r = r.drop(sliceSize)
      }
      slices.toSeq
    case _ =>
      val array = seq.toArray // To prevent O(n^2) operations for List etc
      positions(array.length, numSlices).map { case (start, end) =>
          array.slice(start, end).toSeq
      }.toSeq
  }
} 
4. textFile 切片原理
textFile使用的MapReduce框架中TextInputFormat类完成对文件切片和读取切片中数据

4.1 切片规则
1.对job输入路径中的每个文件单独切片
2.判断每个文件是否支持切片
true : 按照指定切片大小对文件切片
false: 文件整体作为一个切片
4.2 怎样设置切片大小
// 切片大小计算规则
    splitSize = Math.max(minSize, Math.min(goalSize, blockSize))
// 参数说明
    1.minSize
    set mapreduce.input.fileinputformat.split.minsize=256000000 或 
    set mapred.min.split.size=256000000
    默认值 minSize=1L
    2.goalSize
    goalSize=所有文件大小总和/指定的切片个数
    3.blockSize
    本地目录32M|HDFS目录128M或256M(看hdfs文件块具体配置)
// 需求 
    1.真实切片大小 < blockSize
    goalSize=所有文件大小总和/指定的切片个数 < blockSize 即(创建rdd时调大切片个数)
    2.真实切片大小 > blockSize
    set mapreduce.input.fileinputformat.split.minSize=大于blockSize值 
4.3 测试代码
  test("textFile - 切片逻辑") {
    // 初始化 spark配置实例
    val sf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("Test textFile")
    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sf)
    sc.hadoopConfiguration.setInt("mapred.min.split.size", 469000000)
    // sc.hadoopConfiguration.setInt("mapreduce.input.fileinputformat.split.minsize", 256000000)
    // 读取目录下的所有文件
    val rdd: RDD[String] = sc.textFile("src/main/resources/data/dir/dir3/LOL.map", 1000)
    // 打印分区个数
    println("切片个数:"+rdd.getNumPartitions)
    sc.stop()
  }
 
执行结果:

5.hadoopFile 切片原理
5.1 说明
def hadoopFile[K, V](
    path: String,
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
  assertNotStopped()
功能:
  读取HDFS文件或本地文件来创建RDD(使用MapReduce框架中InputFormat类)
参数:
  path: 指定job的输入路径
  inputFormatClass: 对输入文件切片和读取的实现类
  keyClass: key的数据类型
  valueClass: value的数据类型
  minPartitions: 最小切片数 
5.2 切片规则
根据指定的切片大小进行切片,允许将多个文件合并成换一个切片对象
5.3 怎样设置切片大小
指定切片大小(默认值Long.MaxValue)
set mapred.max.split.size=切片大小 或
set mapreduce.input.fileinputformat.split.maxsize=切片大小 
5.4 代码测试
  test("spark中使用 CombineTextInputFormat") {
    // 初始化 spark配置实例
    val sf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sf)
    // 读取目录下的所有文件
    val input = "src/main/resources/data/dir/dir3"
        val combineRDD: RDD[(LongWritable, Text)] = sc.hadoopFile[LongWritable, Text
         , org.apache.hadoop.mapred.lib.CombineTextInputFormat](input, 10000)
    //    val combineRDD: RDD[(LongWritable, Text)] = sc.hadoopFile[LongWritable, Text
    //      , org.apache.hadoop.mapred.TextInputFormat](input, 10000)
    sc.hadoopConfiguration.setInt("mapred.max.split.size", 128000000)
    //sc.hadoopConfiguration.setInt("mapreduce.input.fileinputformat.split.maxsize", 128000000)
    println("切片个数:" + combineRDD.getNumPartitions)
    //combineRDD.map(_._2.toString).foreach(println(_))
    //combineRDD.collect()
    //combineRDD.had
    sc.stop()
  }
 
执行结果:
5.5 minPartitions 在 CombineTextInputFormat 中的作用?
CombineTextInputFormat切片逻辑和 最小切片数(minPartitions) 无关
查看 org.apache.hadoop.mapred.lib.CombineTextInputFormat类 getSplits方法
TODO: numSplits指定的切片个数,并没有使用
  public InputSplit[] getSplits(JobConf job, int numSplits) 
    throws IOException {
    List<org.apache.hadoop.mapreduce.InputSplit> newStyleSplits =
      super.getSplits(Job.getInstance(job));
    InputSplit[] ret = new InputSplit[newStyleSplits.size()];
    for(int pos = 0; pos < newStyleSplits.size(); ++pos) {
      org.apache.hadoop.mapreduce.lib.input.CombineFileSplit newStyleSplit = 
        (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) newStyleSplits.get(pos);
      ret[pos] = new CombineFileSplit(job, newStyleSplit.getPaths(),
        newStyleSplit.getStartOffsets(), newStyleSplit.getLengths(),
        newStyleSplit.getLocations());
    }
    return ret;
  } 
5.6 重点关注
对计算任务而言,合并小文件是一把双刃剑,合并小文件后 就舍弃了数据本地化,则加了网络IO的开销,需要根据实际情况合理的选择 切片策略
CombineTextInputFormat源码参考:https://blog.csdn.net/wawmg/article/details/17095125


















