关于SparkRdd和SparkSql的几个指标统计,scala语言,打包上传到spark集群,yarn模式运行

news2025/7/13 23:51:36

需求:

❖ 要求:分别用SparkRDD, SparkSQL两种编程方式完成下列数据分析,结合webUI监控比较性能优劣并给出结果的合理化解释.
1、分别统计用户,性别,职业的个数:
2、查看统计年龄分布情况(按照年龄分段为7段)
3、查看统计职业分布情况(按照职业统计人数)
4、统计最高评分,最低评分,平均评分,中位评分,平均每个用户的评分次数,平均每部影片被评分次数:
5、统计评分分布情况
6、统计不同用户的评分次数。
7、统计不同类型的电影分布情况
8、统计每年的电影发布情况。
9、统计每部电影有多少用户评价,总评分情况,平均分情况
10、统计每个用户评价次数,评价总分以及平均分情况
11、求被评分次数最多的 10 部电影,并给出评分次数(电影名,评分次数)
12、分别求男性,女性当中评分最高的 10 部电影(性别,电影名,影评分)
13、分别求男性,女性看过最多的 10 部电影(性别,电影名)
14、年龄段在“18-24”的男人,最喜欢看 10 部电影
15、求 movieid = 2116 这部电影各年龄段(年龄段为7段)的平均影评(年龄段,影评分)
16、求最喜欢看电影(影评次数最多)的那位女性评最高分的 10 部电影的平均影评分(观影者,电影名,影评分)
17、求好片(评分>=4.0)最多的那个年份的最好看的 10 部电影
18、求1997年上映的电影中,评分最高的10部喜剧类电影
19、该影评库中各种类型电影中评价最高的 5 部电影(类型,电影名,平均影评分)
20、各年评分最高的电影类型(年份,类型,影评分)

构建maven工程

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.dataAnalysis</groupId>
    <artifactId>SparkRddAndSparkSQL</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
           <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.0.0</version>
           <scope>provided</scope>

        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.0.0</version>
           <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
           <scope>provided</scope>
        </dependency>

    </dependencies>

    <build>
        <finalName>MovieDataAnalysisBySparkRDD</finalName>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>Run</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>



</project>

hive-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!--mysql连接信息-->
    <!-- jdbc连接的URL -->
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://hadoop102:3306/metastore?useSSL=false</value>
	</property>
    <!-- jdbc连接的Driver-->
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
	</property>

	<!-- jdbc连接的username-->
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
    </property>

    <!-- jdbc连接的password -->
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>123456</value>
    </property>
    <!-- Hive默认在HDFS的工作目录 -->
    <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/user/hive/warehouse</value>
    </property>
    
    <!-- 指定hiveserver2连接的端口号 -->
    <property>
        <name>hive.server2.thrift.port</name>
        <value>10000</value>
    </property>
   <!-- 指定hiveserver2连接的host -->
    <property>
        <name>hive.server2.thrift.bind.host</name>
        <value>hadoop102</value>
	</property>

    

    <!-- 元数据存储授权  -->
    <property>
        <name>hive.metastore.event.db.notification.api.auth</name>
        <value>false</value>
	</property>
	<!-- Hive元数据存储版本的验证 -->
		<property>
			<name>hive.metastore.schema.verification</name>
			<value>false</value>
	</property>

	<!-- hiveserver2的高可用参数,开启此参数可以提高hiveserver2的启动速度 -->
	<property>
		<name>hive.server2.active.passive.ha.enable</name>
		<value>true</value>
	</property>
	
	<!--配置hiveserver2高可用-->
	<property>
		<name>hive.server2.support.dynamic.service.discovery</name>
		<value>true</value>
	</property>
	<property>
		<name>hive.server2.zookeeper.namespace</name>
		<value>hiveserver2_zk</value>
	</property>
	<property>
		<name>hive.zookeeper.quorum</name>
		<value> hadoop102:2181,hadoop103:2181,hadoop104:2181</value>
	</property>
	<property>
		<name>hive.zookeeper.client.port</name>
		<value>2181</value>
	</property>
	<property>
		<name>hive.server2.thrift.bind.host</name>
		<value>hadoop102</value>
	</property>
	<!--配置metastore高可用--><!-- 指定存储元数据要连接的地址 -->
	<property>
		<name>hive.metastore.uris</name>
		<value>thrift://hadoop102:9083,thrift://hadoop104:9083</value>
	</property>

	
	<!--Spark依赖位置(注意:端口号8020必须和namenode的端口号一致)-->
	<property>
		<name>spark.yarn.jars</name>
		<value>hdfs://yang-HA/spark-jars/*</value>
	</property>
	  
	<!--Hive执行引擎-->
	<property>
		<name>hive.execution.engine</name>
		<value>spark</value>
	</property>

	<!--Hive和Spark连接超时时间-->
	<property>
		<name>hive.spark.client.connect.timeout</name>
		<value>10000ms</value>
	</property>
	
