Persist First Event

Create Ids, commands and events

First, we need to add a UserId and TaskId to have the Identifications of these two entities

[DataContract(Name = "d5e50e1f-5886-4608-9361-9fe0eb440a6b")]
public class TaskId : AggregateRootId
{
    TaskId() { }

    public TaskId(string id) : base("tenant", "task", id) { }
}

Then we need to create a Cronus command for task creation and an Event that will indicate that the event has occurred.

[DataContract(Name = "857d960c-4b91-49cc-98fd-fa543906c52d")]
public class CreateTask : ICommand
{
    public CreateTask() { }

    public CreateTask(TaskId id, UserId userId, string name, DateTimeOffset timestamp)
    {
        if (id is null) throw new ArgumentNullException(nameof(id));
        if (userId is null) throw new ArgumentNullException(nameof(userId));
        if (name is null) throw new ArgumentNullException(nameof(name));
        if (timestamp == default) throw new ArgumentNullException(nameof(timestamp));

        Id = id;
        UserId = userId;
        Name = name;
        Timestamp = timestamp;
    }

    [DataMember(Order = 1)]
    public TaskId Id { get; private set; }

    [DataMember(Order = 2)]
    public UserId UserId { get; private set; }

    [DataMember(Order = 3)]
    public string Name { get; private set; }

    [DataMember(Order = 4)]
    public DateTimeOffset Timestamp { get; private set; }

    public override string ToString()
    {
        return $"Create a task with id '{Id}' and name '{Name}' for user [{UserId}].";
    }
}

Create an Aggregate and Application Service

Add Aggregate that inherits AggregateRoot with the generic state.

public class TaskAggregate : AggregateRoot<TaskState>
{
    public TaskAggregate() { }

    public void CreateTask(TaskId id, UserId userId, string name, DateTimeOffset deadline)
    {
        IEvent @event = new TaskCreated(id, userId, name, deadline);
        Apply(@event);
    }
}

Apply method will pass the event to the state of an aggregate and change its state.

public class TaskState : AggregateRootState<TaskAggregate, TaskId>
{
    public override TaskId Id { get; set; }

    public UserId UserId { get; set; }

    public string Name { get; set; }

    public DateTimeOffset CreatedAt { get; set; }

    public DateTimeOffset Deadline { get; set; }

    public void When(TaskCreated @event)
    {
        Id = @event.Id;
        UserId = @event.UserId;
        Name = @event.Name;
        CreatedAt = @event.CreatedAt;
        Deadline = @event.Timestamp;
    }
}

Finally, we can create an Application Service for command handling.

[DataContract(Name = "ef669879-5d35-4cb7-baea-39a7c46c9e13")]
public class TaskService : ApplicationService<TaskAggregate>,
ICommandHandler<CreateTask>
{
    public TaskService(IAggregateRepository repository) : base(repository) { }

    public async Task HandleAsync(CreateTask command)
    {
        ReadResult<TaskAggregate> taskResult = await repository.LoadAsync<TaskAggregate>(command.Id).ConfigureAwait(false);
        if (taskResult.NotFound)
        {
            var task = new TaskAggregate();
            task.CreateTask(command.Id, command.UserId, command.Name, DateTimeOffset.UtcNow);
            await repository.SaveAsync(task).ConfigureAwait(false);
        }
    }
}

We register a handler by inheriting from ICommandHandler<>. When the command arrives we read the state of the aggregate, and if it is not found we create a new one and call SaveAsync to save its state to the database.

Create Controller and send a request

Now we need a controller to publish our commands and create tasks.

[ApiController]
[Route("[controller]/[action]")]
public class TaskController : ControllerBase
{
    private readonly IPublisher<ICommand> _publisher;

    public TaskController(IPublisher<ICommand> publisher)
    {
        _publisher = publisher;
    }

    [HttpPost]
    public IActionResult CreateTask(CreateTaskRequest request)
    {
        string id = Guid.NewGuid().ToString();
        string Userid = Guid.NewGuid().ToString();
        TaskId taskId = new TaskId(id);
        UserId userId = new UserId(Userid);
        var expireDate = DateTimeOffset.UtcNow;
        expireDate.AddDays(request.DaysActive);

        CreateTask command = new CreateTask(taskId, userId, request.Name, expireDate);

        if (_publisher.Publish(command) == false)
        {
            return Problem($"Unable to publish command. {command.Id}: {command.Name}");
        };
        return Ok(id);
    }
}

Here we create TaskId and UserId and inject_IPublisher<CreateTask>_to publish the command. After this, the command will be sent to RabbitMq and then handled in Application Service.

Now let's start our Service and API. We should be able to make post requests to our Controller throw the Swagger and create our first Task in the system. It must be persisted in the Event Store.

Inspection of the Event Store

Download DevCenter or any other UI tool for Cassandra.

Let's take an Id from the response and encode it to Base64. Than try: select * from taskmanagerevents where id = 'dXJuOnRlbmFudDp0YXNrOmU1MjA1NTA3LWYyNmUtNGExMy05OTU4LTNjMzVlYzAwY2I1Yw=='

Last updated