Sidekiq 信号处理源码分析

2016-11-21 10:40:10来源:作者:Martin人点击

第七城市
引言

在之前的文章 《Sidekiq任务调度流程分析》中,我们一起仔细分析了 Sidekiq 是如何基于多线程完成队列任务处理以及调度的。我们在之前的分析里,看到了不管是 Sidekiq::Scheduled::Poller还是 Sidekiq::Processor的核心代码里,都会有一个由 @done实例变量控制的循环体:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73def start @thread ||= safe_thread("scheduler") do initial_wait while !@done # 这是 poller 的循环控制 enqueue wait end Sidekiq.logger.info("Scheduler exiting...") endend # https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77def run begin while !@done # 这是我们常说的 worker 循环控制 process_one end @mgr.processor_stopped(self) rescue Sidekiq::Shutdown @mgr.processor_stopped(self) rescue Exception => ex @mgr.processor_died(self, ex) endend

也就是说,这些 @done实例变量决定了 poller线程跟 worker线程是否循环执行?一旦 @done被改为 true,那循环体就不再执行,线程自然也就是退出了。于是,单从这些代码,我们可以断定, Sidekiq 就是通过设置 @done的值来通知一个线程安全退出(graceful exit)的。我们也知道,生产环境中,我们是通过发送信号的方式来告诉 sidekiq 退出或者进入静默(quiet)状态的,那么,这里的 @done是怎么跟信号处理联系起来的呢?这些就是今天这篇文章的重点了!

注意 今天的分析所参考的 sidekiq 的源码对应版本是 4.2.3; 今天所讨论的内容,将主要围绕系统信号处理进行分析,无关细节将不赘述,如有需要,请自行翻阅 sidekiq 源码; 今天的文章跟上篇的《Sidekiq任务调度流程分析》紧密相关,上篇文章介绍的启动过程跟任务调度会帮助这篇文章的理解,如果还没有阅读上篇文章的,建议先阅读后再来阅读这一篇信号处理的文章。 你将了解到什么? Sidekiq 信号处理机制; 为什么重启 Sidekiq 时, USR1信号(即进入 quiet模式)需要尽可能早,而进程的退出重启需要尽可能晚。 从头再来

因为前一篇文章着眼于任务调度,所以略过了其他无关细节,包括信号处理,这篇文章则将镜头对准信号处理,所以让我们从头再来一遍,只是这一次,我们只关心与信号处理有关的代码。

依旧是从 cli.rb文件开始,它是 Sidekiq 核心代码的生命起点,因为 Sidekiq 命令行启动后,它是第一个被执行的代码,Sidekiq 启动过程中调用了 Sidekiq::CLI#run方法:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L49-L106def run boot_system print_banner self_read, self_write = IO.pipe %w(INT TERM USR1 USR2 TTIN).each do |sig| begin trap sig do self_write.puts(sig) end rescue ArgumentError puts "Signal #{sig} not supported" end end # ... other codes begin launcher.run while readable_io = IO.select([self_read]) signal = readable_io.first[0].gets.strip handle_signal(signal) end rescue Interrupt logger.info 'Shutting down' launcher.stop # Explicitly exit so busy Processor threads can't block # process shutdown. logger.info "Bye!" exit(0) end

以上的代码就是整个 Sidekiq 最顶层的信号处理的核心代码了,让我们慢慢分析! 首先, self_read, self_write = IO.pipe创建了一个模拟管道的 IO 对象,并且同时返回这个 管道的一个写端以及一个读端,通过这两端,就可以实现对管道的读写了。需要注意的是, IO.pipe创建的读端在读的时候不会自动生成 EOF符,所以这就要求读时,写端是关闭的,而写时,读端是关闭的,一句话说,就是这样的管道不允许读写端同时打开。关于 IO.pipe还有挺多细节跟需要注意的点,如果还需要了解,请阅读 官方文档。

上面说的管道本质上只是一个 IO 对象而已,暂时不用纠结太多,让我们接着往下读:

