Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Analytics Adapter: agma #3400

Merged
merged 10 commits into from
Mar 12, 2024
28 changes: 28 additions & 0 deletions analytics/agma/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# agma Analytics

In order to use the Agma Analytics Adapter, please adjust the accounts / endpoint with the data provided by agma (https://www.agma-mmc.de).

## Configuration

```yaml
analytics:
agma:
# Required: enable the module
enabled: true
hhhjort marked this conversation as resolved.
Show resolved Hide resolved
# Required: set the accounts you want to track
accounts:
- code: "my-code" # Required: provied by agma
publisher_id: "123" # Required: Exchange specific publisher_id
site_app_id: "openrtb2-site.id-or-app.id" # optional: scope to the publisher with an openrtb2 Site object id or App object id
# Optional properties (advanced configuration)
endpoint:
url: "https://go.pbs.agma-analytics.de/v1/prebid-server" # Check with agma if your site needs an extra url
timeout: "2s"
gzip: true
buffers: # Flush events when (first condition reached)
# Size of the buffer in bytes
size: "2MB" # greater than 2MB (size using SI standard eg. "44kB", "17MB")
count : 100 # greater than 100 events
timeout: "15m" # greater than 15 minutes (parsed as golang duration)

```
266 changes: 266 additions & 0 deletions analytics/agma/agma_module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
package agma

import (
"bytes"
"errors"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/benbjohnson/clock"
"github.com/docker/go-units"
"github.com/golang/glog"
"github.com/prebid/go-gdpr/vendorconsent"
"github.com/prebid/prebid-server/v2/analytics"
"github.com/prebid/prebid-server/v2/config"
"github.com/prebid/prebid-server/v2/openrtb_ext"
)

type httpSender = func(payload []byte) error

const (
agmaGVLID = 1122
p9 = 9
)

type AgmaLogger struct {
sender httpSender
clock clock.Clock
accounts []config.AgmaAnalyticsAccount
eventCount int64
maxEventCount int64
maxBufferByteSize int64
maxDuration time.Duration
mux sync.RWMutex
sigTermCh chan os.Signal
buffer bytes.Buffer
bufferCh chan []byte
}

func newAgmaLogger(cfg config.AgmaAnalytics, sender httpSender, clock clock.Clock) (*AgmaLogger, error) {
pSize, err := units.FromHumanSize(cfg.Buffers.BufferSize)
if err != nil {
return nil, err
}
pDuration, err := time.ParseDuration(cfg.Buffers.Timeout)
if err != nil {
return nil, err
}
if len(cfg.Accounts) == 0 {
return nil, errors.New("Please configure at least one account for Agma Analytics")
}

buffer := bytes.Buffer{}
buffer.Write([]byte("["))

return &AgmaLogger{
sender: sender,
clock: clock,
accounts: cfg.Accounts,
maxBufferByteSize: pSize,
eventCount: 0,
maxEventCount: int64(cfg.Buffers.EventCount),
maxDuration: pDuration,
buffer: buffer,
bufferCh: make(chan []byte),
sigTermCh: make(chan os.Signal, 1),
}, nil
}

func NewModule(httpClient *http.Client, cfg config.AgmaAnalytics, clock clock.Clock) (analytics.Module, error) {
sender, err := createHttpSender(httpClient, cfg.Endpoint)
if err != nil {
return nil, err
}

m, err := newAgmaLogger(cfg, sender, clock)
if err != nil {
return nil, err
}

signal.Notify(m.sigTermCh, os.Interrupt, syscall.SIGTERM)

go m.start()

return m, nil
}

func (l *AgmaLogger) start() {
ticker := l.clock.Ticker(l.maxDuration)
for {
select {
case <-l.sigTermCh:
glog.Infof("[AgmaAnalytics] Received Close, trying to flush buffer")
l.flush()
return
case event := <-l.bufferCh:
l.bufferEvent(event)
if l.isFull() {
l.flush()
}
case <-ticker.C:
l.flush()
}
}
}

func (l *AgmaLogger) bufferEvent(data []byte) {
l.mux.Lock()
defer l.mux.Unlock()

l.buffer.Write(data)
l.buffer.WriteByte(',')
l.eventCount++
}

func (l *AgmaLogger) isFull() bool {
l.mux.RLock()
defer l.mux.RUnlock()
return l.eventCount >= l.maxEventCount || int64(l.buffer.Len()) >= l.maxBufferByteSize
}

func (l *AgmaLogger) flush() {
l.mux.Lock()

hhhjort marked this conversation as resolved.
Show resolved Hide resolved
if l.eventCount == 0 || l.buffer.Len() == 0 {
l.mux.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you can remove all of the Unlock calls, and instead utilize a defer l.mux.Unlock() instead to ensure it's calling when the function returns. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the initial design i had in the first Adapter, as per #3299 (comment) - which this PR is based of - I'm handling this explicitly

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there may be another solution to the issue:

defer {
    l.reset()
    l.mux.Unlock() }

This would prevent the reset and unlock becoming detached from each other. But in this case you don't want this pattern, as you have a path that doesn't call l.reset().

return
}

// Close the json array, remove last ,
l.buffer.Truncate(l.buffer.Len() - 1)
l.buffer.Write([]byte("]"))

payload := make([]byte, l.buffer.Len())
_, err := l.buffer.Read(payload)
if err != nil {
l.reset()
l.mux.Unlock()
glog.Warning("[AgmaAnalytics] fail to copy the buffer")
return
Comment on lines +139 to +143
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to cover these error lines?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I believe this situation can only occur if the buffer is empty and does not reset. However, this would effectively test the buffer's Read functionality. Do you have a specific test case in mind?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay to leave it uncovered.

}

go l.sender(payload)

l.reset()
l.mux.Unlock()
}

func (l *AgmaLogger) reset() {
l.buffer.Reset()
l.buffer.Write([]byte("["))
l.eventCount = 0
}

func (l *AgmaLogger) extractPublisherAndSite(requestWrapper *openrtb_ext.RequestWrapper) (string, string) {
publisherId := ""
appSiteId := ""
if requestWrapper.Site != nil {
if requestWrapper.Site.Publisher != nil {
publisherId = requestWrapper.Site.Publisher.ID
}
appSiteId = requestWrapper.Site.ID
}
if requestWrapper.App != nil {
if requestWrapper.App.Publisher != nil {
publisherId = requestWrapper.App.Publisher.ID
}
appSiteId = requestWrapper.App.ID
}
return publisherId, appSiteId
}

func (l *AgmaLogger) shouldTrackEvent(requestWrapper *openrtb_ext.RequestWrapper) (bool, string) {
userExt, err := requestWrapper.GetUserExt()
if err != nil || userExt == nil {
return false, ""
}
consent := userExt.GetConsent()
if consent == nil {
return false, ""
}
consentStr := *consent
parsedConsent, err := vendorconsent.ParseString(consentStr)
if err != nil {
return false, ""
}

p9Allowed := parsedConsent.PurposeAllowed(p9)
agmaAllowed := parsedConsent.VendorConsent(agmaGVLID)
if !p9Allowed || !agmaAllowed {
return false, ""
}

publisherId, appSiteId := l.extractPublisherAndSite(requestWrapper)
if publisherId == "" && appSiteId == "" {
return false, ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you cover this line?

}

for _, account := range l.accounts {
if account.PublisherId == publisherId {
if account.SiteAppId == "" {
return true, account.Code
}
if account.SiteAppId == appSiteId {
return true, account.Code
}
}
}

return false, ""
}

func (l *AgmaLogger) LogAuctionObject(event *analytics.AuctionObject) {
if event == nil || event.Status != http.StatusOK || event.RequestWrapper == nil {
return
}
shouldTrack, code := l.shouldTrackEvent(event.RequestWrapper)
if !shouldTrack {
return
}
data, err := serializeAnayltics(event.RequestWrapper, EventTypeAuction, code, event.StartTime)
if err != nil {
glog.Errorf("[AgmaAnalytics] Error serializing auction object: %v", err)
return
}
l.bufferCh <- data
}

func (l *AgmaLogger) LogAmpObject(event *analytics.AmpObject) {
if event == nil || event.Status != http.StatusOK || event.RequestWrapper == nil {
return
}
shouldTrack, code := l.shouldTrackEvent(event.RequestWrapper)
if !shouldTrack {
return
}
data, err := serializeAnayltics(event.RequestWrapper, EventTypeAmp, code, event.StartTime)
if err != nil {
glog.Errorf("[AgmaAnalytics] Error serializing amp object: %v", err)
return
}
l.bufferCh <- data
}

func (l *AgmaLogger) LogVideoObject(event *analytics.VideoObject) {
if event == nil || event.Status != http.StatusOK || event.RequestWrapper == nil {
return
}
shouldTrack, code := l.shouldTrackEvent(event.RequestWrapper)
if !shouldTrack {
return
}
data, err := serializeAnayltics(event.RequestWrapper, EventTypeVideo, code, event.StartTime)
if err != nil {
glog.Errorf("[AgmaAnalytics] Error serializing video object: %v", err)
return
}
l.bufferCh <- data
}

func (l *AgmaLogger) LogCookieSyncObject(event *analytics.CookieSyncObject) {}
func (l *AgmaLogger) LogNotificationEventObject(event *analytics.NotificationEvent) {}
func (l *AgmaLogger) LogSetUIDObject(event *analytics.SetUIDObject) {}
Loading
Loading