如何定制一个python的logging Handler

gevent貌似和logging冲害,如何定制一个日志File Handler

上个月用python的gevent协程库写了一个tcp服务。日志库使用python标准日志库logging。一个月后,发现一个偶发的bug。这个bug发生时,用python的标准日志库自带的FileHandler写的日志会发往socket占用的文件描述符fd。结果就是,客户端收到了本要打印到磁盘上的日志。花了不少时间定位排查这个bug,仍然没有结果。我开始怀疑是gevent协程库和python的标准日志库logging有冲突。协程库会错误的把logging打开的文件描述符fd关闭并分配给新创建的socket,于是日志就打印到socket占用的fd了。

后来,写了一个检测脚本用来监控这个情况发生的概率。该脚本每分钟会检查进程占用的所有fd,一但发现用来打印本地日志的文件fd不见了,就重启服务进程。核心代码如下:

lsof -p pid | grep -q{log_file_name}
if [ $? != 0 ]
then
    # 报警代码 ...
fi

发现bug发生的频率大概是一个星期一次。但这毕竟根本上解决不了问题啊,又找不到bug的原因,怎么办?

那就自定义一个File Handler吧!

决定自定义一个File Handler,这个File Handler工作在另外一个单独的进程,这样无论如何日志用的fd都不会跟主进程的各种socket用的fd冲突了吧。
代码如下,主要用到的技术进程通讯和用python的__getattribute__魔法把截获类实例的方法调用。这样,只需要把旧的代码中的File Handler(我用了TimedRotatingFileHandler)换成自定义的Handler Class,所有其他旧代码都无需改动。

#!/usr/bin/env python3

'''
created by kamuszhou*AT*tencent.com, zausiu*AT*gmail.com http://ykyi.net
Nov 20, 2018
'''

import logging
import logging.handlers
from functools import partial
from multiprocessing import Process, Queue


class MySpecialHandler(logging.StreamHandler):
    def __init__(self, *args, **kargs):
        self._q = Queue()
        self._p = Process(target=MySpecialHandler.__run, args=(self._q,))
        self._p.start()
        self._q.put(('__init__', args, kargs))

    def join(self):
        self._p.join()

    def __run(q):
        handler = None
        while True:
            # op, params = q.get()
            method, args, kwargs = q.get()
            if method == '__init__':
                handler = logging.handlers.TimedRotatingFileHandler(*args, **kwargs)
            else:  # 主进程的日志调用实际上被转到这里
                getattr(handler, method)(*args, **kwargs)

    def __proxy(self, name, *args, **kwargs):
        # 把调用的方法名和方法参数通过Queue传到专门的日志进程。
        self._q.put((name, args, kwargs))
        fun = logging.StreamHandler.__getattribute__(self, name)
        # print('call method: ', name, args, kwargs)
        # 如果是setLevel函数,再调用一次父类的方法
        if name in {'setLevel'}:
            return fun(*args, **kwargs)

    def __getattribute__(self, name):
        '''
        Hook大法!截获所有方法
        '''
        attr = logging.StreamHandler.__getattribute__(self, name)
        if hasattr(attr, '__call__') and name not in {'join', 'emit'}:
            return partial(MySpecialHandler.__proxy, self, name)
        else:
            return attr


if __name__ == '__main__':
    handler = MySpecialHandler('/data/tmp/ttt.log', when='D', interval=1, backupCount=90)
    handler.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s: %(levelname)s %(message)s')
    handler.setFormatter(formatter)
    logger = logging.getLogger(__name__)
    logger.propagate = False  # Don't propagate the logging to ROOT
    logger.setLevel(logging.DEBUG)
    logger.addHandler(handler)
    logger.debug('debug testtttttttt')
    logger.info('info testtttttttttt')
    logger.warn('warn testtttttttttt')
    logger.error('error testtttttttt')
    logger.critical('critical testttttt')
    handler.join()

这里自定义的日志进程类只是一个很粗糙的实现,一但跑起来,只能手动杀进程。反正我的使用场景是一个服务。所以,我也懒得加‘优雅的退出代码’。

另外,这里创建自定义日志Handler的父类是StreamHandler,它还有一个重要的函数是emit。如果想定制这么一个Handler,把日志发给kafka而不需要起进程。则子类需要重写父类的emit方法。比如:

    def emit(self, record):
        msg = self.format(record)  # 日志会以record的形式传入该函数,用format把它格式化
        self.kafka_broker.send(msg, self.topic)

Leave a Reply

Your email address will not be published. Required fields are marked *