%w(INT TERM USR1 USR2 TTIN).each do |sig| begin trap sig do self_write.puts(sig) end rescue ArgumentError puts "Signal #{sig} not supported" endend

这段代码就比较有意思了,最外层遍历了一个系统信号的数组,然后逐个信号进行监听(trap,或者叫捕捉?)。让我们聚焦在 trap方法的调用跟其 block 上,查阅 Ruby 文档,发现 trap是 Signal模块下的一个方法, Signal主要是处理与系统信号有关的任务,然后 trap的作用是:

Specifies the handling of signals. The first parameter is a signal name (a string such as “SIGALRM”, “SIGUSR1”, and so on) or a signal number…

所以,前面的那段代码的意思就很容易理解了,Sidekiq 注册了对 INT、 TERM、 USR1、 USR2以及 TTIN等系统信号的处理,而在进程收到这些信号时,就会执行 self_write.puts(sig),也就是将收到的信号通过之前介绍的管道写端 self_write记录下来。什么?只记录下来,那还得处理啊?!

稍安勿躁,让我们接着往下分析 Sidekiq::CLI#run方法末尾的代码:

begin launcher.run while readable_io = IO.select([self_read]) signal = readable_io.first[0].gets.strip handle_signal(signal) endrescue Interrupt logger.info 'Shutting down' launcher.stop # Explicitly exit so busy Processor threads can't block # process shutdown. logger.info "Bye!" exit(0)end

看到没有,这里有个循环,循环控制条件里, readable_io = IO.select([self_read])是从前面的管道的读端 self_read阻塞地等待信号的到达。对于 IO.select, Ruby 官方文档介绍如下:

Calls select(2) system call. It monitors given arrays of IO objects, waits until one or more of IO objects are ready for reading, are ready for writing, and have pending exceptions respectively, and returns an array that contains arrays of those IO objects.

所以这里就是说 Sidekiq 主线程首先负责执行完其他初始化工作,最后阻塞在信号等待以及处理。在其等到新的信号之后,进入上面代码展示的循环体:

signal = readable_io.first[0].gets.striphandle_signal(signal)

这里语法细节先不深究,我们看下这两行代码第一行是从前面说的管道中读取信号,并且将信号传递给 handle_signal方法,让我们接着往下看 handle_signal方法的定义:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/cli.rb#L125-L153def handle_signal(sig) Sidekiq.logger.debug "Got #{sig} signal" case sig when 'INT' # Handle Ctrl-C in JRuby like MRI # http://jira.codehaus.org/browse/JRUBY-4637 raise Interrupt when 'TERM' # Heroku sends TERM and then waits 10 seconds for process to exit. raise Interrupt when 'USR1' Sidekiq.logger.info "Received USR1, no longer accepting new work" launcher.quiet when 'USR2' if Sidekiq.options[:logfile] Sidekiq.logger.info "Received USR2, reopening log file" Sidekiq::Logging.reopen_logs end when 'TTIN' Thread.list.each do |thread| Sidekiq.logger.warn "Thread TID-#{thread.object_id.to_s(36)} #{thread['label']}" if thread.backtrace Sidekiq.logger.warn thread.backtrace.join("/n") else Sidekiq.logger.warn "<no backtrace available>" end end endend

这里的代码挺长,但是一点都不难理解,我简单解释下就够了。当进程:

收到 TERM或者 INT信号时,直接抛出 Interrupt中断; 收到 USR1信号时,则通知 launcher执行 .quiet方法,Sidekiq 在这里进入 Quiet 模式(怎么进入?); 收到 USR2信号时,重新打开日志; 收到 TTIN信号时,打印所有线程当前正在执行的代码列表。

到此,一个信号从收到被存下,到被取出处理的大致过程就是这样的,至于具体的处理方式,我们下个章节详细展开。现在有一点需要补充的是,上面讲当 Sidekiq 收到 TERM或者 INT信号时,都会抛出 Interrupt中断异常,那这个异常又是如何处理的呢?我们回过头去看刚才最开始的 Sidekiq::CLI#run方法末尾的代码:

