import org.apache.spark.{Partitioner, SparkConf, SparkContext} object PartitionCustom { // 分区器决定哪一个元素进入某一个分区 // 目标: 把10个分区器,偶数分在第一个分区,奇数分在第二个分区 // 自定义分区器 // 1. 创建一个类继承Partitioner // 2. 重写两个方法 // 3. 在创建RDD的时候,partitionBy方法 指定分区器 // 创建一个类继承Partitioner class MyPartitioner extends Partitioner{ override def numPartitions: Int = 2 // 两个分区,编号就是:0,1 // key - value override def getPartition(key: Any): Int = { if(key.asInstanceOf[Int] % 2 == 0){ 0 }else{ 1 } } } def main(args: Array[String]): Unit = { // 创建SparkContext val conf = new SparkConf().setAppName("PartitionCustom").setMaster("local[*]") val sc = new SparkContext(conf) // 初始数据 val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) //val rdd = sc.parallelize(List( (1,1), (2,2)) // 自定义分区器使用的前提:数据是key-value类型 val rdd1 = rdd.map(num =>(num,num)) // 使用自定义分区器 val rdd2 = rdd1.partitionBy(new MyPartitioner) // 在分区完成之后的基础上,只保留key val rdd3 = rdd2.map(t => t._1) rdd3.saveAsTextFile("output6") } }