Python领域驱动编程实践-第三章:FlaskAPI及S

在上一章中,我们介绍了,我们通过Repository的方式来驱动我们的应用程序,在本章中我们将讨论如何编排业务逻辑,以及业务逻辑和接口代码的区别。并介绍一些服务层的职责。我们还要讨论一下如何通过服务层与Repository的抽象来让我们快速编写测试。

我们将添加一个Flask API与服务层进行交互,它将作为我们领域模型的入口点。我们服务层依赖于AbstractRepository,所以我们可以使用FakeRepository来进行单元测试,而在生产时我们使用SqlAlchemyRepository运行

apwp_0402.png

将应用程序连接到外部

我们希望我们程序快速的在用户那里得到快速反馈,现在我们有一个领域模型核心部分和分配订单所需的领域服务函数,还有可以用于持久化的Repository接口。那么现在让我们把这些东西链接到一起,然后建立一个整洁的架构。下面是我们的计划

  1. 使用Flask将我们的API放置在我们分配领域服务的前面。然后连接数据库Session与我们的Repository。之后我们进行一些简单的端到端测试。

  2. 构建一个服务层,位于我们Flask和领域模型之间。构建一些服务层的测试。然后我们看看怎么使用我们的FakeRepository

  3. 为我们服务层添加一个参数,能让与我们服务层与API层解耦

第一个端到端测试

对于什么时端到端测试,什么是功能测试,什么是验收测试,什么是集成测试,什么是单元测试。我觉得我们没有必要对这种进行论述,也没有必要为这个为陷入争吵。我们只将测试分为快速测试和慢速测试。现在我们希望编写一些测试。他们将运行一个真正的客户端(使用HTTP)与真正的数据库进行测试。我们称之为端到端测试。下面展示一个例子

@pytest.mark.usefixtures('restart_api')
def test_api_returns_allocation(add_stock):
    sku, othersku = random_sku(), random_sku('other')  #(1)
    earlybatch = random_batchref(1)
    laterbatch = random_batchref(2)
    otherbatch = random_batchref(3)
    add_stock([  #(2)
        (laterbatch, sku, 100, '2011-01-02'),
        (earlybatch, sku, 100, '2011-01-01'),
        (otherbatch, othersku, 100, None),
    ])
    data = {'orderid': random_orderid(), 'sku': sku, 'qty': 3}
    url = config.get_api_url()  #(3)
    r = requests.post(f'{url}/allocate', json=data)
    assert r.status_code == 201
    assert r.json()['batchref'] == earlybatch
复制代码
  1. random_sku(),random_batchref()等都是小帮助行数,它们使用uuid模块生成随机字符。因为我们现在是针对实际的数据库运行,所以这是防止各种测试和运行相互干扰的一种方法。

  2. add_stock是一个helper fixture(帮助测试的夹具),它知识隐藏了使用Sql插入数据库的方法。我们将再后面的文章中介绍一个更好的办法。

  3. config.py是配置文件

构建API

现在,我们用简单的方式构建

from flask import Flask, jsonify, request
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

import config
import model
import orm
import repository


orm.start_mappers()
get_session = sessionmaker(bind=create_engine(config.get_postgres_uri()))
app = Flask(__name__)

@app.route("/allocate", methods=['POST'])
def allocate_endpoint():
    session = get_session()
    batches = repository.SqlAlchemyRepository(session).list()
    line = model.OrderLine(
        request.json['orderid'],
        request.json['sku'],
        request.json['qty'],
    )

    batchref = model.allocate(line, batches)

    return jsonify({'batchref': batchref}), 201
复制代码

到目前为止,我们一切顺利。但是我们还没有commit.我们实际上并没有将数据保存到数据库中。所以我们需要第二个测试,来检如果第一个订单行已经被分配完了。我们能不能正确的分配第二个订单行