</configuration>

run.scala

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

/**
 * @description:
 * @author: 宇文智
 * @date 2022/5/18 10:22
 * @version 1.0
 */
object MovieDataAnalysisBySparkRDD {

  //使用spark Rdd分别统计用户,性别,职业的个数
  def demand_1_by_sparkRdd(spark:SparkSession): Unit = {

    val sc = spark.sparkContext

    //分别统计用户,性别,职业的个数:    用户身份::性别::年龄阶段::职业::邮政编码
    //local[*]:默认模式。自动帮你按照CPU最多核来设置线程数。比如CPU有8核,Spark帮你自动设置8个线程计算。
    val users: RDD[String] = sc.textFile("hdfs://yang-HA/movie/users.dat")

    println("--------查看 users ADD 血缘依赖关系-----------")
    println(users.toDebugString)
    //users会重复使用,将数据缓存
    //users.cache()

    //使用persist方法更改存储级别cache()是rdd.persist(StorageLevel.MEMORY_ONLY)的简写
    users.persist(StorageLevel.MEMORY_AND_DISK_2)

    //设置检查点 如果checkpoint之后的出问题 ,避免数据从头开始计算,而且减少开销
    //会立即启动一个新的job来专门的做checkpoint运算,
    // 所以建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job
    // 只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。
    sc.setCheckpointDir("hdfs://yang-HA/spark_checkpoint")

    val quotas: ListBuffer[MovieQuota] = ListBuffer()

    sc.setJobGroup("008", "使用rdd,计算用户总数")
    val cnt: Long = users.filter(!_.isEmpty).count() //count()为action算子,生成一个job => job_00
    sc.setJobGroup("008", "使用rdd,计算用户总数")
    println("用户总数: " + cnt)
    quotas.append(MovieQuota("001", "用户总数", cnt.toString))


    println("---------使用系统累加器,计算用户总数---------")
    sc.setJobGroup("007", "使用系统累加器,计算用户总数")
    val sum: LongAccumulator = sc.longAccumulator("sum")
    users.foreach(l=>
      sum.add(1)
    )
    sc.setJobGroup("007", "使用系统累加器,计算用户总数")
    println("用户总数:" + sum.value)

    sc.setJobGroup("2", "使用rdd算子,计算男女人数")
    val gender: RDD[(String, Int)] = users.map(line => {
      val lineArr: Array[String] = line.split("::")
      (lineArr(1), 1)
    }).reduceByKey(_ + _) //reduceByKey 自带缓存

    val genderCount: Array[String] = gender.map(x => { //foreach为action算子,生成一个job => job2
      if (x._1.equals("M")) {
        "男性人数:" + x._2
      } else {
        "女性人数:" + x._2
      }
    }).collect() //job_02
    sc.setJobGroup("2", "使用rdd算子,计算男女人数")
    quotas.append(MovieQuota("002", "男女统计", genderCount.mkString(",")))

    sc.setJobGroup("3", "使用自定义累加器,计算男女人数")
    val accumulator = new MyAccumulator()
    sc.register(accumulator)
    users.map(data => {
      val arr: Array[String] = data.split("::")
      accumulator.add(arr(1))
    }).collect() //job_03
    sc.setJobGroup("3", "使用自定义累加器,计算男女人数")
    println(accumulator.value)

    println("---------使用自定义累加器,计算进行职业统计---------")

    sc.setJobGroup("4", "使用自定义累加器,计算进行职业统计")
    val professionAcc = new MyAccumulator()
    sc.register(professionAcc)
    users.map(line => {
      professionAcc.add(line.split("::")(3))
    }).collect() //job_04
    sc.setJobGroup("4", "使用自定义累加器,计算进行职业统计")
    println(professionAcc.value)


    sc.setJobGroup("b", "checkpoint 容错机制开启一个job ,重新计算数据并存储在hdfs")
    val professionCount: RDD[(String, Int)] = users.map(line => {
      val arr: Array[String] = line.split("::")
      (arr(3), 1)
    }).reduceByKey(_ + _)
    professionCount.cache() //只缓存在内存中
    professionCount.checkpoint() //job_05
    sc.setJobGroup("b", "checkpoint 容错机制开启一个job ,重新计算数据并存储在hdfs")

    sc.setJobGroup("a", "使用rdd算子,计算进行职业统计")
    val profession: RDD[String] = sc.textFile("hdfs://yang-HA/movie/profession.dat")
    val professionRelation: RDD[(String, String)] = profession.map(line => {
      val arr: Array[String] = line.split(":")
      (arr(0), arr(1))
    })
    val quotas1: Array[MovieQuota] = professionCount.join(professionRelation).map(line => {
      MovieQuota("003", "职业统计", line._2._2.trim + ": " + line._2._1)
    }).collect() //job_06
    sc.setJobGroup("a", "使用rdd算子,计算进行职业统计")

    quotas.appendAll(quotas1)

    loadDataToHiveLocation(quotas,spark)

  }

