3.6.3 特征处理原则
- 离散数据 
  
- one-hot编码
 
 - 连续数据 
  
- 归一化
 
 - 图片/文本 
  
- 文章标签/关键词提取
 - embedding
 
 
 
3.6.4 优化训练方式
- 使用Batch SGD优化 
  
- 加入正则化防止过拟合
 
 
3.6.5 spark LR 进行预估
- 目的:通过LR模型进行CTR预估
 - 步骤: 
  
- 1、需要通过spark读取HIVE外部表,需要新的sparksession配置 
    
- 增加HBASE配置
 
 - 2、读取用户点击行为表,与用户画像和文章画像,构造训练样本
 - 3、LR模型进行训练
 - 4、LR模型预测、结果评估
 
 - 1、需要通过spark读取HIVE外部表,需要新的sparksession配置 
    
 
创建环境
import os
import sys
# 如果当前代码文件运行测试需要加入修改路径,避免出现后导包问题
BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd()))
sys.path.insert(0, os.path.join(BASE_DIR))
PYSPARK_PYTHON = "/miniconda2/envs/reco_sys/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LogisticRegressionModel
from offline import SparkSessionBase
class CtrLogisticRegression(SparkSessionBase):
    SPARK_APP_NAME = "ctrLogisticRegression"
    ENABLE_HIVE_SUPPORT = True
    def __init__(self):
        self.spark = self._create_spark_hbase()
ctr = CtrLogisticRegression()
 
- 1、这里注意的是_create_spark_hbase,我们后面需要通过spark读取HIVE外部表,需要新的配置
 
    def _create_spark_hbase(self):
        conf = SparkConf()  # 创建spark config对象
        config = (
            ("spark.app.name", self.SPARK_APP_NAME),  # 设置启动的spark的app名称,没有提供,将随机产生一个名称
            ("spark.executor.memory", self.SPARK_EXECUTOR_MEMORY),  # 设置该app启动时占用的内存用量,默认2g
            ("spark.master", self.SPARK_URL),  # spark master的地址
            ("spark.executor.cores", self.SPARK_EXECUTOR_CORES),  # 设置spark executor使用的CPU核心数,默认是1核心
            ("spark.executor.instances", self.SPARK_EXECUTOR_INSTANCES),
            ("hbase.zookeeper.quorum", "192.168.19.137"),
            ("hbase.zookeeper.property.clientPort", "22181")
        )
        conf.setAll(config)
        # 利用config对象,创建spark session
        if self.ENABLE_HIVE_SUPPORT:
            return SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
        else:
            return SparkSession.builder.config(conf=conf).getOrCreate()
 
- 2、读取用户点击行为表,与用户画像和文章画像,构造训练样本 
  
- 目标值:clicked
 - 特征值: 
    
- 用户画像关键词权重:权重值排序TOPK,这里取10个
 - 文章频道号:channel_id, ID类型通常要做one_hot编码,变成25维度(25个频道) 
      
- 这里由于我们的历史点击日志测试时候是只有18号频道,所以不进行转换
 
 - 文章向量:articlevector
 - 总共:10 + 1+ 100 = 110
 
 
 
 
进行行为日志数据读取
ctr.spark.sql("use profile")
# +-------------------+----------+----------+-------+
# |            user_id|article_id|channel_id|clicked|
# +-------------------+----------+----------+-------+
# |1105045287866466304|     14225|         0|  false|
user_article_basic = ctr.spark.sql("select * from user_article_basic").select(
    ['user_id', 'article_id', 'channel_id', 'clicked'])
 
用户画像读取处理与日志数据合并
user_profile_hbase = ctr.spark.sql(
    "select user_id, information.birthday, information.gender, article_partial, env from user_profile_hbase")
user_profile_hbase = user_profile_hbase.drop('env')
# +--------------------+--------+------+--------------------+
# |             user_id|birthday|gender|     article_partial|
# +--------------------+--------+------+--------------------+
# |              user:1|     0.0|  null|Map(18:Animal -> ...|
_schema = StructType([
    StructField("user_id", LongType()),
    StructField("birthday", DoubleType()),
    StructField("gender", BooleanType()),
    StructField("weights", MapType(StringType(), DoubleType()))
])
def get_user_id(row):
    return int(row.user_id.split(":")[1]), row.birthday, row.gender, row.article_partial
 
读取用户画像HIVE的外部表,构造样本
user_profile_hbase_temp = user_profile_hbase.rdd.map(get_user_id)
user_profile_hbase_schema = ctr.spark.createDataFrame(user_profile_hbase_temp, schema=_schema)
train = user_article_basic.join(user_profile_hbase_schema, on=['user_id'], how='left').drop('channel_id')
 
 