@pytest.mark.usefixtures('restart_api')
def test_allocations_are_persisted(add_stock):
    sku = random_sku()
    batch1, batch2 = random_batchref(1), random_batchref(2)
    order1, order2 = random_orderid(1), random_orderid(2)
    add_stock([
        (batch1, sku, 10, '2011-01-01'),
        (batch2, sku, 10, '2011-01-02'),
    ])
    line1 = {'orderid': order1, 'sku': sku, 'qty': 10}
    line2 = {'orderid': order2, 'sku': sku, 'qty': 10}
    url = config.get_api_url()

    # first order uses up all stock in batch 1
    r = requests.post(f'{url}/allocate', json=line1)
    assert r.status_code == 201
    assert r.json()['batchref'] == batch1

    # second order should go to batch 2
    r = requests.post(f'{url}/allocate', json=line2)
    assert r.status_code == 201
    assert r.json()['batchref'] == batch2
复制代码

emmm,不是那么优雅,因为这逼迫我们必须进行commit

需要使用真实的数据库检查错误条件

如果继续这么下去,事情会越来越糟。

假设我们想添加一些错误处理,如果我们的领域层发生错误,比如产品发生缺货,该怎么办?或者传入一个根本不存在的产品又怎么办?我们现在的领域甚至不知道,也不应该知道。在调用领域服务之前,我们应该在数据库层实现更多的安全性检查。

所以我们写以下两个端到端测试

@pytest.mark.usefixtures('restart_api')
def test_400_message_for_out_of_stock(add_stock):  #(1)
    sku, smalL_batch, large_order = random_sku(), random_batchref(), random_orderid()
    add_stock([
        (smalL_batch, sku, 10, '2011-01-01'),
    ])
    data = {'orderid': large_order, 'sku': sku, 'qty': 20}
    url = config.get_api_url()
    r = requests.post(f'{url}/allocate', json=data)
    assert r.status_code == 400
    assert r.json()['message'] == f'Out of stock for sku {sku}'


@pytest.mark.usefixtures('restart_api')
def test_400_message_for_invalid_sku():  #(2)
    unknown_sku, orderid = random_sku(), random_orderid()
    data = {'orderid': orderid, 'sku': unknown_sku, 'qty': 20}
    url = config.get_api_url()
    r = requests.post(f'{url}/allocate', json=data)
    assert r.status_code == 400
    assert r.json()['message'] == f'Invalid sku {unknown_sku}'
复制代码
  1. 在第一个测试中,我们试图分配比库存更多的产品数量

  2. 第二中情况下,SKU根本不存在,对我们的程序而言,它应该是无效的。

接下来我们应该在我们的API处实现它

def is_valid_sku(sku, batches):
    return sku in {b.sku for b in batches}

@app.route("/allocate", methods=['POST'])
def allocate_endpoint():
    session = get_session()
    batches = repository.SqlAlchemyRepository(session).list()
    line = model.OrderLine(
        request.json['orderid'],
        request.json['sku'],
        request.json['qty'],
    )

    if not is_valid_sku(line.sku, batches):
        return jsonify({'message': f'Invalid sku {line.sku}'}), 400

    try:
        batchref = model.allocate(line, batches)
    except model.OutOfStock as e:
        return jsonify({'message': str(e)}), 400

    session.commit()
    return jsonify({'batchref': batchref}), 201
复制代码

现在我们的应用看起来已经有些重了。我们的端到端测试数量已经开始失控,很快我们就会得到一个倒置的测试金字塔模型,也就是说我们端到端的测试远远大于我们的单元测试。

引入服务层,并使用FakeRepository进行单元测试

现在我们看看我们的API正在做什么,我们发现我们做了太多与我们API没有任何关系的东西,比如我们从数据库提取东西,针对数据库状态验证我们的输入,处理错误,然后开心的commit。他们实际上根本用不着进行端到端测试,这也是我们划分服务层的意义所在。

还记得我们第二章构建的FakeRepository吗?

class FakeRepository(repository.AbstractRepository):

    def __init__(self, batches):
        self._batches = set(batches)

    def add(self, batch):
        self._batches.add(batch)

    def get(self, reference):
        return next(b for b in self._batches if b.reference == reference)

    def list(self):
        return list(self._batches)
