Dans l’article précédent, nous avons vu le Pub/Sub Dapr avec des souscriptions HTTP (Minimal API, contrôleurs). Mais Dapr supporte aussi la communication gRPC entre le sidecar et l’application, y compris pour la livraison des messages Pub/Sub. gRPC apporte la sérialisation binaire (Protobuf), des contrats typés et de meilleures performances, ce qui est particulièrement intéressant pour les systèmes à haut débit. Dans cet article, on met en place un système Pub/Sub complet en gRPC.
Cet article fait partie de la série Dapr pour les développeurs .NET : 5 sur 6.
- Part 1 - Présentation de Dapr : le runtime pour applications distribuées
- Part 2 - Dapr : l'invocation de service en .NET
- Part 3 - Dapr : invocation de service gRPC en .NET
- Part 4 - Dapr : la gestion d'état (State Management) en .NET
- Part 5 - Cet article
- Part 6 - Dapr : le Pub/Sub (Publish & Subscribe) en .NET
Rappel : HTTP vs gRPC pour le Pub/Sub
Avec le Pub/Sub HTTP (article précédent), le sidecar Dapr appelle un endpoint HTTP de votre application pour lui livrer les messages :
Sidecar → POST http://localhost:<app-port>/order-created → Application
Avec le Pub/Sub gRPC, le sidecar communique avec votre application via un service gRPC Dapr que votre application implémente. Le sidecar appelle la méthode OnTopicEvent de ce service pour livrer chaque message :
Sidecar → gRPC OnTopicEvent → Application
graph LR
Broker["Broker<br/>(RabbitMQ, Kafka...)"]
Sidecar["Sidecar Dapr"]
App["Application .NET<br/>(gRPC server)"]
Broker -->|Distribue| Sidecar
Sidecar -->|gRPC<br/>OnTopicEvent| App
style Broker fill:#D0021B
style Sidecar fill:#F5A623
style App fill:#4A90E2
Les deux protocoles sont interchangeables : le publisher peut publier en HTTP et le subscriber recevoir en gRPC (ou inversement). Le broker et l’API de publication restent identiques. Seule la livraison au subscriber change.
Quand utiliser gRPC plutôt que HTTP ?
| Critère | HTTP | gRPC |
|---|---|---|
| Simplicité | Plus simple (Minimal API, [Topic]) |
Plus de code (Protobuf, service gRPC) |
| Performance | Bon (JSON, HTTP/1.1) | Meilleur (binaire, HTTP/2) |
| Débit élevé | Suffisant pour la plupart des cas | Préférable pour les volumes importants |
| Contrat | Implicite (sérialisation JSON) | Explicite (fichier .proto) |
| Streaming | Non supporté | Possible (server streaming) |
| Cas d’usage | La majorité des applications | Systèmes à haut débit, événements volumineux |
En pratique, le mode HTTP convient à la majorité des scénarios. L’utilisation de gRPC se justifie principalement quand :
- Vous traitez un volume élevé de messages et la performance de sérialisation/désérialisation est critique.
- Vous utilisez déjà des services gRPC et souhaitez unifier le protocole.
- Vous avez besoin de contrats Protobuf partagés pour vos événements.
Architecture de l’exemple
On va construire un système Pub/Sub gRPC complet :
- OrderService : publie des événements
OrderCreated(en HTTP ou gRPC — la publication ne change pas). - PaymentService : subscriber gRPC qui reçoit les événements
OrderCreatedet simule un traitement de paiement.
graph LR
OS["OrderService<br/>(Publisher)"]
OSS["Sidecar Order"]
Broker["Broker<br/>(Redis)"]
PSS["Sidecar Payment"]
PS["PaymentService<br/>(Subscriber gRPC)"]
OS -->|PublishEvent<br/>localhost| OSS
OSS -->|Publie| Broker
Broker -->|Distribue| PSS
PSS -->|gRPC<br/>OnTopicEvent| PS
style OS fill:#4A90E2
style OSS fill:#F5A623
style Broker fill:#D0021B
style PSS fill:#F5A623
style PS fill:#7ED321
Étape 1 : Le publisher (identique à HTTP)
La publication d’événements ne change pas, qu’on utilise HTTP ou gRPC côté subscriber. Le publisher utilise toujours DaprClient.PublishEventAsync :
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDaprClient();
var app = builder.Build();
app.MapPost("/orders", async (CreateOrderRequest request, DaprClient dapr) =>
{
var order = new Order
{
Id = Guid.NewGuid().ToString(),
CustomerId = request.CustomerId,
TotalAmount = request.TotalAmount,
Status = "Created",
CreatedAt = DateTime.UtcNow
};
await dapr.PublishEventAsync("pubsub", "orders", new OrderCreated
{
OrderId = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.TotalAmount,
CreatedAt = order.CreatedAt
});
return Results.Created($"/orders/{order.Id}", order);
});
app.Run();
Étape 2 : Le subscriber gRPC
Créer le projet
dotnet new grpc -n PaymentService
cd PaymentService
dotnet add package Dapr.AspNetCore
dotnet add package Dapr.Client
Comprendre le contrat Dapr AppCallback
Dapr fournit un fichier proto qui définit le service AppCallback. C’est ce service que votre application doit implémenter pour recevoir les événements via gRPC. Les méthodes clés sont :
ListTopicSubscriptions: appelée par le sidecar au démarrage pour connaître les topics auxquels l’application est abonnée.OnTopicEvent: appelée par le sidecar pour chaque message reçu.
Le fichier proto de Dapr (appcallback.proto) définit ces méthodes :
syntax = "proto3";
package dapr.proto.runtime.v1;
import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";
import "google/protobuf/struct.proto";
// AppCallback est le service que l'application doit implémenter
service AppCallback {
// Appelé par le sidecar pour connaître les souscriptions
rpc ListTopicSubscriptions(google.protobuf.Empty)
returns (ListTopicSubscriptionsResponse);
// Appelé par le sidecar pour chaque événement reçu
rpc OnTopicEvent(TopicEventRequest) returns (TopicEventResponse);
}
message TopicEventRequest {
string id = 1; // ID unique de l'événement (CloudEvents)
string source = 2; // Source de l'événement
string type = 3; // Type de l'événement
string spec_version = 4; // Version CloudEvents
string data_content_type = 5; // Content-type du payload
bytes data = 6; // Le payload sérialisé
string topic = 7; // Le topic d'où vient le message
string pubsub_name = 8; // Le nom du composant pub/sub
string path = 9; // Le chemin de routage
map<string, string> extensions = 10; // Extensions CloudEvents
}
message TopicEventResponse {
TopicEventResponseStatus status = 1;
}
enum TopicEventResponseStatus {
SUCCESS = 0; // Message traité avec succès
RETRY = 1; // Demander au sidecar de re-livrer
DROP = 2; // Abandonner le message
}
message TopicSubscription {
string pubsub_name = 1;
string topic = 2;
map<string, string> metadata = 3;
TopicRoutes routes = 4;
string dead_letter_topic = 5;
BulkSubscribeConfig bulk_subscribe = 6;
}
message TopicRoutes {
repeated TopicRule rules = 1;
string default = 2;
}
message TopicRule {
string match = 1;
string path = 2;
}
message BulkSubscribeConfig {
bool enabled = 1;
int32 max_messages_count = 2;
int32 max_await_duration_ms = 3;
}
message ListTopicSubscriptionsResponse {
repeated TopicSubscription subscriptions = 1;
}
Vous n’avez pas besoin de copier ce fichier manuellement : le package NuGet
Dapr.AspNetCoreinclut déjà les protos Dapr et le code généré.
Implémenter le service AppCallback
Le SDK Dapr pour .NET fournit la classe de base AppCallback.AppCallbackBase qu’on peut surcharger :
using System.Text.Json;
using Dapr.AppCallback.Autogen.Grpc.v1;
using Dapr.Client.Autogen.Grpc.v1;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
namespace PaymentService.Services;
public class DaprSubscriberService : AppCallback.AppCallbackBase
{
private readonly ILogger<DaprSubscriberService> _logger;
public DaprSubscriberService(ILogger<DaprSubscriberService> logger)
{
_logger = logger;
}
/// <summary>
/// Déclaration des souscriptions auprès du sidecar.
/// Appelée automatiquement par le sidecar au démarrage.
/// </summary>
public override Task<ListTopicSubscriptionsResponse> ListTopicSubscriptions(
Empty request, ServerCallContext context)
{
var response = new ListTopicSubscriptionsResponse();
// S'abonner au topic "orders" du composant "pubsub"
response.Subscriptions.Add(new TopicSubscription
{
PubsubName = "pubsub",
Topic = "orders",
DeadLetterTopic = "orders-deadletter"
});
// S'abonner au topic "payments" du composant "pubsub"
response.Subscriptions.Add(new TopicSubscription
{
PubsubName = "pubsub",
Topic = "payments"
});
return Task.FromResult(response);
}
/// <summary>
/// Réception d'un événement. Appelée par le sidecar pour chaque message.
/// </summary>
public override async Task<TopicEventResponse> OnTopicEvent(
TopicEventRequest request, ServerCallContext context)
{
_logger.LogInformation(
"Événement reçu - Topic: {Topic}, Source: {Source}, Id: {Id}",
request.Topic, request.Source, request.Id);
try
{
return request.Topic switch
{
"orders" => await HandleOrderCreatedAsync(request),
"payments" => await HandlePaymentEventAsync(request),
_ => new TopicEventResponse
{
Status = TopicEventResponse.Types.TopicEventResponseStatus.Drop
}
};
}
catch (Exception ex)
{
_logger.LogError(ex, "Erreur lors du traitement de l'événement {Id}", request.Id);
// Demander au sidecar de re-livrer le message
return new TopicEventResponse
{
Status = TopicEventResponse.Types.TopicEventResponseStatus.Retry
};
}
}
private async Task<TopicEventResponse> HandleOrderCreatedAsync(TopicEventRequest request)
{
// Désérialiser le payload depuis les bytes
var orderCreated = JsonSerializer.Deserialize<OrderCreated>(
request.Data.ToStringUtf8(),
new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
if (orderCreated is null)
{
return new TopicEventResponse
{
Status = TopicEventResponse.Types.TopicEventResponseStatus.Drop
};
}
_logger.LogInformation(
"Traitement du paiement pour la commande {OrderId}, montant : {Amount}€",
orderCreated.OrderId, orderCreated.TotalAmount);
// Simuler le traitement du paiement
await Task.Delay(200);
_logger.LogInformation("Paiement validé pour la commande {OrderId}",
orderCreated.OrderId);
return new TopicEventResponse
{
Status = TopicEventResponse.Types.TopicEventResponseStatus.Success
};
}
private async Task<TopicEventResponse> HandlePaymentEventAsync(TopicEventRequest request)
{
_logger.LogInformation("Événement de paiement reçu : {Data}",
request.Data.ToStringUtf8());
await Task.CompletedTask;
return new TopicEventResponse
{
Status = TopicEventResponse.Types.TopicEventResponseStatus.Success
};
}
}
Configurer Program.cs
using PaymentService.Services;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddGrpc();
var app = builder.Build();
// Enregistrer le service AppCallback pour le sidecar Dapr
app.MapGrpcService<DaprSubscriberService>();
app.Run();
Configuration Kestrel
Le service doit écouter en HTTP/2 (requis par gRPC) :
{
"Kestrel": {
"Endpoints": {
"Grpc": {
"Url": "http://localhost:5010",
"Protocols": "Http2"
}
}
}
}
On utilise
http(pashttps) car le sidecar Dapr gère le mTLS entre les services. La communication entre l’application et son propre sidecar reste locale.
Souscriptions avancées en gRPC
Souscription avec routage
On peut ajouter des règles de routage directement dans ListTopicSubscriptions :
public override Task<ListTopicSubscriptionsResponse> ListTopicSubscriptions(
Empty request, ServerCallContext context)
{
var response = new ListTopicSubscriptionsResponse();
response.Subscriptions.Add(new TopicSubscription
{
PubsubName = "pubsub",
Topic = "orders",
Routes = new TopicRoutes
{
Default = "default",
Rules =
{
new TopicRule
{
Match = "event.data.status == \"created\"",
Path = "created"
},
new TopicRule
{
Match = "event.data.status == \"paid\"",
Path = "paid"
}
}
}
});
return Task.FromResult(response);
}
Le champ Path de la règle est ensuite accessible dans TopicEventRequest.Path, ce qui permet de dispatcher dans OnTopicEvent :
public override async Task<TopicEventResponse> OnTopicEvent(
TopicEventRequest request, ServerCallContext context)
{
return request.Path switch
{
"created" => await HandleOrderCreatedAsync(request),
"paid" => await HandleOrderPaidAsync(request),
"default" => await HandleDefaultAsync(request),
_ => new TopicEventResponse
{
Status = TopicEventResponse.Types.TopicEventResponseStatus.Drop
}
};
}
Souscription avec métadonnées
On peut ajouter des métadonnées à la souscription (par exemple, pour spécifier un consumer group Kafka) :
var subscription = new TopicSubscription
{
PubsubName = "pubsub",
Topic = "orders"
};
subscription.Metadata.Add("consumerGroup", "payment-processors");
subscription.Metadata.Add("maxConcurrency", "10");
response.Subscriptions.Add(subscription);
Souscription avec dead letter topic
response.Subscriptions.Add(new TopicSubscription
{
PubsubName = "pubsub",
Topic = "orders",
DeadLetterTopic = "orders-deadletter"
});
// S'abonner aussi au dead letter topic pour monitoring
response.Subscriptions.Add(new TopicSubscription
{
PubsubName = "pubsub",
Topic = "orders-deadletter"
});
Souscription en bulk
Pour recevoir les messages par lots :
response.Subscriptions.Add(new TopicSubscription
{
PubsubName = "pubsub",
Topic = "orders",
BulkSubscribe = new BulkSubscribeConfig
{
Enabled = true,
MaxMessagesCount = 50,
MaxAwaitDurationMs = 1000
}
});
Pattern : séparation par handlers typés
Pour un code plus maintenable, on peut introduire un pattern de dispatch typé :
Interface de handler
public interface ITopicHandler<TEvent>
{
string PubSubName { get; }
string TopicName { get; }
Task<TopicEventResponse.Types.TopicEventResponseStatus> HandleAsync(
TEvent evt, CancellationToken ct);
}
Implémentation des handlers
public class OrderCreatedHandler : ITopicHandler<OrderCreated>
{
private readonly ILogger<OrderCreatedHandler> _logger;
public OrderCreatedHandler(ILogger<OrderCreatedHandler> logger)
{
_logger = logger;
}
public string PubSubName => "pubsub";
public string TopicName => "orders";
public async Task<TopicEventResponse.Types.TopicEventResponseStatus> HandleAsync(
OrderCreated evt, CancellationToken ct)
{
_logger.LogInformation(
"Traitement du paiement pour {OrderId} ({Amount}€)",
evt.OrderId, evt.TotalAmount);
// Logique de paiement...
await Task.Delay(100, ct);
return TopicEventResponse.Types.TopicEventResponseStatus.Success;
}
}
public class PaymentConfirmedHandler : ITopicHandler<PaymentConfirmed>
{
private readonly ILogger<PaymentConfirmedHandler> _logger;
public PaymentConfirmedHandler(ILogger<PaymentConfirmedHandler> logger)
{
_logger = logger;
}
public string PubSubName => "pubsub";
public string TopicName => "payment-confirmed";
public async Task<TopicEventResponse.Types.TopicEventResponseStatus> HandleAsync(
PaymentConfirmed evt, CancellationToken ct)
{
_logger.LogInformation("Paiement {PaymentId} confirmé", evt.PaymentId);
await Task.CompletedTask;
return TopicEventResponse.Types.TopicEventResponseStatus.Success;
}
}
Service AppCallback avec dispatch
public class DaprSubscriberService : AppCallback.AppCallbackBase
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<DaprSubscriberService> _logger;
// Mapping topic → (type événement, type handler)
private static readonly Dictionary<string, (Type EventType, Type HandlerType)> _topicMap = new()
{
["orders"] = (typeof(OrderCreated), typeof(ITopicHandler<OrderCreated>)),
["payment-confirmed"] = (typeof(PaymentConfirmed), typeof(ITopicHandler<PaymentConfirmed>))
};
public DaprSubscriberService(
IServiceProvider serviceProvider,
ILogger<DaprSubscriberService> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
}
public override Task<ListTopicSubscriptionsResponse> ListTopicSubscriptions(
Empty request, ServerCallContext context)
{
var response = new ListTopicSubscriptionsResponse();
foreach (var (topic, _) in _topicMap)
{
response.Subscriptions.Add(new TopicSubscription
{
PubsubName = "pubsub",
Topic = topic
});
}
return Task.FromResult(response);
}
public override async Task<TopicEventResponse> OnTopicEvent(
TopicEventRequest request, ServerCallContext context)
{
if (!_topicMap.TryGetValue(request.Topic, out var mapping))
{
_logger.LogWarning("Topic inconnu : {Topic}", request.Topic);
return new TopicEventResponse
{
Status = TopicEventResponse.Types.TopicEventResponseStatus.Drop
};
}
try
{
// Désérialiser l'événement dans le type attendu
var evt = JsonSerializer.Deserialize(
request.Data.ToStringUtf8(),
mapping.EventType,
new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
if (evt is null)
{
return new TopicEventResponse
{
Status = TopicEventResponse.Types.TopicEventResponseStatus.Drop
};
}
// Résoudre le handler depuis le conteneur DI
var handler = _serviceProvider.GetRequiredService(mapping.HandlerType);
// Appeler HandleAsync via réflexion
var method = mapping.HandlerType.GetMethod("HandleAsync")!;
var resultTask = (Task<TopicEventResponse.Types.TopicEventResponseStatus>)
method.Invoke(handler, [evt, context.CancellationToken])!;
var status = await resultTask;
return new TopicEventResponse { Status = status };
}
catch (Exception ex)
{
_logger.LogError(ex, "Erreur sur le topic {Topic}", request.Topic);
return new TopicEventResponse
{
Status = TopicEventResponse.Types.TopicEventResponseStatus.Retry
};
}
}
}
Enregistrement DI
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddGrpc();
// Enregistrer les handlers
builder.Services.AddScoped<ITopicHandler<OrderCreated>, OrderCreatedHandler>();
builder.Services.AddScoped<ITopicHandler<PaymentConfirmed>, PaymentConfirmedHandler>();
var app = builder.Build();
app.MapGrpcService<DaprSubscriberService>();
app.Run();
Ce pattern sépare proprement les responsabilités : le service AppCallback ne fait que du dispatch, et chaque handler contient la logique métier de son événement.
Événements Protobuf (au lieu de JSON)
Si vous voulez aller au bout de la logique gRPC, vous pouvez définir vos événements en Protobuf :
Définir les événements dans un fichier proto
syntax = "proto3";
option csharp_namespace = "SharedContracts";
package events;
message OrderCreatedEvent {
string order_id = 1;
string customer_id = 2;
double total_amount = 3;
int64 created_at_unix = 4; // Timestamp Unix en secondes
repeated OrderItemEvent items = 5;
}
message OrderItemEvent {
int32 product_id = 1;
string name = 2;
double price = 3;
int32 quantity = 4;
}
message PaymentConfirmedEvent {
string payment_id = 1;
string order_id = 2;
double amount = 3;
string payment_method = 4;
int64 confirmed_at_unix = 5;
}
Publier un événement Protobuf
Pour publier un message Protobuf via Dapr, on le sérialise en bytes et on utilise l’API HTTP du sidecar avec le content-type approprié :
public async Task PublishProtobufEventAsync(OrderCreatedEvent evt)
{
// Sérialiser en bytes Protobuf
var bytes = evt.ToByteArray();
// Publier via l'API HTTP du sidecar avec le content-type Protobuf
using var httpClient = new HttpClient();
var content = new ByteArrayContent(bytes);
content.Headers.ContentType =
new System.Net.Http.Headers.MediaTypeHeaderValue("application/octet-stream");
var daprPort = Environment.GetEnvironmentVariable("DAPR_HTTP_PORT") ?? "3500";
var response = await httpClient.PostAsync(
$"http://localhost:{daprPort}/v1.0/publish/pubsub/orders?metadata.rawPayload=true",
content);
response.EnsureSuccessStatusCode();
}
Recevoir un événement Protobuf
Côté subscriber gRPC, on désérialise les bytes Protobuf depuis TopicEventRequest.Data :
private async Task<TopicEventResponse> HandleOrderCreatedProtobufAsync(
TopicEventRequest request)
{
// Désérialiser les bytes Protobuf
var orderCreated = OrderCreatedEvent.Parser.ParseFrom(request.Data);
_logger.LogInformation(
"Commande {OrderId} reçue (Protobuf), montant : {Amount}€, {ItemCount} articles",
orderCreated.OrderId,
orderCreated.TotalAmount,
orderCreated.Items.Count);
// Traitement...
await Task.Delay(100);
return new TopicEventResponse
{
Status = TopicEventResponse.Types.TopicEventResponseStatus.Success
};
}
L’avantage de Protobuf : la sérialisation/désérialisation est beaucoup plus rapide et les messages sont plus compacts que le JSON, ce qui réduit la bande passante et la latence, surtout à haut débit.
Combiner publication gRPC et souscription gRPC
On peut aussi publier des événements via l’API gRPC du sidecar au lieu de l’API HTTP :
using Dapr.Client;
public class EventPublisher
{
private readonly DaprClient _daprClient;
public EventPublisher(DaprClient daprClient)
{
_daprClient = daprClient;
}
public async Task PublishOrderCreatedAsync(OrderCreated order)
{
// DaprClient utilise gRPC par défaut pour communiquer avec le sidecar
await _daprClient.PublishEventAsync("pubsub", "orders", order);
}
}
Pour forcer l’utilisation de gRPC côté DaprClient :
builder.Services.AddDaprClient(daprBuilder =>
{
var grpcPort = Environment.GetEnvironmentVariable("DAPR_GRPC_PORT") ?? "50001";
daprBuilder.UseGrpcEndpoint($"http://localhost:{grpcPort}");
});
Exemple complet : PaymentService gRPC
Voici le service complet avec handler, publication de réponse, et monitoring :
Program.cs
using PaymentService.Services;
using PaymentService.Handlers;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddGrpc();
builder.Services.AddDaprClient();
// Enregistrer les handlers
builder.Services.AddScoped<ITopicHandler<OrderCreated>, OrderPaymentHandler>();
var app = builder.Build();
app.MapGrpcService<DaprSubscriberService>();
app.Run();
Le handler métier
using Dapr.Client;
namespace PaymentService.Handlers;
public class OrderPaymentHandler : ITopicHandler<OrderCreated>
{
private readonly DaprClient _daprClient;
private readonly ILogger<OrderPaymentHandler> _logger;
public OrderPaymentHandler(DaprClient daprClient, ILogger<OrderPaymentHandler> logger)
{
_daprClient = daprClient;
_logger = logger;
}
public string PubSubName => "pubsub";
public string TopicName => "orders";
public async Task<TopicEventResponse.Types.TopicEventResponseStatus> HandleAsync(
OrderCreated evt, CancellationToken ct)
{
_logger.LogInformation(
"Début du traitement de paiement pour la commande {OrderId}", evt.OrderId);
// Vérifier l'idempotence via le state store
var alreadyProcessed = await _daprClient.GetStateAsync<bool>(
"statestore", $"payment-processed-{evt.OrderId}", cancellationToken: ct);
if (alreadyProcessed)
{
_logger.LogInformation(
"Commande {OrderId} déjà traitée, skip", evt.OrderId);
return TopicEventResponse.Types.TopicEventResponseStatus.Success;
}
// Simuler le traitement du paiement
var paymentResult = await ProcessPaymentAsync(evt, ct);
if (paymentResult.Success)
{
// Marquer comme traité
await _daprClient.SaveStateAsync(
"statestore", $"payment-processed-{evt.OrderId}", true,
cancellationToken: ct);
// Publier un événement de confirmation
await _daprClient.PublishEventAsync("pubsub", "payment-confirmed",
new PaymentConfirmed
{
PaymentId = paymentResult.PaymentId,
OrderId = evt.OrderId,
Amount = evt.TotalAmount,
ConfirmedAt = DateTime.UtcNow
}, ct);
_logger.LogInformation(
"Paiement {PaymentId} confirmé pour la commande {OrderId}",
paymentResult.PaymentId, evt.OrderId);
return TopicEventResponse.Types.TopicEventResponseStatus.Success;
}
_logger.LogWarning(
"Échec du paiement pour la commande {OrderId} : {Reason}",
evt.OrderId, paymentResult.FailureReason);
// Publier un événement d'échec
await _daprClient.PublishEventAsync("pubsub", "payment-failed",
new PaymentFailed
{
OrderId = evt.OrderId,
Reason = paymentResult.FailureReason ?? "Unknown"
}, ct);
// Ne pas retrier : le paiement a échoué pour une raison métier
return TopicEventResponse.Types.TopicEventResponseStatus.Success;
}
private async Task<PaymentResult> ProcessPaymentAsync(
OrderCreated order, CancellationToken ct)
{
// Simulation de traitement
await Task.Delay(500, ct);
return new PaymentResult
{
Success = true,
PaymentId = Guid.NewGuid().ToString()
};
}
}
Les modèles
public record OrderCreated
{
public string OrderId { get; init; } = string.Empty;
public string CustomerId { get; init; } = string.Empty;
public decimal TotalAmount { get; init; }
public DateTime CreatedAt { get; init; }
}
public record PaymentConfirmed
{
public string PaymentId { get; init; } = string.Empty;
public string OrderId { get; init; } = string.Empty;
public decimal Amount { get; init; }
public DateTime ConfirmedAt { get; init; }
}
public record PaymentFailed
{
public string OrderId { get; init; } = string.Empty;
public string Reason { get; init; } = string.Empty;
}
public record PaymentResult
{
public bool Success { get; init; }
public string PaymentId { get; init; } = string.Empty;
public string? FailureReason { get; init; }
}
Lancement en local
# Terminal 1 : OrderService (publisher HTTP)
dapr run --app-id order-service --app-port 5000 \
--resources-path ./components -- dotnet run --project OrderService
# Terminal 2 : PaymentService (subscriber gRPC)
dapr run --app-id payment-service \
--app-port 5010 \
--app-protocol grpc \
--resources-path ./components \
-- dotnet run --project PaymentService
Le flag --app-protocol grpc est essentiel : il indique au sidecar que l’application derrière lui communique en gRPC. Sans ce flag, le sidecar tentera de livrer les messages en HTTP, ce qui échouera.
Tester
# Créer une commande
curl -X POST http://localhost:5000/orders \
-H "Content-Type: application/json" \
-d '{ "customerId": "cust-42", "totalAmount": 149.99 }'
Dans les logs du PaymentService, vous devriez voir :
info: PaymentService.Handlers.OrderPaymentHandler
Début du traitement de paiement pour la commande abc-123
info: PaymentService.Handlers.OrderPaymentHandler
Paiement def-456 confirmé pour la commande abc-123
Lancement avec .NET Aspire
var builder = DistributedApplication.CreateBuilder(args);
var pubSub = builder.AddDaprPubSub("pubsub");
var stateStore = builder.AddDaprStateStore("statestore");
builder.AddProject<Projects.OrderService>("order-service")
.WithDaprSidecar()
.WithReference(pubSub)
.WithReference(stateStore);
builder.AddProject<Projects.PaymentService>("payment-service")
.WithDaprSidecar(new DaprSidecarOptions
{
AppProtocol = "grpc"
})
.WithReference(pubSub)
.WithReference(stateStore);
builder.Build().Run();
HTTP vs gRPC : récapitulatif Pub/Sub
| Aspect | Pub/Sub HTTP | Pub/Sub gRPC |
|---|---|---|
| Souscription | [Topic] attribut, Minimal API |
ListTopicSubscriptions override |
| Réception | Endpoint HTTP POST | OnTopicEvent override |
| Sérialisation | JSON automatique | JSON ou Protobuf (manuel) |
| Dispatch | Routage ASP.NET Core natif | Switch/dispatch dans OnTopicEvent |
| Performance | Bon | Meilleur (HTTP/2 binaire) |
| Simplicité | Plus simple | Plus de code |
| Flag sidecar | --app-protocol http (défaut) |
--app-protocol grpc |
| Cas d’usage | Majorité des applications | Haut débit, écosystème gRPC existant |
Résumé
| Aspect | Détail |
|---|---|
| Protocole | gRPC entre le sidecar et l’application subscriber |
| Service | Implémenter AppCallback.AppCallbackBase (Dapr SDK) |
| Souscriptions | ListTopicSubscriptions : déclare les topics, routes, dead letters |
| Réception | OnTopicEvent : reçoit chaque message avec topic, payload, métadonnées |
| Réponse | Success (ACK), Retry (re-livraison), Drop (abandon) |
| Sérialisation | JSON par défaut, Protobuf possible pour de meilleures performances |
| Flag | --app-protocol grpc obligatoire au lancement du subscriber |
| Routage | Règles de routage via TopicRoutes dans la souscription |
| Publication | Identique (HTTP ou gRPC via DaprClient.PublishEventAsync) |
| Aspire | AppProtocol = "grpc" dans DaprSidecarOptions |
Le Pub/Sub gRPC de Dapr est le choix naturel pour les services déjà construits autour de gRPC ou pour les scénarios à haut débit où la performance de sérialisation et la compacité des messages font la différence, tout en conservant les mêmes garanties de livraison et la même portabilité entre brokers que le mode HTTP.