COW微信机器人项目的运行中我遇到了许多问题,需要理清程序的运行原理和执行过程,下面分享我的总结:
app.py 调用微信渠道 channel.startup() ->
wechat_channel.py 重复运行 itchat.run() -> 修饰函数监听 @itchat.msg_register handler_single_msg handler_group_msg
-> handle_single(msg) handle_group(msg) 列入任务序列 self.produce(context) ->
chat_channel.py threading.Thread(target=self.consume) 定义 produce() -> 取出任务序列 consume() -> self._generate_reply(context) ->
bridge.py 桥接机器人处理程序 fetch_reply_content() -> get_bot(“chat”) -> 创造机器人 create_bot() ->
bot_factory.py 定义 def create_bot(bot_type) ->
chat_gpt_bot.py 调用GPT模型处理函数 ChatGPTBot() -> def reply(self, query, context):
缩进对齐的均为同一程序,运行从上至下执行,每段首为一个程序文件名字,后面为本程序文件调用的函数或定义的函数。
app.py
start_channel(channel_name)
def start_channel(channel_name: str):
channel = channel_factory.create_channel(channel_name)
PluginManager().load_plugins()
channel.startup()
channel_factory.py
def create_channel(channel_type) -> Channel:
WechatChannel()
wechat_channel.py
def startup(self):
itchat.auto_login()
itchat.run()
register.py
def run(self, debug=True, blockThread=True):
def reply_fn():
try:
while self.alive:
self.configured_reply()
def configured_reply(self):
''' determine the type of message and reply if its method is defined
however, I use a strange way to determine whether a msg is from massive platform
I haven't found a better solution here
The main problem I'm worrying about is the mismatching of new friends added on phone
If you have any good idea, pleeeease report an issue. I will be more than grateful.
'''
try:
# logger.info('msg: %s', msg)
# print(f"msg: {msg}")
msg = self.msgList.get(timeout=5)
except Queue.Empty:
pass
else:
if isinstance(msg['User'], templates.User):
replyFn = self.functionDict['FriendChat'].get(msg['Type'])
if replyFn is None:
r = None
else:
try:
r = replyFn(msg)
if r is not None:
self.send(r, msg.get('FromUserName'))
queue.py
def get(self, block=True, timeout=None):
item = self._get()
messagequeue.py
def __getitem__(self, value):
return super(Message, self).__getitem__(value)
register.py
def msg_register(self, msgType, isFriendChat=False, isGroupChat=False, isMpChat=False):
''' a decorator constructor
return a specific decorator based on information given '''
if not (isinstance(msgType, list) or isinstance(msgType, tuple)):
msgType = [msgType]
def _msg_register(fn):
for _msgType in msgType:
if isFriendChat:
self.functionDict['FriendChat'][_msgType] = fn
return fn
return _msg_register
wechat_channel.py
@itchat.msg_register([TEXT, VOICE, PICTURE, NOTE, ATTACHMENT, SHARING])
def handler_single_msg(msg):
try:
cmsg = WechatMessage(msg, False)
except NotImplementedError as e:
logger.debug("[WX]single message {} skipped: {}".format(msg["MsgId"], e))
return None
WechatChannel().handle_single(cmsg)
return None
def handler_single_msg(msg):
cmsg = WechatMessage(msg, False)
WechatChannel().handle_single(cmsg)
class WechatChannel(ChatChannel):
def handle_single(self, cmsg: ChatMessage):
self.produce(context)
wechat_message.py
class WechatMessage(ChatMessage):
def __init__(self, itchat_msg, is_group=False):
super().__init__(itchat_msg)
self.msg_id = itchat_msg["MsgId"]
self.create_time = itchat_msg["CreateTime"]
self.is_group = is_group
if itchat_msg["Type"] == TEXT:
self.ctype = ContextType.TEXT
self.content = itchat_msg["Text"]
chat_channel.py
class ChatChannel(Channel):
def __init__(self):
_thread = threading.Thread(target=self.consume)
def produce(self, context: Context):
self.sessions[session_id][0].put(context)
def consume(self):
self.sessions[session_id][0].put(context)
future: Future = handler_pool.submit(self._handle, context)
def _handle(self, context: Context):
reply = self._generate_reply(context)
reply = self._decorate_reply(context, reply)
# reply的发送步骤
self._send_reply(context, reply)
def _generate_reply(self, context: Context, reply: Reply = Reply()) -> Reply:
reply = super().build_reply_content(context.content, context)
def build_reply_content(self, query, context: Context = None) -> Reply:
return Bridge().fetch_reply_content(query, context)
bridge.py
def fetch_reply_content(self, query, context: Context) -> Reply:
return self.get_bot("chat").reply(query, context)
def get_bot(self, typename):
self.bots[typename] = create_bot(self.btype[typename])
bot_factory.py
def create_bot(bot_type):
ChatGPTBot()
OpenAIBot()
chat_gpt_bot.py
class ChatGPTBot(Bot, OpenAIImage):
def __init__(self):
self.sessions = SessionManager(ChatGPTSession, model=conf().get("model") or "gpt-3.5-turbo")
self.args = {
"model": conf().get("model") or "gpt-3.5-turbo", # 对话模型的名称
"temperature": conf().get("temperature", 0.9), # 值在[0,1]之间,越大表示回复越具有不确定性
# "max_tokens":4096, # 回复最大的字符数
"top_p": conf().get("top_p", 1),
"frequency_penalty": conf().get("frequency_penalty", 0.0), # [-2,2]之间,该值越大则更倾向于产生不同的内容
"presence_penalty": conf().get("presence_penalty", 0.0), # [-2,2]之间,该值越大则更倾向于产生不同的内容
"request_timeout": conf().get("request_timeout", None), # 请求超时时间,openai接口默认设置为600,对于难问题一般需要较长时间
"timeout": conf().get("request_timeout", None), # 重试超时时间,在这个时间内,将会自动重试
}
def reply(self, query, context=None):
reply_content = self.reply_text(session, api_key, args=new_args)
self.sessions.session_reply(reply_content["content"], session_id, reply_content["total_tokens"])
reply = Reply(ReplyType.TEXT, reply_content["content"])