复制代码

现在就是它大显身手的时候了,它可以让我们快速简单的测试我们的服务层。

def test_returns_allocation():
    line = model.OrderLine("o1", "COMPLICATED-LAMP", 10)
    batch = model.Batch("b1", "COMPLICATED-LAMP", 100, eta=None)
    repo = FakeRepository([batch])  #(1)

    result = services.allocate(line, repo, FakeSession())  #(2)(3)
    assert result == "b1"


def test_error_for_invalid_sku():
    line = model.OrderLine("o1", "NONEXISTENTSKU", 10)
    batch = model.Batch("b1", "AREALSKU", 100, eta=None)
    repo = FakeRepository([batch])  #(1)

    with pytest.raises(services.InvalidSku, match="Invalid sku NONEXISTENTSKU"):
        services.allocate(line, repo, FakeSession())  #(2)(3)
复制代码
  1. FakeRepository保存着我们的测试将要使用的Batch对象

  2. 我们服务模块(service.py)将定义一个allocate()服务层函数。它位于我们API层与我们领域模型中的allocate()服务函数之间

  3. 我们还要一个Fakesession来伪造数据库会话。

class FakeSession():
    committed = False

    def commit(self):
        self.committed = True
复制代码

这个FakeSession是一个临时解决方法。我们将在后面介绍另一种模式来处理。它会更加优雅。现在我们将我们的端到端层测试迁移过来

def test_commits():
    line = model.OrderLine('o1', 'OMINOUS-MIRROR', 10)
    batch = model.Batch('b1', 'OMINOUS-MIRROR', 100, eta=None)
    repo = FakeRepository([batch])
    session = FakeSession()

    services.allocate(line, repo, session)
    assert session.committed is True
复制代码

一个典型的服务函数

我们将编写一个类似下面这样的服务函数

class InvalidSku(Exception):
    pass


def is_valid_sku(sku, batches):
    return sku in {b.sku for b in batches}

def allocate(line: OrderLine, repo: AbstractRepository, session) -> str:
    batches = repo.list()  #(1)
    if not is_valid_sku(line.sku, batches):  #(2)
        raise InvalidSku(f'Invalid sku {line.sku}')
    batchref = model.allocate(line, batches)  #(3)
    session.commit()  #(4)
    return batchref
复制代码

典型的服务层函数有以下

类似的步骤

  1. 我们从Repository中提取一些对象

  2. 我们针对当前状态进行一些检查和断言

  3. 我们称之为服务层

  4. 如果一切正常,我们将保存/更新所有已更改的状态

最后一个步骤有点不太爽,因为我们的服务层与数据库进行了耦合。我们将在后面介绍一种叫unit work的模式来解决它。让它依赖于抽象

关于我们的服务层函数还有一点需要注意

def allocate(line: OrderLine, repo: AbstractRepository, session) -> str:
复制代码

它显式的依赖于一个Repository,并且用一个type hint来提示我们依赖的是一个AbstractRepository。所以当我们测试的时候,无论给他一个FakeRepository还是给它一个SqlalchemyRepository时,他都可以工作。

如果你还记得我们的DIP(依赖倒置原则),现在就可以看到我们所说的我们应该依赖于抽象的意思 我们高级模块(Service层)依赖于Repository的抽象。我们一些别的Repository的实现细节也应该依赖于相同的抽象。比如我们加一个什么Mongodb/Redis/CSV等等。

现在我们的应用看起来就干净多了

@app.route("/allocate", methods=['POST'])
def allocate_endpoint():
    session = get_session()  #(1)
    repo = repository.SqlAlchemyRepository(session)  #(1)
    line = model.OrderLine(
        request.json['orderid'],  #(2)
        request.json['sku'],  #(2)
        request.json['qty'],  #(2)
    )
    try:
        batchref = services.allocate(line, repo, session)  #(2)
    except (model.OutOfStock, services.InvalidSku) as e:
        return jsonify({'message': str(e)}), 400  (3)

    return jsonify({'batchref': batchref}), 201  (3)
