鳥の巣箱

ネトゲしたり、機械いじったり、ソフト書いたり、山篭ったり、ギャンブルしたりする人

Pythonで終了処理を実行させる

一つの問題です。
よくあるRaspberry PiのGPIOを制御するプログラムです。
以下のようなソースコードがあって

import RPi.GPIO as GPIO
import time

GPIO.setmode(GPIO.BOARD)
GPIO.setup(13, GPIO.OUT)

while True:
    GPIO.output(13, True)
    time.sleep(1)
    GPIO.output(13, False)
    time.sleep(1)
    
GPIO.cleanup()

これを実行中に、Ctrl-Cやkillなどで停止させた場合、GPIO.cleanup()は実行されるでしょうか。



正解は、実行されません。
簡単な話で、while True:で無限ループしている途中で強制終了されるので、当然それ以下の処理は実行されません。

RPi.GPIOの場合、以前のプロセスがcleanupメソッドを実行せずにsetupメソッドを実行した場合、以下のエラーを発生させるようになっています。

RuntimeWarning:This channel is already in use,continuing anyway.Use GPIO.setwarning'(Flase) to disable warnings.

このメッセージを読むと、回避する方法として先頭に

GPIO.setwarning(Flase)

を追記する方法が推奨されていますが、この方法を使うと他のエラーメッセージも非表示となってしまうため、個人的にあまりおススメできません。

そもそも本来実行すべきcleanupメソッドが実行されない問題を先送りするべきではないですね。
RPi.GPIOのcleanup問題くらいだと、それほど重大な問題とはなりにくいですが、理想としては終了処理を実行させることです。


では、どうするのか
以下のクラスを用意しました。

import signal
import sys

class ProcessDestructor():
	def __init__(self, callback):
		self.callback	= callback
		signal.signal(signal.SIGINT, self.__CatcheSignal)
		signal.signal(signal.SIGTERM, self.__CatcheSignal)

	def __CatcheSignal(self, signum, frame):
		self.SignalIgnoreDestruction()
		self.callback()
		self.SignalDefaultDestruction()
		sys.exit()

	@classmethod
	def SignalIgnoreTERM(self):
		signal.signal(signal.SIGTERM, signal.SIG_IGN)

	@classmethod
	def SignalIgnoreINT(self):
		signal.signal(signal.SIGINT, signal.SIG_IGN)

	@classmethod
	def SignalIgnoreDestruction(self):
		self.SignalIgnoreTERM()
		self.SignalIgnoreINT()

	@classmethod
	def SignalDefaultTERM(self):
		signal.signal(signal.SIGTERM, signal.SIG_DFL)

	@classmethod
	def SignalDefaultINT(self):
		signal.signal(signal.SIGINT, signal.SIG_DFL)

	@classmethod
	def SignalDefaultDestruction(self):
		self.SignalDefaultTERM()
		self.SignalDefaultINT()

このクラスは、Ctrl-CやKillコマンドなどのシグナルをプロセスが受け取ったとき、指定されたコールバック関数を実行します。


簡単な使い方を紹介します。

import RPi.GPIO as GPIO
import time
import TsProcessDestructor
import sys

def destructor():
    print('call destructor')
    GPIO.cleanup()    

def main():
    ProcessDestructor   = TsProcessDestructor.ProcessDestructor(destructor)
    GPIO.setmode(GPIO.BOARD)
    GPIO.setup(13, GPIO.OUT)

    while True:
        GPIO.output(13, True)
        time.sleep(1)
        GPIO.output(13, False)
        time.sleep(1)

if __name__ == "__main__":
    sys.exit(main())

mainでProcessDestructorクラスのインスタンスを生成します。生成時にはコールバック関数であるdestructorを引数にしています。
これで、終了シグナル検知時にdestructorが呼び出されるようになります。

実際に実行してCtrl-Cをすると、コンソールに「call destructor」が表示されるので、実行されていることがわかります。

これを利用すれば、main関数でループ処理などをしていても、終了時にきちんと終了処理をすることができるので、より安全にアプリを停止させることができます。

TsProcessDestrucor内のクラスメソッド群は、シグナルを実行するか無視するかを切り替えられるものです。
必要に応じて使いましょう。

2021/04/27 追記 multiprocessingでの挙動

このクラスですが、multiprocessingを使っている場合、すべてのプロセスに対してシグナルが発せられるのでプロセスの分だけコールバックが呼び出されてしまいます。
2回目のコールバックではAttribute Errorなどが発生する可能性もあるので、1度だけの発生にしたいところ。
理想はProcessDestrucotrインスタンスの生成したプロセスに対してのみコールバックを呼び出すことなので、少々変更を入れます。

import signal
import sys
from TsMessages import PrintDebugMess, DebugStatus
import os

class ProcessDestructor():
	def __init__(self, callback):
		self.callback	= callback
		self.__pid		= os.getpid()
		signal.signal(signal.SIGINT, self.__CatcheSignal)
		signal.signal(signal.SIGTERM, self.__CatcheSignal)

	def __CatcheSignal(self, signum, frame):
		if self.__pid == os.getpid():
			self.SignalIgnoreDestruction()
			self.callback()
			self.SignalDefaultDestruction()
			sys.exit()

	@classmethod
	def SignalIgnoreTERM(self):
		signal.signal(signal.SIGTERM, signal.SIG_IGN)

	@classmethod
	def SignalIgnoreINT(self):
		signal.signal(signal.SIGINT, signal.SIG_IGN)

	@classmethod
	def SignalIgnoreDestruction(self):
		self.SignalIgnoreTERM()
		self.SignalIgnoreINT()

	@classmethod
	def SignalDefaultTERM(self):
		signal.signal(signal.SIGTERM, signal.SIG_DFL)

	@classmethod
	def SignalDefaultINT(self):
		signal.signal(signal.SIGINT, signal.SIG_DFL)

	@classmethod
	def SignalDefaultDestruction(self):
		self.SignalDefaultTERM()
		self.SignalDefaultINT()

クラスのコンストラクタで生成元のプロセスID(PID)を取得します。
プロセスの分だけ__CatcheSignalが呼び出されてしまいますが、
生成元のPIDと、シグナルによって呼ばれたプロセスのPIDを比較して一致した場合のみコールバックを呼ぶという処理に変更しています。
これでマルチプロセスにも対応できます。