Here’s an example implementation of the Inbox-Outbox pattern in C# using ASP.NET Core and Entity Framework Core.

Inbox Pattern Example

  1. Inbox Entity: Define an entity to represent the inbox table in the database.
public class Inbox
{
    public long Id { get; set; }
    public string Message { get; set; }
    public bool Processed { get; set; }
    public DateTime ReceivedAt { get; set; }
}
  1. Inbox Repository: Create a repository to interact with the inbox table.
public interface IInboxRepository
{
    Task<List<Inbox>> GetUnprocessedMessagesAsync();
    Task SaveAsync(Inbox inbox);
}

public class InboxRepository : IInboxRepository
{
    private readonly ApplicationDbContext _context;

    public InboxRepository(ApplicationDbContext context)
    {
        _context = context;
    }

    public async Task<List<Inbox>> GetUnprocessedMessagesAsync()
    {
        return await _context.Inboxes.Where(i => !i.Processed).ToListAsync();
    }

    public async Task SaveAsync(Inbox inbox)
    {
        _context.Inboxes.Update(inbox);
        await _context.SaveChangesAsync();
    }
}
  1. Inbox Service: Implement a service to process the inbox messages.
public class InboxService
{
    private readonly IInboxRepository _inboxRepository;

    public InboxService(IInboxRepository inboxRepository)
    {
        _inboxRepository = inboxRepository;
    }

    public async Task ProcessInboxMessagesAsync()
    {
        var inboxMessages = await _inboxRepository.GetUnprocessedMessagesAsync();
        foreach (var inbox in inboxMessages)
        {
            // Process the message
            inbox.Processed = true;
            await _inboxRepository.SaveAsync(inbox);
        }
    }
}

Outbox Pattern Example

  1. Outbox Entity: Define an entity to represent the outbox table in the database.
public class Outbox
{
    public long Id { get; set; }
    public string Message { get; set; }
    public bool Sent { get; set; }
    public DateTime CreatedAt { get; set; }
}
  1. Outbox Repository: Create a repository to interact with the outbox table.
public interface IOutboxRepository
{
    Task<List<Outbox>> GetUnsentMessagesAsync();
    Task SaveAsync(Outbox outbox);
}

public class OutboxRepository : IOutboxRepository
{
    private readonly ApplicationDbContext _context;

    public OutboxRepository(ApplicationDbContext context)
    {
        _context = context;
    }

    public async Task<List<Outbox>> GetUnsentMessagesAsync()
    {
        return await _context.Outboxes.Where(o => !o.Sent).ToListAsync();
    }

    public async Task SaveAsync(Outbox outbox)
    {
        _context.Outboxes.Update(outbox);
        await _context.SaveChangesAsync();
    }
}
  1. Outbox Service: Implement a service to send the outbox messages.
public class OutboxService
{
    private readonly IOutboxRepository _outboxRepository;

    public OutboxService(IOutboxRepository outboxRepository)
    {
        _outboxRepository = outboxRepository;
    }

    public async Task ProcessOutboxMessagesAsync()
    {
        var outboxMessages = await _outboxRepository.GetUnsentMessagesAsync();
        foreach (var outbox in outboxMessages)
        {
            // Send the message
            outbox.Sent = true;
            await _outboxRepository.SaveAsync(outbox);
        }
    }
}

Integration

You can use a hosted service to periodically call the inbox and outbox service methods to process the messages.

public class MessageProcessingService : IHostedService, IDisposable
{
    private Timer _timer;
    private readonly InboxService _inboxService;
    private readonly OutboxService _outboxService;

    public MessageProcessingService(InboxService inboxService, OutboxService outboxService)
    {
        _inboxService = inboxService;
        _outboxService = outboxService;
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        _timer = new Timer(ProcessMessages, null, TimeSpan.Zero, TimeSpan.FromSeconds(5));
        return Task.CompletedTask;
    }

    private async void ProcessMessages(object state)
    {
        await _inboxService.ProcessInboxMessagesAsync();
        await _outboxService.ProcessOutboxMessagesAsync();
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _timer?.Change(Timeout.Infinite, 0);
        return Task.CompletedTask;
    }

    public void Dispose()
    {
        _timer?.Dispose();
    }
}

This implementation ensures that messages are reliably received and sent, providing data consistency across your microservices.

Related Posts