pip install wxauto
# 如果需要 requests
# pip install requests
# db.json
{}
# users.txt
# 一行写一个群名称或者用户名称
Family
张三
xxx
import os
import time
import json
import requests
from wxauto import WeChat
# 加载历史消息记录
DB = {}
if os.path.exists("db.json"):
fp = open("db.json", encoding='utf-8', mode='r')
DB = json.load(fp)
fp.close()
# 读取 users.txt 加载监听用户
MONITOR_LIST = []
fp = open("users.txt", encoding="utf-8", mode='r')
for line in fp:
MONITOR_LIST.append(line)
fp.close()
# 1、打开微信
wx = WeChat()
# 2、监听账户列表(好有或群名称)
for user in MONITOR_LIST:
# print("user =>", user)
wx.AddListenChat(who=user)
# 3、监听消息
while True:
listen_list = wx.GetListenMessage()
for chat_win, message_list in listen_list.items():
chat_user = chat_win.who
# print("chat_user =>", chat_user, message_list)
# 获取最新的聊天的消息
interval_list = []
for msg in message_list:
# print("msg =>", msg)
if msg[0] == "Self" or msg[0] == "Time" or msg[0] == "SYS":
continue
interval_list.append({"role": "user", "content": msg[1]})
if not interval_list:
continue
# 拼接历史聊天记录
# for interval in interval_list:
# print("interval =>", interval)
history_list = DB.get(chat_user, [])
history_list.extend(interval_list)
# 调用本地 DeepSeek
res = requests.post(
url="http://127.0.0.1:9001/chat/",
json={
"model": "deepseek-chat",
"messages": history_list,
"stream": False
},
timeout=30
)
data_dict = res.json()
# print("data_dict =>", data_dict)
res_msg_dict = data_dict['data']
# 获取 DeepSeek 回复内容,微信回复
res_content = res_msg_dict['content']
chat_win.SendMsg(res_content)
print("DeepSeek回复 =>", res_content)
# 保存聊天记录
history_list.append(res_msg_dict)
DB[chat_user] = history_list
with open("db.json", encoding="utf-8", mode='w') as fp:
json.dump(DB, fp, ensure_ascii=False, indent=2)