Connect to Azure Event Hubs
Bu içerik henüz dilinizde mevcut değil.
This page describes how consuming apps connect to an Azure Event Hubs resource that’s already modeled in your AppHost. For the AppHost API surface — adding a namespace, hubs, consumer groups, the emulator, and more — see Azure Event Hubs Hosting integration.
When you reference an Azure Event Hubs resource from your AppHost, Aspire injects the connection information into the consuming app as environment variables. Your app can either read those environment variables directly — the pattern works the same from any language — or, in C#, use the Aspire Azure Messaging Event Hubs client integration for automatic dependency injection, health checks, and telemetry.
Connection properties
Section titled “Connection properties”Aspire exposes each property as an environment variable named [RESOURCE]_[PROPERTY]. For instance, the Uri property of a resource called event-hubs becomes EVENT_HUBS_URI.
Event Hubs namespace
Section titled “Event Hubs namespace”The Event Hubs namespace resource exposes the following connection properties:
| Property Name | Description |
|---|---|
Host | The hostname of the Event Hubs namespace |
Port | The port of the Event Hubs namespace (emulator only) |
Uri | The connection URI, with the format sb://myeventhubs.servicebus.windows.net on Azure and sb://localhost:62824 for the emulator |
ConnectionString | Emulator only. Full connection string including SAS key material for the local emulator |
Example connection strings:
Uri: sb://myeventhubs.servicebus.windows.netConnectionString: Endpoint=sb://localhost:62824;SharedAccessKeyName=... (emulator only)Event Hub
Section titled “Event Hub”The Event Hub resource inherits all properties from its parent namespace and adds:
| Property Name | Description |
|---|---|
EventHubName | The name of the Event Hub |
Event Hub consumer group
Section titled “Event Hub consumer group”The Event Hub consumer group resource inherits all properties from its parent hub and adds:
| Property Name | Description |
|---|---|
ConsumerGroupName | The name of the consumer group |
For example, if you add a hub named messages and reference it from a consuming app, the following environment variables are available:
MESSAGES_HOSTMESSAGES_URIMESSAGES_EVENTHUBNAME
Connect from your app
Section titled “Connect from your app”Pick the language your consuming app is written in. Each example assumes your AppHost adds an Azure Event Hubs namespace named event-hubs with a hub named messages, and references them from the consuming app.
For C# apps, the recommended approach is the Aspire Azure Messaging Event Hubs client integration. It registers strongly-typed Event Hubs client instances through dependency injection and adds health checks and telemetry automatically. If you’d rather read environment variables directly, see the Read environment variables section at the end of this tab.
Install the client integration
Section titled “Install the client integration”Install the 📦 Aspire.Azure.Messaging.EventHubs NuGet package in the client-consuming project:
dotnet add package Aspire.Azure.Messaging.EventHubs#:package Aspire.Azure.Messaging.EventHubs@*<PackageReference Include="Aspire.Azure.Messaging.EventHubs" Version="*" />Supported client types
Section titled “Supported client types”The following Event Hubs client types are supported, along with their corresponding options and settings classes:
| Azure client type | Azure options class | Aspire settings class |
|---|---|---|
EventHubProducerClient | EventHubProducerClientOptions | AzureMessagingEventHubsProducerSettings |
EventHubBufferedProducerClient | EventHubBufferedProducerClientOptions | AzureMessagingEventHubsBufferedProducerSettings |
EventHubConsumerClient | EventHubConsumerClientOptions | AzureMessagingEventHubsConsumerSettings |
EventProcessorClient | EventProcessorClientOptions | AzureMessagingEventHubsProcessorSettings |
PartitionReceiver | PartitionReceiverOptions | AzureMessagingEventHubsPartitionReceiverSettings |
Add an Event Hubs producer client
Section titled “Add an Event Hubs producer client”In Program.cs, call AddAzureEventHubProducerClient to register an EventHubProducerClient:
builder.AddAzureEventHubProducerClient(connectionName: "messages");Resolve the client through dependency injection:
public class ExampleService(EventHubProducerClient producerClient){ // Use producerClient...}Add an Event Hubs processor client
Section titled “Add an Event Hubs processor client”To consume events, register an EventProcessorClient:
builder.AddAzureEventProcessorClient(connectionName: "messages");Resolve it through dependency injection:
public class ExampleService(EventProcessorClient processorClient){ // Use processorClient...}All registration APIs
Section titled “All registration APIs”When you need to register a different client type, use the corresponding API:
| Azure client type | Registration API |
|---|---|
EventHubProducerClient | AddAzureEventHubProducerClient |
EventHubBufferedProducerClient | AddAzureEventHubBufferedProducerClient |
EventHubConsumerClient | AddAzureEventHubConsumerClient |
EventProcessorClient | AddAzureEventProcessorClient |
PartitionReceiver | AddAzurePartitionReceiverClient |
Add keyed Event Hubs clients
Section titled “Add keyed Event Hubs clients”To register multiple client instances with different connection names, use the keyed APIs:
builder.AddKeyedAzureEventHubProducerClient(name: "messages");builder.AddKeyedAzureEventHubProducerClient(name: "commands");Then resolve each instance by key:
public class ExampleService( [KeyedService("messages")] EventHubProducerClient messagesClient, [KeyedService("commands")] EventHubProducerClient commandsClient){ // Use clients...}The full set of keyed registration APIs:
| Azure client type | Keyed registration API |
|---|---|
EventHubProducerClient | AddKeyedAzureEventHubProducerClient |
EventHubBufferedProducerClient | AddKeyedAzureEventHubBufferedProducerClient |
EventHubConsumerClient | AddKeyedAzureEventHubConsumerClient |
EventProcessorClient | AddKeyedAzureEventProcessorClient |
PartitionReceiver | AddKeyedAzurePartitionReceiverClient |
For more information, see Keyed services in .NET.
Configuration
Section titled “Configuration”The Aspire Azure Messaging Event Hubs library supports multiple configuration approaches. Either a FullyQualifiedNamespace or a ConnectionString is required.
Fully Qualified Namespace (recommended). Use a fully qualified namespace with the default credential:
{ "ConnectionStrings": { "messages": "{your_namespace}.servicebus.windows.net" }}If no credential is configured, a default credential is used.
Connection string. Alternatively, provide a full connection string:
{ "ConnectionStrings": { "messages": "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=accesskeyname;SharedAccessKey=accesskey;EntityPath=messages" }}Configuration providers. The library loads AzureMessagingEventHubsSettings and the associated client options from the Aspire:Azure:Messaging:EventHubs: key prefix, followed by the specific client name. For example, to configure an EventProcessorClient:
{ "Aspire": { "Azure": { "Messaging": { "EventHubs": { "EventProcessorClient": { "EventHubName": "messages", "ClientOptions": { "Identifier": "PROCESSOR_ID" } } } } } }}Named configuration. To configure multiple instances of the same client type with different settings, use named configuration:
{ "Aspire": { "Azure": { "Messaging": { "EventHubs": { "EventProcessorClient": { "processor1": { "EventHubName": "messages", "ClientOptions": { "Identifier": "PROCESSOR_1" } }, "processor2": { "EventHubName": "commands", "ClientOptions": { "Identifier": "PROCESSOR_2" } } } } } } }}Then use the connection names when registering:
builder.AddAzureEventProcessorClient("processor1");builder.AddAzureEventProcessorClient("processor2");Inline delegates. Pass an Action<IAzureClientBuilder<...>> to configure the client builder inline:
builder.AddAzureEventProcessorClient( "messages", configureClientBuilder: clientBuilder => clientBuilder.ConfigureOptions( options => options.Identifier = "PROCESSOR_ID"));For the complete JSON schema, see Aspire.Azure.Messaging.EventHubs/ConfigurationSchema.json.
Client integration health checks
Section titled “Client integration health checks”Aspire client integrations enable health checks by default. The Azure Messaging Event Hubs client integration registers a health check that verifies connectivity to the namespace. The health check is wired into the /health HTTP endpoint.
Observability and telemetry
Section titled “Observability and telemetry”The Aspire Azure Messaging Event Hubs client integration automatically configures logging, tracing, and metrics through OpenTelemetry.
Logging categories:
Azure.CoreAzure.Identity
Tracing activities:
Azure.Messaging.EventHubs.*
Metrics: The Azure Messaging Event Hubs client integration currently doesn’t support metrics by default due to limitations with the Azure SDK for .NET. This section will be updated if that changes.
Read environment variables in C#
Section titled “Read environment variables in C#”If you prefer not to use the Aspire client integration, you can read the connection URI from the environment and use the Azure SDK directly:
using Azure.Messaging.EventHubs.Producer;
var fullyQualifiedNamespace = Environment.GetEnvironmentVariable("EVENT_HUBS_HOST");var eventHubName = Environment.GetEnvironmentVariable("MESSAGES_EVENTHUBNAME");
var producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, new DefaultAzureCredential());
// Use producer to send events...Use the Azure SDK for Go Event Hubs client:
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubsgo get github.com/Azure/azure-sdk-for-go/sdk/azidentityRead the Aspire-injected environment variables and connect:
package main
import ( "context" "os"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs")
func main() { // Read Aspire-injected connection properties namespace := os.Getenv("EVENT_HUBS_HOST") hubName := os.Getenv("MESSAGES_EVENTHUBNAME")
cred, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { panic(err) }
producer, err := azeventhubs.NewProducerClient(namespace, hubName, cred, nil) if err != nil { panic(err) } defer producer.Close(context.Background())
// Use producer to send events...}To consume events, use the EventProcessorClient or ConsumerClient:
package main
import ( "context" "os"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs")
func main() { namespace := os.Getenv("EVENT_HUBS_HOST") hubName := os.Getenv("MESSAGES_EVENTHUBNAME") consumerGroupName := os.Getenv("MESSAGESCONSUMER_CONSUMERGROUPNAME")
cred, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { panic(err) }
consumer, err := azeventhubs.NewConsumerClient( namespace, hubName, consumerGroupName, cred, nil) if err != nil { panic(err) } defer consumer.Close(context.Background())
// Receive events from partition "0"... partitionClient, err := consumer.NewPartitionClient("0", nil) if err != nil { panic(err) } defer partitionClient.Close(context.Background())
events, err := partitionClient.ReceiveEvents(context.Background(), 10, nil) if err != nil { panic(err) } _ = events}Install the azure-eventhub package:
pip install azure-eventhub azure-identityRead the Aspire-injected environment variables and connect:
import osfrom azure.eventhub import EventHubProducerClient, EventDatafrom azure.identity import DefaultAzureCredential
# Read Aspire-injected connection propertiesfully_qualified_namespace = os.getenv("EVENT_HUBS_HOST")eventhub_name = os.getenv("MESSAGES_EVENTHUBNAME")
credential = DefaultAzureCredential()
with EventHubProducerClient( fully_qualified_namespace=fully_qualified_namespace, eventhub_name=eventhub_name, credential=credential,) as producer: event_data_batch = producer.create_batch() event_data_batch.add(EventData("First event")) producer.send_batch(event_data_batch)To consume events, use EventHubConsumerClient:
import osfrom azure.eventhub import EventHubConsumerClientfrom azure.identity import DefaultAzureCredential
fully_qualified_namespace = os.getenv("EVENT_HUBS_HOST")eventhub_name = os.getenv("MESSAGES_EVENTHUBNAME")consumer_group = os.getenv("MESSAGESCONSUMER_CONSUMERGROUPNAME", "$Default")
credential = DefaultAzureCredential()
def on_event(partition_context, event): print(f"Received: {event.body_as_str()}") partition_context.update_checkpoint(event)
with EventHubConsumerClient( fully_qualified_namespace=fully_qualified_namespace, eventhub_name=eventhub_name, consumer_group=consumer_group, credential=credential,) as consumer: consumer.receive(on_event=on_event)Install the @azure/event-hubs package:
npm install @azure/event-hubs @azure/identityRead the Aspire-injected environment variables and connect:
import { EventHubProducerClient } from '@azure/event-hubs';import { DefaultAzureCredential } from '@azure/identity';
// Read Aspire-injected connection propertiesconst fullyQualifiedNamespace = process.env.EVENT_HUBS_HOST!;const eventHubName = process.env.MESSAGES_EVENTHUBNAME!;
const credential = new DefaultAzureCredential();const producer = new EventHubProducerClient( fullyQualifiedNamespace, eventHubName, credential);
const batch = await producer.createBatch();batch.tryAdd({ body: 'First event' });await producer.sendBatch(batch);
await producer.close();To consume events, use EventHubConsumerClient:
import { EventHubConsumerClient } from '@azure/event-hubs';import { DefaultAzureCredential } from '@azure/identity';
const fullyQualifiedNamespace = process.env.EVENT_HUBS_HOST!;const eventHubName = process.env.MESSAGES_EVENTHUBNAME!;const consumerGroup = process.env.MESSAGESCONSUMER_CONSUMERGROUPNAME ?? '$Default';
const credential = new DefaultAzureCredential();const consumer = new EventHubConsumerClient( consumerGroup, fullyQualifiedNamespace, eventHubName, credential);
const subscription = consumer.subscribe({ processEvents: async (events, context) => { for (const event of events) { console.log(`Received: ${event.body}`); } }, processError: async (err, context) => { console.error(err); },});
// Call subscription.close() when done