इसे छोड़कर कंटेंट पर जाएं

Apache Kafka client integration

यह कंटेंट अभी तक आपकी भाषा में उपलब्ध नहीं है।

Apache Kafka logo

To get started with the Aspire Apache Kafka client integration, install the 📦 Aspire.Confluent.Kafka NuGet package:

.NET CLI — Add Aspire.Confluent.Kafka package
dotnet add package Aspire.Confluent.Kafka

In the Program.cs file of your client-consuming project, call the AddKafkaProducer extension method to register an IProducer<TKey, TValue> for use via the dependency injection container:

builder.AddKafkaProducer<string, string>("kafka");

You can then retrieve the IProducer<TKey, TValue> instance using dependency injection:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}

To register an IConsumer<TKey, TValue> for use via the dependency injection container, call the AddKafkaConsumer extension method:

builder.AddKafkaConsumer<string, string>("kafka");

You can then retrieve the IConsumer<TKey, TValue> instance using dependency injection:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}

There might be situations where you want to register multiple producer or consumer instances with different connection names:

  • AddKeyedKafkaProducer: Registers a keyed Kafka producer.
  • AddKeyedKafkaConsumer: Registers a keyed Kafka consumer.

When you reference a Kafka resource using WithReference, the following connection properties are made available to the consuming project:

The Kafka server resource exposes the following connection properties:

Property NameDescription
HostThe host-facing Kafka listener hostname or IP address
PortThe host-facing Kafka listener port

Example properties:

Host: localhost
Port: 9092

The Apache Kafka integration provides multiple options to configure the connection.

When using a connection string from the ConnectionStrings configuration section, provide the name when calling builder.AddKafkaProducer() or builder.AddKafkaConsumer():

builder.AddKafkaProducer<string, string>("kafka");

Example configuration:

{
"ConnectionStrings": {
"kafka": "broker:9092"
}
}

The Apache Kafka integration supports Microsoft.Extensions.Configuration. It loads the KafkaProducerSettings or KafkaConsumerSettings from configuration using the Aspire:Confluent:Kafka:Producer and Aspire:Confluent:Kafka:Consumer keys. Example appsettings.json:

{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}

The Config properties bind to instances of ProducerConfig and ConsumerConfig.

The Apache Kafka integration supports named configuration for multiple instances:

{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"kafka1": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
},
"kafka2": {
"DisableHealthChecks": true,
"Config": {
"Acks": "Leader"
}
}
}
}
}
}
}

Use the connection names when calling the registration methods:

builder.AddKafkaProducer<string, string>("kafka1");
builder.AddKafkaConsumer<string, string>("kafka2");

You can pass the Action<KafkaProducerSettings> delegate to set up options inline:

builder.AddKafkaProducer<string, string>(
"kafka",
static settings => settings.DisableHealthChecks = true);

To configure Confluent.Kafka builders, pass an Action<ProducerBuilder<TKey, TValue>>:

builder.AddKafkaProducer<string, MyMessage>(
"kafka",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
});

By default, Aspire integrations enable health checks for all services. The Apache Kafka integration handles the following health check scenarios:

  • Adds the Aspire.Confluent.Kafka.Producer health check when DisableHealthChecks is false.
  • Adds the Aspire.Confluent.Kafka.Consumer health check when DisableHealthChecks is false.
  • Integrates with the /health HTTP endpoint.

The Apache Kafka integration uses the following log categories:

  • Aspire.Confluent.Kafka

The Apache Kafka integration doesn’t emit distributed traces.

The Apache Kafka integration emits the following metrics using OpenTelemetry:

  • Aspire.Confluent.Kafka
    • messaging.kafka.network.tx
    • messaging.kafka.network.transmitted
    • messaging.kafka.network.rx
    • messaging.kafka.network.received
    • messaging.publish.messages
    • messaging.kafka.message.transmitted
    • messaging.receive.messages
    • messaging.kafka.message.received