微信自动回复助手

作者:user 发布日期: 浏览量:51

使用 wxauto

  • 安装
pip install wxauto 

# 如果需要 requests
# pip install requests
  • 在功能实现目录下创建 db.json、users.txt 文件
# db.json
{}
# users.txt
# 一行写一个群名称或者用户名称
Family
张三
xxx
  • 功能实现
import os
import time
import json
import requests
from wxauto import WeChat

# 加载历史消息记录
DB = {}

if os.path.exists("db.json"):
    fp = open("db.json", encoding='utf-8', mode='r')
    DB = json.load(fp)
    fp.close()

# 读取 users.txt 加载监听用户
MONITOR_LIST = []
fp = open("users.txt", encoding="utf-8", mode='r')
for line in fp:
    MONITOR_LIST.append(line)
fp.close()

# 1、打开微信
wx = WeChat()

# 2、监听账户列表(好有或群名称)
for user in MONITOR_LIST:
    # print("user =>", user)
    wx.AddListenChat(who=user)

# 3、监听消息
while True:
    listen_list = wx.GetListenMessage()
    for chat_win, message_list in listen_list.items():
        chat_user = chat_win.who
        # print("chat_user =>", chat_user, message_list)

        # 获取最新的聊天的消息
        interval_list = []
        for msg in message_list:
            # print("msg =>", msg)
            if msg[0] == "Self" or msg[0] == "Time" or msg[0] == "SYS":
                continue
            interval_list.append({"role": "user", "content": msg[1]})

        if not interval_list:
            continue

        # 拼接历史聊天记录
        # for interval in interval_list:
        #     print("interval =>", interval)

        history_list = DB.get(chat_user, [])
        history_list.extend(interval_list)

        # 调用本地 DeepSeek
        res = requests.post(
            url="http://127.0.0.1:9001/chat/",
            json={
                "model": "deepseek-chat",
                "messages": history_list,
                "stream": False
            },
            timeout=30
        )
        data_dict = res.json()
        # print("data_dict =>", data_dict)
        res_msg_dict = data_dict['data']

        # 获取 DeepSeek 回复内容,微信回复
        res_content = res_msg_dict['content']
        chat_win.SendMsg(res_content)
        print("DeepSeek回复 =>", res_content)

        # 保存聊天记录
        history_list.append(res_msg_dict)
        DB[chat_user] = history_list
        with open("db.json", encoding="utf-8", mode='w') as fp:
            json.dump(DB, fp, ensure_ascii=False, indent=2)