begin launcher.run while readable_io = IO.select([self_read]) signal = readable_io.first[0].gets.strip handle_signal(signal) endrescue Interrupt logger.info 'Shutting down' launcher.stop # Explicitly exit so busy Processor threads can't block # process shutdown. logger.info "Bye!" exit(0)end

原来是 run方法在处理信号时,声明了 rescue Interrupt,捕捉了 Interrupt中断异常,并且在异常处理时打印必要日志,同时执行 launcher.stop通知各个线程停止工作,最后调用 exit方法强制退出进程,到此,一个 Sidekiq 进程就彻底退出了。 但是问题又来了,信号处理的大致过程我是知道了,但是具体的 launcher.quiet跟 launcher.stop都干了些什么呢?

Sidekiq::Launcher#quiet 源码探索

老规矩,先上代码:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L32-L36def quiet @done = true @manager.quiet @poller.terminateend

代码只有短短三行。 Launcher 对象首先设置自己的实例变量 @done的值为 true,接着执行 @manager.quiet以及 @poller.terminate。看方法命名上理解,应该是 Luancher 对象又将 quiet 的消息传递给了 @manager即 Sidekiq::Manager对象,同时通知 @poller即 Sidekiq::Scheduled::Poller对象结束工作。那到底是不是真的这样呢?让我们继续深挖!

Sidekiq::Manager#quiet

让我们来看看 Sidekiq::Manager#quiet方法的代码

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L51-L58def quiet return if @done @done = true logger.info { "Terminating quiet workers" } @workers.each { |x| x.terminate } fire_event(:quiet, true)end

上面的代码也很短,首先将 Sidekiq::Manager对象自身的 @done实例变量的值设置为 true,接着对其所管理的每一个 worker,都发出一个 terminate消息。让我们接着往下看 worker 对象( Sidekiq::Processor对象)的 #terminate方法定义:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L42-L46def terminate(wait=false) @done = true return if !@thread @thread.value if waitend

这里的代码依然保持了精短的特点!跟上一层逻辑一样,worker 在处理 terminate时,同样设置自己的 @done实例变量为 true后返回,但是,如果其参数 wait为 true,则会保持主线程等待,直到 @thread线程退出( @thread.value相当于执行 @thread.join并且返回线程的返回值,可参考 Ruby 文档)。

那么,这里就要问了,worker 设置 @done为 true 是要干嘛?这里好像也没有做什么特别的事啊?!勿急,还记得上篇文章介绍 worker 运行时的核心代码吗?

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77def run begin while !@done process_one end @mgr.processor_stopped(self) rescue Sidekiq::Shutdown @mgr.processor_stopped(self) rescue Exception => ex @mgr.processor_died(self, ex) endend

看到了吧, @done变量可是一个重要的开关,当 @done为 false时,worker 一直周而复始地从队列中取任务并且老老实实干活;而当 @done为 true时,worker 在处理完当前的任务之后,便不再执行新的任务,执行 @msg.processor_stopped(self)通知 worker 管理器自己已经退出工作,最终 #run方法返回。由于 #run方法是在独立线程里执行的,所以当 #run方法返回时,其所在的线程自然也就退出了。

那关于 worker 的 quiet 模式进入过程就是这么简单,通过一个共享变量 @done便实现了对工作线程的控制。

Sidekiq::Scheduled::Poller#terminate

前面说到 Sidekiq::Launcher#quiet执行时,先将消息传递给了 worker 管理器,随后执行了 @poller.terminate,那我们来看看 #terminate方法的定义:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L53-L61def terminate @done = true if @thread t = @thread @thread = nil @sleeper << 0 t.value endend

又是如此简短的代码。poller 退出的逻辑跟 worker 退出的逻辑非常一致,都是同样先设置自己的 @done实例变量的值为 true,接着等待线程 @thread退出,最后 poller 返回。

