# Connect to Apache Kafka

<Image
  src={kafkaIcon}
  alt="Apache Kafka logo"
  width={100}
  height={100}
  class:list={'float-inline-left icon'}
  data-zoom-off
/>

This page describes how consuming apps connect to a Kafka resource that's already modeled in your AppHost. For the AppHost API surface — adding a Kafka server, Kafka UI, data volumes, and more — see [Apache Kafka Hosting integration](../apache-kafka-host/).

When you reference a Kafka resource from your AppHost, Aspire injects the connection information into the consuming app as environment variables. Your app can either read those environment variables directly — the pattern works the same from any language — or, in C#, use the Aspire Confluent Kafka client integration for automatic dependency injection of producers and consumers, health checks, and telemetry.

## Connection properties

Aspire exposes each property as an environment variable named `[RESOURCE]_[PROPERTY]`. For instance, the `Host` property of a resource called `kafka` becomes `KAFKA_HOST`.

The Kafka server resource exposes the following connection properties:

| Property Name | Description |
| ------------- | ----------- |
| `Host`        | The host-facing Kafka listener hostname or IP address |
| `Port`        | The host-facing Kafka listener port |

**Example connection string:**

```
ConnectionStrings__kafka: localhost:9092
```
**Note:** Aspire also injects the connection string directly into the `ConnectionStrings` section. The full bootstrap-servers string (for example, `localhost:9092`) is injected under the key matching the resource name.

## Connect from your app

Pick the language your consuming app is written in. Each example assumes your AppHost adds a Kafka resource named `kafka` and references it from the consuming app.

For C# apps, the recommended approach is the Aspire Confluent Kafka client integration. It registers an [`IProducer<TKey, TValue>`](https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.IProducer-2.html) and/or [`IConsumer<TKey, TValue>`](https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.IConsumer-2.html) through dependency injection and adds health checks and telemetry automatically. If you'd rather read environment variables directly, use the `ConnectionStrings` section value as the bootstrap server address.

#### Install the client integration