  //使用sparkSQL分别统计用户,性别,职业的个数
  def demand_1_by_sparkSql(spark:SparkSession): Unit = {

    spark.sparkContext.setJobGroup("sparksql", "sparksql")

    val ds: Dataset[String] = spark.read.textFile("hdfs://yang-HA/movie/users.dat")

    import spark.implicits._

    val userDS: Dataset[user] = ds.map(line => {
      val lineArr: Array[String] = line.split("::")
      user(lineArr(0), lineArr(1), lineArr(2), lineArr(3), lineArr(4))
    })
    val professionDS: Dataset[profession] = spark.read.textFile("hdfs://yang-HA/movie/profession.dat").map(line => {
      val lineArr: Array[String] = line.split(":")
      profession(lineArr(0), lineArr(1).trim)
    })

    userDS.groupBy("professionId").count
      .join(professionDS, List("professionId"), "left")
      .orderBy("professionId")
      .createOrReplaceTempView("tmp1")
    userDS.createOrReplaceTempView("user")

    spark.sql(
      """
        |set hive.exec.dynamic.partition.mode=nonstrict
        |""".stripMargin)

    spark.sql(
      """
        |insert into table spark_data_analysis_quota.movie_quota partition(dt)
        |select '004','sparkSql职业统计',concat_ws(':',trim(professionName),count), current_date() dt from tmp1 ;
        |""".stripMargin)

  }

