arrow-left

All pages
gitbookPowered by GitBook
1 of 7

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Projections

A projection is a representation of an object using a different perspective. In the context of CQRS, projections are queryable models on the "read" side that never manipulate the original data (events in event-sourced systems) in any way. Projections should be designed in a way that is useful and convenient for the reader (API, UI, etc.).

Cronus supports non-event-sourced and event-sourced projections with snapshots.

hashtag
Defining a projection

To create a projection, create a class for it that inherits ProjectionDefinition<TState, TId>. The id can be any type that implements the IBlobId interface. All ids provided by Cronus implement this interface but it is common to create your own for specific business cases. The ProjectionDefinition<TState, TId> base class provides a Subscribe() the method that is used to create a projection id from an event. This will define an event-sourced projection with a state that will be used to persist snapshots.

Use the IEventHandler<TEvent> interface to indicate that the projection can handle events of the specified event type. Implement this interface for each event type your projection needs to handle.

Create a class for the projection state. The state of the projection gets serialized and deserialized when persisting or restoring a snapshot. That's why it must have a parameterless constructor, a data contract and data members.

circle-info

There is no guarantee the events will be handled in the order of publishing nor that every event will be handled at most once. That's why you should design projections in a way that solves those problems. Always assign all possible properties from the handled event to the state and make sure the projection is idempotent.

circle-info

If the projection state contains a collection, make sure it doesn't get populated with duplicates. This can be achieved by using a HashSet<T> and ValueObject.

You can define a non-event-sourced projection by decorating it with the IProjection interface. This is useful when you want to persist the state in an external system (e.g. ElasticSearch, relational database).

By default, all projections' states are being persisted as snapshots. If you want to disable this feature for a specific projection, use the IAmNotSnapshotable interface.

hashtag
Querying a projection

To query a projection, you need to inject an instance of IProjectionReader in your code and invoke the Get() or GetAsync() method. The returned object will be of type ReadResult or Task<ReadResult> containing the projection and a few properties indicating if the loading was successful.

circle-info

Use separate models for the API responses from the projection states to ensure you won't introduce breaking changes if the projection gets modified.

hashtag
Projection versioning

TODO

hashtag
Best Practices

circle-check

You can/should/must...

  • a projection must be idempotent

circle-exclamation

You should not...

  • a projection should not query other projections. All the data of a projection must be collected from the Events' data

a projection must not issue new commands or events
Serializationchevron-right
[DataContract(Name = "c94513d1-e5ee-4aae-8c0f-6e85b63a4e03")]
public class TaskProjection : ProjectionDefinition<TaskProjectionData, TaskId>,
    IEventHandler<TaskCreated>
{
    public TaskProjection()
    {
        Subscribe<TaskCreated>(x => new TaskId(x.Id.NID));
    }

    public Task HandleAsync(TaskCreated @event)
    {
        Data task = new Data();

        task.Id = @event.Id;
        task.UserId = @event.UserId;
        task.Name = @event.Name;
        task.Timestamp = @event.Timestamp;

        State.Tasks.Add(task);

        return Task.CompletedTask;
    }
    public IEnumerable<Data> GetTaskByName(string name)
    {
        return State.Tasks.Where(x => x.Name.Equals(name));
    }
}
[DataContract(Name = "c135893e-b9e3-453a-b0e0-53545094ec5d")]
public class TaskProjectionData
{
    public TaskProjectionData()
    {
        Tasks = new List<Data>();
    }

    [DataMember(Order = 1)]
    public List<Data> Tasks { get; set; }

    [DataContract(Name = "317b3cbb-593a-4ffc-8284-d5f5c599d8ae")]
    public class Data
    {
        [DataMember(Order = 1)]
        public TaskId Id { get; set; }

        [DataMember(Order = 2)]
        public UserId UserId { get; set; }

        [DataMember(Order = 3)]
        public string Name { get; set; }

        [DataMember(Order = 4)]
        public DateTimeOffset CreatedAt { get; set; }