Install the [📦 Aspire.Confluent.Kafka](https://www.nuget.org/packages/Aspire.Confluent.Kafka) NuGet package in the client-consuming project:

<InstallDotNetPackage packageName="Aspire.Confluent.Kafka" />

#### Add a Kafka producer

In _Program.cs_, call `AddKafkaProducer` on your `IHostApplicationBuilder` to register an `IProducer<TKey, TValue>`:

```csharp title="C# — Program.cs"
builder.AddKafkaProducer<string, string>(connectionName: "kafka");
```
**Tip:** The `connectionName` must match the Kafka resource name from the AppHost. For more information, see [Add Kafka server resource](../apache-kafka-host/#add-kafka-server-resource).

Resolve the producer through dependency injection:

```csharp title="C# — ExampleService.cs"
public class ExampleService(IProducer<string, string> producer)
{
    // Use producer...
}
```

#### Add a Kafka consumer

Call `AddKafkaConsumer` to register an `IConsumer<TKey, TValue>`:

```csharp title="C# — Program.cs"
builder.AddKafkaConsumer<string, string>(connectionName: "kafka");
```
**Note:** `Confluent.Kafka.Consumer<TKey, TValue>` requires the `ClientId` property to be set so the broker can track consumed message offsets. Set it through the configuration key `Aspire:Confluent:Kafka:Consumer:Config:ClientId` or via an inline delegate.

Resolve the consumer through dependency injection:

```csharp title="C# — ExampleService.cs"
public class ExampleService(IConsumer<string, string> consumer)
{
    // Use consumer...
}
```

#### Add keyed Kafka producers or consumers

To register multiple producer or consumer instances with different connection names, use the keyed variants:

```csharp title="C# — Program.cs"
builder.AddKeyedKafkaProducer<string, string>(name: "orders");
builder.AddKeyedKafkaConsumer<string, string>(name: "events");
```

Then resolve each instance by key:

```csharp title="C# — ExampleService.cs"
public class ExampleService(
    [FromKeyedServices("orders")] IProducer<string, string> ordersProducer,
    [FromKeyedServices("events")] IConsumer<string, string> eventsConsumer)
{
    // Use producers and consumers...
}
```

#### Configuration

The Aspire Confluent Kafka client integration offers multiple ways to provide configuration.

**Connection strings.** When using a connection string from the `ConnectionStrings` configuration section, pass the connection name to `AddKafkaProducer` or `AddKafkaConsumer`:

```csharp title="C# — Program.cs"
builder.AddKafkaProducer<string, string>("kafka");
```

The connection string is resolved from the `ConnectionStrings` section:

```json title="JSON — appsettings.json"
{
  "ConnectionStrings": {
    "kafka": "localhost:9092"
  }
}
```

**Configuration providers.** The client integration supports `Microsoft.Extensions.Configuration`. It loads `KafkaProducerSettings` or `KafkaConsumerSettings` from _appsettings.json_ (or any other configuration source) using the `Aspire:Confluent:Kafka:Producer` and `Aspire:Confluent:Kafka:Consumer` keys:

```json title="JSON — appsettings.json"
{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        },
        "Consumer": {
          "DisableHealthChecks": false,
          "Config": {
            "ClientId": "my-consumer",
            "GroupId": "my-group"
          }
        }
      }
    }
  }
}
```

The `Config` properties bind to instances of `ProducerConfig` and `ConsumerConfig` from the `Confluent.Kafka` library.

**Named configuration.** The integration supports named configuration for multiple instances:

```json title="JSON — appsettings.json"
{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "orders": {
            "Config": { "Acks": "All" }
          },
          "audit": {
            "Config": { "Acks": "Leader" }
          }
        }
      }
    }
  }
}
```

**Inline delegates.** Pass an `Action<KafkaProducerSettings>` to configure settings inline, for example to disable health checks:

```csharp title="C# — Program.cs"
builder.AddKafkaProducer<string, string>(
    "kafka",
    static settings => settings.DisableHealthChecks = true);
```

To configure the underlying `Confluent.Kafka` builder, pass an `Action<ProducerBuilder<TKey, TValue>>`:

```csharp title="C# — Program.cs"
builder.AddKafkaProducer<string, MyMessage>(
    "kafka",
    static producerBuilder =>
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    });
```

#### Client integration health checks

Aspire client integrations enable health checks by default. The Confluent Kafka client integration adds:

- The `Aspire.Confluent.Kafka.Producer` health check when `DisableHealthChecks` is `false`.
- The `Aspire.Confluent.Kafka.Consumer` health check when `DisableHealthChecks` is `false`.
- Integration with the `/health` HTTP endpoint, where all registered health checks must pass before the app is considered ready to accept traffic.

#### Observability and telemetry

The Aspire Confluent Kafka client integration automatically configures logging, tracing, and metrics through OpenTelemetry.

**Logging** categories:

- `Aspire.Confluent.Kafka`

**Tracing:** The Apache Kafka integration doesn't currently emit distributed traces.

**Metrics** emitted through OpenTelemetry:

- `messaging.kafka.network.tx`
- `messaging.kafka.network.transmitted`
- `messaging.kafka.network.rx`
- `messaging.kafka.network.received`
- `messaging.publish.messages`
- `messaging.kafka.message.transmitted`
- `messaging.receive.messages`
- `messaging.kafka.message.received`

#### Read environment variables in C\#

If you prefer not to use the Aspire client integration, you can read the Aspire-injected bootstrap server address from the environment and pass it directly to `Confluent.Kafka`:

```csharp title="C# — Program.cs"
using Confluent.Kafka;

var bootstrapServers = Environment.GetEnvironmentVariable("KAFKA_HOST") + ":"
    + Environment.GetEnvironmentVariable("KAFKA_PORT");

using var producer = new ProducerBuilder<string, string>(
    new ProducerConfig { BootstrapServers = bootstrapServers })
    .Build();

// Use producer...
```

Use [`confluent-kafka-go`](https://github.com/confluentinc/confluent-kafka-go), the official Go client for Apache Kafka:

```bash title="Terminal"
go get github.com/confluentinc/confluent-kafka-go/v2/kafka
```
**Note:** [`IBM/sarama`](https://github.com/IBM/sarama) is another popular pure-Go Kafka client. Use `confluent-kafka-go` for the closest feature parity with the Confluent ecosystem; use `sarama` if you prefer a pure-Go implementation without CGo dependencies.

Read the injected environment variables and connect:

```go title="Go — main.go"
package main

import (
    "fmt"
    "os"
    "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {
    // Read the Aspire-injected bootstrap server properties
    host := os.Getenv("KAFKA_HOST")
    port := os.Getenv("KAFKA_PORT")
    bootstrapServers := fmt.Sprintf("%s:%s", host, port)

    producer, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": bootstrapServers,
    })
    if err != nil {
        panic(err)
    }
    defer producer.Close()

    // Use producer to send messages...
}
```

Install [`confluent-kafka-python`](https://github.com/confluentinc/confluent-kafka-python), the official Python client for Apache Kafka:

```bash title="Terminal"
pip install confluent-kafka
```

Read the injected environment variables and connect:

```python title="Python — app.py"
import os
from confluent_kafka import Producer, Consumer

# Read the Aspire-injected bootstrap server properties
host = os.getenv("KAFKA_HOST")
port = os.getenv("KAFKA_PORT")
bootstrap_servers = f"{host}:{port}"

producer = Producer({"bootstrap.servers": bootstrap_servers})

# Use producer to send messages...
producer.flush()
```

To consume messages:

```python title="Python — consumer.py"
import os
from confluent_kafka import Consumer

host = os.getenv("KAFKA_HOST")
port = os.getenv("KAFKA_PORT")
bootstrap_servers = f"{host}:{port}"

consumer = Consumer({
    "bootstrap.servers": bootstrap_servers,
    "group.id": "my-group",
    "auto.offset.reset": "earliest",
})

consumer.subscribe(["my-topic"])

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is not None and not msg.error():
            print(f"Received: {msg.value().decode('utf-8')}")
finally:
    consumer.close()
```

Install [`kafkajs`](https://kafka.js.org/), the most popular TypeScript/JavaScript client for Apache Kafka:

```bash title="Terminal"
npm install kafkajs
```

Read the injected environment variables and connect:

```typescript title="TypeScript — index.ts"
import { Kafka } from 'kafkajs';

// Read Aspire-injected connection properties
const kafka = new Kafka({
    clientId: 'my-app',
    brokers: [`${process.env.KAFKA_HOST}:${process.env.KAFKA_PORT}`],
});

const producer = kafka.producer();
await producer.connect();

// Use producer to send messages...
await producer.disconnect();
```

To consume messages:

```typescript title="TypeScript — consumer.ts"
import { Kafka } from 'kafkajs';

const kafka = new Kafka({
    clientId: 'my-app',
    brokers: [`${process.env.KAFKA_HOST}:${process.env.KAFKA_PORT}`],
});

const consumer = kafka.consumer({ groupId: 'my-group' });
await consumer.connect();
await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });

await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        console.log(`Received: ${message.value?.toString()}`);
    },
});
```
**Tip:** If your app expects specific environment variable names different from the Aspire defaults, you can pass individual connection properties from the AppHost. See [Pass custom environment variables](../apache-kafka-host/#pass-custom-environment-variables) in the Hosting integration reference.