那么,poller 的 @done是不是也是用来控制线程退出呢?答案是肯定的!

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/scheduled.rb#L63-L73def start @thread ||= safe_thread("scheduler") do initial_wait while !@done enqueue wait end Sidekiq.logger.info("Scheduler exiting...") endend

还记得上面这段代码吗? poller 在每次将定时任务压回任务队列之后,等待一定时间,然后重新检查 @done的值,如果为 true,则 poller 直接返回退出,因为 #start方法里的循环体在新线程中执行,当循环结束时,线程自然也退出了。

小结 当 Sidekiq 收到 USR1系统信号时,Sidekiq 主线程向 @launcher发送 quiet消息, @launcher又将消息传递给 @manager,同时向 @poller发出 terminate消息; @manager在收到 quiet消息时,逐一对运行中的 worker 发送 terminate消息,worker 收到消息后,设置自己的 @done为 true,标识不再处理新任务,当前任务处理完成后退出线程; @poller在收到 terminate消息后,也是设置自己的 @done为 true,在本次任务执行完毕后,线程也退出; Sidekiq 进入 quiet 模式之后,所有未处理任务以及新任务都不再处理,直到 sidekiq 的下一次重启。 Sidekiq::Launcher#stop 源码探索

前面介绍的是 Sidekiq 进入 quiet 模式的过程,那 Sidekiq 的停止过程又是怎样的呢?

让我们从 Sidekiq::Launcher#stop方法开始寻找答案:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/launcher.rb#L41-L56def stop deadline = Time.now + @options[:timeout] @done = true @manager.quiet @poller.terminate @manager.stop(deadline) # Requeue everything in case there was a worker who grabbed work while stopped # This call is a no-op in Sidekiq but necessary for Sidekiq Pro. strategy = (@options[:fetch] || Sidekiq::BasicFetch) strategy.bulk_requeue([], @options) clear_heartbeatend

首先, Sidekiq::Launcher对象设定了一个强制退出的 deadline,时间是以当前时间加上配置的 timeout,这个时间 默认是 8 秒。

接着,设定对象本身的 @done变量的值为 true,然后分别对 @manager和 @poller发送 quiet和 terminate消息,这个过程就是我们上面说的 Sidekiq::Launcher#quiet的过程,所以,这里的代码主要是 Sidekiq 要确保退出前已经通知各个线程准备退出。

接下来的代码就比较重要了,我们先看这一行:

@manager.stop(deadline)

在通知完 @manager进入 quiet 模式之后,launcher 向 @manager发送了 stop消息,并且同时传递了 deadline参数。让我们接着继续往下看:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L61-L83PAUSE_TIME = STDOUT.tty? ? 0.1 : 0.5def stop(deadline) quiet fire_event(:shutdown, true) # some of the shutdown events can be async, # we don't have any way to know when they're done but # give them a little time to take effect sleep PAUSE_TIME return if @workers.empty? logger.info { "Pausing to allow workers to finish..." } remaining = deadline - Time.now while remaining > PAUSE_TIME return if @workers.empty? sleep PAUSE_TIME remaining = deadline - Time.now end return if @workers.empty? hard_shutdownend

上面的代码,manager 首先调用了自身的 quiet方法(这里就真的多此一举了,因为外层的 launcher 已经调用过一次了),然后 manager 执行 sleep系统调用进入休眠,持续时间为 0.5 秒,休眠结束后检查所有 worker 是否已经都退出,如果退出,则直接返回,任务提前结束;如果仍有 worker 未退出,则检查当前时间是否接近强制退出的 deadline,如果不是,则重复“检查所有 worker 退出 – 休眠” 的过程,直到 deadline 来临,或者 worker 线程都已经全部退出。如果最后到达 deadline,仍有 worker 线程未退出,则最后执行 hard_shutdown。

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/manager.rb#L108-L135def hard_shutdown cleanup = nil @plock.synchronize do cleanup = @workers.dup end if cleanup.size > 0 jobs = cleanup.map {|p| p.job }.compact # ... other codes strategy = (@options[:fetch] || Sidekiq::BasicFetch) strategy.bulk_requeue(jobs, @options) end cleanup.each do |processor| processor.kill endend

