Skip to content

Commit

Permalink
fix: eliminate race condition on Monitor.globalTags (influxdata#23467)
Browse files Browse the repository at this point in the history
  • Loading branch information
gwossum authored and chengshiwen committed Aug 27, 2024
1 parent 284dd93 commit c7e1bd0
Showing 1 changed file with 41 additions and 8 deletions.
49 changes: 41 additions & 8 deletions monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,39 @@ const (
MonitorRetentionPolicyReplicaN = 1
)

// tags provides thread-safe tag handling
type tags struct {
mu sync.RWMutex
tags map[string]string
}

// NewTags creates a new tags struct to use
func newTags() *tags {
return &tags{
tags: make(map[string]string),
}
}

// Add adds a new tag to the tag collection
func (t *tags) Add(key string, value interface{}) {
t.mu.Lock()
defer t.mu.Unlock()

t.tags[key] = fmt.Sprintf("%v", value)
}

// Tags safely returns a copy of the current tag mapping
func (t *tags) Tags() map[string]string {
t.mu.RLock()
defer t.mu.RUnlock()

r := make(map[string]string)
for k, v := range t.tags {
r[k] = v
}
return r
}

// Monitor represents an instance of the monitor system.
type Monitor struct {
// Build information for diagnostics.
Expand All @@ -42,8 +75,9 @@ type Monitor struct {

wg sync.WaitGroup

globalTags *tags

mu sync.RWMutex
globalTags map[string]string
diagRegistrations map[string]diagnostics.Client
reporter Reporter
done chan struct{}
Expand Down Expand Up @@ -73,7 +107,7 @@ type PointsWriter interface {
// New returns a new instance of the monitor system.
func New(r Reporter, c Config) *Monitor {
return &Monitor{
globalTags: make(map[string]string),
globalTags: newTags(),
diagRegistrations: make(map[string]diagnostics.Client),
reporter: r,
storeEnabled: c.StoreEnabled,
Expand Down Expand Up @@ -138,9 +172,10 @@ func (m *Monitor) WritePoints(p models.Points) error {
return nil
}

if len(m.globalTags) > 0 {
gt := m.globalTags.Tags()
if len(gt) > 0 {
for _, pp := range p {
pp.SetTags(pp.Tags().Merge(m.globalTags))
pp.SetTags(pp.Tags().Merge(gt))
}
}

Expand Down Expand Up @@ -185,9 +220,7 @@ func (m *Monitor) Close() error {
// SetGlobalTag can be used to set tags that will appear on all points
// written by the Monitor.
func (m *Monitor) SetGlobalTag(key string, value interface{}) {
m.mu.Lock()
m.globalTags[key] = fmt.Sprintf("%v", value)
m.mu.Unlock()
m.globalTags.Add(key, value)
}

// RemoteWriterConfig represents the configuration of a remote writer.
Expand Down Expand Up @@ -440,7 +473,7 @@ func (m *Monitor) storeStatistics() {
m.createInternalStorage()
}()

stats, err := m.Statistics(m.globalTags)
stats, err := m.Statistics(m.globalTags.Tags())
if err != nil {
m.Logger.Info("Failed to retrieve registered statistics", zap.Error(err))
return
Expand Down

0 comments on commit c7e1bd0

Please sign in to comment.