From e0df9036f0626299ae609ed8bab37fad8f8684bf Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sat, 22 Jun 2024 06:39:08 +0930 Subject: [PATCH] x-pack/filebeat/input/entityanalytics/provider/jamf: add jamf provider --- CHANGELOG.next.asciidoc | 1 + .../inputs/input-entity-analytics.asciidoc | 175 ++++++- .../filebeat/input/entityanalytics/input.go | 1 + .../entityanalytics/provider/jamf/.gitignore | 1 + .../entityanalytics/provider/jamf/conf.go | 182 +++++++ .../provider/jamf/conf_test.go | 121 +++++ .../entityanalytics/provider/jamf/jamf.go | 485 ++++++++++++++++++ .../provider/jamf/jamf_test.go | 157 ++++++ .../entityanalytics/provider/jamf/metrics.go | 50 ++ .../provider/jamf/state_string.go | 30 ++ .../provider/jamf/statestore.go | 198 +++++++ .../provider/jamf/statestore_test.go | 226 ++++++++ 12 files changed, 1625 insertions(+), 2 deletions(-) create mode 100644 x-pack/filebeat/input/entityanalytics/provider/jamf/.gitignore create mode 100644 x-pack/filebeat/input/entityanalytics/provider/jamf/conf.go create mode 100644 x-pack/filebeat/input/entityanalytics/provider/jamf/conf_test.go create mode 100644 x-pack/filebeat/input/entityanalytics/provider/jamf/jamf.go create mode 100644 x-pack/filebeat/input/entityanalytics/provider/jamf/jamf_test.go create mode 100644 x-pack/filebeat/input/entityanalytics/provider/jamf/metrics.go create mode 100644 x-pack/filebeat/input/entityanalytics/provider/jamf/state_string.go create mode 100644 x-pack/filebeat/input/entityanalytics/provider/jamf/statestore.go create mode 100644 x-pack/filebeat/input/entityanalytics/provider/jamf/statestore_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8dd90c6ee5c6..d838ac16ca5e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -281,6 +281,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix handling of infinite rate values in CEL rate limit handling logic. {pull}39940[39940] - Allow elision of set and append failure logging. {issue}34544[34544] {pull}39929[39929] - Add ability to remove request trace logs from CEL input. {pull}39969[39969] +- Add Jamf entity analytics provider. {pull}39996[39996] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc b/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc index 41c015761231..c1ee4fd5f839 100644 --- a/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc @@ -18,6 +18,7 @@ The following identity providers are supported: - <> - <> +- <> - <> ==== Configuration options @@ -521,6 +522,176 @@ For Example, `http-request-trace-*.ndjson`. Enabling this option compromises security and should only be used for debugging. +[id="provider-jamf"] +==== Jamf Computer Management (`jamf`) + +The `jamf` provider allows the input to retrieve computer records from the +Jamf API. + +[float] +==== How It Works + +[float] +===== Overview + +The Jamf provider periodically contacts the Jamf API, retrieving updates for +computers, updates its internal cache of managed computer metadata, and ships +updated metadata to Elasticsearch. + +Fetching and shipping updates occurs in one of two processes: *full +synchronizations* and *incremental updates*. Full synchronizations will send +the entire list of computers in state, along with write markers to +indicate the start and end of the synchronization event. Incremental updates +will only send data for changed computers records during that event. Changes +on a user or device can come in many forms, whether it be a change to the +user's metadata, or a user was added or deleted. + +[float] +===== API Interactions + +The provider periodically retrieves changes to user/device metadata from the +Jamf computers-preview API. This is done through calls to: + +- https://developer.jamf.com/jamf-pro/reference/get_preview-computers[/api/preview/computers] + +Updates are tracked by the provider by retaining a record of the time of the last +noted update in the returned user list. During provider updates the Jamf provider +makes use of the Jamf API's query filtering to only request records updated at or +since the provider's recorded last update. + +[float] +===== Sending Computer Metadata to Elasticsearch + +During a full synchronization, all users/devices stored in state will be sent +to the output, while incremental updates will only send users and devices +that have been updated. Full synchronizations will be bounded on either side +by write marker documents, which will look something like this: + +["source","json",subs="attributes"] +---- +{ + "@timestamp": "2022-11-04T09:57:19.786056-05:00", + "event": { + "action": "started", + "start": "2022-11-04T09:57:19.786056-05:00" + }, + "labels": { + "identity_source": "jamf-1" + } +} +---- + +Documents will show the current state of the computer record. + +Example document: + +["source","json",subs="attributes"] +---- +{ + "device": { + "id": "5982CE36-4526-580B-B4B9-ECC6782535BC" + }, + "event": { + "action": "device-discovered" + }, + "jamf": { + "location": { + "username": "john.doe", + "position": "Unknown Developer" + }, + "site": null, + "name": "acme-C07DM3AZQ6NV", + "udid": "5982CE36-4526-580B-B4B9-ECC6782535BC", + "serialNumber": "C07DM3AZQ6NV", + "operatingSystemVersion": "14.0", + "operatingSystemBuild": "23A344", + "operatingSystemSupplementalBuildVersion": null, + "operatingSystemRapidSecurityResponse": null, + "macAddress": "64:0B:D7:AA:E4:B2", + "assetTag": null, + "modelIdentifier": "Macmini9,1", + "mdmAccessRights": 0, + "lastContactDate": "2024-04-18T14:26:51.514Z", + "lastReportDate": "2024-06-19T15:54:37.692Z", + "lastEnrolledDate": "2023-02-22T10:46:17.199Z", + "ipAddress": null, + "managementId": "1a59c510-b3a9-41cb-8afa-3d4187ac60d0", + "isManaged": true + }, + "labels": { + "identity_source": "jamf-1" + } +} +---- + +[float] +==== Configuration + +Example configuration: + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: entity-analytics + enabled: true + id: jamf-1 + provider: jamf + dataset: "all" + sync_interval: "12h" + update_interval: "30m" + jamf_tenant: "JAMF_TENANT" + jamf_username: "JAMF_USERNAME" + jamf_password: "JAMF_PASSWORD" +---- + +The `jamf` provider supports the following configuration: + +[float] +===== `jamf_tenant` + +The Jamf tenant host. Field is required. + +[float] +===== `jamf_username` + +The Jamf username, used for authentication. Field is required. + +[float] +===== `jamf_password` + +The Jamf user password, used for authentication. Field is required. + +[float] +===== `page_size` + +The number of computer records to collect with each API request. Defaults to https://developer.jamf.com/jamf-pro/reference/get_preview-computers[API default]. + +[float] +===== `sync_interval` + +The interval in which full synchronizations should occur. The interval must be +longer than the update interval (`update_interval`) Expressed as a duration +string (e.g., 1m, 3h, 24h). Defaults to `24h` (24 hours). + +[float] +===== `update_interval` + +The interval in which incremental updates should occur. The interval must be +shorter than the full synchronization interval (`sync_interval`). Expressed as a +duration string (e.g., 1m, 3h, 24h). Defaults to `15m` (15 minutes). + +[float] +==== `tracer.filename` + +It is possible to log HTTP requests and responses to the Jamf API to a local file-system for debugging configurations. +This option is enabled by setting the `tracer.filename` value. Additional options are available to +tune log rotation behavior. + +To differentiate the trace files generated from different input instances, a placeholder `*` can be added to the filename and will be replaced with the input instance id. +For Example, `http-request-trace-*.ndjson`. + +Enabling this option compromises security and should only be used for debugging. + [id="provider-okta"] ==== Okta User Identities (`okta`) @@ -550,8 +721,8 @@ The Okta provider periodically contacts the Okta API, retrieving updates for users and devices, updates its internal cache of user metadata, and ships updated user/device metadata to Elasticsearch. -Fetching and shipping updates occurs in one of two processes: **full -synchronizations** and *incremental updates*. Full synchronizations will send +Fetching and shipping updates occurs in one of two processes: *full +synchronizations* and *incremental updates*. Full synchronizations will send the entire list of users and devices in state, along with write markers to indicate the start and end of the synchronization event. Incremental updates will only send data for changed users and devices during that event. Changes diff --git a/x-pack/filebeat/input/entityanalytics/input.go b/x-pack/filebeat/input/entityanalytics/input.go index 703cc4329a8c..7757d4cf33b8 100644 --- a/x-pack/filebeat/input/entityanalytics/input.go +++ b/x-pack/filebeat/input/entityanalytics/input.go @@ -17,6 +17,7 @@ import ( // For provider registration. _ "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/activedirectory" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/azuread" + _ "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/jamf" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/okta" ) diff --git a/x-pack/filebeat/input/entityanalytics/provider/jamf/.gitignore b/x-pack/filebeat/input/entityanalytics/provider/jamf/.gitignore new file mode 100644 index 000000000000..13df6a73f0dd --- /dev/null +++ b/x-pack/filebeat/input/entityanalytics/provider/jamf/.gitignore @@ -0,0 +1 @@ +*.ndjson diff --git a/x-pack/filebeat/input/entityanalytics/provider/jamf/conf.go b/x-pack/filebeat/input/entityanalytics/provider/jamf/conf.go new file mode 100644 index 000000000000..ec3fe7a5d936 --- /dev/null +++ b/x-pack/filebeat/input/entityanalytics/provider/jamf/conf.go @@ -0,0 +1,182 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package jamf + +import ( + "errors" + "time" + + "gopkg.in/natefinch/lumberjack.v2" + + "github.com/elastic/elastic-agent-libs/transport/httpcommon" +) + +// defaultConfig returns a default configuration. +func defaultConfig() conf { + maxAttempts := 5 + waitMin := time.Second + waitMax := time.Minute + transport := httpcommon.DefaultHTTPTransportSettings() + transport.Timeout = 30 * time.Second + + return conf{ + SyncInterval: 24 * time.Hour, + UpdateInterval: 15 * time.Minute, + TokenGrace: time.Minute, + Request: &requestConfig{ + Retry: retryConfig{ + MaxAttempts: &maxAttempts, + WaitMin: &waitMin, + WaitMax: &waitMax, + }, + RedirectForwardHeaders: false, + RedirectMaxRedirects: 10, + Transport: transport, + }, + } +} + +// conf contains parameters needed to configure the input. +type conf struct { + JamfTenant string `config:"jamf_tenant" validate:"required"` + JamfUsername string `config:"jamf_username" validate:"required"` + JamfPassword string `config:"jamf_password" validate:"required"` + + // PageSize is the number of entities to collect in each request. + PageSize int `config:"page_size"` + + // TokenGrace is purely here to allow tuning the tolerance for + // token staling. It is intentionally not documented in the + // public documentation as it is not a user control. + TokenGrace time.Duration `config:"token_grace_period"` + + // SyncInterval is the time between full + // synchronisation operations. + SyncInterval time.Duration `config:"sync_interval"` + + // UpdateInterval is the time between + // incremental updated. + UpdateInterval time.Duration `config:"update_interval"` + + // Request is the configuration for establishing + // HTTP requests to the API. + Request *requestConfig `config:"request"` + + // Tracer allows configuration of request trace logging. + Tracer *lumberjack.Logger `config:"tracer"` +} + +type requestConfig struct { + Retry retryConfig `config:"retry"` + RedirectForwardHeaders bool `config:"redirect.forward_headers"` + RedirectHeadersBanList []string `config:"redirect.headers_ban_list"` + RedirectMaxRedirects int `config:"redirect.max_redirects"` + KeepAlive keepAlive `config:"keep_alive"` + + Transport httpcommon.HTTPTransportSettings `config:",inline"` +} + +type retryConfig struct { + MaxAttempts *int `config:"max_attempts"` + WaitMin *time.Duration `config:"wait_min"` + WaitMax *time.Duration `config:"wait_max"` +} + +func (c retryConfig) Validate() error { + switch { + case c.MaxAttempts != nil && *c.MaxAttempts <= 0: + return errors.New("max_attempts must be greater than zero") + case c.WaitMin != nil && *c.WaitMin <= 0: + return errors.New("wait_min must be greater than zero") + case c.WaitMax != nil && *c.WaitMax <= 0: + return errors.New("wait_max must be greater than zero") + } + return nil +} + +func (c retryConfig) getMaxAttempts() int { + if c.MaxAttempts == nil { + return 0 + } + return *c.MaxAttempts +} + +func (c retryConfig) getWaitMin() time.Duration { + if c.WaitMin == nil { + return 0 + } + return *c.WaitMin +} + +func (c retryConfig) getWaitMax() time.Duration { + if c.WaitMax == nil { + return 0 + } + return *c.WaitMax +} + +type keepAlive struct { + Disable *bool `config:"disable"` + MaxIdleConns int `config:"max_idle_connections"` + MaxIdleConnsPerHost int `config:"max_idle_connections_per_host"` // If zero, http.DefaultMaxIdleConnsPerHost is the value used by http.Transport. + IdleConnTimeout time.Duration `config:"idle_connection_timeout"` +} + +func (c keepAlive) Validate() error { + if c.Disable == nil || *c.Disable { + return nil + } + if c.MaxIdleConns < 0 { + return errors.New("max_idle_connections must not be negative") + } + if c.MaxIdleConnsPerHost < 0 { + return errors.New("max_idle_connections_per_host must not be negative") + } + if c.IdleConnTimeout < 0 { + return errors.New("idle_connection_timeout must not be negative") + } + return nil +} + +func (c keepAlive) settings() httpcommon.WithKeepaliveSettings { + return httpcommon.WithKeepaliveSettings{ + Disable: c.Disable == nil || *c.Disable, + MaxIdleConns: c.MaxIdleConns, + MaxIdleConnsPerHost: c.MaxIdleConnsPerHost, + IdleConnTimeout: c.IdleConnTimeout, + } +} + +var ( + errInvalidSyncInterval = errors.New("zero or negative sync_interval") + errInvalidUpdateInterval = errors.New("zero or negative update_interval") + errSyncBeforeUpdate = errors.New("sync_interval not longer than update_interval") +) + +// Validate runs validation against the config. +func (c *conf) Validate() error { + switch { + case c.SyncInterval <= 0: + return errInvalidSyncInterval + case c.UpdateInterval <= 0: + return errInvalidUpdateInterval + case c.SyncInterval <= c.UpdateInterval: + return errSyncBeforeUpdate + } + + if c.Tracer == nil { + return nil + } + if c.Tracer.Filename == "" { + return errors.New("request tracer must have a filename if used") + } + if c.Tracer.MaxSize == 0 { + // By default Lumberjack caps file sizes at 100MB which + // is excessive for a debugging logger, so default to 1MB + // which is the minimum. + c.Tracer.MaxSize = 1 + } + return nil +} diff --git a/x-pack/filebeat/input/entityanalytics/provider/jamf/conf_test.go b/x-pack/filebeat/input/entityanalytics/provider/jamf/conf_test.go new file mode 100644 index 000000000000..f1c7f07a5d63 --- /dev/null +++ b/x-pack/filebeat/input/entityanalytics/provider/jamf/conf_test.go @@ -0,0 +1,121 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package jamf + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/transport/httpcommon" +) + +var validateTests = []struct { + name string + cfg conf + wantErr error +}{ + { + name: "default", + cfg: defaultConfig(), + wantErr: nil, + }, + { + name: "invalid_sync_interval", + cfg: conf{ + SyncInterval: 0, + UpdateInterval: time.Second * 2, + }, + wantErr: errInvalidSyncInterval, + }, + { + name: "invalid_update_interval", + cfg: conf{ + SyncInterval: time.Second, + UpdateInterval: 0, + }, + wantErr: errInvalidUpdateInterval, + }, + { + name: "invalid_relative_intervals", + cfg: conf{ + SyncInterval: time.Second, + UpdateInterval: time.Second * 2, + }, + wantErr: errSyncBeforeUpdate, + }, +} + +func TestConfValidate(t *testing.T) { + for _, test := range validateTests { + t.Run(test.name, func(t *testing.T) { + err := test.cfg.Validate() + if err != test.wantErr { + t.Errorf("unexpected error: got:%v want:%v", err, test.wantErr) + } + }) + } +} + +var keepAliveTests = []struct { + name string + input map[string]interface{} + want httpcommon.WithKeepaliveSettings + wantErr error +}{ + { + name: "keep_alive_none", // Default to the old behaviour of true. + input: map[string]interface{}{}, + want: httpcommon.WithKeepaliveSettings{Disable: true}, + }, + { + name: "keep_alive_true", + input: map[string]interface{}{ + "request.keep_alive.disable": true, + }, + want: httpcommon.WithKeepaliveSettings{Disable: true}, + }, + { + name: "keep_alive_false", + input: map[string]interface{}{ + "request.keep_alive.disable": false, + }, + want: httpcommon.WithKeepaliveSettings{Disable: false}, + }, + { + name: "keep_alive_invalid_max", + input: map[string]interface{}{ + "request.keep_alive.disable": false, + "request.keep_alive.max_idle_connections": -1, + }, + wantErr: errors.New("max_idle_connections must not be negative accessing 'request.keep_alive'"), + }, +} + +func TestKeepAliveSetting(t *testing.T) { + for _, test := range keepAliveTests { + t.Run(test.name, func(t *testing.T) { + test.input["resource.url"] = "localhost" + cfg := config.MustNewConfigFrom(test.input) + conf := defaultConfig() + conf.JamfTenant = "test.domain" + conf.JamfUsername = "test_user" + conf.JamfPassword = "test_password" + err := cfg.Unpack(&conf) + if fmt.Sprint(err) != fmt.Sprint(test.wantErr) { + t.Errorf("unexpected error return from Unpack: got: %v want: %v", err, test.wantErr) + } + if err != nil { + return + } + got := conf.Request.KeepAlive.settings() + if got != test.want { + t.Errorf("unexpected setting for %s: got: %#v\nwant:%#v", test.name, got, test.want) + } + }) + } +} diff --git a/x-pack/filebeat/input/entityanalytics/provider/jamf/jamf.go b/x-pack/filebeat/input/entityanalytics/provider/jamf/jamf.go new file mode 100644 index 000000000000..d750a246eda6 --- /dev/null +++ b/x-pack/filebeat/input/entityanalytics/provider/jamf/jamf.go @@ -0,0 +1,485 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// Package jamf provides a computer asset provider for Jamf. +package jamf + +import ( + "context" + "errors" + "fmt" + "net/http" + "net/url" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/hashicorp/go-retryablehttp" + "go.elastic.co/ecszap" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/internal/kvstore" + "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider" + "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/jamf/internal/jamf" + "github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/transport/httpcommon" + "github.com/elastic/go-concert/ctxtool" +) + +func init() { + err := provider.Register(Name, New) + if err != nil { + panic(err) + } +} + +// Name of this provider. +const Name = "jamf" + +// FullName of this provider, including the input name. Prefer using this +// value for full context, especially if the input name isn't present in an +// adjacent log field. +const FullName = "entity-analytics-" + Name + +// jamfInput implements the provider.Provider interface. +type jamfInput struct { + *kvstore.Manager + + cfg conf + + client *http.Client + token jamf.Token + + metrics *inputMetrics + logger *logp.Logger +} + +// New creates a new instance of an Jamf entity provider. +func New(logger *logp.Logger) (provider.Provider, error) { + p := jamfInput{ + cfg: defaultConfig(), + } + p.Manager = &kvstore.Manager{ + Logger: logger, + Type: FullName, + Configure: p.configure, + } + + return &p, nil +} + +// configure configures this provider using the given configuration. +func (p *jamfInput) configure(cfg *config.C) (kvstore.Input, error) { + err := cfg.Unpack(&p.cfg) + if err != nil { + return nil, fmt.Errorf("unable to unpack %s input config: %w", Name, err) + } + return p, nil +} + +// Name returns the name of this provider. +func (p *jamfInput) Name() string { + return FullName +} + +func (*jamfInput) Test(v2.TestContext) error { return nil } + +// Run will start data collection on this provider. +func (p *jamfInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error { + p.logger = inputCtx.Logger.With("provider", Name, "tenant", p.cfg.JamfTenant) + p.metrics = newMetrics(inputCtx.ID, nil) + defer p.metrics.Close() + + lastSyncTime, _ := getLastSync(store) + syncWaitTime := time.Until(lastSyncTime.Add(p.cfg.SyncInterval)) + lastUpdateTime, _ := getLastUpdate(store) + updateWaitTime := time.Until(lastUpdateTime.Add(p.cfg.UpdateInterval)) + + syncTimer := time.NewTimer(syncWaitTime) + updateTimer := time.NewTimer(updateWaitTime) + + if p.cfg.Tracer != nil { + id := sanitizeFileName(inputCtx.ID) + p.cfg.Tracer.Filename = strings.ReplaceAll(p.cfg.Tracer.Filename, "*", id) + } + + var err error + p.client, err = newClient(ctxtool.FromCanceller(inputCtx.Cancelation), p.cfg, p.logger) + if err != nil { + return err + } + + for { + select { + case <-inputCtx.Cancelation.Done(): + if !errors.Is(inputCtx.Cancelation.Err(), context.Canceled) { + return inputCtx.Cancelation.Err() + } + return nil + case <-syncTimer.C: + start := time.Now() + if err := p.runFullSync(inputCtx, store, client); err != nil { + p.logger.Errorw("Error running full sync", "error", err) + p.metrics.syncError.Inc() + } + p.metrics.syncTotal.Inc() + p.metrics.syncProcessingTime.Update(time.Since(start).Nanoseconds()) + + syncTimer.Reset(p.cfg.SyncInterval) + p.logger.Debugf("Next sync expected at: %v", time.Now().Add(p.cfg.SyncInterval)) + + // Reset the update timer and wait the configured interval. If the + // update timer has already fired, then drain the timer's channel + // before resetting. + if !updateTimer.Stop() { + <-updateTimer.C + } + updateTimer.Reset(p.cfg.UpdateInterval) + p.logger.Debugf("Next update expected at: %v", time.Now().Add(p.cfg.UpdateInterval)) + case <-updateTimer.C: + start := time.Now() + if err := p.runIncrementalUpdate(inputCtx, store, client); err != nil { + p.logger.Errorw("Error running incremental update", "error", err) + p.metrics.updateError.Inc() + } + p.metrics.updateTotal.Inc() + p.metrics.updateProcessingTime.Update(time.Since(start).Nanoseconds()) + updateTimer.Reset(p.cfg.UpdateInterval) + p.logger.Debugf("Next update expected at: %v", time.Now().Add(p.cfg.UpdateInterval)) + } + } +} + +func newClient(ctx context.Context, cfg conf, log *logp.Logger) (*http.Client, error) { + c, err := cfg.Request.Transport.Client(clientOptions(cfg.Request.KeepAlive.settings())...) + if err != nil { + return nil, err + } + + c = requestTrace(ctx, c, cfg, log) + + c.CheckRedirect = checkRedirect(cfg.Request, log) + + client := &retryablehttp.Client{ + HTTPClient: c, + Logger: newRetryLog(log), + RetryWaitMin: cfg.Request.Retry.getWaitMin(), + RetryWaitMax: cfg.Request.Retry.getWaitMax(), + RetryMax: cfg.Request.Retry.getMaxAttempts(), + CheckRetry: retryablehttp.DefaultRetryPolicy, + Backoff: retryablehttp.DefaultBackoff, + } + return client.StandardClient(), nil +} + +// requestTrace decorates cli with an httplog.LoggingRoundTripper if cfg.Tracer +// is non-nil. +func requestTrace(ctx context.Context, cli *http.Client, cfg conf, log *logp.Logger) *http.Client { + if cfg.Tracer == nil { + return cli + } + w := zapcore.AddSync(cfg.Tracer) + go func() { + // Close the logger when we are done. + <-ctx.Done() + cfg.Tracer.Close() + }() + core := ecszap.NewCore( + ecszap.NewDefaultEncoderConfig(), + w, + zap.DebugLevel, + ) + traceLogger := zap.New(core) + + const margin = 10e3 // 1OkB ought to be enough room for all the remainder of the trace details. + maxSize := cfg.Tracer.MaxSize * 1e6 + cli.Transport = httplog.NewLoggingRoundTripper(cli.Transport, traceLogger, max(0, maxSize-margin), log) + return cli +} + +// sanitizeFileName returns name with ":" and "/" replaced with "_", removing +// repeated instances. The request.tracer.filename may have ":" when an input +// has cursor config and the macOS Finder will treat this as path-separator and +// causes to show up strange filepaths. +func sanitizeFileName(name string) string { + name = strings.ReplaceAll(name, ":", string(filepath.Separator)) + name = filepath.Clean(name) + return strings.ReplaceAll(name, string(filepath.Separator), "_") +} + +// clientOption returns constructed client configuration options, including +// setting up http+unix and http+npipe transports if requested. +func clientOptions(keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption { + return []httpcommon.TransportOption{ + httpcommon.WithAPMHTTPInstrumentation(), + keepalive, + } +} + +func checkRedirect(cfg *requestConfig, log *logp.Logger) func(*http.Request, []*http.Request) error { + return func(req *http.Request, via []*http.Request) error { + log.Debug("http client: checking redirect") + if len(via) >= cfg.RedirectMaxRedirects { + log.Debug("http client: max redirects exceeded") + return fmt.Errorf("stopped after %d redirects", cfg.RedirectMaxRedirects) + } + + if !cfg.RedirectForwardHeaders || len(via) == 0 { + log.Debugf("http client: nothing to do while checking redirects - forward_headers: %v, via: %#v", cfg.RedirectForwardHeaders, via) + return nil + } + + prev := via[len(via)-1] // previous request to get headers from + + log.Debugf("http client: forwarding headers from previous request: %#v", prev.Header) + req.Header = prev.Header.Clone() + + for _, k := range cfg.RedirectHeadersBanList { + log.Debugf("http client: ban header %v", k) + req.Header.Del(k) + } + + return nil + } +} + +// retryLog is a shim for the retryablehttp.Client.Logger. +type retryLog struct{ log *logp.Logger } + +func newRetryLog(log *logp.Logger) *retryLog { + return &retryLog{log: log.Named("retryablehttp").WithOptions(zap.AddCallerSkip(1))} +} + +func (l *retryLog) Error(msg string, kv ...interface{}) { l.log.Errorw(msg, kv...) } +func (l *retryLog) Info(msg string, kv ...interface{}) { l.log.Infow(msg, kv...) } +func (l *retryLog) Debug(msg string, kv ...interface{}) { l.log.Debugw(msg, kv...) } +func (l *retryLog) Warn(msg string, kv ...interface{}) { l.log.Warnw(msg, kv...) } + +// runFullSync performs a full synchronization. It will fetch user and group +// identities from Azure Active Directory, enrich users with group memberships, +// and publishes all known users (regardless if they have been modified) to the +// given beat.Client. +func (p *jamfInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error { + p.logger.Debugf("Running full sync...") + + p.logger.Debugf("Opening new transaction...") + state, err := newStateStore(store) + if err != nil { + return fmt.Errorf("unable to begin transaction: %w", err) + } + p.logger.Debugf("Transaction opened") + defer func() { // If commit is successful, call to this close will be no-op. + closeErr := state.close(false) + if closeErr != nil { + p.logger.Errorw("Error rolling back full sync transaction", "error", closeErr) + } + }() + + ctx := ctxtool.FromCanceller(inputCtx.Cancelation) + p.logger.Debugf("Starting fetch...") + _, err = p.doFetchComputers(ctx, state, true) + if err != nil { + return err + } + + if len(state.computers) != 0 { + tracker := kvstore.NewTxTracker(ctx) + + start := time.Now() + p.publishMarker(start, start, inputCtx.ID, true, client, tracker) + for _, c := range state.computers { + p.publishComputer(c, inputCtx.ID, client, tracker) + } + + end := time.Now() + p.publishMarker(end, end, inputCtx.ID, false, client, tracker) + + tracker.Wait() + } + + if ctx.Err() != nil { + return ctx.Err() + } + + state.lastSync = time.Now() + err = state.close(true) + if err != nil { + return fmt.Errorf("unable to commit state: %w", err) + } + + return nil +} + +// runIncrementalUpdate will run an incremental update. The process is similar +// to full synchronization, except only users which have changed (newly +// discovered, modified, or deleted) will be published. +func (p *jamfInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error { + p.logger.Debugf("Running incremental update...") + + state, err := newStateStore(store) + if err != nil { + return fmt.Errorf("unable to begin transaction: %w", err) + } + defer func() { // If commit is successful, call to this close will be no-op. + closeErr := state.close(false) + if closeErr != nil { + p.logger.Errorw("Error rolling back incremental update transaction", "error", closeErr) + } + }() + + ctx := ctxtool.FromCanceller(inputCtx.Cancelation) + updatedDevices, err := p.doFetchComputers(ctx, state, false) + if err != nil { + return err + } + + var tracker *kvstore.TxTracker + if len(updatedDevices) != 0 { + tracker = kvstore.NewTxTracker(ctx) + for _, d := range updatedDevices { + p.publishComputer(d, inputCtx.ID, client, tracker) + } + tracker.Wait() + } + + if ctx.Err() != nil { + return ctx.Err() + } + + state.lastUpdate = time.Now() + if err = state.close(true); err != nil { + return fmt.Errorf("unable to commit state: %w", err) + } + + return nil +} + +// doFetchComputers handles fetching computer and associated user identities from Jamf. +// If a full synchronization from Jamf is performed. +// Returns a set of modified devices by ID. +func (p *jamfInput) doFetchComputers(ctx context.Context, state *stateStore, fullSync bool) ([]*Computer, error) { + var ( + computers []*Computer + query url.Values + err error + ) + if p.cfg.PageSize > 0 { + query = make(url.Values) + query.Set("page-size", strconv.Itoa(p.cfg.PageSize)) + } + for page, n := 0, 0; ; page++ { + if !p.token.IsValidFor(p.cfg.TokenGrace) { + p.token, err = jamf.GetToken(ctx, p.client, p.cfg.JamfTenant, p.cfg.JamfUsername, p.cfg.JamfPassword) + if err != nil { + return nil, fmt.Errorf("failed to get auth token: %w", err) + } + } + + if query != nil { + query.Set("page", strconv.Itoa(page)) + } + resp, err := jamf.GetComputers(ctx, p.client, p.cfg.JamfTenant, p.token, query) + if err != nil { + p.logger.Debugf("received %d computers from API", len(computers)) + return nil, err + } + if len(resp.Results) == 0 { + break + } + p.logger.Debugf("received batch of %d computers from API", len(resp.Results)) + + if fullSync { + for _, c := range resp.Results { + state.storeComputer(c) + } + } else { + for _, c := range resp.Results { + stored, changed := state.storeComputer(c) + if stored == nil { + continue + } + if changed { + computers = append(computers, stored) + } + } + } + + n += len(resp.Results) + if n >= resp.TotalCount { + break + } + } + + p.logger.Debugf("received %d modified computer records from API", len(computers)) + return computers, nil +} + +// publishMarker will publish a write marker document using the given beat.Client. +// If start is true, then it will be a start marker, otherwise an end marker. +func (p *jamfInput) publishMarker(ts, eventTime time.Time, inputID string, start bool, client beat.Client, tracker *kvstore.TxTracker) { + fields := mapstr.M{} + _, _ = fields.Put("labels.identity_source", inputID) + + if start { + _, _ = fields.Put("event.action", "started") + _, _ = fields.Put("event.start", eventTime) + } else { + _, _ = fields.Put("event.action", "completed") + _, _ = fields.Put("event.end", eventTime) + } + + event := beat.Event{ + Timestamp: ts, + Fields: fields, + Private: tracker, + } + tracker.Add() + if start { + p.logger.Debug("Publishing start write marker") + } else { + p.logger.Debug("Publishing end write marker") + } + + client.Publish(event) +} + +// publishComputer will publish a computer document using the given beat.Client. +func (p *jamfInput) publishComputer(c *Computer, inputID string, client beat.Client, tracker *kvstore.TxTracker) { + devDoc := mapstr.M{} + + id := "unknown" + if c.Udid != nil { + id = *c.Udid + } + _, _ = devDoc.Put("jamf", c.Computer) + _, _ = devDoc.Put("labels.identity_source", inputID) + _, _ = devDoc.Put("device.id", id) + + switch c.State { + case Deleted: + _, _ = devDoc.Put("event.action", "device-deleted") + case Discovered: + _, _ = devDoc.Put("event.action", "device-discovered") + case Modified: + _, _ = devDoc.Put("event.action", "device-modified") + } + + event := beat.Event{ + Timestamp: time.Now(), + Fields: devDoc, + Private: tracker, + } + tracker.Add() + + p.logger.Debugf("Publishing computer %q", id) + + client.Publish(event) +} diff --git a/x-pack/filebeat/input/entityanalytics/provider/jamf/jamf_test.go b/x-pack/filebeat/input/entityanalytics/provider/jamf/jamf_test.go new file mode 100644 index 000000000000..5661c42cb88a --- /dev/null +++ b/x-pack/filebeat/input/entityanalytics/provider/jamf/jamf_test.go @@ -0,0 +1,157 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package jamf + +import ( + "context" + "crypto/tls" + "encoding/json" + "flag" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + _ "embed" + + "github.com/google/go-cmp/cmp" + "github.com/google/uuid" + "gopkg.in/natefinch/lumberjack.v2" + + "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/jamf/internal/jamf" + "github.com/elastic/elastic-agent-libs/logp" +) + +var trace = flag.Bool("request_trace", false, "enable request tracing during tests") + +//go:embed internal/jamf/testdata/computers.json +var computers []byte + +func TestJamfDoFetch(t *testing.T) { + dbFilename := t.Name() + ".db" + store := testSetupStore(t, dbFilename) + t.Cleanup(func() { + testCleanupStore(store, dbFilename) + }) + + var ( + wantComputers []*Computer + rawComputers jamf.Computers + ) + err := json.Unmarshal(computers, &rawComputers) + if err != nil { + t.Fatalf("failed to unmarshal device data: %v", err) + } + for _, c := range rawComputers.Results { + wantComputers = append(wantComputers, &Computer{ + Computer: c, + State: Discovered, + }) + } + + // Set the number of repeats. + tenant, username, password, client, cleanup, err := testContext() + if err != nil { + t.Fatalf("unexpected error getting env context: %v", err) + } + defer cleanup() + + a := jamfInput{ + cfg: conf{ + JamfTenant: tenant, + JamfUsername: username, + JamfPassword: password, + }, + client: client, + logger: logp.L(), + } + if *trace { + a.cfg.Tracer = &lumberjack.Logger{ + Filename: "test_trace.ndjson", + } + } + a.client = requestTrace(context.Background(), a.client, a.cfg, a.logger) + + ss, err := newStateStore(store) + if err != nil { + t.Fatalf("unexpected error making state store: %v", err) + } + defer ss.close(false) + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + t.Run("devices", func(t *testing.T) { + got, err := a.doFetchComputers(ctx, ss, false) + if err != nil { + t.Fatalf("unexpected error from doFetch: %v", err) + } + + if wantComputers != nil && !cmp.Equal(wantComputers, got) { + t.Errorf("unexpected result\n--- want\n+++ got\n%s", cmp.Diff(wantComputers, got)) + } + }) +} + +func testContext() (tenant string, username string, password string, client *http.Client, cleanup func(), err error) { + username = "testuser" + password = "testuser_password" + + var tok jamf.Token + mux := http.NewServeMux() + mux.Handle("/api/v1/auth/token", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + user, pass, ok := r.BasicAuth() + if !ok || user != username || pass != password { + w.WriteHeader(http.StatusUnauthorized) + w.Header().Set("content-type", "application/json;charset=UTF-8") + w.Write([]byte("{\n \"httpStatus\" : 401,\n \"errors\" : [ ]\n}")) + return + } + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + w.Header().Set("content-type", "application/json;charset=UTF-8") + w.Write([]byte("{\n \"httpStatus\" : 405,\n \"errors\" : [ ]\n}")) + return + } + tok.Token = uuid.New().String() + tok.Expires = time.Now().In(time.UTC).Add(time.Hour) + fmt.Fprintf(w, "{\n \"token\" : \"%s\",\n \"expires\" : \"%s\"\n}", tok.Token, tok.Expires.Format(time.RFC3339)) + })) + mux.Handle("/api/preview/computers", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Authorization") != "Bearer "+tok.Token || !tok.IsValidFor(0) { + w.WriteHeader(http.StatusUnauthorized) + w.Header().Set("content-type", "application/json;charset=UTF-8") + w.Write([]byte("{\n \"httpStatus\" : 401,\n \"errors\" : [ {\n \"code\" : \"INVALID_TOKEN\",\n \"description\" : \"Unauthorized\",\n \"id\" : \"0\",\n \"field\" : null\n } ]\n}")) + return + } + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + w.Header().Set("content-type", "application/json;charset=UTF-8") + w.Write([]byte("{\n \"httpStatus\" : 405,\n \"errors\" : [ ]\n}")) + return + } + w.Write(computers) + })) + + srv := httptest.NewTLSServer(mux) + u, err := url.Parse(srv.URL) + if err != nil { + srv.Close() + return "", "", "", nil, func() {}, err + } + tenant = u.Host + + cli := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + }, + } + + return tenant, username, password, cli, srv.Close, nil +} diff --git a/x-pack/filebeat/input/entityanalytics/provider/jamf/metrics.go b/x-pack/filebeat/input/entityanalytics/provider/jamf/metrics.go new file mode 100644 index 000000000000..54186b87fee8 --- /dev/null +++ b/x-pack/filebeat/input/entityanalytics/provider/jamf/metrics.go @@ -0,0 +1,50 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package jamf + +import ( + "github.com/rcrowley/go-metrics" + + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/monitoring/adapter" +) + +// inputMetrics defines metrics for this provider. +type inputMetrics struct { + unregister func() + + syncTotal *monitoring.Uint // The total number of full synchronizations. + syncError *monitoring.Uint // The number of full synchronizations that failed due to an error. + syncProcessingTime metrics.Sample // Histogram of the elapsed full synchronization times in nanoseconds (time of API contact to items sent to output). + updateTotal *monitoring.Uint // The total number of incremental updates. + updateError *monitoring.Uint // The number of incremental updates that failed due to an error. + updateProcessingTime metrics.Sample // Histogram of the elapsed incremental update times in nanoseconds (time of API contact to items sent to output). +} + +// Close removes metrics from the registry. +func (m *inputMetrics) Close() { + m.unregister() +} + +// newMetrics creates a new instance for gathering metrics. +func newMetrics(id string, optionalParent *monitoring.Registry) *inputMetrics { + reg, unreg := inputmon.NewInputRegistry(FullName, id, optionalParent) + + out := inputMetrics{ + unregister: unreg, + syncTotal: monitoring.NewUint(reg, "sync_total"), + syncError: monitoring.NewUint(reg, "sync_error"), + syncProcessingTime: metrics.NewUniformSample(1024), + updateTotal: monitoring.NewUint(reg, "update_total"), + updateError: monitoring.NewUint(reg, "update_error"), + updateProcessingTime: metrics.NewUniformSample(1024), + } + + adapter.NewGoMetrics(reg, "sync_processing_time", adapter.Accept).Register("histogram", metrics.NewHistogram(out.syncProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. + adapter.NewGoMetrics(reg, "update_processing_time", adapter.Accept).Register("histogram", metrics.NewHistogram(out.updateProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. + + return &out +} diff --git a/x-pack/filebeat/input/entityanalytics/provider/jamf/state_string.go b/x-pack/filebeat/input/entityanalytics/provider/jamf/state_string.go new file mode 100644 index 000000000000..eae45772b8f1 --- /dev/null +++ b/x-pack/filebeat/input/entityanalytics/provider/jamf/state_string.go @@ -0,0 +1,30 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// Code generated by "stringer -type State"; DO NOT EDIT. + +package jamf + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[Discovered-1] + _ = x[Modified-2] + _ = x[Deleted-3] +} + +const _State_name = "DiscoveredModifiedDeleted" + +var _State_index = [...]uint8{0, 10, 18, 25} + +func (i State) String() string { + i -= 1 + if i < 0 || i >= State(len(_State_index)-1) { + return "State(" + strconv.FormatInt(int64(i+1), 10) + ")" + } + return _State_name[_State_index[i]:_State_index[i+1]] +} diff --git a/x-pack/filebeat/input/entityanalytics/provider/jamf/statestore.go b/x-pack/filebeat/input/entityanalytics/provider/jamf/statestore.go new file mode 100644 index 000000000000..ffe2d0714cb6 --- /dev/null +++ b/x-pack/filebeat/input/entityanalytics/provider/jamf/statestore.go @@ -0,0 +1,198 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package jamf + +import ( + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/internal/kvstore" + "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/jamf/internal/jamf" +) + +var ( + computersBucket = []byte("computers") + stateBucket = []byte("state") + + lastSyncKey = []byte("last_sync") + lastUpdateKey = []byte("last_update") + computersLinkKey = []byte("devices_link") +) + +//go:generate stringer -type State +//go:generate go-licenser -license Elastic +type State int + +const ( + Discovered State = iota + 1 + Modified + Deleted +) + +type Computer struct { + jamf.Computer `json:"properties"` + State State `json:"state"` +} + +// stateStore wraps a kvstore.Transaction and provides convenience methods for +// accessing and store relevant data within the kvstore database. +type stateStore struct { + tx *kvstore.Transaction + + // lastSync and lastUpdate are the times of the first update + // or sync operation of users/devices. + lastSync time.Time + lastUpdate time.Time + computers map[string]*Computer +} + +// newStateStore creates a new instance of stateStore. It will open a new write +// transaction on the kvstore and load values from the database. Since this +// opens a write transaction, only one instance of stateStore may be created +// at a time. The close function must be called to release the transaction lock +// on the kvstore database. +func newStateStore(store *kvstore.Store) (*stateStore, error) { + tx, err := store.BeginTx(true) + if err != nil { + return nil, fmt.Errorf("unable to open state store transaction: %w", err) + } + + s := stateStore{ + computers: make(map[string]*Computer), + tx: tx, + } + + err = s.tx.Get(stateBucket, lastSyncKey, &s.lastSync) + if err != nil && !errIsItemNotFound(err) { + return nil, fmt.Errorf("unable to get last sync time from state: %w", err) + } + err = s.tx.Get(stateBucket, lastUpdateKey, &s.lastUpdate) + if err != nil && !errIsItemNotFound(err) { + return nil, fmt.Errorf("unable to get last update time from state: %w", err) + } + + err = s.tx.ForEach(computersBucket, func(key, value []byte) error { + var c Computer + err = json.Unmarshal(value, &c) + if err != nil { + return fmt.Errorf("unable to unmarshal computer from state: %w", err) + } + if c.Udid == nil { + return fmt.Errorf("did not get computer id from state: %s", value) + } + s.computers[*c.Udid] = &c + + return nil + }) + if err != nil && !errIsItemNotFound(err) { + return nil, fmt.Errorf("unable to get devices from state: %w", err) + } + + return &s, nil +} + +// storeComputer stores a computer. If the computer does not exist in the store, +// then the computer will be marked as discovered. Otherwise, the user will be +// marked as modified or deleted depending on the state of the IsManaged field. +// changed will be returned true if the record is updated in any way. +func (s *stateStore) storeComputer(c jamf.Computer) (_ *Computer, changed bool) { + if c.Udid == nil { + return nil, false + } + stored, ok := s.computers[*c.Udid] + if !ok { + // Whether this is managed or not, it is discovered. The next sync + // will change its state to Deleted if it is unmanaged. + curr := &Computer{Computer: c, State: Discovered} + s.computers[*c.Udid] = curr + return curr, true + } + + changed = !c.Equal(stored.Computer) + stored.Computer = c + if c.IsManaged != nil || !*c.IsManaged { // Assume no flag means unmanaged. + stored.State = Deleted + return stored, changed + } + if changed { + stored.State = Modified + } + return stored, changed +} + +// close will close out the stateStore. If commit is true, the staged values on the +// stateStore will be set in the kvstore database, and the transaction will be +// committed. Otherwise, all changes will be discarded and the transaction will +// be rolled back. The stateStore must NOT be used after close is called, rather, +// a new stateStore should be created. +func (s *stateStore) close(commit bool) (err error) { + if !commit { + return s.tx.Rollback() + } + + // Fallback in case one of the statements below fails. If everything is + // successful and Commit is called, then this call to Rollback will be a no-op. + defer func() { + if err == nil { + return + } + rollbackErr := s.tx.Rollback() + if rollbackErr != nil { + err = fmt.Errorf("multiple errors during statestore close: %w", errors.Join(err, rollbackErr)) + } + }() + + if !s.lastSync.IsZero() { + err = s.tx.Set(stateBucket, lastSyncKey, &s.lastSync) + if err != nil { + return fmt.Errorf("unable to save last sync time to state: %w", err) + } + } + if !s.lastUpdate.IsZero() { + err = s.tx.Set(stateBucket, lastUpdateKey, &s.lastUpdate) + if err != nil { + return fmt.Errorf("unable to save last update time to state: %w", err) + } + } + + for key, value := range s.computers { + err = s.tx.Set(computersBucket, []byte(key), value) + if err != nil { + return fmt.Errorf("unable to save device %q to state: %w", key, err) + } + } + + return s.tx.Commit() +} + +// getLastSync retrieves the last full synchronization time from the kvstore +// database. If the value doesn't exist, a zero time.Time is returned. +func getLastSync(store *kvstore.Store) (time.Time, error) { + var t time.Time + err := store.RunTransaction(false, func(tx *kvstore.Transaction) error { + return tx.Get(stateBucket, lastSyncKey, &t) + }) + + return t, err +} + +// getLastUpdate retrieves the last incremental update time from the kvstore +// database. If the value doesn't exist, a zero time.Time is returned. +func getLastUpdate(store *kvstore.Store) (time.Time, error) { + var t time.Time + err := store.RunTransaction(false, func(tx *kvstore.Transaction) error { + return tx.Get(stateBucket, lastUpdateKey, &t) + }) + + return t, err +} + +// errIsItemNotFound returns true if the error represents an item not found +// error (bucket not found or key not found). +func errIsItemNotFound(err error) bool { + return errors.Is(err, kvstore.ErrBucketNotFound) || errors.Is(err, kvstore.ErrKeyNotFound) +} diff --git a/x-pack/filebeat/input/entityanalytics/provider/jamf/statestore_test.go b/x-pack/filebeat/input/entityanalytics/provider/jamf/statestore_test.go new file mode 100644 index 000000000000..266a129aa4dd --- /dev/null +++ b/x-pack/filebeat/input/entityanalytics/provider/jamf/statestore_test.go @@ -0,0 +1,226 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package jamf + +import ( + "bytes" + "encoding/json" + "errors" + "os" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/internal/kvstore" + "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/jamf/internal/jamf" + "github.com/elastic/elastic-agent-libs/logp" +) + +func TestStateStore(t *testing.T) { + lastSync, err := time.Parse(time.RFC3339Nano, "2023-01-12T08:47:23.296794-05:00") + if err != nil { + t.Fatalf("failed to parse lastSync") + } + lastUpdate, err := time.Parse(time.RFC3339Nano, "2023-01-12T08:50:04.546457-05:00") + if err != nil { + t.Fatalf("failed to parse lastUpdate") + } + + const computersLink = "computers-link" + + t.Run("new", func(t *testing.T) { + dbFilename := "TestStateStore_New.db" + store := testSetupStore(t, dbFilename) + t.Cleanup(func() { + testCleanupStore(store, dbFilename) + }) + + // Inject test values into store. + data := []struct { + key []byte + val any + }{ + {key: lastSyncKey, val: lastSync}, + {key: lastUpdateKey, val: lastUpdate}, + {key: computersLinkKey, val: computersLink}, + } + for _, kv := range data { + err := store.RunTransaction(true, func(tx *kvstore.Transaction) error { + return tx.Set(stateBucket, kv.key, kv.val) + }) + if err != nil { + t.Fatalf("failed to set %s: %v", kv.key, err) + } + } + + ss, err := newStateStore(store) + if err != nil { + t.Fatalf("failed to make new store: %v", err) + } + defer ss.close(false) + + checks := []struct { + name string + got, want any + }{ + {name: "lastSync", got: ss.lastSync, want: lastSync}, + {name: "lastUpdate", got: ss.lastUpdate, want: lastUpdate}, + } + for _, c := range checks { + if !cmp.Equal(c.got, c.want) { + t.Errorf("unexpected results for %s: got:%#v want:%#v", c.name, c.got, c.want) + } + } + }) + + t.Run("close", func(t *testing.T) { + dbFilename := "TestStateStore_Close.db" + store := testSetupStore(t, dbFilename) + t.Cleanup(func() { + testCleanupStore(store, dbFilename) + }) + + wantDevices := map[string]*Computer{ + "deviceid": { + State: Discovered, + Computer: jamf.Computer{}, + }, + } + + ss, err := newStateStore(store) + if err != nil { + t.Fatalf("failed to make new store: %v", err) + } + ss.lastSync = lastSync + ss.lastUpdate = lastUpdate + ss.computers = wantDevices + + err = ss.close(true) + if err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + roundTripChecks := []struct { + name string + key []byte + val any + }{ + {name: "lastSyncKey", key: lastSyncKey, val: &ss.lastSync}, + {name: "lastUpdateKey", key: lastUpdateKey, val: &ss.lastUpdate}, + } + for _, check := range roundTripChecks { + want, err := json.Marshal(check.val) + if err != nil { + t.Errorf("unexpected error marshaling %s: %v", check.name, err) + } + var got []byte + err = store.RunTransaction(false, func(tx *kvstore.Transaction) error { + got, err = tx.GetBytes(stateBucket, check.key) + return err + }) + if err != nil { + t.Errorf("unexpected error from store run transaction %s: %v", check.name, err) + } + if !bytes.Equal(got, want) { + t.Errorf("unexpected result after store round-trip for %s: got:%s want:%s", check.name, got, want) + } + } + }) + + t.Run("get_last_sync", func(t *testing.T) { + dbFilename := "TestGetLastSync.db" + store := testSetupStore(t, dbFilename) + t.Cleanup(func() { + testCleanupStore(store, dbFilename) + }) + + err := store.RunTransaction(true, func(tx *kvstore.Transaction) error { + return tx.Set(stateBucket, lastSyncKey, lastSync) + }) + if err != nil { + t.Fatalf("failed to set value: %v", err) + } + + got, err := getLastSync(store) + if err != nil { + t.Errorf("unexpected error from getLastSync: %v", err) + } + if !lastSync.Equal(got) { + t.Errorf("unexpected result from getLastSync: got:%v want:%v", got, lastSync) + } + }) + + t.Run("get_last_update", func(t *testing.T) { + dbFilename := "TestGetLastUpdate.db" + store := testSetupStore(t, dbFilename) + t.Cleanup(func() { + testCleanupStore(store, dbFilename) + }) + + err := store.RunTransaction(true, func(tx *kvstore.Transaction) error { + return tx.Set(stateBucket, lastUpdateKey, lastUpdate) + }) + if err != nil { + t.Fatalf("failed to set value: %v", err) + } + + got, err := getLastUpdate(store) + if err != nil { + t.Errorf("unexpected error from getLastUpdate: %v", err) + } + if !lastUpdate.Equal(got) { + t.Errorf("unexpected result from getLastUpdate: got:%v want:%v", got, lastUpdate) + } + }) +} + +func TestErrIsItemFound(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "bucket-not-found", + err: kvstore.ErrBucketNotFound, + want: true, + }, + { + name: "key-not-found", + err: kvstore.ErrKeyNotFound, + want: true, + }, + { + name: "invalid error", + err: errors.New("test error"), + want: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := errIsItemNotFound(test.err) + if got != test.want { + t.Errorf("unexpected result for %s: got:%t want:%t", test.name, got, test.want) + } + }) + } +} + +func testSetupStore(t *testing.T, path string) *kvstore.Store { + t.Helper() + + store, err := kvstore.NewStore(logp.L(), path, 0644) + if err != nil { + t.Fatalf("unexpected error making store: %v", err) + } + return store +} + +func testCleanupStore(store *kvstore.Store, path string) { + _ = store.Close() + _ = os.Remove(path) +}