浅谈查询职责分离(CQRS)模式#
作者 周蕾 (ArcBlock 后端工程师)
最近几年,在DDD的领域,我们经常会看到CQRS架构的概念, CQRS 是查询职责分离模式(Command Query Responsibility Segregation)的缩写。正好这些日子Arcblock的后端的服务有考虑使用CQRS 的架构,所以今天和大家一起分享一下最近的研读收获。今天文章会从Event Sourcing出发 介绍CQRS,以及通过Commanded (Elixir 的库),一起看一看如何遵循ES/CQRS 的概念开发应用程序。
什么是Event Sourcing (事件溯源)?#
Event Sourcing本质来说是保存了发生事件的本身,而不是当前的事物的状态。在Event Sourcing的概念里,Event作为既定的发生之后的事情,也是最小的单位。
比如:
Event Sourcing 的工作模式:在下面这条数据流里面,由4个发生的事件(event)组成,进而每一次改变当前的状态,同时事件们的相对顺序也是我们需要保证的。
我们会得到这样的总结:
Sn = apply(Sn-1, En) 或者 Sn = reduce(E, S0, apply)
其中: (S: state, E: Event)
现在我们可以发现Event Sourcing 的优点:
- 每个状态发生的改变都有完备的日志记录,可追溯
- 优化了的写的操作,提高了性能
我们身边的Event Sourcing#
- 每个程序员的每天都离不开的 Github。 在Git 的世界里, Events(事件) 是Commits, State(状态) 是文件系统。
- Blockchain 每次保存的是 transaction 而不是一个现在的状态,从这个角度出发, Events(事件) 是transaction, State(状态) 是用户的账户信息。
- WAL: 是Write-ahead logging, 在数据写入到数据库之前,先写入到日志, 再将日志记录变更到存储器中。Events(事件) 是每一个操作, State(状态) 是数据库。
对于Event Sourcing 来说,想做查询(query) 怎么办?#
试想一下,在一个银行系统里面,如果我们想要查询账户余额在1000块以上的用户,那我们难道需要把每个账户的按照 Sn = reduce(E, S0, apply) 这个公式在重新计算一遍吗? 如果我们考虑用一个DataStore 来保存 Event,再用另外一个DataStore 去专门为Query 提供数据,同时两个Datastore 通过发送消息 进行信息同步,如何? CQRS 某种程度上就解决了这样的问题。
CQRS 是什么?#
CQRS 全称是 Command Query Rsponsibility Segregation,将应用程序分为两部分:命令端(Command) 和查询端 (Query)。命令端处理程序创建,更新和删除请求,并在数据更改时发出事件。查询端通过执行查询来处理查询,并且通过订阅数据更改时发出的事件流而保持最新。CQRS使用分离的接口将数据查询操作(Queries) 和数据修改操作(Commands) 分离开来,这也意味着在查询和更新过程中使用的数据模型也是不一样的。这样读和写逻辑就隔离开来了。
CQRS 里面的一些概念:
- Command (命令): 不返回任何结果,被校验成功后但会改变对象的状态。
- Query (查询): 有返回结果,但是不会改变对象的状态。
- Aggregate (聚合): 保存状态, 处理command,和改变状态。
- Event Store: 存储Events。
怎么遵循CQRS 模式建立应用程序?#
首先我们会基于一个Commanded, 一个Elixir 遵循CQRS/ES 模式 实现 Command side 的库。
1. Commands
Commands 是用户发送给应用程序的指令,表示用户的一种请求,当然请求是有可能失败的,如果想在余额有10的账户里面取出1000块这样的操作。每一个指令对应是一个module,然后使用defstruct定义域,命名方式是MineCoin,动名结构。
defmodule MineCoin do
defstruct [
:account_id,
:nonce
]
end
2. Events
Events 是由Command产生,最终导致状态改变。会最终在eventstore 里面序列化存储,可以用于日后想要恢复状态。命名方式相比于Command 来说发生了变化,CoinMined, 过去式表达一种过去发生的事实。
defmodule CoinMined do
defstruct [
:account_id,
:nonce
]
end
3. Aggregates
Aggregates 作为接受处理Command,产生或者引起对应事件的发生,以及一些改变状态的处理器。
里面包含两个函数:execute方法使用来添加我们的校验Command的一些逻辑,输入时状态和command,如果成功输出就是Event。
apply 函数用来更改状态,注意这里的对象是已经是生成出来的event。
@spec execute(state, command)
::{ok, [event]}
| {:error, term()}
@spec apply(state, event) ::state
现在我们有了Command, Event, Aggregates ...#
那我们还需要一个派遣的角色帮助我们把Command 走向对应的Aggregates。Commanded 库提供了Router:
defmodule Coins.Router do
use Commanded.Commands.Router
alias Coins.Account
alias Coins.Commands, as: C
dispatch(
[
C.MineCoin
],
to: Account
)
end
最后我们使用Commanded 推荐的 EventStore,它是基于PostgreSQL 作为存储引擎, 来保存 Events。
现在可以发现我们构建了如下的整个流程:这样我们就可以愉快发布Commands 和生成对应的Events。
最后怎么进行数据同步到读取的DataStore里呢?#
在这里Commaned 库推荐了Commanded Ecto projections 来做Event handler, 或者也可以采用Kafka同步信息,可以基于不同的应用场景选择适合的Event handler。
了解更多的ArcBlock 系列讲座#
我们的讲座信息都将同步在: https://www.arcblock.io/zh/learning/
最后,如果您想要加入高质量高效率的团队,请加入ArcBlock吧!