  //统计最高评分,最低评分,平均评分,中位评分,平均每个用户的评分次数,平均每部影片被评分次数:
  def demand_2_by_sparkRdd(spark:SparkSession): Unit = {
    var sc = spark.sparkContext
    //统计最高评分,最低评分,平均评分,中位评分,平均每个用户的评分次数,平均每部影片被评分次数:
    //每个用户的评分次数: 评分总次数 / 评分总人数(需去重)
    //最高评分=每部影片评分中取最大值,最低评分同理
    val rating: RDD[String] = sc.textFile("hdfs://yang-HA/movie/ratings.dat")
    val quotas = new ListBuffer[MovieQuota]


    /*var rank = 0
    var beforeVal = -1.0


    rating.map(line => {
      val arr: Array[String] = line.split("::")
      (arr(1), arr(2).toDouble)
    }).groupByKey().map {
      case (k, v) => {
        val sum1: Double = v.sum
        (k, sum1 / v.size)
      }
    }.collect().sortWith((kv1, kv2) => {
      kv1._2 > kv2._2
    }).map(kv=>{
      if(kv._2 != beforeVal){
        beforeVal = kv._2
        rank+=1
      }
      (kv,rank)
    }).filter(_._2==1).foreach(println)*/

    import spark.implicits._
    val ratingTuples: RDD[(String, String, String, String)] = rating.map(line => {
      val arr: Array[String] = line.split("::")
      (arr(0), arr(1), arr(2), arr(3))
    })
    ratingTuples.cache()
    //userId, movieID, rating, timestamp
    val ratingCnt: Long = ratingTuples.count()
    val NumberPeoples: Long = ratingTuples.map(_._1).distinct(8).count()
    val movieCnt: Long = ratingTuples.map(_._2).distinct(8).count()

    quotas.append(MovieQuota("005","平均每个用户的评分次数,平均每部影片被评分次数",(ratingCnt/NumberPeoples+","+ratingCnt/movieCnt)))

    ratingTuples.map(line => (line._2, line._3.toDouble)).groupByKey().map(kv => {
      //统计最高评分,最低评分,平均评分
      var median = 0
      if (kv._2.size % 2 == 1) {
        //奇数
        median = (kv._2.size + 1) / 2
      } else {
        //偶数
        median = kv._2.size / 2
      }
      // println(median)
      val medianVal: Double = kv._2.toList.sortWith((v1, v2) => {
        v1 > v2
      }).apply(median - 1)

      val avgVal: Double = kv._2.sum / kv._2.size

      MovieQuota("006", "最高评分,最低评分,平均评分,中位评分", (kv._1, kv._2.max, kv._2.min, f"$avgVal%.3f", medianVal).toString())
    }).toDS.createOrReplaceTempView("tmp2")

    spark.sql(
      """
        |insert into table spark_data_analysis_quota.movie_quota partition(dt)
        |select  *,current_date() dt from tmp2
        |""".stripMargin)

    loadDataToHiveLocation(quotas,spark)

  }

  //加载数据到hive表
  def loadDataToHiveLocation(quotas: Seq[MovieQuota],spark:SparkSession): Unit ={

    import spark.implicits._
    val sc: SparkContext = spark.sparkContext

    quotas.toDS.createOrReplaceTempView("quotas")

    spark.sql(
      """
        |msck repair table spark_data_analysis_quota.movie_quota;
        |""".stripMargin)

    spark.sql(
      s"""
        |insert into table spark_data_analysis_quota.movie_quota partition(dt)
        |select *, current_date() dt from quotas
        |""".stripMargin)

    sc.setJobGroup("c", "保存文件到hive表的location")
  }



}

object Run {
  def main(args: Array[String]): Unit = {

    // 设置访问HDFS集群的用户名
    System.setProperty("HADOOP_USER_NAME", "atguigu")
    System.setProperty("file.encoding", "UTF-8")

    // 1 创建上下文环境配置对象
    val conf: SparkConf = new SparkConf()
      .setAppName("movie_data_analysis")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //替换默认序列化机制
      .registerKryoClasses(Array(classOf[MovieQuota])) //注册使用kryo序列化的自定义类
      .setMaster("yarn")

    // 2 创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()

    spark.sql(
      """
        |set hive.exec.dynamic.partition.mode=nonstrict
        |""".stripMargin)

    //使用spark Rdd分别统计用户,性别,职业的个数
    MovieDataAnalysisBySparkRDD.demand_1_by_sparkRdd(spark)
    //使用sparkSQL分别统计用户,性别,职业的个数
    MovieDataAnalysisBySparkRDD.demand_1_by_sparkSql(spark)
    //使用spark Rdd统计最高评分,最低评分,平均评分,中位评分,平均每个用户的评分次数,平均每部影片被评分次数:
    MovieDataAnalysisBySparkRDD.demand_2_by_sparkRdd(spark)

    spark.close()
  }
}