这里 hard_shutdown方法在执行时,首先克隆了当前仍未退出的 @workers列表,接着获取每个 worker 当前正在处理的任务,将这些正在执行中的任务数据通过 strategy.bulk_requeue(jobs, @options)重新写回队列,而最后对每一个 worker 发送 kill消息:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L48-L58def kill(wait=false) @done = true return if !@thread @thread.raise ::Sidekiq::Shutdown @thread.value if waitend

worker 在收到 kill消息时,首先设置自己的 @done为 true,最后向 worker 所关联的线程抛出 ::Sidekiq::Shutdown异常。让我们看看 worker 的线程又是如何处理异常的:

# https://github.com/mperham/sidekiq/blob/5ebd857e3020d55f5c701037c2d7bedf9a18e897/lib/sidekiq/processor.rb#L66-L77def run begin while !@done process_one end @mgr.processor_stopped(self) rescue Sidekiq::Shutdown @mgr.processor_stopped(self) rescue Exception => ex @mgr.processor_died(self, ex) endend

又回到 worker 的 run方法这里,可以看到, run方法捕捉了 Sidekiq::Shutdown异常,并且在处理异常时,只是执行 @mgr.processor_stopped(self),通知 manager 自己已经退出,由于已经跳出正常流程,worker 的 run方法返回,线程也因此得以退出。至此,worker 也都正常退出了。

小结 launcher 在执行退出时,首先按照 quiet 的流程先通知各个线程准备退出; 接着 launcher 向 manager 下达 stop指令,并且给出最后期限( deadline); manager 在给定的限时内,尽可能等待所有 worker 执行完自己退出,对于到达限时仍未退出的 worker,manager 备份了每个 worker 的当前任务,重新加入队列,确保任务至少完整执行一次,然后通过向线程抛出异常的方式,迫使 worker 的线程被动退出。 总结 Sidekiq 简单高效利用了系统信号,并且有比较清晰明了的信号处理过程; Sidekiq 在信号处理的过程中,各个组件协调很有条理,消息逐级传递,而且对被强制停止的任务也有备份方案; 我们可以从 Sidekiq 的系统信号处理机制上借鉴不少东西,比如常用系统信号的分类处理等; 对于多线程的控制,通过共享变量以及异常的方式做到 graceful以及 hard两种方式的退出处理。 还有很多,一百个人心中有一百个哈姆莱特,同样一份代码,不同的人学习阅读,肯定收获不同,你可以在评论区留下你的感悟,跟看到这篇文章的人一起分享! 问题思考 为了尽可能确保所有 Sidekiq 的任务能够正常主动退出,所以在部署脚本中,都会尽可能早地让 Sidekiq 进入 quiet 模式,但是 Sidekiq 的 quiet 是不可逆的,所以一旦部署脚本中途失败,Sidekiq 得不到重启,将会一直保持 quiet 状态,如果长时间未重启,任务就会积压。所以,一般我都会在部署脚本中,额外捕捉部署脚本失败异常,然后主动执行 sidekiq 的重启。 如果你的部署脚本中有涉及 Sidekiq 的,一定要注意检查部署失败是否会影响 Sidekiq 的状态 虽然 Sidekiq 在强制退出当前长时间未退出的任务时,会将 job 的数据写回队列,等待重启后重新执行,那么这里就有个细节需要注意了,就是你的 job 必须是幂等的,否则就不能允许重新执行了。所以,请注意, 如果你有需要长时间运行的 job,请注意检查其幂等性。

好了,今天就写到这吧!仍然挺长一篇,啰嗦了。感谢看到这里!

第七城市

最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台