文章频道与向量读取合并,删除无用的特征
- 由于黑马用户信息中,大多数没有相关特征,直接删除
 
# +-------------------+----------+-------+--------+------+--------------------+
# |            user_id|article_id|clicked|birthday|gender|             weights|
# +-------------------+----------+-------+--------+------+--------------------+
# |1106473203766657024|     13778|  false|     0.0|  null|Map(18:text -> 0....|
ctr.spark.sql("use article")
article_vector = ctr.spark.sql("select * from article_vector")
train = train.join(article_vector, on=['article_id'], how='left').drop('birthday').drop('gender')
# +-------------------+-------------------+-------+--------------------+----------+--------------------+
# |         article_id|            user_id|clicked|             weights|channel_id|       articlevector|
# +-------------------+-------------------+-------+--------------------+----------+--------------------+
# |              13401|                 10|  false|Map(18:tp2 -> 0.2...|        18|[0.06157120217893...|
 
 
合并文章画像的权重特征
ctr.spark.sql("use article")
article_profile = ctr.spark.sql("select * from article_profile")
def article_profile_to_feature(row):
    try:
        weights = sorted(row.keywords.values())[:10]
    except Exception as e:
        weights = [0.0] * 10
    return row.article_id, weights
article_profile = article_profile.rdd.map(article_profile_to_feature).toDF(['article_id', 'article_weights'])
article_profile.show()
train = train.join(article_profile, on=['article_id'], how='left')
 
 
 
进行用户的权重特征筛选处理,类型处理
- 用户权重排序筛选,缺失值 
  
- 获取用户对应文章频道号的关键词权重
 - 若无:生成默认值
 
 
train = train.dropna()
columns = ['article_id', 'user_id', 'channel_id', 'articlevector', 'weights', 'clicked']
def feature_preprocess(row):
    from pyspark.ml.linalg import Vectors
    try:
        weights = sorted([row.weights[key] for key in row.weights.keys() if key[:2] == str(row.channel_id)])[
                  :10]
    except Exception:
        weights = [0.0] * 10
    return row.article_id, row.user_id, row.channel_id, Vectors.dense(row.articlevector), Vectors.dense(
        weights), int(row.clicked)
train = train.rdd.map(feature_preprocess).toDF(columns)
 
结果:
 
输入模型的特征格式指定,通过VectorAssembler()收集
cols = ['article_id', 'user_id', 'channel_id', 'articlevector', 'weights', 'article_weights', 'clicked']
train_version_two = VectorAssembler().setInputCols(cols[2:6]).setOutputCol("features").transform(train)
 
合并特征向量(channel_id1个+用户特征权重10个+文章向量100个+文章关键词权重) = 121个特征
 
lr = LogisticRegression()
model = lr.setLabelCol("clicked").setFeaturesCol("features").fit(train_version_two)
model.save("hdfs://hadoop-master:9000/headlines/models/lr.obj")
 
3.6.5 点击率预测结果
使用model模型加载预估
online_model = LogisticRegressionModel.load("hdfs://hadoop-master:9000/headlines/models/CtrLogistic.obj")
res_transfrom = online_model.transform(train_version_two)
res_transfrom.select(["clicked", "probability", "prediction"]).show()
 
probability结果中有对某个文章点击(1为目标)的概率,和不点击(0为目标)的概率
def vector_to_double(row):
    return float(row.clicked), float(row.probability[1]) 
score_label = res_transfrom.select(["clicked", "probability"]).rdd.map(vector_to_double)
 
3.6.6 模型评估-Accuracy与AUC
画出ROC图,使用训练的时候的模型model中会有
import matplotlib.pyplot as plt
plt.figure(figsize=(5,5))
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(model.summary.roc.select('FPR').collect(),
         model.summary.roc.select('TPR').collect())
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.show()
 
结果
 
- from pyspark.mllib.evaluation import BinaryClassificationMetrics
 
from pyspark.mllib.evaluation import BinaryClassificationMetrics
metrics = BinaryClassificationMetrics(score_label)
metrics.areaUnderROC
0.7364334522585716
 
其它方法
from sklearn.metrics import roc_auc_score, accuracy_score
import numpy as np
arr = np.array(score_label.collect())
 
评估AUC与准确率
accuracy_score(arr[:, 0], arr[:, 1].round())
0.9051438053097345
roc_auc_score(arr[:, 0], arr[:, 1])
0.719274521004087
                


















