admin 管理员组文章数量: 894198
Python教你从0搭建微信推送斗鱼直播提醒(单房间简化版)
重新推出重制版,单房间推送提醒版本(后文有扩展多房间思路),适合没有服务器的小伙伴学习使用。
Gitee文档同步更新:
语雀文档更为详细(包含成品案例):?# 《斗鱼直播提醒服务》
当然在学习Python的道路上肯定会困难,没有好的学习资料,怎么去学习呢?
学习Python中有不明白推荐加入交流Q群号:928946953
群里有志同道合的小伙伴,互帮互助, 群里有不错的视频学习教程和PDF!
还有大牛解答!
斗鱼直播提醒 ——简化版
功能介绍:
主播开播后或更换房间标题后推送提醒到微信(单房间)
> 理论上可以配置多个房间,可以达到和斗鱼全房间推送的效果一样。
项目截图:
<ignore_js_op>
[TOC]
一、项目准备
1. 用到的技术内容
- 云函数
- 对象存储(为什么使用对象存储,下文有介绍)
- python(3.6/3.7)
- WxPusher
2. 需要的工具
- 腾讯云(云函数和云对象存储)
二、开通对象存储
这里我们使用对象存储相当于数据库的一个作用,为什么使用对象存储呢?
> 对象存储采用扁平的文件组织方式,所以在文件量上升至千万、亿级别,容量在PB级别的时候,这种文件组织方式下的性能优势就显现出来了,文件不在有目录树深度的问题,历史和近线数据有同样的访问效率。另外,对象存储多采用分布式架构,简而言之就是便宜效率高
官方入门步骤
1. 注册腾讯云
官方的注册方法
2. 开通 COS 服务
在 腾讯云控制台 中,选择【云产品】>【对象存储】,进入 COS 控制台,按照界面提示开通 COS 服务。(如果您已开通,请跳过该步骤。)
3. 创建存储桶
我们需要创建一个用于存放对象的存储桶:
-
在 对象存储控制台 左侧导航栏中单击【存储桶列表】,进入存储桶管理页。
-
单击【创建存储桶】,输入以下配置信息,其他配置保持默认即可。
- 名称:输入存储桶名称。名称设置后不可修改。此处举例输入 examplebucket。
- 所属地域:存储桶所属地域,选择与您业务最近的一个地区,例如广州地域。
- 访问权限:存储桶访问权限,此处我们保持默认为“私有读写”。
-
单击【确定】,即可创建完成。
三、云函数的开发
云函数我们使用腾讯云函数作为示例,阿里云也同理。(阿里云似乎是不允许触发器的触发周期低于1分钟)
1. 注册腾讯云
官方的注册方法
2. 创建云函数
- 登录腾讯云,在云产品里找到云函数,在【函数服务】里新建云函数
- 选择【自定义创建】
- 修改【函数名称】、【地域】、【运行环境(Python3.6)】、【提交方法(在线编辑)】、【执行方法(
index.main_handler
)】暂时不用修改默认代码。 - 点击完成
3. 完善功能
在写代码之前,我们需要用到这几个关键信息:
- appid :你的APPID(账号信息里有)
- secret_id : 你的 SecretId (API密钥管理里获取)
- secret_key :你的 SecretKey(API密钥管理里获取)
- region :存储桶所在的地域
- BUCKET:存储桶名称
- token :没有可为空
- FILE_NAME :你要上传的文件名 如room_info
- WxPusherToken :WxPusher的用户appToken
3.1 引入所需包,完成基本配置
复制代码 隐藏代码
# -*- coding: utf8 -*-
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client
from qcloud_cos_v5 import CosServiceError
from qcloud_cos_v5 import CosClientError
import datetime
import json
import logging
import requests
from urllib.parse import parse_qslogger = logging.getLogger()
logger.setLevel(logging.ERROR) # 默认打印 INFO 级别日志,可根据需要调整为 DEBUG、WARNING、ERROR、CRITICAL 级日志appid = 123123 # Please replace with your APPID. 请替换为您的 APPID
secret_id = u'xxx' # Please replace with your SecretId. 请替换为您的 SecretId
secret_key = u'xxx' # Please replace with your SecretKey. 请替换为您的 SecretKey
region = u'ap-shanghai' # Please replace with the region where COS bucket located. 请替换为您bucket 所在的地域
BUCKET = 'xxx'
FILE_NAME = 'xxx'
token = ''
WxPusherToken = 'xxxxxx'# 配置存储桶
config = CosConfig(Secret_id=secret_id, Secret_key=secret_key, Region=region, Token=token)
client = CosS3Client(config)
注:qcloud_cos_v5 为 COS的SDK
3.2 创建方法 根据房间号获取直播状态
斗鱼第三方API,通过房间号获取房间信息,自行百度获取。
最后返回 房间信息复制代码 隐藏代码def getRoomInfo(): url = '斗鱼官方API/房间号' try: r = requests.get(url, timeout=5) except requests.exceptions.RequestException as e: print(str(e)) return 'timeout' else: if r.status_code == 404: return '404' json_str = json.loads(r.text) if isinstance(json_str, dict): if 'error' in json_str.keys() and json_str['error'] == 0: room_info = json_str['data'] return room_info
3.3 创建方法 发送WxPusher消息
复制代码 隐藏代码
def send_WxPusher_msg(c):headers = {'Content-Type': 'application/json'}url = ''parameter = {"appToken": WxPusherToken,"content": c,"contentType": 1, # 内容类型 1表示文字 2表示html(只发送body标签内部的数据即可,不包括body标签) 3表示markdown "topicIds":[ 你的主题号]}r = requests.post(url=url, headers=headers,data=json.dumps(parameter, ensure_ascii=False).encode('utf-8'))json_str = json.loads(r.text)datas = json_str['data']for data in datas:if data['code'] == 1001:logger.error(data['status'])
3.4 创建方法 更新(上传)文件
设置更新时间字段,用于锁定更新,在指定时间内不允许更新/重复推送(我猜是腾讯云多线程的问题,会导致重复更新、重复推送)
复制代码 隐藏代码def updateLocalInfo(roominfo): DIC = { "room_id": roominfo["room_id"], "room_status": roominfo["room_status"], "room_name": roominfo["room_name"], "update_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") } response = client.put_object( Bucket=BUCKET, Body=DIC, Key=FILE_NAME ) print("更新成功:",response['ETag'])
3.5 创建方法 获取文件
url获取:上传文件后,在存储桶内点击文件的
详情
,基本信息
的对象地址
信息
如复制代码 隐藏代码def getLocalInfo(): url = '文件的对象地址' r = requests.get(url) info_str = r.text params = parse_qs(info_str) result = {key: params[key][0] for key in params} return result
3.6 创建方法 信息整合
复制代码 隐藏代码
def time_last(time_str): # 计算时间差t_r = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")now_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")t_n = datetime.datetime.strptime(now_time_str, "%Y-%m-%d %H:%M:%S")return (t_n - t_r).secondsdef checkRoomInfo():room = getLocalInfo() # 获取历史房间信息curr_info = getRoomInfo() # 获取当前真实房间信息time_l = time_last(room['update_time']) # 计算当前时间和上次更新时间的时间差# 直播开播提醒if curr_info['room_status'] == "1": # 正在直播if room['room_status'] == "2": # 之前状态为 未开播if time_l > 300: # 300秒内无法重复推送content = '【开播】您关注的主播:' + curr_info['owner_name'] + '开始直播啦' + \'\n房间标题:' + curr_info['room_name'] + \'\n正在直播:' + curr_info['cate_name']send_WxPusher_msg(content)updateLocalInfo(roominfo=curr_info)else:logger.error('【开播】房间号:' + room['room_id'] + '\n上次更新:' + room['update_time'] + '\n时间差:' + str(time_l))else:if room['room_status'] == "1": # 关闭了直播updateLocalInfo(roominfo=curr_info)# 更换房间标题提醒if curr_info['room_name'] != room['room_name']:if time_l > 300:content1 = '【更换标题】您关注的主播:' + curr_info['owner_name'] + '改变了房间标题' + \'\n当前房间标题:' + curr_info['room_name']send_WxPusher_msg(content1)updateLocalInfo(roominfo=curr_info)else:logger.error('【更换标题】房间号:' + room['room_id'] + '\n上次更新:' + room['update_time'] + '\n时间差:' + str(time_l))
到这所有工能就基本全部完成啦~
开发扩展思路
多房间支持:支持一个房间就一定支持多个房间。
- 下载文件:文件内容为字典/json数组,每一个字典记录房间的信息。
- 遍历:获取到存储桶内的房间信息后,用斗鱼API遍历得到每一个房间的真实信息。
- 更新/上传:通过比对真实信息,更新字典数组,最后再上传到COS。
本文标签: Python教你从0搭建微信推送斗鱼直播提醒(单房间简化版)
版权声明:本文标题:Python教你从0搭建微信推送斗鱼直播提醒(单房间简化版) 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/jishu/1688190010h190005.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论