case class MovieQuota(var quota_id: String, var quota_name: String, var quota_value: String) {
  override def toString: String = {
    quota_id + '\t' + quota_name + '\t' + quota_value
  }
}


case class profession(professionId: String, professionName: String)

case class rating(userId: String, movieID: String, rating: String, timestamp: String)

case class user(userId: String, gender: String, ageGrades: String, professionId: String, postalCode: String)

case class movie(movieID: String, title: String, genres: String)


//根据输入字段,统计字段总数
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {

  private val genderCountMap: mutable.Map[String, Long] = mutable.Map[String, Long]()

  override def isZero: Boolean = genderCountMap.isEmpty

  override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
    new MyAccumulator
  }

  override def reset(): Unit = genderCountMap.clear

  override def add(v: String): Unit = {

    if (v.equals("M")) {
      genderCountMap("男性") = genderCountMap.getOrElse("男性", 0L) + 1L
    } else if (v.equals("F")) {
      genderCountMap("女性") = genderCountMap.getOrElse("女性", 0L) + 1L
    } else {
      genderCountMap(v) = genderCountMap.getOrElse(v, 0L) + 1L
    }
  }

  override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
    other.value.foreach { case (key, value) => {
      genderCountMap(key) = genderCountMap.getOrElse(key, 0L) + value
    }
    }
  }

  override def value: mutable.Map[String, Long] = this.genderCountMap
}

目标表ddl

CREATE  TABLE `spark_data_analysis_quota.movie_quota`(
     `quota_id` string COMMENT '指标id',
     `quota_name` string COMMENT '指标名',
     `quota_value` string COMMENT '指标值')
    COMMENT '电影指标分析表'
    PARTITIONED BY (
        `dt` string)
    clustered by (quota_id) into 3 buckets
    stored as orc

使用maven install 打jar 包,放到spark 集群上。启动大数据各集群组件,执行 run_spark_job_byJar.sh

$SPARK_HOME/bin/spark-submit \
--class Run \
--master yarn \
--deploy-mode cluster \
--queue spark \
--conf spark.executor.extraJavaOptions="-Dfile.encoding=UTF-8" \
--conf spark.driver.extraJavaOptions="-Dfile.encoding=UTF-8" \
MovieDataAnalysisBySparkRDD.jar \

查看http://hadoop104:8088/cluster yarn历史服务器
在这里插入图片描述
在这里插入图片描述

点击history,跳转到[spark 历史服务器(在hadoop102上启动sbin/start-history-server.sh)]http://hadoop102:4000

查看 spark 作业日志
在这里插入图片描述
在这里插入图片描述

附:

集群启停脚本

cat hadoopHA.sh 
#!/bin/bash
if [ $# -lt 1 ]
then
    echo "No Args Input..."
    exit ;
fi

start_cluster(){
        echo " =================== 启动 hadoop集群 ==================="
        echo " --------------- 启动 hdfs ---------------"
        ssh hadoop102 "/opt/module/hadoopHA/sbin/start-dfs.sh"
        echo " --------------- 启动 yarn ---------------"
        ssh hadoop103 "/opt/module/hadoopHA/sbin/start-yarn.sh"
        echo " --------------- 启动 historyserver ---------------"
        ssh hadoop102 "/opt/module/hadoopHA/bin/mapred --daemon start historyserver"
	echo "---------启动spark日志服务器----------"
	ssh hadoop102 "/opt/module/spark/sbin/start-history-server.sh "
	echo "-----启动hiveservice------"
	ssh hadoop102 "/home/atguigu/bin/hiveservices.sh start"
}


stop_cluster(){
        echo " =================== 关闭 hadoop集群 ==================="
	echo "----------关闭hiveservice-------------"
	ssh hadoop102 "/home/atguigu/bin/hiveservices.sh stop"
        echo " --------------- 关闭 historyserver ---------------"
        ssh hadoop102 "/opt/module/hadoopHA/bin/mapred --daemon stop historyserver"
        echo " --------------- 关闭 yarn ---------------"
        ssh hadoop103 "/opt/module/hadoopHA/sbin/stop-yarn.sh"
        echo " --------------- 关闭 hdfs ---------------"
        ssh hadoop102 "/opt/module/hadoopHA/sbin/stop-dfs.sh"
	echo "---------停止spark日志服务器----------"
	ssh hadoop102 "/opt/module/spark/sbin/stop-history-server.sh "
}

case $1 in
"start")
	echo "--------启动zookeeper----------"
	sh /home/atguigu/bin/dataCollectSystem/zk.sh start
	echo "-------启动大数据高可用集群-------"
	start_cluster	
