很多初次接触到StructuredStreaming 应该会写一个这样的案例
- py脚本不断产生数据写入linux本地, 通过hdfs dfs 建目录文件来实时存储到HDFS中
1. 指定数据schema: 实时json数据
2. 数据源地址:HDFS
3. 结果落地位置: HDFS
这个小案例重点在于数据传输
- item源码:
    // 1. 创建sparksession
    val spark: SparkSession = SparkSession.builder().appName("HDFS_source")
      .master("local[4]").getOrCreate()
    // 1. 指定data源schema---json
    val schema = new StructType()
      .add("name", dataType = "string")
      .add("age", dataType = "integer")
    // 2.指定源址hdfssource
    val source = spark.readStream
      .schema(schema)
      .json("hdfs://hadoop102:8020/dataset/dataset")
    // 3.结果
    val outputPath = "hdfs://hadoop102:8020/filetmp" // 结果存储路径hdfs
    source.writeStream
      .outputMode(OutputMode.Append())
      .format("json")
      .option("checkpointLocation", "hdfs://hadoop102:8020/checkpoint") // hdfs检查点的位置
      .start(outputPath)
      .awaitTermination()
 
报错信息:java.lang.IllegalArgumentException: 'path' is not specified
就是没有指定流处理的sink path在start()中传入sink path 即可;

指定checkpointLocation 地址做容错(也就是检查点)
format落地格式 (parquet , json ...)具体场景具体分析
如果只是对数据进行处理然后打印到console 不用指定sink path


















