Skip to content
Docs Try Aspire
Docs Try

Polyglot Task Queue

Aspire sample
TypeScript AppHost

Clone, run, and explore this sample

Distributed task processing with Python, C#, and Node.js workers using RabbitMQ and end-to-end OpenTelemetry tracing.

DashboardDockerJavaScriptMetricsNode.jsPythonRabbitMQTypeScript
AppHost

The entry point that composes every resource and dependency in this sample's distributed application.

View on GitHub
apphost.mts
import { ContainerLifetime, UrlDisplayLocation, createBuilder } from "./.aspire/modules/aspire.mjs";
const builder = await createBuilder();
const dc = await builder.addDockerComposeEnvironment("dc");
const rabbitmq = await builder.addRabbitMQ("messaging")
.withManagementPlugin()
.withComputeEnvironment(dc)
.withLifetime(ContainerLifetime.Persistent)
.withUrlForEndpoint("tcp", async (url) =>
{
url.displayLocation = UrlDisplayLocation.DetailsOnly;
})
.withUrlForEndpoint("management", async (url) =>
{
url.displayText = "RabbitMQ Management UI";
});
const api = await builder.addNodeApp("api", "./api", "index.js")
.withHttpEndpoint({ env: "PORT" })
.withHttpHealthCheck({ path: "/health" })
.withComputeEnvironment(dc)
.waitFor(rabbitmq)
.withReference(rabbitmq);
const frontend = await builder.addViteApp("frontend", "./frontend")
.withReference(api)
.withUrl("", { displayText: "Task Queue UI" })
.withBrowserLogs();
await builder.addPythonApp("worker-python", "./worker-python", "main.py")
.withUv()
.withComputeEnvironment(dc)
.waitFor(rabbitmq)
.withReference(rabbitmq);
await builder.addCSharpApp("worker-csharp", "./worker-csharp")
.withComputeEnvironment(dc)
.waitFor(rabbitmq)
.withReference(rabbitmq);
await api.publishWithContainerFiles(frontend, "public");
await builder.build().run();

Distributed task processing with Python, C#, and Node.js workers using RabbitMQ and end-to-end OpenTelemetry tracing.

graph TB
    subgraph Frontend
        UI[Vite React UI]
    end

    subgraph "Node.js API"
        HTTP[HTTP Endpoints]
        Cache[(In-Memory Cache)]
        PubChan[Publisher Channel]
        ConsChan[Consumer Channel]
    end

    subgraph RabbitMQ
        TasksQ[tasks queue]
        StatusQ[task_status queue]
        ResultsQ[results queue]
    end

    subgraph Workers
        PythonW[Python Worker<br/>pandas/numpy]
        CSharpW[C# Worker<br/>strong typing]
    end

    UI -->|HTTP| HTTP
    HTTP -->|Read| Cache
    HTTP -->|Publish| PubChan
    PubChan --> TasksQ
    PubChan --> StatusQ
    ConsChan -.->|Consume & Update| StatusQ
    ConsChan -.->|Consume & Update| ResultsQ
    ConsChan --> Cache
    TasksQ --> PythonW
    TasksQ --> CSharpW
    PythonW --> StatusQ
    PythonW --> ResultsQ
    CSharpW --> StatusQ
    CSharpW --> ResultsQ

RabbitMQ is the source of truth - API maintains in-memory cache built from consuming messages.

  • addRabbitMQ: Message queue with management plugin
  • addNodeApp: Express API with dual RabbitMQ channels (publisher + consumer)
  • addPythonApp: Python worker with pandas/numpy for data analysis
  • addCSharpApp: C# worker with strong typing for reports
  • addViteApp: React + TypeScript frontend
  • OpenTelemetry: End-to-end distributed tracing across all services
  • Trace Context Propagation: OpenTelemetry context through RabbitMQ headers
  • Messaging Semantic Conventions: Standardized span attributes for queue operations
  • Event-Driven State: Stateless API, RabbitMQ as source of truth
Terminal window
aspire run
Terminal window
aspire run # Run locally
aspire deploy # Deploy to Docker Compose
aspire do docker-compose-down-dc # Teardown deployment

This sample is intended for local demo use. RabbitMQ messages are treated as trusted internal messages, and the public HTTP endpoints are unauthenticated.

The Node.js API applies simple guardrails for the demo: JSON request bodies are limited to 64 KB, task type must be analyze or report, task data is capped at 10,000 characters, and GET /tasks returns the latest 100 tasks by default with ?limit= capped at 500. Production services should add explicit input schemas, authentication and authorization, rate limits, a RabbitMQ retry/dead-letter policy, and capacity limits for queues, stored results, and in-memory caches.

The workers acknowledge or drop invalid and failed messages instead of requeueing indefinitely. For production, configure bounded retries and a dead-letter exchange so poison messages can be inspected without blocking queue processing.

Relevant references:

RabbitMQ Setup - Message queue with management UI:

const rabbitmq = await builder.addRabbitMQ("messaging")
.withManagementPlugin()
.withLifetime(ContainerLifetime.Persistent)
.withUrlForEndpoint("management", async (url) =>
{
url.displayText = "RabbitMQ Management UI";
});

OpenTelemetry - Automatic configuration via environment variables:

  • OTEL_EXPORTER_OTLP_ENDPOINT: Aspire dashboard endpoint
  • OTEL_SERVICE_NAME: Service identifier for traces

Service References - Automatic connection injection:

const api = await builder.addNodeApp("api", "./api", "index.js")
.waitFor(rabbitmq)
.withReference(rabbitmq); // Injects MESSAGING_URI

Trace Context Propagation - OpenTelemetry context through message headers:

// Node.js - Inject context
propagation.inject(context.active(), headers);
channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), { headers });
// Extract context
const parentContext = propagation.extract(context.active(), msg.properties.headers);
# Python - Extract context
ctx = propagate.extract(dict(message.headers) if message.headers else {})
with tracer.start_as_current_span('process task', context=ctx):
# Process task

Messaging Semantic Conventions:

  • messaging.system: "rabbitmq"
  • messaging.destination.name: Queue name
  • messaging.operation: "publish" or "process"
Preview

Sample screenshots

Select the image to zoom in.

Screenshot of the Polyglot Task Queue sample UI
Screenshot of the Polyglot Task Queue sample UI