Aller au contenu
Docs Try Aspire
Docs Try

Connect to Apache Kafka

Ce contenu n’est pas encore disponible dans votre langue.

Apache Kafka logo

This page describes how consuming apps connect to a Kafka resource that’s already modeled in your AppHost. For the AppHost API surface — adding a Kafka server, Kafka UI, data volumes, and more — see Apache Kafka Hosting integration.

When you reference a Kafka resource from your AppHost, Aspire injects the connection information into the consuming app as environment variables. Your app can either read those environment variables directly — the pattern works the same from any language — or, in C#, use the Aspire Confluent Kafka client integration for automatic dependency injection of producers and consumers, health checks, and telemetry.

Aspire exposes each property as an environment variable named [RESOURCE]_[PROPERTY]. For instance, the Host property of a resource called kafka becomes KAFKA_HOST.

The Kafka server resource exposes the following connection properties:

Property NameDescription
HostThe host-facing Kafka listener hostname or IP address
PortThe host-facing Kafka listener port

Example connection string:

ConnectionStrings__kafka: localhost:9092

Pick the language your consuming app is written in. Each example assumes your AppHost adds a Kafka resource named kafka and references it from the consuming app.

For C# apps, the recommended approach is the Aspire Confluent Kafka client integration. It registers an IProducer<TKey, TValue> and/or IConsumer<TKey, TValue> through dependency injection and adds health checks and telemetry automatically. If you’d rather read environment variables directly, use the ConnectionStrings section value as the bootstrap server address.

Install the 📦 Aspire.Confluent.Kafka NuGet package in the client-consuming project:

.NET CLI — Add Aspire.Confluent.Kafka package
dotnet add package Aspire.Confluent.Kafka

In Program.cs, call AddKafkaProducer on your IHostApplicationBuilder to register an IProducer<TKey, TValue>:

C# — Program.cs
builder.AddKafkaProducer<string, string>(connectionName: "kafka");

Resolve the producer through dependency injection:

C# — ExampleService.cs
public class ExampleService(IProducer<string, string> producer)
{
// Use producer...
}

Call AddKafkaConsumer to register an IConsumer<TKey, TValue>:

C# — Program.cs
builder.AddKafkaConsumer<string, string>(connectionName: "kafka");

Resolve the consumer through dependency injection:

C# — ExampleService.cs
public class ExampleService(IConsumer<string, string> consumer)
{
// Use consumer...
}

To register multiple producer or consumer instances with different connection names, use the keyed variants:

C# — Program.cs
builder.AddKeyedKafkaProducer<string, string>(name: "orders");
builder.AddKeyedKafkaConsumer<string, string>(name: "events");

Then resolve each instance by key:

C# — ExampleService.cs
public class ExampleService(
[FromKeyedServices("orders")] IProducer<string, string> ordersProducer,
[FromKeyedServices("events")] IConsumer<string, string> eventsConsumer)
{
// Use producers and consumers...
}

The Aspire Confluent Kafka client integration offers multiple ways to provide configuration.

Connection strings. When using a connection string from the ConnectionStrings configuration section, pass the connection name to AddKafkaProducer or AddKafkaConsumer:

C# — Program.cs
builder.AddKafkaProducer<string, string>("kafka");

The connection string is resolved from the ConnectionStrings section:

JSON — appsettings.json
{
"ConnectionStrings": {
"kafka": "localhost:9092"
}
}

Configuration providers. The client integration supports Microsoft.Extensions.Configuration. It loads KafkaProducerSettings or KafkaConsumerSettings from appsettings.json (or any other configuration source) using the Aspire:Confluent:Kafka:Producer and Aspire:Confluent:Kafka:Consumer keys:

JSON — appsettings.json
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
},
"Consumer": {
"DisableHealthChecks": false,
"Config": {
"ClientId": "my-consumer",
"GroupId": "my-group"
}
}
}
}
}
}

The Config properties bind to instances of ProducerConfig and ConsumerConfig from the Confluent.Kafka library.

Named configuration. The integration supports named configuration for multiple instances:

JSON — appsettings.json
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"orders": {
"Config": { "Acks": "All" }
},
"audit": {
"Config": { "Acks": "Leader" }
}
}
}
}
}
}

Inline delegates. Pass an Action<KafkaProducerSettings> to configure settings inline, for example to disable health checks:

C# — Program.cs
builder.AddKafkaProducer<string, string>(
"kafka",
static settings => settings.DisableHealthChecks = true);

To configure the underlying Confluent.Kafka builder, pass an Action<ProducerBuilder<TKey, TValue>>:

C# — Program.cs
builder.AddKafkaProducer<string, MyMessage>(
"kafka",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
});

Aspire client integrations enable health checks by default. The Confluent Kafka client integration adds:

  • The Aspire.Confluent.Kafka.Producer health check when DisableHealthChecks is false.
  • The Aspire.Confluent.Kafka.Consumer health check when DisableHealthChecks is false.
  • Integration with the /health HTTP endpoint, where all registered health checks must pass before the app is considered ready to accept traffic.

The Aspire Confluent Kafka client integration automatically configures logging, tracing, and metrics through OpenTelemetry.

Logging categories:

  • Aspire.Confluent.Kafka

Tracing: The Apache Kafka integration doesn’t currently emit distributed traces.

Metrics emitted through OpenTelemetry:

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

If you prefer not to use the Aspire client integration, you can read the Aspire-injected bootstrap server address from the environment and pass it directly to Confluent.Kafka:

C# — Program.cs
using Confluent.Kafka;
var bootstrapServers = Environment.GetEnvironmentVariable("KAFKA_HOST") + ":"
+ Environment.GetEnvironmentVariable("KAFKA_PORT");
using var producer = new ProducerBuilder<string, string>(
new ProducerConfig { BootstrapServers = bootstrapServers })
.Build();
// Use producer...