Skip to content

Commit

Permalink
Use dedicated infx instance
Browse files Browse the repository at this point in the history
  • Loading branch information
awegrzyn committed Nov 10, 2023
1 parent 278c774 commit f003f8b
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions examples/12-KafkaToInfluxDb.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ int main(int argc, char* argv[])
("kafka-host", boost::program_options::value<std::string>()->required(), "Kafka broker hostname")
("influxdb-url", boost::program_options::value<std::string>()->required(), "InfluxDB hostname")
("influxdb-token", boost::program_options::value<std::string>()->required(), "InfluxDB token")
("influxdb-orgid", boost::program_options::value<std::string>(), "InfluxDB organization ID")
("influxdb-org", boost::program_options::value<std::string>()->default_value("cern"), "InfluxDB organisation")
("influxdb-bucket", boost::program_options::value<std::string>()->default_value("aliecs"), "InfluxDB bucket");
("influxdb-bucket", boost::program_options::value<std::string>()->default_value("aliecs"), "InfluxDB bucket")
("influxdb-dpl-url", boost::program_options::value<std::string>(), "InfluxDB DPL ID")
("influxdb-dpl-orgid", boost::program_options::value<std::string>(), "InfluxDB DPL organization ID")
("influxdb-dpl-token", boost::program_options::value<std::string>(), "InfluxDB DPL token");
boost::program_options::variables_map vm;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
boost::program_options::notify(vm);
Expand All @@ -58,8 +60,12 @@ int main(int argc, char* argv[])
httpTransport->addHeader("Authorization: Token " + vm["influxdb-token"].as<std::string>());
auto influxdbBackend = std::make_unique<backends::InfluxDB>(std::move(httpTransport));

auto influxBucketApi = std::make_unique<transports::HTTP>(vm["influxdb-url"].as<std::string>() + "/api/v2/buckets");
influxBucketApi->addHeader("Authorization: Token " + vm["influxdb-token"].as<std::string>());
std::unique_ptr<transports::HTTP> influxBucketApi;
if (vm.count("influxdb-dpl-orgid") && vm.count("influxdb-dpl-url") && vm.count("influxdb-dpl-token")) {
MonLogger::Get() << "Creating bucket HTTP API for " << vm["influxdb-dpl-url"].as<std::string>() << MonLogger::End();
influxBucketApi.reset(new transports::HTTP(vm["influxdb-dpl-url"].as<std::string>() + "/api/v2/buckets"));
influxBucketApi->addHeader("Authorization: Token " + vm["influxdb-dpl-token"].as<std::string>());
}

for (;;) {
auto changes = kafkaConsumer->pull();
Expand All @@ -70,21 +76,21 @@ int main(int argc, char* argv[])
if (stateChange.envinfo().state().empty()) {
continue;
}
int run = stateChange.envinfo().runnumber();
auto metric = Metric{"run_times"};
if (change.first.find("leave") != std::string::npos) {
metric.addValue(stateChange.timestamp(), "eor");
MonLogger::Get() << stateChange.envinfo().environmentid() << "/" << stateChange.envinfo().runnumber() << " " << change.first << " EOR: " << stateChange.timestamp() << MonLogger::End();
} else {
metric.addValue(stateChange.envinfo().runtype(), "type").addValue(stateChange.envinfo().enterstatetimestamp(), "sor");
MonLogger::Get() << stateChange.envinfo().environmentid() << "/" << stateChange.envinfo().runnumber() << " " << change.first << " SOR: " <<stateChange.envinfo().enterstatetimestamp() << MonLogger::End();
if (vm.count("influxdb-dpl-orgid") && run > 1) {
MonLogger::Get() << "Request sent to create bucket " << stateChange.envinfo().runnumber() << " on " << vm["influxdb-dpl-url"].as<std::string>() << MonLogger::End();
influxBucketApi->send(getCreateBucketBody(vm["influxdb-dpl-orgid"].as<std::string>(), stateChange.envinfo().runnumber()));
}
}
int run = stateChange.envinfo().runnumber();
if (run > 1) {
influxdbBackend->sendWithRun(metric, stateChange.envinfo().environmentid(), std::to_string(run));
if (vm.count("influxdb-orgid")) {
MonLogger::Get() << "Request sent to create bucket " << stateChange.envinfo().runnumber() << " on " << vm["influxdb-url"].as<std::string>() << MonLogger::End();
influxBucketApi->send(getCreateBucketBody(vm["influxdb-orgid"].as<std::string>(), stateChange.envinfo().runnumber()));
}
}
}
}
Expand Down

0 comments on commit f003f8b

Please sign in to comment.