;;
"stop")
	stop_cluster
	echo "----------关闭zookeeper------------"
	sh /home/atguigu/bin/dataCollectSystem/zk.sh stop
;;
"restart")
	echo "---------重启集群---------"
	stop_cluster
	start_cluster
;;
"status")
	echo " =================hadoopHA集群 各个节点状态==========="
	echo " ==========hadoop102,nn1========="
	n1_port=`ssh hadoop102 "jps | grep -v Jps | grep NameNode"` 
	nn1=`hdfs haadmin -getServiceState nn1`
	echo ${n1_port}" "${nn1}
	
	echo " ==========hadoop103,nn2,rm1========="
	n2_port=`ssh hadoop103 "jps | grep -v Jps | grep NameNode"`
	nn2=`hdfs haadmin -getServiceState nn2`
	echo ${n2_port}" "${nn2}
	rm1_port=`ssh hadoop103 "jps | grep -v Jps | grep ResourceManager"`
	rm1=`yarn rmadmin -getServiceState rm1`
	echo ${rm1_port}" "${rm1}
	
	echo " ==========hadoop104,rm2========="
	rm2_port=`ssh hadoop104 "jps | grep -v Jps | grep ResourceManager"`
	rm2=`yarn rmadmin -getServiceState rm2`
	echo ${rm2_port}" "${rm2}
;;
*)
    echo "Input Args Error..."
;;
esac

cat hiveservices.sh 

#!/bin/bash
HIVE_LOG_DIR=$HIVE_HOME/logs
if [ ! -d $HIVE_LOG_DIR ]
then
	mkdir -p $HIVE_LOG_DIR
fi
#检查进程是否运行正常,参数1为进程名,参数2为进程端口
function check_process()
{
    pid=$(ps -ef 2>/dev/null | grep -v grep | grep -i $1 | awk '{print $2}')
    ppid=$(netstat -nltp 2>/dev/null | grep $2 | awk '{print $7}' | cut -d '/' -f 1)
    echo $pid
    [[ "$pid" =~ "$ppid" ]] && [ "$ppid" ] && return 0 || return 1
}

function hive_start()
{
    metapid=$(check_process HiveMetastore 9083)
    cmd="nohup hive --service metastore >$HIVE_LOG_DIR/metastore.log 2>&1 &"
    cmd=$cmd" sleep 4; hdfs dfsadmin -safemode wait >/dev/null 2>&1"
    [ -z "$metapid" ] && eval $cmd || echo "Metastroe服务已启动"
    server2pid=$(check_process HiveServer2 10000)
    cmd="nohup hive --service hiveserver2 >$HIVE_LOG_DIR/hiveServer2.log 2>&1 &"
    [ -z "$server2pid" ] && eval $cmd || echo "HiveServer2服务已启动"
}

function hive_stop()
{
    metapid=$(check_process HiveMetastore 9083)
    [ "$metapid" ] && kill $metapid || echo "Metastore服务未启动"
    server2pid=$(check_process HiveServer2 10000)
    [ "$server2pid" ] && kill $server2pid || echo "HiveServer2服务未启动"
}

case $1 in
"start")
    hive_start
    ;;
"stop")
    hive_stop
    ;;
"restart")
    hive_stop
    sleep 2
    hive_start
    ;;
"status")
    check_process HiveMetastore 9083 >/dev/null && echo "Metastore服务运行正常" || echo "Metastore服务运行异常"
    check_process HiveServer2 10000 >/dev/null && echo "HiveServer2服务运行正常" || echo "HiveServer2服务运行异常"
    ;;