复制代码
  1. 我们实例化一个数据库会话和一些Repository对象

  2. 我们从web请求中提取了用户的参数,并且传递给领域服务

  3. 我们返回适当的状态代码和一些JSON响应

Flask的职责仅仅包含了标准web的东西:对Web Session进行管理、从请求中解析参数,返回状态码和JSON。我们所有的业务流程逻辑都位于我们的Service层。

最后呢,我们把我们多余的端到端测试干掉。只包含两个,一个正确的一个错误的

@pytest.mark.usefixtures('restart_api')
def test_happy_path_returns_201_and_allocated_batch(add_stock):
    sku, othersku = random_sku(), random_sku('other')
    earlybatch = random_batchref(1)
    laterbatch = random_batchref(2)
    otherbatch = random_batchref(3)
    add_stock([
        (laterbatch, sku, 100, '2011-01-02'),
        (earlybatch, sku, 100, '2011-01-01'),
        (otherbatch, othersku, 100, None),
    ])
    data = {'orderid': random_orderid(), 'sku': sku, 'qty': 3}
    url = config.get_api_url()
    r = requests.post(f'{url}/allocate', json=data)
    assert r.status_code == 201
    assert r.json()['batchref'] == earlybatch


@pytest.mark.usefixtures('restart_api')
def test_unhappy_path_returns_400_and_error_message():
    unknown_sku, orderid = random_sku(), random_orderid()
    data = {'orderid': orderid, 'sku': unknown_sku, 'qty': 20}
    url = config.get_api_url()
    r = requests.post(f'{url}/allocate', json=data)
    assert r.status_code == 400
    assert r.json()['message'] == f'Invalid sku {unknown_sku}'
复制代码

这样我们成功的将我们的测试分成了两类:一类是关于web的测试,我们实现端到端测试。另一类是关于我们领域内容的单元测试。

为什么所有都叫做Service

现在,我估计很多人已经有些挠头,试图弄明白,领域服务与服务层有啥不同。在本章中,我们使用了两种称为Service的东西。第一个是应用程序服务(服务层)。它的职责是处理来自外部的请求并安排操作。服务层一般做下面简单的步骤来驱动应用程序

  1. 从数据库中获取一些数据

  2. 更新领域模型

  3. 持久化更改

对于系统中的每个操作,这是一个非常枯燥无聊的活,但是将其与业务逻辑分离有助于保持我们应用程序干净整洁。

第二类服务是领域服务。这是个服务属于领域服务,它主要职责是操作实体与值对象之间的逻辑,比如:你做了一个购物者程序,您可能会选择将优惠券构建为一个领域服务。计算优惠与更新购物车是不同的工作。也是模型组成的重要部分。但是为这项工作设置一个实体其实也不太合适。取而代之的是一个CouponCalculator类或者一个calculator_conpon的函数就完事了。

组织我们的应用程序

现在随着我们的应用程序越来越大,我们需要整理我们的目录结构。下面介绍一下项目布局参考

.
├── config.py
├── domain  #(1)
│   ├── __init__.py
│   └── model.py
├── service_layer  #(2)
│   ├── __init__.py
│   └── services.py
├── adapters  #(3)
│   ├── __init__.py
│   ├── orm.py
│   └── repository.py
├── entrypoints  (4)
│   ├── __init__.py
│   └── flask_app.py
└── tests
    ├── __init__.py
    ├── conftest.py
    ├── unit
    │   ├── test_allocate.py
    │   ├── test_batches.py
    │   └── test_services.py
    ├── integration
    │   ├── test_orm.py
    │   └── test_repository.py
    └── e2e
        └── test_api.py
复制代码
  1. 为我们的领域模型设置一个文件夹

  2. 将我们应用服务设立一个文件夹,这里我们也可以添加一些我们服务层的异常错误

  3. adapters是我们围绕外部IO建立的抽象,比如redis_client等等

  4. Entrypoints是我们API入口的地方,经典的MVC架构中,我们也可以称之为View层。

总结

