发布时间 : 星期四 文章推荐系统05-用户基于内容召回集更新完毕开始阅读d9a044a65dbfc77da26925c52cc58bd6318693e1
推荐系统05-用户基于内容召回集
1、相关环境 hadoop-2.10.0 hive-3.1.2 hbase-2.2.2 spark-2.4.4 2、相关表结构 HBase
alter 'multiple_recall', {NAME=>'content', TTL=>2592000, VERSIONS=>9999} 3、相关Python实现 # -*- coding:utf-8 -*- import os import sys import numpy as np
from datetime import datetime
BASE_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, os.path.join(BASE_PATH)) print sys.path
from offline import BaseSparkSession default_encoding = 'utf-8'
if sys.getdefaultencoding() != default_encoding: reload(sys)
sys.setdefaultencoding(default_encoding)
os.environ['PYSPARK_PYTHON'] = 'F:\\develop\\python\\Python27\\python.exe' os.environ['HADOOP_HOME'] = 'F:\\develop\\hadoop\\hadoop-2.10.0'
os.environ['HADOOP_CONF_DIR'] = 'F:\\develop\\hadoop\\hadoop-2.10.0-conf' os.environ['SPARK_HOME'] = 'F:\\develop\\spark\\spark-2.4.4-bin-hadoop2.7' class UserRecallBasedContent(BaseSparkSession): def __init__(self):
self.SPARK_APP_NAME = 'user_recall_based_content' self.SPARK_MASTER_URL = 'yarn' self.SPARK_YARN_QUEUE = 'queue3' self.ENABLE_HIVE_SUPPORT = True
self.spark_session = self.create_spark_session() # 生成用户召回结果
def gen_user_recall(self):
self.spark_session.sql(\ # 用户文章点击行为
sql = \= 1\
user_article_click_behaviour_df = self.spark_session.sql(sql) user_article_click_behaviour_df.show() # 用户召回数据存储HBase
def insert_user_recall(partition): sys.path.insert(0,
os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) import json
from recoutils.hbase_utils import HBaseUtils
hbase_utils = HBaseUtils(host=\ for row in partition: # 获取文章相似文章列表 article_id = row.article_id
art_sim_art_result = hbase_utils.read_rows(\ [b\ columns=[b\ if art_sim_art_result: # 排序TOPK
sorted_result = sorted(art_sim_art_result[0][1].items(), key=lambda item: item[1], reverse=True)
art_sim_art_list sorted_result][:5]
print \ # 获取历史召回数据
history_recall_list = hbase_utils.read_cells(\
\
history_recommend_list = []
for history_recall in history_recall_list:
history_recommend_list.extend(eval(history_recall)) # 过滤历史召回数据
recommend_list = list(set(art_sim_art_list) - set(history_recommend_list)) # 存储召回数据和历史召回数据 if recommend_list:
hbase_utils.insert(\
\
{\json.dumps(recommend_list).encode()})
hbase_utils.insert(\
\
{\json.dumps(recommend_list).encode()})
user_article_click_behaviour_df.foreachPartition(insert_user_recall) if __name__ == '__main__':
user_recall_based_content = UserRecallBasedContent() user_recall_based_content.gen_user_recall()
=
[art_sim[0].split(\
for
art_sim
in
\