Connect to Apache Kafka
This page describes how consuming apps connect to a Kafka resource that’s already modeled in your AppHost. For the AppHost API surface — adding a Kafka server, Kafka UI, data volumes, and more — see Apache Kafka Hosting integration.
When you reference a Kafka resource from your AppHost, Aspire injects the connection information into the consuming app as environment variables. Your app can either read those environment variables directly — the pattern works the same from any language — or, in C#, use the Aspire Confluent Kafka client integration for automatic dependency injection of producers and consumers, health checks, and telemetry.
Connection properties
Section titled “Connection properties”Aspire exposes each property as an environment variable named [RESOURCE]_[PROPERTY]. For instance, the Host property of a resource called kafka becomes KAFKA_HOST.
The Kafka server resource exposes the following connection properties:
| Property Name | Description |
|---|---|
Host | The host-facing Kafka listener hostname or IP address |
Port | The host-facing Kafka listener port |
Example connection string:
ConnectionStrings__kafka: localhost:9092Connect from your app
Section titled “Connect from your app”Pick the language your consuming app is written in. Each example assumes your AppHost adds a Kafka resource named kafka and references it from the consuming app.
For C# apps, the recommended approach is the Aspire Confluent Kafka client integration. It registers an IProducer<TKey, TValue> and/or IConsumer<TKey, TValue> through dependency injection and adds health checks and telemetry automatically. If you’d rather read environment variables directly, use the ConnectionStrings section value as the bootstrap server address.
Install the client integration
Section titled “Install the client integration”Install the 📦 Aspire.Confluent.Kafka NuGet package in the client-consuming project:
dotnet add package Aspire.Confluent.Kafka#:package Aspire.Confluent.Kafka@*<PackageReference Include="Aspire.Confluent.Kafka" Version="*" />Add a Kafka producer
Section titled “Add a Kafka producer”In Program.cs, call AddKafkaProducer on your IHostApplicationBuilder to register an IProducer<TKey, TValue>:
builder.AddKafkaProducer<string, string>(connectionName: "kafka");Resolve the producer through dependency injection:
public class ExampleService(IProducer<string, string> producer){ // Use producer...}Add a Kafka consumer
Section titled “Add a Kafka consumer”Call AddKafkaConsumer to register an IConsumer<TKey, TValue>:
builder.AddKafkaConsumer<string, string>(connectionName: "kafka");Resolve the consumer through dependency injection:
public class ExampleService(IConsumer<string, string> consumer){ // Use consumer...}Add keyed Kafka producers or consumers
Section titled “Add keyed Kafka producers or consumers”To register multiple producer or consumer instances with different connection names, use the keyed variants:
builder.AddKeyedKafkaProducer<string, string>(name: "orders");builder.AddKeyedKafkaConsumer<string, string>(name: "events");Then resolve each instance by key:
public class ExampleService( [FromKeyedServices("orders")] IProducer<string, string> ordersProducer, [FromKeyedServices("events")] IConsumer<string, string> eventsConsumer){ // Use producers and consumers...}Configuration
Section titled “Configuration”The Aspire Confluent Kafka client integration offers multiple ways to provide configuration.
Connection strings. When using a connection string from the ConnectionStrings configuration section, pass the connection name to AddKafkaProducer or AddKafkaConsumer:
builder.AddKafkaProducer<string, string>("kafka");The connection string is resolved from the ConnectionStrings section:
{ "ConnectionStrings": { "kafka": "localhost:9092" }}Configuration providers. The client integration supports Microsoft.Extensions.Configuration. It loads KafkaProducerSettings or KafkaConsumerSettings from appsettings.json (or any other configuration source) using the Aspire:Confluent:Kafka:Producer and Aspire:Confluent:Kafka:Consumer keys:
{ "Aspire": { "Confluent": { "Kafka": { "Producer": { "DisableHealthChecks": false, "Config": { "Acks": "All" } }, "Consumer": { "DisableHealthChecks": false, "Config": { "ClientId": "my-consumer", "GroupId": "my-group" } } } } }}The Config properties bind to instances of ProducerConfig and ConsumerConfig from the Confluent.Kafka library.
Named configuration. The integration supports named configuration for multiple instances:
{ "Aspire": { "Confluent": { "Kafka": { "Producer": { "orders": { "Config": { "Acks": "All" } }, "audit": { "Config": { "Acks": "Leader" } } } } } }}Inline delegates. Pass an Action<KafkaProducerSettings> to configure settings inline, for example to disable health checks:
builder.AddKafkaProducer<string, string>( "kafka", static settings => settings.DisableHealthChecks = true);To configure the underlying Confluent.Kafka builder, pass an Action<ProducerBuilder<TKey, TValue>>:
builder.AddKafkaProducer<string, MyMessage>( "kafka", static producerBuilder => { var messageSerializer = new MyMessageSerializer(); producerBuilder.SetValueSerializer(messageSerializer); });Client integration health checks
Section titled “Client integration health checks”Aspire client integrations enable health checks by default. The Confluent Kafka client integration adds:
- The
Aspire.Confluent.Kafka.Producerhealth check whenDisableHealthChecksisfalse. - The
Aspire.Confluent.Kafka.Consumerhealth check whenDisableHealthChecksisfalse. - Integration with the
/healthHTTP endpoint, where all registered health checks must pass before the app is considered ready to accept traffic.
Observability and telemetry
Section titled “Observability and telemetry”The Aspire Confluent Kafka client integration automatically configures logging, tracing, and metrics through OpenTelemetry.
Logging categories:
Aspire.Confluent.Kafka
Tracing: The Apache Kafka integration doesn’t currently emit distributed traces.
Metrics emitted through OpenTelemetry:
messaging.kafka.network.txmessaging.kafka.network.transmittedmessaging.kafka.network.rxmessaging.kafka.network.receivedmessaging.publish.messagesmessaging.kafka.message.transmittedmessaging.receive.messagesmessaging.kafka.message.received
Read environment variables in C#
Section titled “Read environment variables in C#”If you prefer not to use the Aspire client integration, you can read the Aspire-injected bootstrap server address from the environment and pass it directly to Confluent.Kafka:
using Confluent.Kafka;
var bootstrapServers = Environment.GetEnvironmentVariable("KAFKA_HOST") + ":" + Environment.GetEnvironmentVariable("KAFKA_PORT");
using var producer = new ProducerBuilder<string, string>( new ProducerConfig { BootstrapServers = bootstrapServers }) .Build();
// Use producer...Use confluent-kafka-go, the official Go client for Apache Kafka:
go get github.com/confluentinc/confluent-kafka-go/v2/kafkaRead the injected environment variables and connect:
package main
import ( "fmt" "os" "github.com/confluentinc/confluent-kafka-go/v2/kafka")
func main() { // Read the Aspire-injected bootstrap server properties host := os.Getenv("KAFKA_HOST") port := os.Getenv("KAFKA_PORT") bootstrapServers := fmt.Sprintf("%s:%s", host, port)
producer, err := kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": bootstrapServers, }) if err != nil { panic(err) } defer producer.Close()
// Use producer to send messages...}Install confluent-kafka-python, the official Python client for Apache Kafka:
pip install confluent-kafkaRead the injected environment variables and connect:
import osfrom confluent_kafka import Producer, Consumer
# Read the Aspire-injected bootstrap server propertieshost = os.getenv("KAFKA_HOST")port = os.getenv("KAFKA_PORT")bootstrap_servers = f"{host}:{port}"
producer = Producer({"bootstrap.servers": bootstrap_servers})
# Use producer to send messages...producer.flush()To consume messages:
import osfrom confluent_kafka import Consumer
host = os.getenv("KAFKA_HOST")port = os.getenv("KAFKA_PORT")bootstrap_servers = f"{host}:{port}"
consumer = Consumer({ "bootstrap.servers": bootstrap_servers, "group.id": "my-group", "auto.offset.reset": "earliest",})
consumer.subscribe(["my-topic"])
try: while True: msg = consumer.poll(1.0) if msg is not None and not msg.error(): print(f"Received: {msg.value().decode('utf-8')}")finally: consumer.close()Install kafkajs, the most popular TypeScript/JavaScript client for Apache Kafka:
npm install kafkajsRead the injected environment variables and connect:
import { Kafka } from 'kafkajs';
// Read Aspire-injected connection propertiesconst kafka = new Kafka({ clientId: 'my-app', brokers: [`${process.env.KAFKA_HOST}:${process.env.KAFKA_PORT}`],});
const producer = kafka.producer();await producer.connect();
// Use producer to send messages...await producer.disconnect();To consume messages:
import { Kafka } from 'kafkajs';
const kafka = new Kafka({ clientId: 'my-app', brokers: [`${process.env.KAFKA_HOST}:${process.env.KAFKA_PORT}`],});
const consumer = kafka.consumer({ groupId: 'my-group' });await consumer.connect();await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });
await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log(`Received: ${message.value?.toString()}`); },});