一、概述
这里是应用的最后一部分,这部分整完,接下来就会把全部应用部署到kubernetes上,敬请期待。
订单系统框架如图:
文件内容持续更新在GitHub上,可自行查看。
二、思路
从上面的框架图中可以看出开,主要是在各个状态下来进行,分为无、排队、待支付、完成,简单划分为这四种状态,当然,实际的订单系统复杂很多,还有确认收货,待收货等等;本项目是先提交预购信息,在后端生成订单并返回。
- 模拟限时限量热卖场景
- 通过中间件(防止脚本高频请求,令牌桶限流)之后(还可以加入验证码等等,防止控制肉鸡作弊)
- 在Kafka中进行排队,购买人的状态变为排队中,同一个user同一个物品只能排一个队
- 库存不够则堵塞直到排队超时,状态变为无
- 库存足够则生成订单,库存-1,购买人的状态变为待支付
- 设置未完成未支付订单过期时间,超时则删除订单,库存+1,购买人的状态变为无
- 取消订单则删除订单,库存+1,购买人的状态变为无
- 等待支付订单,支付成功,关闭订单,购买人的状态变为无
三、难点
1.消息有序性
本项目使用同一个topic,同一个key来保证消息进入是放在kafka同一个分区中,在同一个分区中,消息使有序的
2.防止kafka消息丢失
a.生产者发送消息采用producer在follower副本确认接收到数据后才算一次发送完成
kConfig := sarama.NewConfig()
kConfig.Producer.RequiredAcks = sarama.WaitForAll
复制代码
b.broke采用集群的方式,防止某一个突然暴毙
c.消费者,防止还没有真正消费,就已经自动提交,导致丢失。在sarama中,就算设置的自动提交,也需要给了标记才会使offset移动
type msgConsumerGroupHandler struct {
channel chan struct{}
}
func (msgConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (msgConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h msgConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Message topic:%q partition:%d offset:%d value:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
sess.MarkMessage(msg, "")
}
return nil
}
复制代码
3.状态的变更
最好不要在数据库上做频繁的操作,只做最后的改动,所以,中间的变化都使用redis缓存
4.防止超卖
使用的是redis,因为redis具有原子性的特点,而且速度足够快,所以事先把需要售卖的物品库存量存入redis中,下订单就使库存减少,取消订单或者超时等等再增加库存。
四、代码
1.订单操作
type OrderHandle interface {
GetUserStatus(order *OrderInfo) (int, error)
SetUserStatus(order *OrderInfo, status int8) error
OrderLineUp(orderJson string) (partition int32, offset int64, err error)
CreateOrder(ctx context.Context, orderJson string) (orderNum string, err error)
PayHandle(ctx context.Context, orderNum string) error
GetNotPaidOrder(ctx context.Context, buyerId int64, bookId int64) (*order.OrderInfoResp, error)
DeleteOrder(ctx context.Context, orderNum string) error
DecrInventory(bookId int64) (bool, error)
IncrInventory(bookId int64) (bool, error)
ParseOrder(orderJson string) (*OrderInfo, error)
StartOrderHandle(ctx context.Context, h sarama.ConsumerGroupHandler, ch chan struct{})
}
复制代码
2.启动kafka队列发送到生成订单,其实就是开始启动kafka接收消息
func HotSaleHandler() {
ch := make(chan struct{}, MANOEUVRABLE)
go Svc.SvcContext.Model.Order.StartOrderHandle(context.Background(), msgConsumerGroupHandler{channel: ch}, ch)
}
复制代码
3.查询库存,创建订单, 设置订单过期时间, 排队超时,取消排队
func (h msgConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Message topic:%q partition:%d offset:%d value:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
h.channel <- struct{}{}
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*1)
go func(ctx context.Context, v string) {
defer cancel()
ticker := time.NewTicker(1 * time.Second)
for _ = range ticker.C {
select {
case <-ctx.Done():
fmt.Println("order timeout")
ord, err := Svc.SvcContext.Model.Order.ParseOrder(v)
if err != nil {
fmt.Println(err)
}
err = Svc.SvcContext.Model.Order.SetUserStatus(ord, 0)
if err != nil {
fmt.Println(err)
}
return
default:
if orderNum, err := Svc.SvcContext.Model.Order.CreateOrder(context.Background(), v); err == nil {
fmt.Println("Create Order Ok: " + orderNum)
<-h.channel
return
} else {
fmt.Println(err)
}
}
}
}(ctx, string(msg.Value))
sess.MarkMessage(msg, "")
}
return nil
}
复制代码
4.不断的查询订单情况
func GetNotPaidOrderInfoHandler(c *gin.Context) {
buyerId, err := strconv.ParseInt(c.Query("buyerId"), 10, 64)
if err != nil {
c.JSON(http.StatusBadRequest, err)
return
}
bookId, err := strconv.ParseInt(c.Query("bookId"), 10, 64)
if err != nil {
c.JSON(http.StatusBadRequest, err)
return
}
res, err := Svc.SvcContext.Model.Order.GetNotPaidOrder(context.Background(), buyerId, bookId)
if err != nil {
if err.Error() == "无状态" {
c.JSON(http.StatusOK, gin.H{"code": 2001, "message": "库存不足,排队已超时"})
return
} else if err.Error() == "排队" {
c.JSON(http.StatusOK, gin.H{"code": 2002, "message": err.Error()})
return
} else if err.Error() == "订单超时未处理" {
c.JSON(http.StatusOK, gin.H{"code": 2003, "message": err.Error()})
return
} else if err.Error() == "redis出错" {
c.JSON(http.StatusOK, gin.H{"code": 2005, "message": err.Error()})
return
} else {
c.JSON(http.StatusBadRequest, gin.H{"code": 1001, "message": err.Error()})
return
}
}
bookInfo, err := Svc.SvcContext.Grpc.BookGrpc.FindOneBookById(context.Background(), &book.BookBasicInfoReq{Id: bookId})
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"code": 1001, "message": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"code": 2000, "message": res, "bookName": bookInfo.Name})
}
复制代码
五、效果展示
1.先开启热卖,只有书店老板有这个功能,为了测试,再点击开始热卖的时候,我放入了《星辰变》3的库存;
func StartOrderHandler(c *gin.Context) {
_, _ = Svc.SvcContext.Redis.Get().Do("SET", "Inventory:BookId:4", 3)
ALLOCATING = true
c.JSON(http.StatusOK, gin.H{"message": "start"})
}
复制代码
2.点击购买,会在后台排队(随机时间,个人加来测试的)生成订单并返回
3.支付成功(这里我也是为了测试,随机会失败)
4.当卖完了,就会提示没有库存,然后在特定的时间把库存更新到数据库中
5.排队太久或者订单创建之后太久不支付,也引发超时处理
六、Tips
接下来会进行容器技术docker和容器编排技术kubernetes的编写,请多多支持,点个赞,谢谢各位。代码已经上传GitHub上。
近期评论