Skip to content

Commit

Permalink
os/gcron: add graceful shutdown support (#3625)
Browse files Browse the repository at this point in the history
  • Loading branch information
vector233 authored Jun 25, 2024
1 parent f8272bc commit 8824b8b
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 4 deletions.
5 changes: 5 additions & 0 deletions os/gcron/gcron.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,8 @@ func Start(name ...string) {
func Stop(name ...string) {
defaultCron.Stop(name...)
}

// StopGracefully Blocks and waits all current running jobs done.
func StopGracefully() {
defaultCron.StopGracefully()
}
16 changes: 12 additions & 4 deletions os/gcron/gcron_cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package gcron

import (
"context"
"sync"
"time"

"github.com/gogf/gf/v2/container/garray"
Expand All @@ -19,10 +20,11 @@ import (

// Cron stores all the cron job entries.
type Cron struct {
idGen *gtype.Int64 // Used for unique name generation.
status *gtype.Int // Timed task status(0: Not Start; 1: Running; 2: Stopped; -1: Closed)
entries *gmap.StrAnyMap // All timed task entries.
logger glog.ILogger // Logger, it is nil in default.
idGen *gtype.Int64 // Used for unique name generation.
status *gtype.Int // Timed task status(0: Not Start; 1: Running; 2: Stopped; -1: Closed)
entries *gmap.StrAnyMap // All timed task entries.
logger glog.ILogger // Logger, it is nil in default.
jobWaiter sync.WaitGroup // Graceful shutdown when cron jobs are stopped.
}

// New returns a new Cron object with default settings.
Expand Down Expand Up @@ -187,6 +189,12 @@ func (c *Cron) Stop(name ...string) {
}
}

// StopGracefully Blocks and waits all current running jobs done.
func (c *Cron) StopGracefully() {
c.status.Set(StatusStopped)
c.jobWaiter.Wait()
}

// Remove deletes scheduled task which named `name`.
func (c *Cron) Remove(name string) {
if v := c.entries.Get(name); v != nil {
Expand Down
2 changes: 2 additions & 0 deletions os/gcron/gcron_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ func (e *Entry) checkAndRun(ctx context.Context) {
e.Close()

case StatusReady, StatusRunning:
e.cron.jobWaiter.Add(1)
defer func() {
e.cron.jobWaiter.Done()
if exception := recover(); exception != nil {
// Exception caught, it logs the error content to logger in default behavior.
e.logErrorf(ctx,
Expand Down
25 changes: 25 additions & 0 deletions os/gcron/gcron_z_example_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ package gcron_test

import (
"context"
"os"
"os/signal"
"syscall"
"time"

"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gcron"
"github.com/gogf/gf/v2/os/glog"
)
Expand All @@ -21,3 +25,24 @@ func ExampleCronAddSingleton() {
})
select {}
}

func ExampleCronGracefulShutdown() {
_, err := gcron.Add(ctx, "*/2 * * * * *", func(ctx context.Context) {
g.Log().Debug(ctx, "Every 2s job start")
time.Sleep(5 * time.Second)
g.Log().Debug(ctx, "Every 2s job after 5 second end")
}, "MyCronJob")
if err != nil {
panic(err)
}

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

sig := <-quit
glog.Printf(ctx, "Signal received: %s, stopping cron", sig)

glog.Print(ctx, "Waiting for all cron jobs to complete...")
gcron.StopGracefully()
glog.Print(ctx, "All cron jobs completed")
}
42 changes: 42 additions & 0 deletions os/gcron/gcron_z_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ package gcron_test
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"testing"
"time"

"github.com/gogf/gf/v2/container/garray"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gcron"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/test/gtest"
)

Expand Down Expand Up @@ -277,3 +281,41 @@ func TestCron_DelayAddTimes(t *testing.T) {
t.Assert(cron.Size(), 0)
})
}

func TestCron_JobWaiter(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
var err error
s1 := garray.New(true)
s2 := garray.New(true)
_, err = gcron.Add(ctx, "* * * * * *", func(ctx context.Context) {
g.Log().Debug(ctx, "Every second")
s1.Append(struct{}{})
}, "MyFirstCronJob")
t.Assert(err, nil)
_, err = gcron.Add(ctx, "*/2 * * * * *", func(ctx context.Context) {
g.Log().Debug(ctx, "Every 2s job start")
time.Sleep(3 * time.Second)
s2.Append(struct{}{})
g.Log().Debug(ctx, "Every 2s job after 3 second end")
}, "MySecondCronJob")
t.Assert(err, nil)

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

go func() {
time.Sleep(4 * time.Second) // Ensure that the job is triggered twice
glog.Print(ctx, "Sending SIGINT")
quit <- syscall.SIGINT // Send SIGINT
}()

sig := <-quit
glog.Printf(ctx, "Signal received: %s, stopping cron", sig)

glog.Print(ctx, "Waiting for all cron jobs to complete...")
gcron.StopGracefully()
glog.Print(ctx, "All cron jobs completed")
t.Assert(s1.Len(), 4)
t.Assert(s2.Len(), 2)
})
}

0 comments on commit 8824b8b

Please sign in to comment.