        [DataMember(Order = 5)]
        public DateTimeOffset Timestamp { get; set; }
    }
}
// TODO: give a relevant example
[DataContract(Name = "af157a4d-7608-4c9d-8e42-63bd483a8ad4")]
public class ExampleEfProjection : IProjection,
        IEventHandler<ExampleCreated>
{
		public DbContext Context { get; set; }

		public void Handle(ExampleCreated @event)
    {
				var exampleDto = new ExampleDto(@event.Id, @event.Name);
        Context.Examples.Add(exampleDto);
        Context.SaveChanges();
    }
}
// TODO: give a relevant example
[DataContract(Name = "bae8bd10-9903-4960-95c4-b4fa4688a860")]
public class ExampleByIdProjection : ProjectionDefinition<ExampleByIdProjectionState, ExampleId>,
    IEventHandler<ExampleCreated>,
    IAmNotSnapshotable
{
		// ...
}
public class GetExampleController : ControllerBase
{
    private IProjectionReader projectionReader;
    
    public GetExampleController(IProjectionReader projectionReader)
    {
        this.projectionReader = projectionReader;
    }

    public async Task<IActionResult> GetExample(GetExampleRequest request)
    {
				var id = ExampleId.New(request.Tenant, request.Id);
        var result = await projectionReader.GetAsync<ExampleByIdProjection>(id);
        if (result.IsSuccess)
            return Ok(new GetExampleResponse(result.Data.State));
        else
            return BadRequest(result.Error);
    }

		public class GetExampleResponse
		{
				// ...
		}
}

Ports

In the Cronus framework, Ports facilitate communication between aggregates, enabling one aggregate to react to events triggered by another. This design promotes a decoupled architecture, allowing aggregates to interact through well-defined events without direct dependencies.

hashtag
Key Characteristics of Ports

  • Event-Driven Communication: Ports listen for domain events—representing business changes that have already occurred—and dispatch corresponding commands to other aggregates that need to respond.

  • Statelessness: Ports do not maintain any persistent state. Their sole responsibility is to handle the routing of events to appropriate command handlers.

hashtag
When to Use Ports

Ports are ideal for straightforward interactions where an event from one aggregate necessitates a direct response from another. However, for more complex workflows involving multiple steps or requiring state persistence, implementing a Saga is recommended. Sagas provide a transparent view of the business process and manage the state across various interactions, ensuring consistency and reliability.

hashtag
Communication Guide Table

Triggered by
Description

By utilizing Ports appropriately, developers can design systems that are both modular and maintainable, adhering to the principles of Domain-Driven Design and Event Sourcing.

Port example

Event

Domain events represent business changes that have already happened.

[DataContract(Name = "a44e9a38-ab13-4f86-844a-86fefa925b53")]
public class AlertPort : IPort,
    IEventHandler<UserCreated>
{
    public Task HandleAsync(UserCreated @event)
    {
        //Implement your custom logic here
        return Task.CompletedTask;
    }
}

Gateways

https://github.com/Elders/Cronus/issues/260arrow-up-right

Compared to a Port, which can dispatch a command, a Gateway can do the same but it also has a persistent state. A scenario could be sending commands to external BC, such as push notifications, emails, etc. There is no need to event source this state and it's perfectly fine if this state is wiped. Example: iOS push notifications badge. This state should be used only for infrastructure needs and never for business cases. Compared to Projection, which tracks events, projects their data, and is not allowed to send any commands at all, a Gateway can store and track metadata required by external systems. Furthermore, Gateways are restricted and not touched when events are replayed.

hashtag
Communication Guide Table

hashtag
Best Practices

circle-check

You can/should/must...

  • a gateway can send new commands

Application Services

This is a handler where commands are received and delivered to the addressed aggregate. Such a handler is called an application service. This is the "write" side in CQRS.

An application service is a command handler for a specific aggregate. One aggregate has one application service whose purpose is to orchestrate how commands will be fulfilled. Its the application service's responsibility to invoke the appropriate aggregate methods and pass the command's payload. It mediates between Domain and infrastructure and it shields any domain model from the "outside". Only the application service interacts with the domain model.

Aggregatechevron-right

You can create an application service with Cronus by using the AggregateRootApplicationService base class. Specifying which commands the application service can handle is done using the ICommandHandler<T> interface.

AggregateRootApplicationService provides a property of type IAggregateRepository that you can use to load and save the aggregate state. There is also a helper method Update(IAggregateRootId id, Action update) that loads and aggregate based on the provided id invokes the action and saves the new state if there are any changes.

