python用multiprocessing起子进程,父进程被杀掉时,如何让子进程也自动退出

最近用python3写了一段代码,代码会起一个子进程做一些辅助的事情。但发现父进程被杀死后,这个子进程变成了孤儿进程,仍然在运行。
起进程的代码大概如下:

from multiprocessing import Process, Queue
__q = Queue()
__p = Process(target=__run, args=(__q,))
__p.start()

很不方便,那怎样让父进程退出后,子进程也跟着退出呢。
一番google之后,原来可以用一行代码解决:

from multiprocessing import Process, Queue
__q = Queue()
__p = Process(target=__run, args=(__q,))
__p.daemon = True    # 加上这一行代码
__p.start()

设置子进程的daemon属性为True。这样,当在控制台上用ctrl-c,退发送SIGTERM杀掉父进程后,子进程也会跟着退出,不会成为孤儿进程。
参考python官方的解释:

daemon
The process’s daemon flag, a Boolean value. This must be set before start() is called.
The initial value is inherited from the creating process.
When a process exits, it attempts to terminate all of its daemonic child processes.

官方解释的重点是daemon flag必须在start()前被设置。
还有一个坑人的地方,貌似父进程必须被SIGTERM信号杀死,或者自行自然退出,子进程才会退出,否则还是变成了孤儿进程。在stackoverflow上有人提到了这点:
https://stackoverflow.com/questions/25542110/kill-child-process-if-parent-is-killed-in-python

I want to mention, that if parent process was killed using SIGKILL (kill -9) then daemon processes won’t stop.

我想,这或许应该是由于-9(SIGKILL)会立刻中止父进程,于是父进程没有机会通知子进程,比如用atexit设置的进程退出时调用的hook函数就没机会执行了。

python自制包并用pip免提交到pypi仅安装到本机

不得不说python的自制包的相关工具真是多且混乱,什么setuptools,什么distutils,什么wheel,什么egg!!怎么有这么多啊??

而且我的需求且且是创建一个自制包管理自己常用的代码,也必不想提交到PyPI,仅仅只需要安装到本机就行。

下面就是几个关键步骤。

  1. 文件目录布局
    ├── package1
    │ └──-├── init.py
    | |── mod1.py
    │ └── mod2.py
    ├── setup.py
    |── README.md
  2. 编写setup.py文件,类似如下:
from setuptools import setup, find_packages

setup(
    name="kamustools",
    version="1.0.1",
    author="Kamuszhou",
    author_email="zausiu@gmail.com",
    description="tools used by kamuszhou exclusively.",
    license="BSD",
    keywords="kamus",
    url="https://ykyi.net",
    packages=find_packages(),
    long_description="Long descrition is actually short...",
    classifiers=[
        "Development Status :: 3 - Alpha",
        "Topic :: Utilities",
        "License :: OSI Approved :: BSD License",
    ],
)
  1. python3 setup.py sdist bdist_wheel
  2. pip install ./dist/kamustools-1.0.1.tar.gz

就这四步,其他各种功能,以后要用了再慢慢看吧,实在太庞杂了。满足我自己需求的这四个简单步骤就记录在这里。

如何定制一个python的logging Handler

gevent貌似和logging冲害,如何定制一个日志File Handler

上个月用python的gevent协程库写了一个tcp服务。日志库使用python标准日志库logging。一个月后,发现一个偶发的bug。这个bug发生时,用python的标准日志库自带的FileHandler写的日志会发往socket占用的文件描述符fd。结果就是,客户端收到了本要打印到磁盘上的日志。花了不少时间定位排查这个bug,仍然没有结果。我开始怀疑是gevent协程库和python的标准日志库logging有冲突。协程库会错误的把logging打开的文件描述符fd关闭并分配给新创建的socket,于是日志就打印到socket占用的fd了。

后来,写了一个检测脚本用来监控这个情况发生的概率。该脚本每分钟会检查进程占用的所有fd,一但发现用来打印本地日志的文件fd不见了,就重启服务进程。核心代码如下:

lsof -p pid | grep -q{log_file_name}
if [ $? != 0 ]
then
    # 报警代码 ...
fi

发现bug发生的频率大概是一个星期一次。但这毕竟根本上解决不了问题啊,又找不到bug的原因,怎么办?

那就自定义一个File Handler吧!

决定自定义一个File Handler,这个File Handler工作在另外一个单独的进程,这样无论如何日志用的fd都不会跟主进程的各种socket用的fd冲突了吧。
代码如下,主要用到的技术进程通讯和用python的__getattribute__魔法把截获类实例的方法调用。这样,只需要把旧的代码中的File Handler(我用了TimedRotatingFileHandler)换成自定义的Handler Class,所有其他旧代码都无需改动。

#!/usr/bin/env python3

'''
created by kamuszhou*AT*tencent.com, zausiu*AT*gmail.com http://ykyi.net
Nov 20, 2018
'''

import logging
import logging.handlers
from functools import partial
from multiprocessing import Process, Queue


class MySpecialHandler(logging.StreamHandler):
    def __init__(self, *args, **kargs):
        self._q = Queue()
        self._p = Process(target=MySpecialHandler.__run, args=(self._q,))
        self._p.start()
        self._q.put(('__init__', args, kargs))

    def join(self):
        self._p.join()

    def __run(q):
        handler = None
        while True:
            # op, params = q.get()
            method, args, kwargs = q.get()
            if method == '__init__':
                handler = logging.handlers.TimedRotatingFileHandler(*args, **kwargs)
            else:  # 主进程的日志调用实际上被转到这里
                getattr(handler, method)(*args, **kwargs)

    def __proxy(self, name, *args, **kwargs):
        # 把调用的方法名和方法参数通过Queue传到专门的日志进程。
        self._q.put((name, args, kwargs))
        fun = logging.StreamHandler.__getattribute__(self, name)
        # print('call method: ', name, args, kwargs)
        # 如果是setLevel函数,再调用一次父类的方法
        if name in {'setLevel'}:
            return fun(*args, **kwargs)

    def __getattribute__(self, name):
        '''
        Hook大法!截获所有方法
        '''
        attr = logging.StreamHandler.__getattribute__(self, name)
        if hasattr(attr, '__call__') and name not in {'join', 'emit'}:
            return partial(MySpecialHandler.__proxy, self, name)
        else:
            return attr


if __name__ == '__main__':
    handler = MySpecialHandler('/data/tmp/ttt.log', when='D', interval=1, backupCount=90)
    handler.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s: %(levelname)s %(message)s')
    handler.setFormatter(formatter)
    logger = logging.getLogger(__name__)
    logger.propagate = False  # Don't propagate the logging to ROOT
    logger.setLevel(logging.DEBUG)
    logger.addHandler(handler)
    logger.debug('debug testtttttttt')
    logger.info('info testtttttttttt')
    logger.warn('warn testtttttttttt')
    logger.error('error testtttttttt')
    logger.critical('critical testttttt')
    handler.join()

这里自定义的日志进程类只是一个很粗糙的实现,一但跑起来,只能手动杀进程。反正我的使用场景是一个服务。所以,我也懒得加‘优雅的退出代码’。

另外,这里创建自定义日志Handler的父类是StreamHandler,它还有一个重要的函数是emit。如果想定制这么一个Handler,把日志发给kafka而不需要起进程。则子类需要重写父类的emit方法。比如:

    def emit(self, record):
        msg = self.format(record)  # 日志会以record的形式传入该函数,用format把它格式化
        self.kafka_broker.send(msg, self.topic)