Skip to content

Commit

Permalink
cluster is yellow when partitions are under-replicated
Browse files Browse the repository at this point in the history
- reports cluster status as json
- detects under-replicated partitions and reports cluster as yellow
- checks broker metadata with ZooKeeper metadata
  • Loading branch information
andreas-schroeder committed Oct 3, 2016
1 parent 3ebbf62 commit 67b333d
Show file tree
Hide file tree
Showing 13 changed files with 486 additions and 83 deletions.
9 changes: 0 additions & 9 deletions check/broker_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,6 @@ func (check *HealthCheck) brokerInSync() bool {
return false
}

contains := func(arr []int32, id int32) bool {
for _, e := range arr {
if e == id {
return true
}
}
return false
}

brokerID := int32(check.config.brokerID)

for _, topic := range metadata.Topics {
Expand Down
14 changes: 7 additions & 7 deletions check/broker_health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func Test_checkBrokerHealth_WhenProducedMessageIsConsumed_ReturnsHealthy(t *test
defer close(stop)

check := newTestCheck()
connection := workingBroker(check, ctrl, check.config.topicName, stop)
connection := workingBroker(check, ctrl, stop)
connection.EXPECT().Metadata().Return(outOfSyncMetadata(), nil).AnyTimes()

status := check.checkBrokerHealth()
Expand All @@ -30,7 +30,7 @@ func Test_checkBrokerHealth_WhenProducedMessageIsNotConsumed_ReturnsUnhealthy(t
defer ctrl.Finish()

check := newTestCheck()
stop := brokenBroker(check, ctrl, check.config.topicName)
stop := brokenBroker(check, ctrl)
defer close(stop)

status := check.checkBrokerHealth()
Expand All @@ -47,7 +47,7 @@ func Test_checkBrokerHealth_WhenProducedMessageIsConsumedAndInSync_ReturnsInSync
defer close(stop)

check := newTestCheck()
connection := workingBroker(check, ctrl, check.config.topicName, stop)
connection := workingBroker(check, ctrl, stop)
connection.EXPECT().Metadata().Return(inSyncMetadata(), nil).AnyTimes()

status := check.checkBrokerHealth()
Expand All @@ -57,8 +57,8 @@ func Test_checkBrokerHealth_WhenProducedMessageIsConsumedAndInSync_ReturnsInSync
}
}

func workingBroker(check *HealthCheck, ctrl *gomock.Controller, topicName string, stop <-chan struct{}) *MockBrokerConnection {
connection, broker, consumer, _ := mockBroker(check, ctrl, topicName)
func workingBroker(check *HealthCheck, ctrl *gomock.Controller, stop <-chan struct{}) *MockBrokerConnection {
connection, broker, consumer, _ := mockBroker(check, ctrl)

go func() {
for {
Expand All @@ -80,8 +80,8 @@ func workingBroker(check *HealthCheck, ctrl *gomock.Controller, topicName string
return connection
}

func brokenBroker(check *HealthCheck, ctrl *gomock.Controller, topicName string) chan struct{} {
_, broker, consumer, _ := mockBroker(check, ctrl, topicName)
func brokenBroker(check *HealthCheck, ctrl *gomock.Controller) chan struct{} {
_, broker, consumer, _ := mockBroker(check, ctrl)

stop := make(chan struct{})
ticker := time.NewTicker(5 * time.Millisecond)
Expand Down
164 changes: 155 additions & 9 deletions check/cluster_health_check.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,38 @@
package check

import "log"
import (
"encoding/json"

log "github.com/Sirupsen/logrus"
"github.com/optiopay/kafka/proto"
)

type ClusterStatus struct {
Status string `json:"status"`
Topics []TopicStatus `json:"topics,omitempty"`
Brokers []BrokerMetadata `json:"metadata,omitempty"`
ZooKeeper string `json:"zookeeper-connection,omitempty"`
}

type TopicStatus struct {
Topic string `json:"topic"`
Status string
ZooKeeper string `json:"zookeeper,omitempty"`
Partitions []PartitionStatus `json:"partitions,omitempty"`
}

type PartitionStatus struct {
ID int32 `json:"id"`
Status string `json:"status"`
ZooKeeper string `json:"zookeeper,omitempty"`
OutOfSyncBrokers []int32 `json:"out-of-sync-brokers,omitempty"`
}

type BrokerMetadata struct {
ID int32 `json:"broker"`
Status string `json:"status"`
Problem string `json:"problem"`
}

const (
green = "green"
Expand All @@ -9,24 +41,138 @@ const (
)

// periodically checks health of the Kafka cluster
func (check *HealthCheck) checkClusterHealth() string {
func (check *HealthCheck) checkClusterHealth() []byte {
metadata, err := check.broker.Metadata()

var clusterStatus ClusterStatus = ClusterStatus{Status: red}
if err != nil {
log.Println("Error while retrieving metadata:", err)
return red
} else {
clusterStatus = ClusterStatus{Status: green}
zkTopics, zkBrokers, err := check.getZooKeeperMetadata(&clusterStatus)
if err == nil {
mStatus := check.checkBrokerMetadata(metadata, zkBrokers, &clusterStatus)
tStatus := check.checkTopics(metadata, zkTopics, &clusterStatus)
clusterStatus.Status = worstStatus(tStatus, mStatus)
}
}

data, err := json.Marshal(clusterStatus)

if err != nil {
log.Println("Error while marshaling cluster status", err)
return []byte("{\"status\": \"" + red + "\"}")
} else {
return data
}
}

func (check *HealthCheck) checkBrokerMetadata(metadata *proto.MetadataResp, zkBrokers []int32, cluster *ClusterStatus) (status string) {
status = green

var brokersFromMeta []int32
for _, broker := range metadata.Brokers {
brokersFromMeta = append(brokersFromMeta, broker.NodeID)
}

for _, broker := range brokersFromMeta {
if !contains(zkBrokers, broker) {
cluster.Brokers = append(cluster.Brokers, BrokerMetadata{broker, red, "missing in ZooKeeper"})
status = red
}
}

for _, broker := range zkBrokers {
if !contains(brokersFromMeta, broker) {
cluster.Brokers = append(cluster.Brokers, BrokerMetadata{broker, red, "missing in Metadata"})
status = red
}
}

return
}

func (check *HealthCheck) checkTopics(metadata *proto.MetadataResp, zkTopics []ZkTopic, cluster *ClusterStatus) (status string) {

zkTopicMap := make(map[string]ZkTopic)
for _, topic := range zkTopics {
zkTopicMap[topic.Name] = topic
}

status = green
for _, topic := range metadata.Topics {
for _, partition := range topic.Partitions {
if len(partition.Isrs) == 0 {
return red // offline partitions exist.
zkTopic, ok := zkTopicMap[topic.Name]
topicStatus := TopicStatus{Topic: topic.Name, Status: green}
if !ok {
topicStatus.Status = red
topicStatus.ZooKeeper = "Missing ZooKeeper metadata"
}

zkPartitionMap := make(map[int32]ZkPartition)
if ok {
for _, partition := range zkTopic.Partitions {
zkPartitionMap[partition.ID] = partition
}
if len(partition.Isrs) < len(partition.Replicas) {
return yellow // under-replicated partitions exist.
}

for _, partition := range topic.Partitions {
pStatus := checkPartition(partition, zkPartitionMap, &topicStatus)
topicStatus.Status = worstStatus(topicStatus.Status, pStatus)
}
cluster.Topics = append(cluster.Topics, topicStatus)
status = worstStatus(topicStatus.Status, status)

if topicStatus.Status != green {
log.Infof("Reporting topic %s as %s", topicStatus.Topic, topicStatus.Status)
}
}

return
}

func checkPartition(partition proto.MetadataRespPartition, zkPartitionMap map[int32]ZkPartition, topicStatus *TopicStatus) string {
status := PartitionStatus{ID: partition.ID, Status: green}

replicas := partition.Replicas

zkPartition, ok := zkPartitionMap[partition.ID]
if !ok {
status.Status = red
status.ZooKeeper = "Missing ZooKeeper metadata"
} else {
replicas = zkPartition.Replicas
}

if len(partition.Isrs) < len(replicas) {
for _, replica := range replicas {
if !contains(partition.Isrs, replica) {
status.OutOfSyncBrokers = append(status.OutOfSyncBrokers, replica)
}
}
status.Status = yellow // partition is under-replicated.
}
if len(partition.Isrs) == 0 {
status.Status = red // partition is offline.
}
if status.Status != green {
log.Infof("Reporting partition %d of topic %s as %s", status.ID, topicStatus.Topic, status.Status)
topicStatus.Partitions = append(topicStatus.Partitions, status)
}

return status.Status
}

return green // all replicas up to date.
func worstStatus(s1 string, s2 string) string {
switch s1 {
case green:
return s2
case yellow:
if s2 == green {
return s1
}
return s2
case red:
return s1
}
return s2
}
47 changes: 27 additions & 20 deletions check/cluster_health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,45 @@ package check
import (
"testing"

"encoding/json"
"github.com/golang/mock/gomock"
)

func Test_checkClusterHealth_WhenAllPartitionsReplicated_ReportsGreen(t *testing.T) {
func Test_checkClusterHealth_WhenAllMetadataConsistent_ReportsGreen(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
stop := make(chan struct{})
defer close(stop)

check := newTestCheck()
connection := workingBroker(check, ctrl, check.config.topicName, stop)
connection.EXPECT().Metadata().Return(healthyMetadata(check.config.topicName), nil).AnyTimes()
check, zk := newZkTestCheck(ctrl)
connection := workingBroker(check, ctrl, stop)
connection.EXPECT().Metadata().Return(healthyMetadata("some-topic"), nil).AnyTimes()
zk.mockHealthyMetadata("some-topic")

clusterStatus := check.checkClusterHealth()
var clusterStatus ClusterStatus
json.Unmarshal(check.checkClusterHealth(), &clusterStatus)

if clusterStatus != green {
t.Errorf("CheckHealth reported cluster status as %s, expected %s", clusterStatus, green)
if clusterStatus.Status != green {
t.Errorf("CheckHealth reported cluster status as %v, expected %s", clusterStatus, green)
}
}

func Test_checkClusterHealth_WhenSomePartitionUnderreplicated_ReportsYellow(t *testing.T) {
func Test_checkClusterHealth_WhenSomePartitionUnderReplicated_ReportsYellow(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
stop := make(chan struct{})
defer close(stop)

check := newTestCheck()
connection := workingBroker(check, ctrl, check.config.topicName, stop)
connection.EXPECT().Metadata().Return(outOfSyncMetadata(), nil).AnyTimes()
check, zk := newZkTestCheck(ctrl)
connection := workingBroker(check, ctrl, stop)
connection.EXPECT().Metadata().Return(underReplicatedMetadata(), nil).AnyTimes()
zk.mockHealthyMetadata("some-topic")

clusterStatus := check.checkClusterHealth()
var clusterStatus ClusterStatus
json.Unmarshal(check.checkClusterHealth(), &clusterStatus)

if clusterStatus != yellow {
t.Errorf("CheckHealth reported cluster status as %s, expected %s", clusterStatus, yellow)
if clusterStatus.Status != yellow {
t.Errorf("CheckHealth reported cluster status as %v, expected %s", clusterStatus, yellow)
}
}

Expand All @@ -46,13 +51,15 @@ func Test_checkClusterHealth_WhenSomePartitionOffline_ReportsRed(t *testing.T) {
stop := make(chan struct{})
defer close(stop)

check := newTestCheck()
connection := workingBroker(check, ctrl, check.config.topicName, stop)
connection.EXPECT().Metadata().Return(offlinecMetadata(), nil).AnyTimes()
check, zk := newZkTestCheck(ctrl)
connection := workingBroker(check, ctrl, stop)
connection.EXPECT().Metadata().Return(offlineMetadata(), nil).AnyTimes()
zk.mockHealthyMetadata("some-topic")

clusterStatus := check.checkClusterHealth()
var clusterStatus ClusterStatus
json.Unmarshal(check.checkClusterHealth(), &clusterStatus)

if clusterStatus != red {
t.Errorf("CheckHealth reported cluster status as %s, expected %s", clusterStatus, red)
if clusterStatus.Status != red {
t.Errorf("CheckHealth reported cluster status as %v, expected %s", clusterStatus, red)
}
}
10 changes: 10 additions & 0 deletions check/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type ZkConnection interface {
Exists(path string) (bool, *zk.Stat, error)
Set(path string, data []byte, version int32) (*zk.Stat, error)
Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
Children(path string) ([]string, *zk.Stat, error)
Get(path string) ([]byte, *zk.Stat, error)
}

// Actual implementation based on samuel/go-zookeeper/zk
Expand Down Expand Up @@ -86,3 +88,11 @@ func (zkConn *zkConnection) Set(path string, data []byte, version int32) (*zk.St
func (zkConn *zkConnection) Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error) {
return zkConn.connection.Create(path, data, flags, acl)
}

func (zkConn *zkConnection) Children(path string) ([]string, *zk.Stat, error) {
return zkConn.connection.Children(path)
}

func (zkConn *zkConnection) Get(path string) ([]byte, *zk.Stat, error) {
return zkConn.connection.Get(path)
}
24 changes: 24 additions & 0 deletions check/connectors_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,27 @@ func (_m *MockZkConnection) Create(path string, data []byte, flags int32, acl []
func (_mr *_MockZkConnectionRecorder) Create(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Create", arg0, arg1, arg2, arg3)
}

func (_m *MockZkConnection) Children(path string) ([]string, *zk.Stat, error) {
ret := _m.ctrl.Call(_m, "Children", path)
ret0, _ := ret[0].([]string)
ret1, _ := ret[1].(*zk.Stat)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}

func (_mr *_MockZkConnectionRecorder) Children(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Children", arg0)
}

func (_m *MockZkConnection) Get(path string) ([]byte, *zk.Stat, error) {
ret := _m.ctrl.Call(_m, "Get", path)
ret0, _ := ret[0].([]byte)
ret1, _ := ret[1].(*zk.Stat)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}

func (_mr *_MockZkConnectionRecorder) Get(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Get", arg0)
}
Loading

0 comments on commit 67b333d

Please sign in to comment.