Apache Kafka client integration
Dette indhold er ikke tilgængeligt i dit sprog endnu.
To get started with the Aspire Apache Kafka client integration, install the 📦 Aspire.Confluent.Kafka NuGet package:
dotnet add package Aspire.Confluent.Kafka#:package Aspire.Confluent.Kafka@*<PackageReference Include="Aspire.Confluent.Kafka" Version="*" />Add Kafka producer
Section titled “Add Kafka producer”In the Program.cs file of your client-consuming project, call the AddKafkaProducer extension method to register an IProducer<TKey, TValue> for use via the dependency injection container:
builder.AddKafkaProducer<string, string>("kafka");You can then retrieve the IProducer<TKey, TValue> instance using dependency injection:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService{ // Use producer...}Add Kafka consumer
Section titled “Add Kafka consumer”To register an IConsumer<TKey, TValue> for use via the dependency injection container, call the AddKafkaConsumer extension method:
builder.AddKafkaConsumer<string, string>("kafka");You can then retrieve the IConsumer<TKey, TValue> instance using dependency injection:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService{ // Use consumer...}Add keyed Kafka producers or consumers
Section titled “Add keyed Kafka producers or consumers”There might be situations where you want to register multiple producer or consumer instances with different connection names:
AddKeyedKafkaProducer: Registers a keyed Kafka producer.AddKeyedKafkaConsumer: Registers a keyed Kafka consumer.
Connection properties
Section titled “Connection properties”When you reference a Kafka resource using WithReference, the following connection properties are made available to the consuming project:
Kafka server
Section titled “Kafka server”The Kafka server resource exposes the following connection properties:
| Property Name | Description |
|---|---|
Host | The host-facing Kafka listener hostname or IP address |
Port | The host-facing Kafka listener port |
Example properties:
Host: localhostPort: 9092Configuration
Section titled “Configuration”The Apache Kafka integration provides multiple options to configure the connection.
Use a connection string
Section titled “Use a connection string”When using a connection string from the ConnectionStrings configuration section, provide the name when calling builder.AddKafkaProducer() or builder.AddKafkaConsumer():
builder.AddKafkaProducer<string, string>("kafka");Example configuration:
{ "ConnectionStrings": { "kafka": "broker:9092" }}Use configuration providers
Section titled “Use configuration providers”The Apache Kafka integration supports Microsoft.Extensions.Configuration. It loads the KafkaProducerSettings or KafkaConsumerSettings from configuration using the Aspire:Confluent:Kafka:Producer and Aspire:Confluent:Kafka:Consumer keys. Example appsettings.json:
{ "Aspire": { "Confluent": { "Kafka": { "Producer": { "DisableHealthChecks": false, "Config": { "Acks": "All" } } } } }}The Config properties bind to instances of ProducerConfig and ConsumerConfig.
Use named configuration
Section titled “Use named configuration”The Apache Kafka integration supports named configuration for multiple instances:
{ "Aspire": { "Confluent": { "Kafka": { "Producer": { "kafka1": { "DisableHealthChecks": false, "Config": { "Acks": "All" } }, "kafka2": { "DisableHealthChecks": true, "Config": { "Acks": "Leader" } } } } } }}Use the connection names when calling the registration methods:
builder.AddKafkaProducer<string, string>("kafka1");builder.AddKafkaConsumer<string, string>("kafka2");Use inline delegates
Section titled “Use inline delegates”You can pass the Action<KafkaProducerSettings> delegate to set up options inline:
builder.AddKafkaProducer<string, string>( "kafka", static settings => settings.DisableHealthChecks = true);To configure Confluent.Kafka builders, pass an Action<ProducerBuilder<TKey, TValue>>:
builder.AddKafkaProducer<string, MyMessage>( "kafka", static producerBuilder => { var messageSerializer = new MyMessageSerializer(); producerBuilder.SetValueSerializer(messageSerializer); });Client integration health checks
Section titled “Client integration health checks”By default, Aspire integrations enable health checks for all services. The Apache Kafka integration handles the following health check scenarios:
- Adds the
Aspire.Confluent.Kafka.Producerhealth check whenDisableHealthChecksisfalse. - Adds the
Aspire.Confluent.Kafka.Consumerhealth check whenDisableHealthChecksisfalse. - Integrates with the
/healthHTTP endpoint.
Observability and telemetry
Section titled “Observability and telemetry”Logging
Section titled “Logging”The Apache Kafka integration uses the following log categories:
Aspire.Confluent.Kafka
Tracing
Section titled “Tracing”The Apache Kafka integration doesn’t emit distributed traces.
Metrics
Section titled “Metrics”The Apache Kafka integration emits the following metrics using OpenTelemetry:
Aspire.Confluent.Kafkamessaging.kafka.network.txmessaging.kafka.network.transmittedmessaging.kafka.network.rxmessaging.kafka.network.receivedmessaging.publish.messagesmessaging.kafka.message.transmittedmessaging.receive.messagesmessaging.kafka.message.received