Practical Applications of the Inbox-Outbox Pattern

Here is a simple example of an outbox pattern implementation in C#. This example demonstrates how to create an outbox table, write messages to it, and process the messages to ensure reliable delivery.


Database Schema

First, create an OutboxMessages table in your database:

CREATE TABLE OutboxMessages (
    Id UNIQUEIDENTIFIER PRIMARY KEY,
    EventType NVARCHAR(256),
    Source NVARCHAR(256),
    Time DATETIMEOFFSET,
    DataContentType NVARCHAR(256),
    Data NVARCHAR(MAX),
    Processed BIT DEFAULT 0
);

OutboxMessage Class

Next, create a class to represent the outbox message:

using System;

public class OutboxMessage
{
    public Guid Id { get; set; }
    public string EventType { get; set; }
    public string Source { get; set; }
    public DateTimeOffset Time { get; set; }
    public string DataContentType { get; set; }
    public string Data { get; set; }
    public bool Processed { get; set; }
    
    public OutboxMessage()
    {
        Id = Guid.NewGuid();
        Time = DateTimeOffset.UtcNow;
        Processed = false;
    }
}

Repository Class

Create a repository class to handle database operations:

using System.Collections.Generic;
using System.Data.SqlClient;
using Dapper;

public class OutboxRepository
{
    private readonly string _connectionString;

    public OutboxRepository(string connectionString)
    {
        _connectionString = connectionString;
    }

    public void Save(OutboxMessage message)
    {
        using var connection = new SqlConnection(_connectionString);
        var query = "INSERT INTO OutboxMessages (Id, EventType, Source, Time, DataContentType, Data, Processed) VALUES (@Id, @EventType, @Source, @Time, @DataContentType, @Data, @Processed)";
        connection.Execute(query, message);
    }

    public IEnumerable<OutboxMessage> GetUnprocessedMessages()
    {
        using var connection = new SqlConnection(_connectionString);
        var query = "SELECT * FROM OutboxMessages WHERE Processed = 0";
        return connection.Query<OutboxMessage>(query);
    }

    public void MarkAsProcessed(Guid id)
    {
        using var connection = new SqlConnection(_connectionString);
        var query = "UPDATE OutboxMessages SET Processed = 1 WHERE Id = @Id";
        connection.Execute(query, new { Id = id });
    }
}

Message Processor

Create a message processor to handle and process the outbox messages:

using System;

public class OutboxMessageProcessor
{
    private readonly OutboxRepository _repository;

    public OutboxMessageProcessor(OutboxRepository repository)
    {
        _repository = repository;
    }

    public void ProcessMessages()
    {
        var messages = _repository.GetUnprocessedMessages();
        
        foreach (var message in messages)
        {
            // Process the message (e.g., send it to an external service)
            Console.WriteLine($"Processing message: {message.Id}, EventType: {message.EventType}");
            
            // Mark the message as processed
            _repository.MarkAsProcessed(message.Id);
        }
    }
}

Usage Example

Finally, here’s how you can use the classes to save and process messages:

class Program
{
    static void Main()
    {
        string connectionString = "YourConnectionStringHere";
        var repository = new OutboxRepository(connectionString);
        
        // Create a new outbox message
        var message = new OutboxMessage
        {
            EventType = "com.example.event",
            Source = "https://example.com/source",
            DataContentType = "application/json",
            Data = "{\"example\":\"data\"}"
        };
        
        // Save the message to the outbox table
        repository.Save(message);
        
        // Process the outbox messages
        var processor = new OutboxMessageProcessor(repository);
        processor.ProcessMessages();
    }
}

Conclusion

In this example:

  • The OutboxMessage class represents the outbox messages with common properties.
  • The OutboxRepository class handles database operations for saving, retrieving, and updating outbox messages.
  • The OutboxMessageProcessor class processes unprocessed messages and marks them as processed after handling.
  • The Program class demonstrates how to create, save, and process outbox messages using the above classes.

This approach illustrates how the inbox and outbox pattern can enhance reliability and ensure message delivery in distributed systems.


Related Posts