*)
    echo Invalid Args!
    echo 'Usage: '$(basename $0)' start|stop|restart|status'
    ;;
esac

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1103376.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

QML自定义可长按短按的SpinBox

默认长按10.短按1 主要难点在区分长按和短按,以1s为界限.这里我使用了四个定时器用于实现加和减的长短按操作. shortClickTimer定时器用来在鼠标松开的时候判断是否是短按: 如果是短按的话我们需要借助forbidClickTimer定时器短暂禁用Click事件,否则会出现长按松开的时候10后…

2022最新版-李宏毅机器学习深度学习课程-P22 卷积神经网络CNN

零、吴恩达专项课程引入 概念导入&#xff1a;边缘检测 假如有一张如下的图像&#xff0c;想让计算机搞清楚图像上有什么物体&#xff0c;有两种方法&#xff1a;检测图像的 垂直边缘 和 水平边缘。 如下图所示&#xff0c;一个 6 * 6 的灰度图像&#xff0c;构造一个 3 * 3 …

基于类电磁机制优化的BP神经网络(分类应用) - 附代码

基于类电磁机制优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码 文章目录 基于类电磁机制优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码1.鸢尾花iris数据介绍2.数据集整理3.类电磁机制优化BP神经网络3.1 BP神经网络参数设置3.2 类电磁机制算法应用 4…

AI绘画使用Stable Diffusion(SDXL)绘制中国古代神兽

一、引言 说到神奇异兽&#xff0c;脑海中首先就会跳出我国古代神话传说中的各种神兽。比如青龙、白虎、朱雀、玄武&#xff0c;再比如麒麟、凤凰、毕方、饕餮等等&#xff0c;这些都是大家耳熟能详的的神兽。 这些神兽不仅体现了人们丰富的创造力和想象力&#xff0c;更是我…

day07_oop

今日内存 1.作业 2.成员变量局部变量 3.构造方法 4.对象创建过程 5.重载 零、复习 昨天 介绍面向对象编程类,属性,方法,对象定义类,设置属性,设置方法,创建对象使用对象,调用属性,调用方法内存图(创建对象,创建多个对象,多个引用指向一个对象) 类和属性,方法的关系 类是模板,其…

CUDA学习笔记3——图像卷积实现

分别采用GPU、CPU对图像进行sobel滤波处理 #include <stdio.h> #include "cuda_runtime.h" #include "device_launch_parameters.h" #include<math.h> #include <malloc.h> #include <opencv2/opencv.hpp>#include <stdlib.h…

DevEco Studio的安装和开发环境配置(详细)

记录一下这段时间备赛金砖职赛的鸿蒙应用开发 下载与安装 下载网址华为开发者联盟-智能终端能力开放,共建开发者生态 (huawei.com) 单击下载 然后跳转到下载的具体版本型号 这里我们选择window版本 下载完解压后 双击运行安装 我们需要找到一个不包含中文的地方&…

文件操作和IO详解

文件操作 和 IO 文件,File 这个概念,在计算机里,也是“一词多用”. 文件的狭义和广义 狭义的文件: 指的是硬盘上的文件和目录(文件夹) 广义的文件: 泛指计算机中很多的软硬件资源.操作系统中,把很多的硬件设备和软件设备都抽象成了文件.按照文件的方式来统一管理.例如网卡,操…

薪资27k,从华为外包测试“跳”入字节,说说我转行做测试的这5年.....

转行测试5年了 当时因为家里催促就业&#xff0c;在其中一个室友的内推下进入了一家英语教培机构&#xff0c;前期上班和工资都还算满意&#xff0c;甚至觉得自己找到了一份很稳定的工作了&#xff0c;可是好景不长&#xff0c;“双减政策”的到来&#xff0c;让公司的经济遭受…

LeetCode1389——按既定顺序创建目标数组

LeetCode1389 思路&#xff1a;先将元素存放在集合中&#xff0c;集合中的add&#xff08;index&#xff0c;value&#xff09;方法可以在指定的位置插入元素。 再创建新的数组&#xff0c;将集合中的元素存入数组&#xff0c;直接用数组的话元素移动不好操作。 public class D…

