From f003f8beede1a9d7ea77a3224ef990ff2140bfe7 Mon Sep 17 00:00:00 2001 From: Adam Wegrzynek Date: Fri, 10 Nov 2023 11:50:37 +0100 Subject: [PATCH] Use dedicated infx instance --- examples/12-KafkaToInfluxDb.cxx | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/examples/12-KafkaToInfluxDb.cxx b/examples/12-KafkaToInfluxDb.cxx index 1e594b47..218f4614 100644 --- a/examples/12-KafkaToInfluxDb.cxx +++ b/examples/12-KafkaToInfluxDb.cxx @@ -40,9 +40,11 @@ int main(int argc, char* argv[]) ("kafka-host", boost::program_options::value()->required(), "Kafka broker hostname") ("influxdb-url", boost::program_options::value()->required(), "InfluxDB hostname") ("influxdb-token", boost::program_options::value()->required(), "InfluxDB token") - ("influxdb-orgid", boost::program_options::value(), "InfluxDB organization ID") ("influxdb-org", boost::program_options::value()->default_value("cern"), "InfluxDB organisation") - ("influxdb-bucket", boost::program_options::value()->default_value("aliecs"), "InfluxDB bucket"); + ("influxdb-bucket", boost::program_options::value()->default_value("aliecs"), "InfluxDB bucket") + ("influxdb-dpl-url", boost::program_options::value(), "InfluxDB DPL ID") + ("influxdb-dpl-orgid", boost::program_options::value(), "InfluxDB DPL organization ID") + ("influxdb-dpl-token", boost::program_options::value(), "InfluxDB DPL token"); boost::program_options::variables_map vm; boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm); boost::program_options::notify(vm); @@ -58,8 +60,12 @@ int main(int argc, char* argv[]) httpTransport->addHeader("Authorization: Token " + vm["influxdb-token"].as()); auto influxdbBackend = std::make_unique(std::move(httpTransport)); - auto influxBucketApi = std::make_unique(vm["influxdb-url"].as() + "/api/v2/buckets"); - influxBucketApi->addHeader("Authorization: Token " + vm["influxdb-token"].as()); + std::unique_ptr influxBucketApi; + if (vm.count("influxdb-dpl-orgid") && vm.count("influxdb-dpl-url") && vm.count("influxdb-dpl-token")) { + MonLogger::Get() << "Creating bucket HTTP API for " << vm["influxdb-dpl-url"].as() << MonLogger::End(); + influxBucketApi.reset(new transports::HTTP(vm["influxdb-dpl-url"].as() + "/api/v2/buckets")); + influxBucketApi->addHeader("Authorization: Token " + vm["influxdb-dpl-token"].as()); + } for (;;) { auto changes = kafkaConsumer->pull(); @@ -70,6 +76,7 @@ int main(int argc, char* argv[]) if (stateChange.envinfo().state().empty()) { continue; } + int run = stateChange.envinfo().runnumber(); auto metric = Metric{"run_times"}; if (change.first.find("leave") != std::string::npos) { metric.addValue(stateChange.timestamp(), "eor"); @@ -77,14 +84,13 @@ int main(int argc, char* argv[]) } else { metric.addValue(stateChange.envinfo().runtype(), "type").addValue(stateChange.envinfo().enterstatetimestamp(), "sor"); MonLogger::Get() << stateChange.envinfo().environmentid() << "/" << stateChange.envinfo().runnumber() << " " << change.first << " SOR: " < 1) { + MonLogger::Get() << "Request sent to create bucket " << stateChange.envinfo().runnumber() << " on " << vm["influxdb-dpl-url"].as() << MonLogger::End(); + influxBucketApi->send(getCreateBucketBody(vm["influxdb-dpl-orgid"].as(), stateChange.envinfo().runnumber())); + } } - int run = stateChange.envinfo().runnumber(); if (run > 1) { influxdbBackend->sendWithRun(metric, stateChange.envinfo().environmentid(), std::to_string(run)); - if (vm.count("influxdb-orgid")) { - MonLogger::Get() << "Request sent to create bucket " << stateChange.envinfo().runnumber() << " on " << vm["influxdb-url"].as() << MonLogger::End(); - influxBucketApi->send(getCreateBucketBody(vm["influxdb-orgid"].as(), stateChange.envinfo().runnumber())); - } } } }