Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Harden shutdown logic #1037

Merged
merged 10 commits into from
Apr 21, 2015
27 changes: 22 additions & 5 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
// let the user know we're going.
fmt.Printf("Initializing daemon...\n")

ctx := req.Context()

go func() {
select {
case <-ctx.Context.Done():
fmt.Println("Received interrupt signal, shutting down...")
}
}()

// first, whether user has provided the initialization flag. we may be
// running in an uninitialized state.
initialize, _, err := req.Option(initOptionKwd).Bool()
Expand Down Expand Up @@ -111,7 +120,6 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
return
}

ctx := req.Context()
cfg, err := ctx.GetConfig()
if err != nil {
res.SetError(err, cmds.ErrNormal)
Expand Down Expand Up @@ -149,7 +157,19 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
res.SetError(err, cmds.ErrNormal)
return
}
defer node.Close()

defer func() {
// We wait for the node to close first, as the node has children
// that it will wait for before closing, such as the API server.
node.Close()

select {
case <-ctx.Context.Done():
log.Info("Gracefully shut down daemon")
default:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the purpose of the default here? Wouldnt we want to wait and print the log message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's based on the possibility that the node/daemon may shut down through other means, eg through an API call or similar that wouldn't cancel the context, only close the node. In that case we wouldn't want to block. But if that's not really an option, or we do want to go through the context for those shutdowns as well, it shouldn't have a default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, thanks!

}
}()

req.Context().ConstructNode = func() (*core.IpfsNode, error) {
return node, nil
}
Expand Down Expand Up @@ -262,9 +282,6 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
corehttp.VersionOption(),
}

// our global interrupt handler can now try to stop the daemon
close(req.Context().InitDone)

if rootRedirect != nil {
opts = append(opts, rootRedirect)
}
Expand Down
121 changes: 63 additions & 58 deletions cmd/ipfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"runtime"
"runtime/pprof"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -39,7 +40,6 @@ const (
cpuProfile = "ipfs.cpuprof"
heapProfile = "ipfs.memprof"
errorFormat = "ERROR: %v\n\n"
shutdownMessage = "Received interrupt signal, shutting down..."
)

