Skip to content

Commit

Permalink
Omit SIGTERM to certain processes during stop (#136)
Browse files Browse the repository at this point in the history
* Refactor os/kill.KillTree
* sendSignalsToProcesses: don't interrupt on kill error
* Add os/kill.KillTreeWithExcludes
* Add tests for KillTreeWithExcludes and KillTree
* Add configuration of SIGTERM excludes via env var
* Fix KillTree tests on Linux platforms
* Set go 1.13.1 on travis
* Add README entry about kill tree exclude, remove unused wrapWithStopAndCont parameter
* Use good old KillTree for SIGTERMs when ALLEGRO_EXECUTOR_SIGTERM_EXCLUDE_PROCESSES not set
* Don't exclude processes from kill for which name could not be read
* Bring back 'pgrep -g' requirement to readme
  • Loading branch information
pbetkier authored and AlfredBroda committed Dec 4, 2019
1 parent f073a42 commit a178620
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 30 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: go
go:
- 1.10.x
- 1.13.1

script:
- make
Expand All @@ -19,4 +19,4 @@ deploy:
file_glob: true
file: target/*.zip
on:
tags: true
tags: true
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ It is performed in the following steps:
3. Wait `KillPolicyGracePeriod` (can be overridden with Task Kill Policy Grace Period).
4. Sent SIGKILL to process tree.

Executor can be configured to exclude certain processes from SIGTERM signal. Provide
process names to exclude in `ALLEGRO_EXECUTOR_SIGTERM_EXCLUDE_PROCESSES` environment variable
as a comma-separated string. This feature requires `pgrep -g` to be available on the machine.

## Log scraping

By default executor forwards service stdout/stderr to its own standard streams.
Expand Down Expand Up @@ -188,4 +192,4 @@ Mesos Executor is distributed under the [Apache 2.0 License](LICENSE).
[10]: https://mesos.apache.org/documentation/latest/mesos-containerizer/
[11]: https://www.elastic.co/products/logstash
[12]: https://brandur.org/logfmt
[14]: https://godoc.org/github.com/allegro/mesos-executor/servicelog
[14]: https://godoc.org/github.com/allegro/mesos-executor/servicelog
6 changes: 3 additions & 3 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
type Command interface {
Start() error
Wait() <-chan TaskExitState
Stop(gracePeriod time.Duration)
Stop(gracePeriod time.Duration, sigtermExcludeProcesses []string)
}

type cancellableCommand struct {
Expand Down Expand Up @@ -102,13 +102,13 @@ func (c *cancellableCommand) waitForCommand() {
close(c.doneChan)
}

func (c *cancellableCommand) Stop(gracePeriod time.Duration) {
func (c *cancellableCommand) Stop(gracePeriod time.Duration, sigtermExcludeProcesses []string) {
// Return if Stop was already called.
if c.killing {
return
}
c.killing = true
err := osutil.KillTree(syscall.SIGTERM, int32(c.cmd.Process.Pid))
err := osutil.KillTreeWithExcludes(syscall.SIGTERM, int32(c.cmd.Process.Pid), sigtermExcludeProcesses)
if err != nil {
log.WithError(err).Errorf("There was a problem with sending %s to %d children", syscall.SIGTERM, c.cmd.Process.Pid)
return
Expand Down
5 changes: 4 additions & 1 deletion executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type Config struct {
// Range in which certificate will be considered as expired. Used to
// prevent shutdown of all tasks at once.
RandomExpirationRange time.Duration `default:"3h" split_words:"true"`

// SigtermExcludeProcesses specifies process names to omit when sending SIGTERM to process tree during shutdown
SigtermExcludeProcesses []string `split_words:"true"`
}

var errMustAbort = errors.New("received abort signal from mesos, will attempt to re-subscribe")
Expand Down Expand Up @@ -522,7 +525,7 @@ func (e *Executor) shutDown(taskInfo *mesos.TaskInfo, cmd Command) {
TaskInfo: mesosutils.TaskInfo{TaskInfo: *taskInfo},
}
_, _ = e.hookManager.HandleEvent(beforeTerminateEvent, true) // ignore errors here, so every hook will have a chance to be called
cmd.Stop(gracePeriod) // blocking call
cmd.Stop(gracePeriod, e.config.SigtermExcludeProcesses) // blocking call
}

func taskExitToEvent(exitStateChan <-chan TaskExitState, events chan<- Event) {
Expand Down
138 changes: 124 additions & 14 deletions os/kill.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
package os

import (
"bufio"
"fmt"
"os/exec"
"strconv"
"strings"
"syscall"

"github.com/shirou/gopsutil/process"
Expand All @@ -13,26 +17,36 @@ import (
// KillTree sends signal to whole process tree, starting from given pid as root.
// Order of signalling in process tree is undefined.
func KillTree(signal syscall.Signal, pid int32) error {
proc, err := process.NewProcess(pid)
pgids, err := getProcessGroupsInTree(pid)
if err != nil {
return err
}

signals := wrapWithStopAndCont(signal)
return sendSignalsToProcessGroups(signals, pgids)
}

func getProcessGroupsInTree(pid int32) ([]int, error) {
proc, err := process.NewProcess(pid)
if err != nil {
return nil, err
}

processes := getAllChildren(proc)
processes = append(processes, proc)

curPid := syscall.Getpid()
curPgid, err := syscall.Getpgid(curPid)
if err != nil {
return fmt.Errorf("error getting current process pgid: %s", err)
return nil, fmt.Errorf("error getting current process pgid: %s", err)
}

var pgids []int
pgidsSeen := map[int]bool{}
for _, proc := range processes {
pgid, err := syscall.Getpgid(int(proc.Pid))
if err != nil {
return fmt.Errorf("error getting child process pgid: %s", err)
return nil, fmt.Errorf("error getting child process pgid: %s", err)
}
if pgid == curPgid {
continue
Expand All @@ -42,8 +56,7 @@ func KillTree(signal syscall.Signal, pid int32) error {
pgidsSeen[pgid] = true
}
}

return wrapWithStopAndCont(signal, pgids)
return pgids, nil
}

// getAllChildren gets whole descendants tree of given process. Order of returned
Expand All @@ -61,27 +74,124 @@ func getAllChildren(proc *process.Process) []*process.Process {
// wrapWithStopAndCont wraps original process tree signal sending with SIGSTOP and
// SIGCONT to prevent processes from forking during termination, so we will not
// have orphaned processes after.
func wrapWithStopAndCont(signal syscall.Signal, pgids []int) error {
func wrapWithStopAndCont(signal syscall.Signal) []syscall.Signal {
signals := []syscall.Signal{syscall.SIGSTOP, signal}
if signal != syscall.SIGKILL { // no point in sending any signal after SIGKILL
signals = append(signals, syscall.SIGCONT)
}
return signals
}

for _, currentSignal := range signals {
if err := sendSignalToProcessGroups(currentSignal, pgids); err != nil {
return err
func sendSignalsToProcessGroups(signals []syscall.Signal, pgids []int) error {
for _, signal := range signals {
for _, pgid := range pgids {
log.Infof("Sending signal %s to pgid %d", signal, pgid)
err := syscall.Kill(-pgid, signal)
if err != nil {
log.Infof("Error sending signal to pgid %d: %s", pgid, err)
}
}
}
return nil
}

func sendSignalToProcessGroups(signal syscall.Signal, pgids []int) error {
// KillTreeWithExcludes sends signal to whole process tree, starting from given pid as root.
// Omits processes matching names specified in processesToExclude. Kills using pids instead of pgids.
func KillTreeWithExcludes(signal syscall.Signal, pid int32, processesToExclude []string) error {
log.Infof("Will send signal %s to tree starting from %d", signal.String(), pid)

if len(processesToExclude) == 0 {
return KillTree(signal, pid)
}

pgids, err := getProcessGroupsInTree(pid)
if err != nil {
return err
}

log.Infof("Found process groups: %v", pgids)

pids, err := findProcessesInGroups(pgids)
if err != nil {
return err
}

log.Infof("Found processes in groups: %v", pids)

pids, err = excludeProcesses(pids, processesToExclude)
if err != nil {
return err
}

signals := wrapWithStopAndCont(signal)
return sendSignalsToProcesses(signals, pids)
}

func findProcessesInGroups(pgids []int) ([]int, error) {
var pids []int
for _, pgid := range pgids {
log.Infof("Sending signal %s to pgid %d", signal, pgid)
err := syscall.Kill(-pgid, signal)
cmd := exec.Command("pgrep", "-g", strconv.Itoa(pgid))
output, err := cmd.CombinedOutput()
if err != nil {
return nil, fmt.Errorf("'pgrep -g %d' failed: %s", pgid, err)
}
if !cmd.ProcessState.Success() {
return nil, fmt.Errorf("'pgrep -g %d' failed, output was: '%s'", pgid, output)
}

scanner := bufio.NewScanner(strings.NewReader(string(output)))
for scanner.Scan() {
pid, err := strconv.Atoi(scanner.Text())
if err != nil {
return nil, fmt.Errorf("cannot convert pgrep output: %s. Output was '%s'", err, output)
}
pids = append(pids, pid)
}
}

return pids, nil
}

func excludeProcesses(pids []int, processesToExclude []string) ([]int, error) {
var retainedPids []int
for _, pid := range pids {
proc, err := process.NewProcess(int32(pid))
if err != nil {
return nil, err
}

name, err := proc.Name()
if err != nil {
log.Infof("Error sending signal to pgid %d: %s", pgid, err)
return err
log.Infof("Could not get process name of %d, will not exclude it from kill", pid)
} else if isExcluded(name, processesToExclude) {
log.Infof("Excluding process %s with pid %d from kill", name, pid)
continue
}

retainedPids = append(retainedPids, pid)
}

return retainedPids, nil
}

func isExcluded(name string, namesToExclude []string) bool {
for _, exclude := range namesToExclude {
if strings.ToLower(name) == strings.ToLower(exclude) {
return true
}
}

return false
}

func sendSignalsToProcesses(signals []syscall.Signal, pids []int) error {
for _, signal := range signals {
for _, pid := range pids {
log.Infof("Sending signal %s to pid %d", signal, pid)
err := syscall.Kill(pid, signal)
if err != nil {
log.Infof("Error sending signal to pid %d: %s", pid, err)
}
}
}
return nil
Expand Down
Loading

0 comments on commit a178620

Please sign in to comment.