增加应用服务层带给我们很多好处

  1. 我们的API变得非常薄,且容易编写:他们唯一的职责就是处理Web相关的事情

  2. 我们为领域定义了一个清晰的接口。一组用例或入口点,任何适配器都可以使用这些入口点,无论是cli api 还是什么东西

  3. 使用我们服务层我们可以快速的编写测试,我们也可以非常大胆的重构我们的领域模型。我们就可以尝试新的设计,而不会因为这样重写大量的测试。

  4. 我们的测试看起来也不错,我们大部分的测试是针对的服务的单元测试,只有极少数的端到端测试和集成测试。单元测试不连接真实的数据库,所以速度非常快。这也对我们的CI/CD有极大的好处。

DIP在行动

服务层的抽象依赖表达了我们服务层与领域服务层的依赖:领域模型和AbstractRepository。当我们运行测试时,测试提供了一个抽象依赖的实现也就是我们的FakeRepository,当我们运行实际的应用程序时,我们又把他替换成了真实的在我们的例子中是SqlalchemyRepository

        +-----------------------------+
        |         Service Layer       |
        +-----------------------------+
           |                   |
           |                   | depends on abstraction
           V                   V
+------------------+     +--------------------+
|   Domain Model   |     | AbstractRepository |
|                  |     |       (Port)       |
+------------------+     +--------------------+
复制代码
        +-----------------------------+
        |           Tests             |-------------\
        +-----------------------------+             |
                       |                            |
                       V                            |
        +-----------------------------+             |
        |         Service Layer       |    provides |
        +-----------------------------+             |
           |                     |                  |
           V                     V                  |
+------------------+     +--------------------+     |
|   Domain Model   |     | AbstractRepository |     |
+------------------+     +--------------------+     |
                                    ^               |
                         implements |               |
                                    |               |
                         +----------------------+   |
                         |    FakeRepository    |<--/
                         |     (in–memory)      |
                         +----------------------+
复制代码
       +--------------------------------+
       | Flask API (Presentation Layer) |-----------\
       +--------------------------------+           |
                       |                            |
                       V                            |
        +-----------------------------+             |
        |         Service Layer       |             |
        +-----------------------------+             |
           |                     |                  |
           V                     V                  |
+------------------+     +--------------------+     |
|   Domain Model   |     | AbstractRepository |     |
+------------------+     +--------------------+     |
              ^                     ^               |
              |                     |               |
       gets   |          +----------------------+   |
       model  |          | SqlAlchemyRepository |<--/
   definitions|          +----------------------+
       from   |                | uses
              |                V
           +-----------------------+
           |          ORM          |
           | (another abstraction) |
           +-----------------------+
                       |
                       | talks to
                       V
           +------------------------+
           |       Database         |
           +------------------------+
复制代码

现在我们暂停讨论服务层了,该到我们权衡利弊的时候了。

优点 缺点
我们只有一个地方可以找到应用程序的业务逻辑 如果是一个MVC架构的应用程序,那么你的controllers/view就是捕获所有业务逻辑的唯一位置
我们已经把领域逻辑放到了API之后,这样我们就可以持续重构了 这是另一个抽象层
我们干净的将Web相关的东西与业务逻辑的东西区分开了 将太多的业务逻辑放入服务层会导致一个贫血领域的反模式。最好时在业务逻辑已经渗透进控制器的时候引入这一层
结合了RepositoryFakeRepository后,我们可以比领域层更高的层次上编写测试,我们可单元测试很多工作流,而不是集成测试这种 你本来可以从富模型中获得更多益处,只需要将逻辑从控制器推到模型层就行了,而不是再添加一层,这样增大了局部复杂度。即(胖模型,瘦控制器)

当然我们仍有一些不好的地方需要收拾

  1. 服务层仍然与领域层紧密耦合,因为API是用OrderLine这个领域模型进行表示的。在下章中我们将解决这个问题,并讨论服务层如何提高TDD的效率。

  2. 服务层与数据库session紧密耦合。在我们讲解unit of work模式中,我们会介绍另一个存储库和服务层协作的新模式。