嵌入式学习笔记(58)程序运行为什么需要内存?

1.1.1.计算机程序运行的目的 程序 代码 数据 代码就是函数&#xff0c;表示加工数据的动作。 数据包括全局变量和局部变量&#xff0c;表示被加工的东西。 程序运行的目的要么重在数据结果&#xff08;有返回值&#xff09;&#xff0c;要么重在过程&#xff08;无返回值…

Java和前端都不好找工作,计算机毕业可以干嘛?

37了&#xff0c;11年多的Java经验&#xff0c;其他什么mysql&#xff0c;linux&#xff0c;Vue等等都会&#xff0c;现在失业在家已经1年多&#xff0c;不指望入职摸鱼了&#xff0c;寻思着转行中。祝你好运 这是某问答社区&#xff0c;有个大四学生提问&#xff0c;好迷茫啊马…

华为云云耀云服务器L实例评测|部署war格式的web项目

目录 准备服务器安装java安装tomcat配置tomcat部署Web Adaptor总结 对于很多刚开始接触编程的朋友&#xff0c;通常都期待自己能部署一个网站&#xff0c;并可以在公网上访问。这就需要一台云服务器。最近发现华为云 推出了 云耀云服务器L实例&#xff0c;使用后&#xff0c;体…

R语言进度条:txtProgressBar功能使用方法

R语言进度条使用攻略 在数据处理、建模或其他计算密集型任务中&#xff0c;我们常常会执行一些可能需要很长时间的操作。 在这些情况下&#xff0c;展示一个进度条可以帮助我们了解当前任务的进度&#xff0c;以及大约还需要多长时间来完成&#xff0c;R语言提供了几种简单且灵…

多模态论文串讲

多模态论文串讲 近几年&#xff0c;尤其是 CLIP 出现以来&#xff0c;多模态学习的发展异常火爆。除了传统的VQA、图文检索、图像描述等&#xff0c;还有受启发于 CLIP 的新任务 Language Guided Detection/Segmentation、文本图像生成、文本视频生成等。本次串讲主要还是围绕…

MATLAB——概率神经网络分类问题程序

欢迎关注“电击小子程高兴的MATLAB小屋” %% 概率神经网络 %% 解决分类问题 clear all; close all; P[1:8]; Tc[2 3 1 2 3 2 1 1]; Tind2vec(Tc) %数据类型的转换 netnewpnn(P,T); Ysim(net,P); Ycvec2ind(Y) %转换回来

机器人力控制构架

在交互过程中&#xff0c;环境会对末端执行器可以遵循的几何路径设置约束。这种情况通常被称为约束运动。在这种情况下&#xff0c;使用纯运动控制策略来控制交互是失败的&#xff01;&#xff01; 只有任务准确规划&#xff0c;使用运动控制才能成功执行与环境的交互任务。但…

10.17数电第二次实验

就是数码管有4个&#xff0c;前两个来表示分钟&#xff0c;后两个表示秒钟 然后下面十六个led灯来记录时间&#xff0c;小时以十六进制&#xff0c;就是说4个二进制&#xff0c;4个灯为一组&#xff0c;一共可以显示四位小时 首先是要接收信号&#xff0c;应该是要无信号&…

任务调度器

题目链接 任务调度器 题目描述 注意点 tasks[i] 是大写英文字母任务可以以任意顺序执行两个 相同种类 的任务之间必须有长度为整数 n 的冷却时间 解答思路 利用桶思想&#xff0c;将不同的字母放进同一个桶中&#xff0c;桶的数量为tasks中字母出现频率最高的次数&#xf…

保序回归与金融时序数据

保序回归在回归问题中的作用是通过拟合一个单调递增或递减的函数&#xff0c;来保持数据点的相对顺序特性。 一、保序回归的作用 主要用于以下情况&#xff1a; 1. 有序数据&#xff1a;当输入数据具有特定的顺序关系时&#xff0c;保序回归可以帮助保持这种顺序关系。例如&…