Skip to content

Commit

Permalink
simplfied parsing of zk topic info
Browse files Browse the repository at this point in the history
  • Loading branch information
andreas-schroeder committed Oct 17, 2016
1 parent 08a5082 commit 2e45e47
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 114 deletions.
15 changes: 3 additions & 12 deletions check/cluster_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,8 @@ func (check *HealthCheck) checkTopics(metadata *proto.MetadataResp, zkTopics []Z
topicStatus.ZooKeeper = "Missing ZooKeeper metadata"
}

zkPartitionMap := make(map[int32]ZkPartition)
if ok {
for _, partition := range zkTopic.Partitions {
zkPartitionMap[partition.ID] = partition
}
}

for _, partition := range topic.Partitions {
pStatus := checkPartition(&partition, zkPartitionMap, &topicStatus)
pStatus := checkPartition(&partition, &zkTopic, &topicStatus)
topicStatus.Status = worstStatus(topicStatus.Status, pStatus)
}

Expand All @@ -74,17 +67,15 @@ func (check *HealthCheck) checkTopics(metadata *proto.MetadataResp, zkTopics []Z
return
}

func checkPartition(partition *proto.MetadataRespPartition, zkPartitionMap map[int32]ZkPartition, topicStatus *TopicStatus) string {
func checkPartition(partition *proto.MetadataRespPartition, zkTopic *ZkTopic, topicStatus *TopicStatus) string {
status := PartitionStatus{Status: green}

replicas := partition.Replicas

zkPartition, ok := zkPartitionMap[partition.ID]
replicas, ok := zkTopic.Partitions[partition.ID]
if !ok {
status.Status = red
status.ZooKeeper = "Missing ZooKeeper metadata"
} else {
replicas = zkPartition.Replicas
}

if len(partition.Isrs) < len(replicas) {
Expand Down
22 changes: 6 additions & 16 deletions check/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,28 +192,18 @@ func (check *HealthCheck) createTopic(name string, forHealthCheck bool) (err err

}

func findPartition(ID int32, partitions []ZkPartition) (*ZkPartition, bool) {
for _, p := range partitions {
if p.ID == ID {
return &p, true
}
}

return nil, false
}

func maybeExpandReplicationTopic(zk ZkConnection, brokerID, partitionID int32, topicName, chroot string) error {
partitions, err := zkPartitions(zk, topicName, chroot)
topic := ZkTopic{Name: topicName}
err := zkPartitions(&topic, zk, topicName, chroot)
if err != nil {
return errors.Wrap(err, "Unable to determine if replication topic should be expanded")
}

partition, ok := findPartition(partitionID, partitions)
replicas, ok := topic.Partitions[partitionID]
if !ok {
return fmt.Errorf(`Cannot find partition with ID %d in topic "%s"`, partitionID, topicName)
}

replicas := partition.Replicas
if !contains(replicas, brokerID) {
log.Info("Expanding replication check topic to include broker ", brokerID)
replicas = append(replicas, brokerID)
Expand Down Expand Up @@ -296,18 +286,18 @@ func (check *HealthCheck) closeConnection(deleteTopicIfPresent bool) {
}

func (check *HealthCheck) deleteTopic(zkConn ZkConnection, chroot, name string, partitionID int32) error {
partitions, err := zkPartitions(zkConn, name, chroot)
topic := ZkTopic{Name: name}
err := zkPartitions(&topic, zkConn, name, chroot)
if err != nil {
return err
}

partition, ok := findPartition(partitionID, partitions)
replicas, ok := topic.Partitions[partitionID]
if !ok {
return fmt.Errorf(`Cannot find partition with ID %d in topic "%s"`, partitionID, name)
}

brokerID := int32(check.config.brokerID)
replicas := partition.Replicas
if len(replicas) > 1 {
log.Info("Shrinking replication check topic to exclude broker ", brokerID)
replicas = delAll(replicas, brokerID)
Expand Down
11 changes: 6 additions & 5 deletions check/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,8 @@ func healthyZkTopics() []ZkTopic {
return []ZkTopic{
{
Name: "some-topic",
Partitions: []ZkPartition{
{
ID: 2,
Replicas: []int32{2, 1},
},
Partitions: map[int32][]int32{
2: {2, 1},
},
},
}
Expand Down Expand Up @@ -322,6 +319,10 @@ func newZkTestCheck(ctrl *gomock.Controller) (check *HealthCheck, zookeeper *Moc
return
}

func (zookeeper *MockZkConnection) mockGet(path, data string) {
zookeeper.EXPECT().Get(path).Return([]byte(data), nil, nil)
}

func (zookeeper *MockZkConnection) mockTopicGet(name string) {
zookeeper.EXPECT().Get("/brokers/topics/"+name).Return([]byte(`{"version":1,"partitions":{"0":[1]}}`), nil, nil)
}
Expand Down
78 changes: 5 additions & 73 deletions check/zk_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package check

import (
"encoding/json"
"fmt"
"strconv"
"time"

Expand All @@ -11,12 +10,7 @@ import (

type ZkTopic struct {
Name string
Partitions []ZkPartition
}

type ZkPartition struct {
ID int32
Replicas []int32
Partitions map[int32][]int32 `json:"partitions"`
}

func (check *HealthCheck) getZooKeeperMetadata() (topics []ZkTopic, brokers []int32, err error) {
Expand Down Expand Up @@ -67,83 +61,21 @@ func zkTopics(zk ZkConnection, chroot string) (topics []ZkTopic, err error) {

for _, topicName := range topicNames {
topic := ZkTopic{Name: topicName}
partitions, err := zkPartitions(zk, topicName, chroot)
err := zkPartitions(&topic, zk, topicName, chroot)
if err != nil {
return nil, err
}
topic.Partitions = partitions
topics = append(topics, topic)
}
return
}

func zkPartitions(zk ZkConnection, name, chroot string) (partitions []ZkPartition, err error) {
func zkPartitions(topic *ZkTopic, zk ZkConnection, name, chroot string) error {
topicPath := chroot + "/brokers/topics/" + name
dataBytes, _, err := zk.Get(topicPath)
if err != nil {
return nil, err
}
partitions, err = parseZkPartitions(dataBytes, topicPath)
return
}

func parseZkPartitions(dataBytes []byte, topicPath string) (zkPartitions []ZkPartition, err error) {
var topic interface{}
err = json.Unmarshal(dataBytes, &topic)
if err != nil {
return nil, err
}

parseError := func(detailsFmt string, a ...interface{}) error {
details := fmt.Sprintf(detailsFmt, a...)
return errors.New(fmt.Sprintf(
"Unexpected partition data content: %s\nPath is %s\nPartition data is %s", details, topicPath, string(dataBytes)))
}

parseZkReplicas := func(replicas interface{}) (zkReplicas []int32, err error) {
switch replicas := replicas.(type) {
default:
return nil, parseError("Unable to parse replica id array from %v of type %T", replicas, replicas)
case []interface{}:
for _, replica := range replicas {
switch replica := replica.(type) {
default:
return nil, parseError("Unable to parse replica id from %s of type %T", replica, replica)
case float64:
zkReplicas = append(zkReplicas, int32(replica))
}
}
}
return
return err
}

switch topic := topic.(type) {
default:
return nil, parseError("Data is no json object, but %T", topic)
case map[string]interface{}:
partitions, ok := topic["partitions"]
if !ok {
return nil, parseError("Json attribute 'partitions' not found")
}
switch partitions := partitions.(type) {
default:
return nil, parseError("Json 'partitions' attribute does not contain json object, but %T", partitions)
case map[string]interface{}:
for id, replicas := range partitions {
id, err := strconv.Atoi(id)
if err != nil {
return nil, parseError("Unable to parse partition ID from %s: %s", id, err.Error())
}
zkPartition := ZkPartition{ID: int32(id)}
zkReplicas, err := parseZkReplicas(replicas)
if err != nil {
return nil, err
}
zkPartition.Replicas = zkReplicas
zkPartitions = append(zkPartitions, zkPartition)
}
}
}

return
return json.Unmarshal(dataBytes, &topic)
}
24 changes: 16 additions & 8 deletions check/zk_metadata_test.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
package check

import (
"github.com/golang/mock/gomock"
"testing"
)

func Test_parseZkPartitions_WhenDataParsable_ReturnsParsedPartitions(t *testing.T) {
dataBytes := []byte(`{"version":1,"partitions":{"12":[2,3,1],"8":[1,2,3],"19":[3,1,2],"23":[1,3,2],"4":[3,2,1],"15":[2,1,3],"11":[1,3,2],"9":[2,1,3],"22":[3,2,1],"26":[1,2,3],"13":[3,1,2],"24":[2,3,1],"16":[3,2,1],"5":[1,3,2],"10":[3,2,1],"21":[2,1,3],"6":[2,3,1],"1":[3,1,2],"17":[1,3,2],"25":[3,1,2],"14":[1,2,3],"0":[2,3,1],"20":[1,2,3],"27":[2,1,3],"2":[1,2,3],"18":[2,3,1],"7":[3,1,2],"29":[1,3,2],"3":[2,1,3],"28":[3,2,1]}}`)
func Test_zkPartitions_WhenDataParsable_ReturnsParsedPartitions(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

partitions, err := parseZkPartitions(dataBytes, "/path")
zkConn := NewMockZkConnection(ctrl)

data := `{"version":1,"partitions":{"12":[2,3,1],"8":[1,2,3],"19":[3,1,2],"23":[1,3,2],"4":[3,2,1],"15":[2,1,3],"11":[1,3,2],"9":[2,1,3],"22":[3,2,1],"26":[1,2,3],"13":[3,1,2],"24":[2,3,1],"16":[3,2,1],"5":[1,3,2],"10":[3,2,1],"21":[2,1,3],"6":[2,3,1],"1":[3,1,2],"17":[1,3,2],"25":[3,1,2],"14":[1,2,3],"0":[2,3,1],"20":[1,2,3],"27":[2,1,3],"2":[1,2,3],"18":[2,3,1],"7":[3,1,2],"29":[1,3,2],"3":[2,1,3],"28":[3,2,1]}}`
zkConn.mockGet("/brokers/topics/test-topic", data)

topic := ZkTopic{Name: "test-topic"}
err := zkPartitions(&topic, zkConn, "test-topic", "")

if err != nil {
t.Errorf("Parsing produced error: %s", err)
}

if len(partitions) != 30 {
t.Errorf("Parsing produced %d partitions, expected 30", len(partitions))
if len(topic.Partitions) != 30 {
t.Errorf("Parsing produced %d partitions, expected 30", len(topic.Partitions))
}

for _, partition := range partitions {
if len(partition.Replicas) != 3 {
t.Errorf("Partition %d has %d replicas, expected 3", partition.ID, len(partition.Replicas))
for id, replicas := range topic.Partitions {
if len(replicas) != 3 {
t.Errorf("Partition %d has %d replicas, expected 3", id, len(replicas))
}
}
}

0 comments on commit 2e45e47

Please sign in to comment.