Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I tried to pass elasticapmtraceparent in kafka header, but the consumer's link time is not continuous #2135

Open
WangJunZzz opened this issue Jul 14, 2023 · 0 comments
Labels

Comments

@WangJunZzz
Copy link

Confluent.Kafka Version:1.9.0
Elastic.Apm Version: 1.22.0

Steps to reproduce the behavior:

  1. produce code
 public async Task SendMessageAsync(string message, string topic)
    {
        var transaction = Agent.Tracer.CurrentTransaction;
        var span = transaction.StartSpan("publish", "kafka");
        var config = new ProducerConfig
        {
            BootstrapServers = "localhost:29092",
            Acks = Acks.All,
            MessageSendMaxRetries = 5,
            MessageTimeoutMs = 5000,
            RequestTimeoutMs = 3000
        };
        var headers = new Headers
        {
            // KafkaConsts.elasticapmtraceparent="elasticapmtraceparent"
            { KafkaConsts.elasticapmtraceparent, Encoding.UTF8.GetBytes(span.OutgoingDistributedTracingData.SerializeToString()) }
        };

        using (var p = new ProducerBuilder<Null, string>(config).Build())
        {
            try
            {
                var dr = await p.ProduceAsync(
                    topic,
                    new Message<Null, string> { Value = message, Headers = headers }
                ).ConfigureAwait(false);
                Console.WriteLine($"topic:{topic};message:{message};");
            }
            catch (ProduceException<Null, string> e)
            {
                Console.WriteLine($"Delivery failed: {e.Error.Reason}");
            }
        }

        span.End();
    }
  1. consume code
 public void Subscribe(string groupId, string topic)
    {
        var conf = new ConsumerConfig
        {
            GroupId = groupId,
            BootstrapServers = "localhost:29092",
            AutoOffsetReset = AutoOffsetReset.Earliest,
            EnableAutoCommit = false 
        };

        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe(topic);
            CancellationTokenSource cts = new CancellationTokenSource();
            try
            {
                while (true)
                {
                    try
                    {
                        var result = c.Consume(cts.Token);
                        result.Message.Headers.TryGetLastBytes(KafkaConsts.elasticapmtraceparent, out var traceParent);
                        var traceParentString = Encoding.UTF8.GetString(traceParent);
                        var tryDeserializeFromString = DistributedTracingData.TryDeserializeFromString(traceParentString);
                        var transaction = Agent.Tracer.StartTransaction("receive","kafka",tryDeserializeFromString);
                        Thread.Sleep(2000);
                        c.Commit(result);
                        transaction?.End();
                    }
                    catch (ConsumeException e)
                    {
                        System.Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e.Message);
                    }
                }
            }
            catch (OperationCanceledException)
            {
                c.Close();
            }
        }
    }
  1. definition BackgroundService
public class Bootstrapper : BackgroundService
{
    private readonly ConsumerManager _consumerManager;

    public Bootstrapper(ConsumerManager consumerManager)
    {
        _consumerManager = consumerManager;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await Task.Factory.StartNew(() =>
        {
            _consumerManager.Subscribe("groupName", "topicName");
            
        }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    }
}
// builder.Services.AddHostedService<Bootstrapper>().AddTransient<ConsumerManager>();
  1. send message
[ApiController]
[Route("[controller]")]
public class WeatherForecastController : ControllerBase
{
    private readonly ProduceManager _produceManager;

    public WeatherForecastController(ProduceManager produceManager)
    {
        _produceManager = produceManager;
    }

    [HttpGet(Name = "GetWeatherForecast")]
    public async Task Get()
    {
        await _produceManager.SendMessageAsync("message", "topicName");
    }
}
  1. result
    image

  2. problem
    Why doesn't the time in place 1 show the time in place 2?
    I expect the total time in 1 to be shown to be 2,022ms
    The same code rabbitmq is fine

@WangJunZzz WangJunZzz added the bug Something isn't working label Jul 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant