Skip to content

Commit

Permalink
Merge pull request #19 from AlexStocks/add_metrix
Browse files Browse the repository at this point in the history
add lock
  • Loading branch information
chejinge authored Jun 4, 2024
2 parents 12a3ccc + 4202962 commit 3651017
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 77 deletions.
18 changes: 8 additions & 10 deletions codis/pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,16 +786,14 @@ func (p *Proxy) Stats(flags StatsFlags) *Stats {
}

func (s *Proxy) CmdInfo(interval int64) *CmdInfo {
cmdInfo := &CmdInfo{}

cmdInfo.Total = OpTotal()
cmdInfo.Fails = OpFails()
cmdInfo.Redis.Errors = OpRedisErrors()
cmdInfo.QPS = OpQPS()

cmdInfo.Cmd = GetOpStatsByInterval(interval)

return cmdInfo
info := &CmdInfo{
Total: OpTotal(),
Fails: OpFails(),
QPS: OpQPS(),
Cmd: GetOpStatsByInterval(interval),
}
info.Redis.Errors = OpRedisErrors()
return info
}

func StatsSetLogSlowerThan(ms int64) {
Expand Down
77 changes: 44 additions & 33 deletions codis/pkg/proxy/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,46 +690,57 @@ func (s *Session) handleRequestSlotsMapping(r *Request, d *Router) error {
}

func (s *Session) getOpStats(opstr string, create bool) *opStats {
cmdstats.RLock()
e := s.stats.opmap[opstr]
cmdstats.RUnlock()
if e != nil || !create {
return e
}
cmdstats.Lock()
defer cmdstats.Unlock()
e = cmdstats.opmap[opstr]
if e == nil {
e = &opStats{opstr: opstr}
for i := 0; i < IntervalNum; i++ {
e.delayInfo[i] = &delayInfo{interval: IntervalMark[i]}
}
s.stats.opmap[opstr] = e
}
return e
var (
ok bool
stat *opStats
)

func() {
cmdstats.opmapLock.RLock()
defer cmdstats.opmapLock.RUnlock()
stat, ok = s.stats.opmap[opstr]
}()
if (ok && stat != nil) || !create {
return stat
}
cmdstats.opmapLock.Lock()
defer cmdstats.opmapLock.Unlock()
stat, ok = cmdstats.opmap[opstr]
if ok && stat != nil {
return stat
}
stat = &opStats{opstr: opstr}
for i := 0; i < IntervalNum; i++ {
stat.delayInfo[i] = &delayInfo{interval: IntervalMark[i]}
}
s.stats.opmap[opstr] = stat

return stat
}

func (s *Session) incrOpStats(r *Request, t redis.RespType) {

if r == nil {
return
}
responseTime := time.Now().UnixNano() - r.ReceiveTime
var e *opStats
e = s.stats.opmap[r.OpStr]
if e == nil {
e = getOpStats(r.OpStr, true)
s.stats.opmap[r.OpStr] = e
}
e.incrOpStats(responseTime, redis.RespType(t))
e = s.stats.opmap["ALL"]
if e == nil {
e = getOpStats("ALL", true)
s.stats.opmap["ALL"] = e
}
e.incrOpStats(responseTime, redis.RespType(t))
e.calls.Incr()
e.nsecs.Add(time.Now().UnixNano() - r.ReceiveTime)
var (
ok bool
stat *opStats
)
stat, ok = s.stats.opmap[r.OpStr]
if !ok || stat == nil {
stat = getOpStats(r.OpStr, true)
s.stats.opmap[r.OpStr] = stat
}
stat.incrOpStats(responseTime, redis.RespType(t))
stat, ok = s.stats.opmap["ALL"]
if !ok || stat == nil {
stat = getOpStats("ALL", true)
s.stats.opmap["ALL"] = stat
}
stat.incrOpStats(responseTime, redis.RespType(t))
stat.calls.Incr()
stat.nsecs.Add(time.Now().UnixNano() - r.ReceiveTime)
switch t {
case redis.TypeError:
incrOpRedisErrors()
Expand Down
75 changes: 41 additions & 34 deletions codis/pkg/proxy/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ var (
)

var cmdstats struct {
sync.RWMutex //Lock only for opmap.
opmap map[string]*opStats
opmapLock sync.RWMutex //Lock only for opmap.
opmap map[string]*opStats

total atomic2.Int64
fails atomic2.Int64
Expand All @@ -126,7 +126,7 @@ var cmdstats struct {
}

qps atomic2.Int64
tpdelay [TPMaxNum]int64 //us
tpDelay [TPMaxNum]int64 //us
refreshPeriod atomic2.Int64
logSlowerThan atomic2.Int64
autoSetSlowFlag atomic2.Bool
Expand Down Expand Up @@ -184,11 +184,11 @@ func init() {
//init tp delay array
for i := 0; i < TPMaxNum; i++ {
if i < TPFirstGradeSize {
cmdstats.tpdelay[i] = int64(i+1) * TPFirstGrade
cmdstats.tpDelay[i] = int64(i+1) * TPFirstGrade
} else if i < TPFirstGradeSize+TPSecondGradeSize {
cmdstats.tpdelay[i] = TPFirstGradeSize*TPFirstGrade + int64(i-TPFirstGradeSize+1)*TPSecondGrade
cmdstats.tpDelay[i] = TPFirstGradeSize*TPFirstGrade + int64(i-TPFirstGradeSize+1)*TPSecondGrade
} else {
cmdstats.tpdelay[i] = TPFirstGradeSize*TPFirstGrade + TPSecondGradeSize*TPSecondGrade + int64(i-TPFirstGradeSize-TPSecondGradeSize+1)*TPThirdGrade
cmdstats.tpDelay[i] = TPFirstGradeSize*TPFirstGrade + TPSecondGradeSize*TPSecondGrade + int64(i-TPFirstGradeSize-TPSecondGradeSize+1)*TPThirdGrade
}
}

Expand All @@ -206,17 +206,19 @@ func init() {
normalized := math.Max(0, float64(delta)) * float64(time.Second) / float64(time.Since(start))
cmdstats.qps.Set(int64(normalized + 0.5))

cmdstats.RLock()
for i := 0; i < IntervalNum; i++ {
/*if int64(float64(time.Since(LastRefreshTime[i]))/float64(time.Second)) < IntervalMark[i] {
continue
}*/
for _, v := range cmdstats.opmap {
v.RefreshOpStats(i)
func() {
cmdstats.opmapLock.RLock()
defer cmdstats.opmapLock.RUnlock()
for i := 0; i < IntervalNum; i++ {
/*if int64(float64(time.Since(LastRefreshTime[i]))/float64(time.Second)) < IntervalMark[i] {
continue
}*/
for _, v := range cmdstats.opmap {
v.RefreshOpStats(i)
}
LastRefreshTime[i] = time.Now()
}
LastRefreshTime[i] = time.Now()
}
cmdstats.RUnlock()
}()
}
}()
}
Expand Down Expand Up @@ -304,10 +306,10 @@ func (s *delayInfo) refresh4TpInfo(cmd string) {
}

if index1 >= 0 && index2 >= index1 && index3 >= index2 && index4 >= index3 && index4 < TPMaxNum {
s.tp90 = cmdstats.tpdelay[index1]
s.tp99 = cmdstats.tpdelay[index2]
s.tp999 = cmdstats.tpdelay[index3]
s.tp9999 = cmdstats.tpdelay[index4]
s.tp90 = cmdstats.tpDelay[index1]
s.tp99 = cmdstats.tpDelay[index2]
s.tp999 = cmdstats.tpDelay[index3]
s.tp9999 = cmdstats.tpDelay[index4]
return
}

Expand Down Expand Up @@ -500,24 +502,29 @@ func OpQPS() int64 {
}

func getOpStats(opstr string, create bool) *opStats {
cmdstats.RLock()
s := cmdstats.opmap[opstr]
cmdstats.RUnlock()

if s != nil || !create {
var (
ok bool
s *opStats
)
func() {
cmdstats.opmapLock.RLock()
defer cmdstats.opmapLock.RUnlock()
s, ok = cmdstats.opmap[opstr]
}()
if (ok && s != nil) || !create {
return s
}

cmdstats.Lock()
cmdstats.opmapLock.Lock()
defer cmdstats.opmapLock.Unlock()
s = cmdstats.opmap[opstr]
if s == nil {
if !ok || s == nil {
s = &opStats{opstr: opstr}
for i := 0; i < IntervalNum; i++ {
s.delayInfo[i] = &delayInfo{interval: IntervalMark[i]}
}
cmdstats.opmap[opstr] = s
}
cmdstats.Unlock()
return s
}

Expand All @@ -537,34 +544,34 @@ func (s sliceOpStats) Less(i, j int) bool {

func GetOpStatsAll() []*OpStats {
var all = make([]*OpStats, 0, 128)
cmdstats.RLock()
cmdstats.opmapLock.RLock()
defer cmdstats.opmapLock.Unlock()
for _, s := range cmdstats.opmap {
all = append(all, s.GetOpStatsByInterval(1))
}
cmdstats.RUnlock()
sort.Sort(sliceOpStats(all))
return all
}

func GetOpStatsByInterval(interval int64) []*OpStats {
var all = make([]*OpStats, 0, 128)
cmdstats.RLock()
cmdstats.opmapLock.RLock()
defer cmdstats.opmapLock.RUnlock()
for _, s := range cmdstats.opmap {
for i := 0; i < IntervalNum; i++ {
s.RefreshOpStats(i)
}
all = append(all, s.GetOpStatsByInterval(interval))
}
cmdstats.RUnlock()
sort.Sort(sliceOpStats(all))
return all
}

func ResetStats() {
// Since the session has already obtained the struct from cmdstats.opmap, it cannot be reassigned and can only be reset to zero.
// Therefore, the command count will not decrease after the reset.
cmdstats.RLock()
defer cmdstats.RUnlock()
cmdstats.opmapLock.RLock()
defer cmdstats.opmapLock.RUnlock()
for _, v := range cmdstats.opmap {
v.calls.Set(0)
v.nsecs.Set(0)
Expand Down

0 comments on commit 3651017

Please sign in to comment.