type cmdInvocation struct {
Expand Down Expand Up @@ -132,15 +132,10 @@ func main() {
os.Exit(1)
}

// our global interrupt handler may try to stop the daemon
// before the daemon is ready to be stopped; this dirty
// workaround is for the daemon only; other commands are always
// ready to be stopped
if invoc.cmd != daemonCmd {
close(invoc.req.Context().InitDone)
}

// ok, finally, run the command invocation.
intrh, ctx := invoc.SetupInterruptHandler(ctx)
defer intrh.Close()

output, err := invoc.Run(ctx)
if err != nil {
printErr(err)
Expand All @@ -157,8 +152,6 @@ func main() {
}

func (i *cmdInvocation) Run(ctx context.Context) (output io.Reader, err error) {
// setup our global interrupt handler.
i.setupInterruptHandler()

// check if user wants to debug. option OR env var.
debug, _, err := i.req.Option("debug").Bool()
Expand Down Expand Up @@ -226,7 +219,6 @@ func (i *cmdInvocation) Parse(ctx context.Context, args []string) error {
if err != nil {
return err
}
i.req.Context().Context = ctx

repoPath, err := getRepoPath(i.req)
if err != nil {
Expand Down Expand Up @@ -279,6 +271,8 @@ func callCommand(ctx context.Context, req cmds.Request, root *cmds.Command, cmd
log.Info(config.EnvDir, " ", req.Context().ConfigRoot)
var res cmds.Response

req.Context().Context = ctx

details, err := commandDetails(req.Path(), root)
if err != nil {
return nil, err
Expand Down Expand Up @@ -474,59 +468,70 @@ func writeHeapProfileToFile() error {
return pprof.WriteHeapProfile(mprof)
}

// listen for and handle SIGTERM
func (i *cmdInvocation) setupInterruptHandler() {
// IntrHandler helps set up an interrupt handler that can
// be cleanly shut down through the io.Closer interface.
type IntrHandler struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on this struct

sig chan os.Signal
wg sync.WaitGroup
}

func NewIntrHandler() *IntrHandler {
ih := &IntrHandler{}
ih.sig = make(chan os.Signal, 1)
return ih
}

func (ih *IntrHandler) Close() error {
close(ih.sig)
ih.wg.Wait()
return nil
}

ctx := i.req.Context()
sig := allInterruptSignals()

// Handle starts handling the given signals, and will call the handler
// callback function each time a signal is catched. The function is passed
// the number of times the handler has been triggered in total, as
// well as the handler itself, so that the handling logic can use the
// handler's wait group to ensure clean shutdown when Close() is called.
func (ih *IntrHandler) Handle(handler func(count int, ih *IntrHandler), sigs ...os.Signal) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on this method

signal.Notify(ih.sig, sigs...)
ih.wg.Add(1)
go func() {
// first time, try to shut down.

// loop because we may be
for count := 0; ; count++ {
<-sig

// if we're still initializing, cannot use `ctx.GetNode()`
select {
default: // initialization not done
fmt.Println(shutdownMessage)
os.Exit(-1)
case <-ctx.InitDone:
}

switch count {
case 0:
fmt.Println(shutdownMessage)
if ctx.Online {
go func() {
// TODO cancel the command context instead
n, err := ctx.GetNode()
if err != nil {
log.Error(err)
fmt.Println(shutdownMessage)
os.Exit(-1)
}
n.Close()
log.Info("Gracefully shut down.")
}()
} else {
os.Exit(0)
}

default:
fmt.Println("Received another interrupt before graceful shutdown, terminating...")
os.Exit(-1)
}
defer ih.wg.Done()
count := 0
for _ = range ih.sig {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can just say:

for range ih.sig {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A, nice!

count++
handler(count, ih)
}
signal.Stop(ih.sig)
}()
}

func allInterruptSignals() chan os.Signal {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT,
syscall.SIGTERM)
return sigc
func (i *cmdInvocation) SetupInterruptHandler(ctx context.Context) (io.Closer, context.Context) {

intrh := NewIntrHandler()
ctx, cancelFunc := context.WithCancel(ctx)

handlerFunc := func(count int, ih *IntrHandler) {
switch count {
case 1:
fmt.Println() // Prevent un-terminated ^C character in terminal

ih.wg.Add(1)
go func() {
defer ih.wg.Done()
cancelFunc()
}()

default:
fmt.Println("Received another interrupt before graceful shutdown, terminating...")
os.Exit(-1)
}
}

intrh.Handle(handlerFunc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)

return intrh, ctx
}

func profileIfEnabled() (func(), error) {
Expand Down
63 changes: 46 additions & 17 deletions commands/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,44 @@ func (c *client) Send(req cmds.Request) (cmds.Response, error) {
version := config.CurrentVersionNumber
httpReq.Header.Set("User-Agent", fmt.Sprintf("/go-ipfs/%s/", version))

httpRes, err := http.DefaultClient.Do(httpReq)
if err != nil {
return nil, err
}
ec := make(chan error, 1)
rc := make(chan cmds.Response, 1)
dc := req.Context().Context.Done()

// using the overridden JSON encoding in request
res, err := getResponse(httpRes, req)
if err != nil {
return nil, err
}

if found && len(previousUserProvidedEncoding) > 0 {
// reset to user provided encoding after sending request
// NB: if user has provided an encoding but it is the empty string,
// still leave it as JSON.
req.SetOption(cmds.EncShort, previousUserProvidedEncoding)
go func() {
httpRes, err := http.DefaultClient.Do(httpReq)
if err != nil {
ec <- err
return
}
// using the overridden JSON encoding in request
res, err := getResponse(httpRes, req)
if err != nil {
ec <- err
return
}
rc <- res
}()

for {
select {
case <-dc:
log.Debug("Context cancelled, cancelling HTTP request...")
tr := http.DefaultTransport.(*http.Transport)
tr.CancelRequest(httpReq)
dc = nil // Wait for ec or rc
case err := <-ec:
return nil, err
case res := <-rc:
if found && len(previousUserProvidedEncoding) > 0 {
// reset to user provided encoding after sending request
// NB: if user has provided an encoding but it is the empty string,
// still leave it as JSON.
req.SetOption(cmds.EncShort, previousUserProvidedEncoding)
}
return res, nil
}
}

return res, nil
}

func getQuery(req cmds.Request) (string, error) {
Expand Down Expand Up @@ -162,6 +181,8 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
dec := json.NewDecoder(httpRes.Body)
outputType := reflect.TypeOf(req.Command().Type)

ctx := req.Context().Context

for {
var v interface{}
var err error
Expand All @@ -175,6 +196,14 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
fmt.Println(err.Error())
return
}

select {
case <-ctx.Done():
close(outChan)
return
default:
}

if err == io.EOF {
close(outChan)
return
Expand Down
3 changes: 1 addition & 2 deletions commands/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type Context struct {

node *core.IpfsNode
ConstructNode func() (*core.IpfsNode, error)
InitDone chan bool
}

// GetConfig returns the config of the current Command exection
Expand Down Expand Up @@ -288,7 +287,7 @@ func NewRequest(path []string, opts OptMap, args []string, file files.File, cmd
optDefs = make(map[string]Option)
}

ctx := Context{Context: context.TODO(), InitDone: make(chan bool)}
ctx := Context{Context: context.TODO()}
values := make(map[string]interface{})
req := &request{path, opts, args, file, cmd, ctx, optDefs, values, os.Stdin}
err := req.ConvertOptions()
Expand Down
20 changes: 19 additions & 1 deletion core/corehttp/corehttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package corehttp

import (
"net/http"
"time"

manners "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/braintree/manners"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
Expand Down Expand Up @@ -63,6 +64,9 @@ func listenAndServe(node *core.IpfsNode, addr ma.Multiaddr, handler http.Handler
var serverError error
serverExited := make(chan struct{})

node.Children().Add(1)
defer node.Children().Done()

go func() {
serverError = server.ListenAndServe(host, handler)
close(serverExited)
Expand All @@ -75,8 +79,22 @@ func listenAndServe(node *core.IpfsNode, addr ma.Multiaddr, handler http.Handler
// if node being closed before server exits, close server
case <-node.Closing():
log.Infof("server at %s terminating...", addr)

// make sure keep-alive connections do not keep the server running
server.InnerServer.SetKeepAlivesEnabled(false)

server.Shutdown <- true
<-serverExited // now, DO wait until server exit

outer:
for {
// wait until server exits
select {
case <-serverExited:
break outer
case <-time.After(5 * time.Second):
log.Infof("waiting for server at %s to terminate...", addr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may want to log this immediately the first time. perhaps:

  server.InnerServer.SetKeepAlivesEnabled(false)
  server.Shutdown <- true
outer:
  for {
    log.Infof("waiting for server at %s to terminate...", addr)
    // wait until server exits
    select {
    case <-serverExited:
      break outer
    case <-time.After(5 * time.Second):
    }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the log.Infof("server at %s terminating...", addr) line at the start was taking care of that part, so that the timeout logging was more of a "still waiting" kind.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh didnt catch that. sgtm!

}
}
}

log.Infof("server at %s terminated", addr)
Expand Down