hashtag
Best Practices

circle-check

You can/should/must...

  • an application service can load an aggregate root from the event store

circle-exclamation

You should not...

  • an application service should not update more than one aggregate root in a single command/handler

Triggered by

Description

Event

Domain events represent business changes which have already happened

an application service can save new aggregate root events to the event store
  • an application service can establish calls to the read model (not a common practice but sometimes needed)

  • an application service can establish calls to external services

  • you can do dependency orchestration

  • an application service must be stateless

  • an application service must update only one aggregate root. Yes, you can create one aggregate and update another one but think twice before doing so.

  • you should not place domain logic inside an application service
  • you should not use an application service to send emails, push notifications etc. Use a port or a gateway instead

  • an application service should not update the read model

  • public class ConcertAppService : AggregateRootApplicationService<Concert>,
        ICommandHandler<AnnounceConcert>,
        ICommandHandler<RegisterPerformer>
    {
        ...
        
        public void Handle(AnnounceConcert command)
        {
            if (Repository.TryLoad<Concert>(command.Id, out _))
                return;
    
            var concert = new Concert(...);
            Repository.Save(concert);
        }
        
        public void Handle(RegisterPerformer command)
        {
            Update(command.Id, x => x.RegisterPerformer(...));
        }
    
        ...
    }

    Sagas

    Sometimes called a Process Manager

    In the Cronus framework, Sagas—also known as Process Managers—are designed to handle complex workflows that span multiple aggregates. They provide a centralized mechanism to coordinate and manage long-running business processes, ensuring consistency and reliability across the system.

    hashtag
    Key Characteristics of Sagas

    • Event-Driven Coordination: Sagas listen for domain events, which represent business changes that have already occurred, and react accordingly to drive the process forward.

    • State Management: Unlike simple event handlers, Sagas maintain state to track the progress of the workflow, enabling them to handle complex scenarios and ensure that all steps are completed successfully.

    • Command Dispatching: Sagas can send new commands to aggregates or other components, orchestrating the necessary actions to achieve the desired business outcome.

    hashtag
    When to Use Sagas

    Sagas are particularly useful when dealing with processes that:

    • Involve multiple aggregates or bounded contexts.

    • Require coordination of several steps or actions.

    • Need to handle compensating actions in case of failures to maintain consistency.

    By encapsulating the workflow logic within a Saga, developers can manage complex business processes more effectively, ensuring that all parts of the system work together harmoniously.

    hashtag
    Communication Guide Table

    Triggered by
    Description

    hashtag
    Best Practices

    • A Saga can send new commands to drive the process forward.

    • Ensure that Sagas are idempotent to handle potential duplicate events gracefully.

    • Maintain clear boundaries for each Saga to prevent unintended side effects.

    Saga example

    Triggers

    https://github.com/Elders/Cronus/issues/261arrow-up-right

    Event

    Domain events represent business changes that have already happened.

    [DataContract(Name = "d4eb8803-2cc7-48dd-9ca1-4512b8d9b88f")]
    public class TaskSaga : Saga,
        IEventHandler<UserCreated>,
        ISagaTimeoutHandler<Message>
    
    {
        public TaskSaga(IPublisher<ICommand> commandPublisher, IPublisher<IScheduledMessage> timeoutRequestPublisher) : base(commandPublisher, timeoutRequestPublisher)
        {
        }
    
        public Task HandleAsync(UserCreated @event)
        {
            var message = new Message();
            message.Info = @event.Name + "was created yesterday.";
            message.PublishAt = DateTimeOffset.UtcNow.AddDays(1).DateTime;
            message.Timestamp = DateTimeOffset.UtcNow;
    
            RequestTimeout<Message>(message);
    
            return Task.CompletedTask;
        }
        public Task HandleAsync(Message sagaTimeout)
        {
            Console.WriteLine(sagaTimeout.Info);
    
            return Task.CompletedTask;
        }
    
    }
    
    [DataContract(Name = "543e8e28-0dcb-4d41-98de-f701e403dbb2")]
    public class Message : IScheduledMessage
    {
        public string Info { get; set; }
        public DateTime PublishAt { get; set; }
        public DateTimeOffset Timestamp { get; set; }
    }

    Handlers