Přeskočit na hlavní obsah

The 1st method - AMQP Message Publisher / Consumer

Overview

  • This is the first way of communication:
    • To publish events of application domain services using message broker
    • To register subscriber and consume events of application domain services using message broker
    • AMQP API is defined by AMQP protocol
    • The event messages are supported by C# Contracts nuget package
  • Native backend service can publish messages by AMQP exchanges
  • Native backend service can consume messages by AMQP queues

See more: OIDC/OAuth2 authentication

Publish domain entity released event using message broker

amqp-message-publisher.png

  • Example - how to publish AMQP message(s):
    • Your AMQP API is defined by AMQP protocol
    • Publish event(s) to publish AMQP message(s)
    • See in example:
      • MessagePublishers/Example1_MyEntityEventMessagePublisher.cs
      • Controllers/Examples12Controller.cs
        • Example1_Publish_MyEntityReleased

Usage example:

using ASOL.Core.Messaging.Extensions;
using ASOL.Core.Multitenancy.Messaging.Extensions;
using MassTransit;

private readonly IRuntimeContext _runtimeContext;
private readonly ITenantContext _tenantContext;
private readonly IBus _bus;

/// <summary>
/// Example of common domain event publisher against platform message broker.
/// This example doesn't contain real implementation, it's only for demonstration purposes.
/// </summary>
public Task PublishMyEntityReleasedAsync(string myEntityId, CancellationToken ct = default)
{
//create event message
var message = new { MyEntityId = myEntityId };

// *** 1. publish message ***
//publish event message
return _bus.Publish<Example3_MyEntityReleased>(message, context =>
{
//set tenant context to message headers (optional)
context.Headers.SetTenant(_tenantContext);

//set user context to message headers (optional)
var user = _runtimeContext.Security.User;
if (ClaimsPrincipalExtensions.IsAuthenticated(user))
{
context.Headers.SetIdentity(user);
}
}, ct);

Consume domain entity released event using message broker

amqp-message-consumer.png

  • Example - how to consume AMQP message(s):
    • Register consumer(s) for event(s) to create AMQP message queue and to consume messages
    • Use HTTP client to obtain payload when a message is delivered
    • See in example:
      • MessageConsumers/Example1_OrderReleasedEventConsumer.cs

Registration example (Startup.cs):

//data-agent consumers configuration
services.AddDataAgentMessagingSubscriber(Configuration, RabbitMqConfigurationSectionName, cb =>
{
//registration of DataService events for data-agent/data-source management
// ...

//registration of assemblies with consumers
cb.RegisterConsumerAssemblies(typeof(Startup).Assembly);
}, sp =>
{
//configure data-agent queue(s)
var configureQueues = new List<(string ServiceName, Action<IRabbitMqReceiveEndpointConfigurator> Configure)>();

//data-agent data-events queue(s)
// ...

// *** 0. register consumer ***
//register queue and consumer(s) for domain services examples
var queueName = $"{dataAgentCode}-Examples";
Action<IRabbitMqReceiveEndpointConfigurator> configure = ep =>
{
ep.Consumer<Example1_MyEntityReleasedEventConsumer>(sp);
};
configureQueues.Add((queueName, configure));

return configureQueues;
});
services.AddMassTransitHostedService();

Usage example:

using ASOL.Core.Identity;
using MassTransit;

private readonly IRuntimeContext _runtimeContext;

/// <summary>
/// Example of common domain event consumer against platform message broker.
/// This example doesn't contain real implementation, it's only for demonstration purposes.
/// </summary>
public async Task Consume(ConsumeContext<Example1_MyEntityReleased> context)
{
//(optional) check tenant context of received message (exception is thrown if not available)
_ = _runtimeContext.Security.TenantId;

//(optional) check user context of received message (exception is thrown if not available)
_ = _runtimeContext.Security.UserId;

// *** 1. consume message ***
var myEntityReleased = context.Message;

//... evaluate if event should be processed or ignored

// *** 2. [authenticate +] call endpoint ***
//... get details from domain service or persistence storage using MyEntity identifier

//... process delivered message with retrieved payload
}