文章目录  
 1. 打开项目 2. 查看数据集 2.1 查看JSON格式数据 2.2 查看CSV格式数据 2.3 查看TXT格式数据 3. 添加单元测试依赖 4. 创建数据加载与保存对象 4.1 创建Spark会话对象 4.2 创建加载JSON数据方法 4.3 创建加载CSV数据方法 4.4 创建加载Text数据方法 4.5 创建加载JSON数据扩展方法 4.6 创建加载CSV数据扩展方法 4.7 创建加载Text数据扩展方法 4.8 创建保存文本文件方法 4.9 查看程序完整代码 5. 实战小结   
 
打开SparkSQLDataSource项目 查看users.json文件 { "name" :  "李小玲" ,  "gender" :  "女" ,  "age" :  45 } 
{ "name" :  "童安格" ,  "gender" :  "男" ,  "age" :  26 } 
{ "name" :  "陈燕文" ,  "gender" :  "女" ,  "age" :  18 } 
{ "name" :  "王晓明" ,  "gender" :  "男" ,  "age" :  32 } 
{ "name" :  "张丽华" ,  "gender" :  "女" ,  "age" :  29 } 
{ "name" :  "刘伟强" ,  "gender" :  "男" ,  "age" :  40 } 
{ "name" :  "赵静怡" ,  "gender" :  "女" ,  "age" :  22 } 
{ "name" :  "孙强东" ,  "gender" :  "男" ,  "age" :  35 } 
查看users.csv文件 name, gender, age
李小玲, 女, 45 
童安格, 男, 26 
陈燕文, 女, 18 
王晓明, 男, 32 
张丽华, 女, 29 
刘伟强, 男, 40 
赵静怡, 女, 22 
孙强东, 男, 35 
查看users.txt文件 李小玲 女 45 
童安格 男 26 
陈燕文 女 18 
王晓明 男 32 
张丽华 女 29 
刘伟强 男 40 
赵静怡 女 22 
孙强东 男 35 
在pom.xml里添加单元测试框架依赖 < dependency> < groupId> </ groupId> < artifactId> </ artifactId> < version> </ version> </ dependency> 刷新项目依赖 创建net.huawei.practice包 在practice子包里创建DataLoadAndSave对象 创建DataLoadAndSave伴生类 创建spark常量 
val  spark =  SparkSession. builder( )  
  . appName( "DataLoadAndSave" )  
  . master( "local[*]" )  
  . getOrCreate( )  
创建loadJSONData()方法 
def  loadJSONData( filePath:  String ) :  DataFrame =  {    
  spark. read. json( filePath)                          
}                                                    
在伴生类里创建单元测试方法testLoadJSONData()方法 @Test                                                       
def  testLoadJSONData( ) :  Unit  =  {                            
  
  val  df =  DataLoadAndSave. loadJSONData( "data/users.json" )  
  
  df. show( )                                                 
}                                                           
运行testLoadJSONData()测试方法,查看结果 创建loadCSVData()方法 
def  loadCSVData( filePath:  String ) :  DataFrame =  {        
  spark. read                                           
    . option( "header" ,  "true" )                           
    . option( "inferSchema" ,  "true" )                      
    . csv( filePath)                                      
}                                                       
在伴生类里创建单元测试方法testLoadCSVData()方法 @Test                                                        
def  testLoadCSVData( ) :  Unit  =  {                              
  
  val  df =  DataLoadAndSave. loadCSVData( "data/users.csv" )     
  
  df. show( )                                                  
}                                                            
运行testLoadCSVData()测试方法,查看结果 创建loadTextData()方法 
def  loadTextData( filePath:  String ) :  DataFrame =  {    
  spark. read. text( filePath)                          
}                                                    
在伴生类里创建单元测试方法testLoadTextData()方法 运行testLoadTextData()测试方法,查看结果 创建loadJSONDataExpand()方法 
def  loadJSONDataExpand( filePath:  String ) :  DataFrame =  {  
  spark. read. format( "json" ) . load( filePath)               
}                                                        
在伴生类里创建单元测试方法testLoadJSONDataExpand()方法 运行testLoadJSONDataExpand()测试方法,查看结果 创建loadCSVDataExpand()方法 
def  loadCSVDataExpand( filePath:  String ) :  DataFrame =  {     
  spark. read. format( "csv" )                                 
    . option( "header" ,  "true" )                              
    . option( "inferSchema" ,  "true" )                         
    . load( filePath)                                        
}                                                          
在伴生类里创建单元测试方法testLoadCSVDataExpand()方法 运行testLoadCSVDataExpand()测试方法,查看结果 创建loadTextDataExpand()方法 
def  loadTextDataExpand( filePath:  String ) :  DataFrame =  {    
  spark. read. format( "text" ) . load( filePath)                 
}                                                          
在伴生类里创建单元测试方法testLoadTextDataExpand()方法 运行testLoadTextDataExpand()测试方法,查看结果 创建saveTextFile()方法 
def  saveTextFile( inputPath:  String ,  outputPath:  String ) :  Unit  =  { 
  
  val  df =  spark. read. format( "text" ) . load( inputPath)              
  
  df. write. mode( "overwrite" ) . format( "text" ) . save( outputPath)      
}                                                                 
在伴生类里创建单元测试方法testSaveTextFile()方法 运行testSaveTextFile()测试方法,查看结果 package  net. huawei. practice 
import  org. apache. spark. sql.  { DataFrame,  SparkSession} 
import  org. junit.  Test
object  DataLoadAndSave { 
  
