Перейти к содержимому

Apache Kafka client integration

Это содержимое пока не доступно на вашем языке.

Apache Kafka logo

To get started with the Aspire Apache Kafka client integration, install the 📦 Aspire.Confluent.Kafka NuGet package:

.NET CLI — Add Aspire.Confluent.Kafka package
dotnet add package Aspire.Confluent.Kafka

In the Program.cs file of your client-consuming project, call the AddKafkaProducer extension method to register an IProducer<TKey, TValue> for use via the dependency injection container:

builder.AddKafkaProducer<string, string>("kafka");

You can then retrieve the IProducer<TKey, TValue> instance using dependency injection:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}

To register an IConsumer<TKey, TValue> for use via the dependency injection container, call the AddKafkaConsumer extension method:

builder.AddKafkaConsumer<string, string>("kafka");

You can then retrieve the IConsumer<TKey, TValue> instance using dependency injection:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}

There might be situations where you want to register multiple producer or consumer instances with different connection names:

  • AddKeyedKafkaProducer: Registers a keyed Kafka producer.
  • AddKeyedKafkaConsumer: Registers a keyed Kafka consumer.

When you reference a Kafka resource using WithReference, the following connection properties are made available to the consuming project:

The Kafka server resource exposes the following connection properties:

Property NameDescription
HostThe host-facing Kafka listener hostname or IP address
PortThe host-facing Kafka listener port

Example properties:

Host: localhost
Port: 9092

The Apache Kafka integration provides multiple options to configure the connection.

When using a connection string from the ConnectionStrings configuration section, provide the name when calling builder.AddKafkaProducer() or builder.AddKafkaConsumer():

builder.AddKafkaProducer<string, string>("kafka");

Example configuration:

{
"ConnectionStrings": {
"kafka": "broker:9092"
}
}

The Apache Kafka integration supports Microsoft.Extensions.Configuration. It loads the KafkaProducerSettings or KafkaConsumerSettings from configuration using the Aspire:Confluent:Kafka:Producer and Aspire:Confluent:Kafka:Consumer keys. Example appsettings.json:

{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}

The Config properties bind to instances of ProducerConfig and ConsumerConfig.

The Apache Kafka integration supports named configuration for multiple instances:

{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"kafka1": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
},
"kafka2": {
"DisableHealthChecks": true,
"Config": {
"Acks": "Leader"
}
}
}
}
}
}
}

Use the connection names when calling the registration methods:

builder.AddKafkaProducer<string, string>("kafka1");
builder.AddKafkaConsumer<string, string>("kafka2");

You can pass the Action<KafkaProducerSettings> delegate to set up options inline:

builder.AddKafkaProducer<string, string>(
"kafka",
static settings => settings.DisableHealthChecks = true);

To configure Confluent.Kafka builders, pass an Action<ProducerBuilder<TKey, TValue>>:

builder.AddKafkaProducer<string, MyMessage>(
"kafka",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
});

By default, Aspire integrations enable health checks for all services. The Apache Kafka integration handles the following health check scenarios:

  • Adds the Aspire.Confluent.Kafka.Producer health check when DisableHealthChecks is false.
  • Adds the Aspire.Confluent.Kafka.Consumer health check when DisableHealthChecks is false.
  • Integrates with the /health HTTP endpoint.

The Apache Kafka integration uses the following log categories:

  • Aspire.Confluent.Kafka

The Apache Kafka integration doesn’t emit distributed traces.

The Apache Kafka integration emits the following metrics using OpenTelemetry:

  • Aspire.Confluent.Kafka
    • messaging.kafka.network.tx
    • messaging.kafka.network.transmitted
    • messaging.kafka.network.rx
    • messaging.kafka.network.received
    • messaging.publish.messages
    • messaging.kafka.message.transmitted
    • messaging.receive.messages
    • messaging.kafka.message.received