From 244833d0c2b20f40052cffb51308a7dac6a1c61c Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Tue, 29 Jun 2021 12:46:19 +0300 Subject: [PATCH] Refactor add_cloud_metadata to handle ECS fields easier (#26438) (cherry picked from commit f8bb3a2cb3223d2484d6af1af21b3b877915146b) --- .../add_cloud_metadata/add_cloud_metadata.go | 41 ++++++------------- .../add_cloud_metadata/http_fetcher.go | 2 +- .../provider_alibaba_cloud.go | 2 +- .../add_cloud_metadata/provider_aws_ec2.go | 2 +- .../add_cloud_metadata/provider_azure_vm.go | 2 +- .../provider_digital_ocean.go | 2 +- .../add_cloud_metadata/provider_google_gce.go | 32 ++++++++------- .../provider_openstack_nova.go | 2 +- .../provider_tencent_cloud.go | 2 +- 9 files changed, 37 insertions(+), 50 deletions(-) diff --git a/libbeat/processors/add_cloud_metadata/add_cloud_metadata.go b/libbeat/processors/add_cloud_metadata/add_cloud_metadata.go index 5a511d96b46..974b8f85bf9 100644 --- a/libbeat/processors/add_cloud_metadata/add_cloud_metadata.go +++ b/libbeat/processors/add_cloud_metadata/add_cloud_metadata.go @@ -118,25 +118,10 @@ func (p *addCloudMetadata) Run(event *beat.Event) (*beat.Event, error) { return event, nil } - // If cloud key exists in event already and overwrite flag is set to false, this processor will not overwrite the - // cloud fields. For example aws module writes cloud.instance.* to events already, with overwrite=false, - // add_cloud_metadata should not overwrite these fields with new values. - if !p.initData.overwrite { - cloudValue, _ := event.GetValue("cloud") - if cloudValue != nil { - err := p.extractECSMeta(event, meta) - if err != nil { - return nil, err - } - return event, nil - } - } - - err := p.extractECSMeta(event, meta) + err := p.addMeta(event, meta) if err != nil { return nil, err } - _, err = event.PutValue("cloud", meta) return event, err } @@ -144,23 +129,21 @@ func (p *addCloudMetadata) String() string { return "add_cloud_metadata=" + p.getMeta().String() } -func (p *addCloudMetadata) extractECSMeta(event *beat.Event, meta common.MapStr) error { - // handle ECS fields first - if !p.initData.overwrite { - orchestratorValue, _ := event.GetValue("orchestrator") - if orchestratorValue != nil { - meta.Delete("orchestrator") - return nil +func (p *addCloudMetadata) addMeta(event *beat.Event, meta common.MapStr) error { + for key, metaVal := range meta { + // If key exists in event already and overwrite flag is set to false, this processor will not overwrite the + // meta fields. For example aws module writes cloud.instance.* to events already, with overwrite=false, + // add_cloud_metadata should not overwrite these fields with new values. + if !p.initData.overwrite { + v, _ := event.GetValue(key) + if v != nil { + continue + } } - } - orchestratorFields, err := meta.GetValue("orchestrator") - if err == nil { - _, err = event.PutValue("orchestrator", orchestratorFields) + _, err := event.PutValue(key, metaVal) if err != nil { return err } } - meta.Delete("orchestrator") - return nil } diff --git a/libbeat/processors/add_cloud_metadata/http_fetcher.go b/libbeat/processors/add_cloud_metadata/http_fetcher.go index e337edd5be9..0af5693526a 100644 --- a/libbeat/processors/add_cloud_metadata/http_fetcher.go +++ b/libbeat/processors/add_cloud_metadata/http_fetcher.go @@ -75,7 +75,7 @@ func (f *httpMetadataFetcher) fetchMetadata(ctx context.Context, client http.Cli // Apply schema. res.metadata = f.conv(res.metadata) - res.metadata["provider"] = f.provider + res.metadata.Put("cloud.provider", f.provider) return res } diff --git a/libbeat/processors/add_cloud_metadata/provider_alibaba_cloud.go b/libbeat/processors/add_cloud_metadata/provider_alibaba_cloud.go index 65dd9c23286..cf82c2d621a 100644 --- a/libbeat/processors/add_cloud_metadata/provider_alibaba_cloud.go +++ b/libbeat/processors/add_cloud_metadata/provider_alibaba_cloud.go @@ -36,7 +36,7 @@ var alibabaCloudMetadataFetcher = provider{ m["service"] = common.MapStr{ "name": "ECS", } - return common.MapStr(m) + return common.MapStr{"cloud": m} } urls, err := getMetadataURLs(c, ecsMetadataHost, []string{ diff --git a/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go b/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go index cbd82571468..350ec63b8c6 100644 --- a/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go +++ b/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go @@ -45,7 +45,7 @@ var ec2MetadataFetcher = provider{ "account": s.Object{"id": c.Str("accountId")}, "image": s.Object{"id": c.Str("imageId")}, }.Apply(m) - return out + return common.MapStr{"cloud": out} } fetcher, err := newMetadataFetcher(config, "aws", nil, metadataHost, ec2Schema, ec2InstanceIdentityURI) diff --git a/libbeat/processors/add_cloud_metadata/provider_azure_vm.go b/libbeat/processors/add_cloud_metadata/provider_azure_vm.go index b3f2a0b3222..8ba8a2284b3 100644 --- a/libbeat/processors/add_cloud_metadata/provider_azure_vm.go +++ b/libbeat/processors/add_cloud_metadata/provider_azure_vm.go @@ -50,7 +50,7 @@ var azureVMMetadataFetcher = provider{ }, "region": c.Str("location"), }.Apply(m) - return out + return common.MapStr{"cloud": out} } fetcher, err := newMetadataFetcher(config, "azure", azHeaders, metadataHost, azSchema, azMetadataURI) diff --git a/libbeat/processors/add_cloud_metadata/provider_digital_ocean.go b/libbeat/processors/add_cloud_metadata/provider_digital_ocean.go index 04da6228378..63c90bc5a96 100644 --- a/libbeat/processors/add_cloud_metadata/provider_digital_ocean.go +++ b/libbeat/processors/add_cloud_metadata/provider_digital_ocean.go @@ -41,7 +41,7 @@ var doMetadataFetcher = provider{ "name": c.Str("serviceName"), }, }.Apply(m) - return out + return common.MapStr{"cloud": out} } doMetadataURI := "/metadata/v1.json" diff --git a/libbeat/processors/add_cloud_metadata/provider_google_gce.go b/libbeat/processors/add_cloud_metadata/provider_google_gce.go index b9d6d4cc203..d6cb4bb74c1 100644 --- a/libbeat/processors/add_cloud_metadata/provider_google_gce.go +++ b/libbeat/processors/add_cloud_metadata/provider_google_gce.go @@ -49,14 +49,15 @@ var gceMetadataFetcher = provider{ gceMetadataURI := "/computeMetadata/v1/?recursive=true&alt=json" gceHeaders := map[string]string{"Metadata-Flavor": "Google"} gceSchema := func(m map[string]interface{}) common.MapStr { - out := common.MapStr{ + cloud := common.MapStr{ "service": common.MapStr{ "name": "GCE", }, } + meta := common.MapStr{} trimLeadingPath := func(key string) { - v, err := out.GetValue(key) + v, err := cloud.GetValue(key) if err != nil { return } @@ -64,7 +65,7 @@ var gceMetadataFetcher = provider{ if !ok { return } - out.Put(key, path.Base(p)) + cloud.Put(key, path.Base(p)) } if instance, ok := m["instance"].(map[string]interface{}); ok { @@ -77,6 +78,10 @@ var gceMetadataFetcher = provider{ "type": c.Str("machineType"), }, "availability_zone": c.Str("zone"), + }.ApplyTo(cloud, instance) + trimLeadingPath("machine.type") + trimLeadingPath("availability_zone") + s.Schema{ "orchestrator": s.Object{ "cluster": c.Dict( "attributes", @@ -85,29 +90,27 @@ var gceMetadataFetcher = provider{ "kubeconfig": c.Str("kubeconfig"), }), }, - }.ApplyTo(out, instance) - trimLeadingPath("machine.type") - trimLeadingPath("availability_zone") + }.ApplyTo(meta, instance) } - if kubeconfig, err := out.GetValue("orchestrator.cluster.kubeconfig"); err == nil { + if kubeconfig, err := meta.GetValue("orchestrator.cluster.kubeconfig"); err == nil { kubeConfig, ok := kubeconfig.(string) if !ok { - out.Delete("orchestrator.cluster.kubeconfig") + meta.Delete("orchestrator.cluster.kubeconfig") } cc := &KubeConfig{} err := yaml.Unmarshal([]byte(kubeConfig), cc) if err != nil { - out.Delete("orchestrator.cluster.kubeconfig") + meta.Delete("orchestrator.cluster.kubeconfig") } if len(cc.Clusters) > 0 { if cc.Clusters[0].Cluster.Server != "" { - out.Delete("orchestrator.cluster.kubeconfig") - out.Put("orchestrator.cluster.url", cc.Clusters[0].Cluster.Server) + meta.Delete("orchestrator.cluster.kubeconfig") + meta.Put("orchestrator.cluster.url", cc.Clusters[0].Cluster.Server) } } } else { - out.Delete("orchestrator") + meta.Delete("orchestrator") } if project, ok := m["project"].(map[string]interface{}); ok { @@ -118,10 +121,11 @@ var gceMetadataFetcher = provider{ "account": s.Object{ "id": c.Str("projectId"), }, - }.ApplyTo(out, project) + }.ApplyTo(cloud, project) } - return out + meta.DeepUpdate(common.MapStr{"cloud": cloud}) + return meta } fetcher, err := newMetadataFetcher(config, provider, gceHeaders, metadataHost, gceSchema, gceMetadataURI) diff --git a/libbeat/processors/add_cloud_metadata/provider_openstack_nova.go b/libbeat/processors/add_cloud_metadata/provider_openstack_nova.go index 01ada43cfb3..9922e853d2e 100644 --- a/libbeat/processors/add_cloud_metadata/provider_openstack_nova.go +++ b/libbeat/processors/add_cloud_metadata/provider_openstack_nova.go @@ -49,7 +49,7 @@ func buildOpenstackNovaCreate(scheme string) func(provider string, c *common.Con m["service"] = common.MapStr{ "name": "Nova", } - return common.MapStr(m) + return common.MapStr{"cloud": m} } urls, err := getMetadataURLsWithScheme(c, scheme, metadataHost, []string{ diff --git a/libbeat/processors/add_cloud_metadata/provider_tencent_cloud.go b/libbeat/processors/add_cloud_metadata/provider_tencent_cloud.go index 0f09f4944ae..f562e2e9609 100644 --- a/libbeat/processors/add_cloud_metadata/provider_tencent_cloud.go +++ b/libbeat/processors/add_cloud_metadata/provider_tencent_cloud.go @@ -36,7 +36,7 @@ var qcloudMetadataFetcher = provider{ m["service"] = common.MapStr{ "name": "CVM", } - return common.MapStr(m) + return common.MapStr{"cloud": m} } urls, err := getMetadataURLs(c, qcloudMetadataHost, []string{