  val  spark =  SparkSession. builder( )  
    . appName( "DataLoadAndSave" )  
    . master( "local[*]" )  
    . getOrCreate( )  
  
  def  loadJSONData( filePath:  String ) :  DataFrame =  { 
    spark. read. json( filePath) 
  } 
  
  def  loadCSVData( filePath:  String ) :  DataFrame =  { 
    spark. read
      . option( "header" ,  "true" ) 
      . option( "inferSchema" ,  "true" ) 
      . csv( filePath) 
  } 
  
  def  loadTextData( filePath:  String ) :  DataFrame =  { 
    spark. read. text( filePath) 
  } 
  
  def  loadJSONDataExpand( filePath:  String ) :  DataFrame =  { 
    spark. read. format( "json" ) . load( filePath) 
  } 
  
  def  loadCSVDataExpand( filePath:  String ) :  DataFrame =  { 
    spark. read. format( "csv" ) 
      . option( "header" ,  "true" ) 
      . option( "inferSchema" ,  "true" ) 
      . load( filePath) 
  } 
  
  def  loadTextDataExpand( filePath:  String ) :  DataFrame =  { 
    spark. read. format( "text" ) . load( filePath) 
  } 
  
  def  saveTextFile( inputPath:  String ,  outputPath:  String ) :  Unit  =  { 
    
    val  df =  spark. read. format( "text" ) . load( inputPath) 
    
    df. write. mode( "overwrite" ) . format( "text" ) . save( outputPath) 
  } 
} 
class  DataLoadAndSave { 
  @Test 
  def  testLoadJSONData( ) :  Unit  =  { 
    
    val  df =  DataLoadAndSave. loadJSONData( "data/users.json" ) 
    
    df. show( ) 
  } 
  @Test 
  def  testLoadCSVData( ) :  Unit  =  { 
    
    val  df =  DataLoadAndSave. loadCSVData( "data/users.csv" ) 
    
    df. show( ) 
  } 
  @Test 
  def  testLoadTextData( ) :  Unit  =  { 
    
    val  df =  DataLoadAndSave. loadTextData( "data/users.txt" ) 
    
    df. show( ) 
  } 
  @Test 
  def  testLoadJSONDataExpand( ) :  Unit  =  { 
    
    val  df =  DataLoadAndSave. loadJSONDataExpand( "data/users.json" ) 
    
    df. show( ) 
  } 
  @Test 
  def  testLoadCSVDataExpand( ) :  Unit  =  { 
    
    val  df =  DataLoadAndSave. loadCSVDataExpand( "data/users.csv" ) 
    
    df. show( ) 
  } 
  @Test 
  def  testLoadTextDataExpand( ) :  Unit  =  { 
    
    val  df =  DataLoadAndSave. loadTextDataExpand( "data/users.txt" ) 
    
    df. show( ) 
  } 
  @Test 
  def  testSaveTextFile( ) :  Unit  =  { 
    
    DataLoadAndSave. saveTextFile( "data/users.txt" ,  "result/users" ) 
  } 
} 
在本次实战中,我们通过SparkSQLDataSource项目深入学习了如何使用Spark SQL加载和保存不同格式的数据。首先,我们查看了JSON、CSV和TXT格式的数据集,并通过DataLoadAndSave对象实现了数据的加载与保存功能。我们创建了多个方法,如loadJSONData()、loadCSVData()和loadTextData(),分别用于加载不同格式的数据,并通过单元测试验证了这些方法的正确性。此外,我们还扩展了数据加载方法,使用format()方法灵活加载数据,并实现了数据保存功能,如saveTextFile()方法,将数据保存为文本文件。通过本次实战,我们掌握了Spark SQL处理多种数据格式的基本操作,为后续的数据处理和分析打下了坚实基础。