撮合交易引擎:工作原理及实际实施
交易撮合引擎(Matching/Trading Engine),顾名思义是用来撮合交易的软件,广泛地应用在金融、证券、加密货币交易等领域。交易引擎负责管理加密资产市场中所有的开口订单(Open Orders),并在发现匹配的订单对(Trading Pair)时自动执行交易。本文将首先介绍有关加密资产交易撮合引擎的基本概念,例如委托单、交易委托账本等,然后使用Golang实现一个原理性的撮合引擎。如果你正在考虑实现类似交易所(Exchange)这样的产品,相信本文会对你有很大的帮助。
可以这样先思考一下:如果你要实现一个供人们将以太币兑换为比特币的市场,那么你就需要跟踪一些信息,例如以太币的买/卖价格(以比特币计算)、哪些买单或卖单还没有执行等等,同时还要处理新进来的委托单。将这一思路扩展到多个交易对,然后再集成钱包功能,你就实现了一个完整的交易引擎,就像币安一样。
本文的完整源码下载地址:https://github.com/ezpod/crypto-exchange-engine
1、基本概念与术语
在开始打造交易撮合引擎之前,让我们首先熟悉相关的基本概念与术语。
撮合/交易引擎
正如前面所述,交易撮合引擎是用来撮合交易的软件,可以先把交易撮合引擎看作一个黑盒子,它有一些输入和输出。
例如,可能的输入包括:
- 创建新的委托单(NewOrder):一个新的委托单可以作为交易撮合引擎的输入,引擎会尝试将其与已有的
委托单进行撮合。 - 取消已有的委托单(CancelOrder):用户也可以取消一个之前输入的委托单,如果它还没有执行的话,即开口订单。
当然你可以定义其他的输入,出于简化考虑,我们现在只定义上述两个输入。
交易撮合引擎的输出是一些事件,以便及时通知其他应用处理。例如,当引擎撮合了一笔交易后,就会触发一个TradesGenerated事件;而当取消了一个已有的委托单后,引擎就会触发rderCancelled。同样,你可以根据自己的需求来定义引擎的输出,这里我们还是简单点,只定义这两个输出事件。
交易委托账本
交易委托账本(Order Book)就是一个买方委托单或买方委托单的列表,通常按照价格和时间排序。
当一个新的买方(买方)委托单进入引擎后,引擎就会将尝试其与现有的卖方(买方)委托账本
进行匹配,看是否存在执行交易的可能。如果找到了匹配的对手单,引擎就可以执行这两个委托单了,也就是撮合成功了。
委托单
在任何交易引擎中,都可能有多种类型的委托单供用户选择。其中常见的类型包括:
- 限价委托单
限价委托单是在当前的加密货币交易环境中最常用的委托类型。这种委托单允许用户指定一个价格,只有当撮合引擎找到同样价格甚至更好价格的对手单时才执行交易。
对于一个买方委托单而言,这意味着如果你的委托价格是¥100,那么该委托单将会在任何不高于¥100的价格成交 —— 买到指定的价格或者更便宜的价格;而对于一个卖方委托单而言,同样的委托价格意味着该委托单将在任何不低于¥100的价格成交—— 卖出指定的价格或者更高的价格。
- 市价委托单
市价委托单的撮合会完全忽略价格因素,而致力于有限完成指定数量的成交。市价委托单在交易委托账本中有较高的优先级,在流动性充足的市场中市价单可以保证成交。
例如,当用户委托购买2个以太币时,该委托单可以在¥900、¥1000、¥2000或任何其他价位成交,这依赖于市场中当前的敞口委托单的情况。
- 止损委托单
止损委托单尽在市场价格到达指定价位时才被激活,因此它的执行方式与市价委托单相反。一旦止损委托单激活,它们可以自动转化为市价委托单或限价委托单。
如果你希望打造一个高级的交易所,那么还有其他一些需要了解的概念,例如流动性、多/空交易、FIX/FAST协议等等,但是同样出于简化考虑,我们将这些内容留给你自己去发现。
2、系统架构
现在,对于交易撮合引擎的构成我们已经有了一些了解,那么让我们看一下整个系统的架构,以及我们将要使用的技术:
正如你上面看到的,我们的系统将包含引擎的多个客户端,这些客户端可以是交易所系统中的其他组件,例如接收终端用户委托请求的App等等。
在客户端和引擎之间的通信是使用Apache Kafka作为消息总线来实现的,每个交易对都对应Kafka的一个主题,这样我们可以确保当消息队列接收到用户委托单时,引擎将以同样的先后顺序处理委托单。这保证了即使引擎崩溃重启我们也可以重建交易委托账本。
引擎将监听Kafka主题,执行委托账本命令并将引擎的输出事件发布到消息队列中。当然如果能够监测委托单的处理速度以及交易的执行情况会更酷。我们可以使用Prometheus来采集性能指标,使用grafana来实现一个监视仪表盘。
3、开发语言选择
可以选择你熟悉的开发语言,不过由于交易撮合引擎计算量巨大,通常我们应当选择底层系列的语言,例如:C/C++、GoLang、Rust、Java等等。在这个教程中,我们使用Golang,因为它很快、容易理解、并发实现简单,而且我也有好久没有用C++了。
4、开发交易撮合引擎
我们将按照以下的步骤来开发交易撮合引擎:
- 基础类型定义
- Consumer实现
- Order Book实现
- Producer实现
- Monitoring实现
4.1 基础类型定义
我们需要首先定义一些基础类型,这包括Order、OrderBook和Trade,分别表示委托单、交易委托账本和交易:
下面是engine/order.go
文件的内容:
package engine
import "encoding/json"
type Order struct {
Amount uint64 `json:"amount"`
Price uint64 `json:"price"`
ID string `json:"id"`
Side int8 `json:"side"`
}
func (order *Order) FromJSON(msg []byte) error {
return json.Unmarshal(msg, order)
}
func (order *Order) ToJSON() []byte {
str, _ := json.Marshal(order)
return str
}
这里我们就是简单地创建了一个结构用来记录订单的主要信息,然后添加了一个方法用于快速的JSON转换。
类似地engine/trade.go
文件的内容:
package engine
import "encoding/json"
type Trade struct {
TakerOrderID string `json:"taker_order_id"`
MakerOrderID string `json:"maker_order_id"`
Amount uint64 `json:"amount"`
Price uint64 `json:"price"`
}
func (trade *Trade) FromJSON(msg []byte) error {
return json.Unmarshal(msg, trade)
}
func (trade *Trade) ToJSON() []byte {
str, _ := json.Marshal(trade)
return str
}
现在我们已经定义了基本的输入和输出类型,现在看看交易委托账本engine/order_book.go
文件的内容:
package engine
// OrderBook type
type OrderBook struct {
BuyOrders []Order
SellOrders []Order
}
// Add a buy order to the order book
func (book *OrderBook) addBuyOrder(order Order) {
n := len(book.BuyOrders)
var i int
for i := n - 1; i >= 0; i-- {
buyOrder := book.BuyOrders[i]
if buyOrder.Price < order.Price {
break
}
}
if i == n-1 {
book.BuyOrders = append(book.BuyOrders, order)
} else {
copy(book.BuyOrders[i+1:], book.BuyOrders[i:])
book.BuyOrders[i] = order
}
}
// Add a sell order to the order book
func (book *OrderBook) addSellOrder(order Order) {
n := len(book.SellOrders)
var i int
for i := n - 1; i >= 0; i-- {
sellOrder := book.SellOrders[i]
if sellOrder.Price > order.Price {
break
}
}
if i == n-1 {
book.SellOrders = append(book.SellOrders, order)
} else {
copy(book.SellOrders[i+1:], book.SellOrders[i:])
book.SellOrders[i] = order
}
}
// Remove a buy order from the order book at a given index
func (book *OrderBook) removeBuyOrder(index int) {
book.BuyOrders = append(book.BuyOrders[:index], book.BuyOrders[index+1:]...)
}
// Remove a sell order from the order book at a given index
func (book *OrderBook) removeSellOrder(index int) {
book.SellOrders = append(book.SellOrders[:index], book.SellOrders[index+1:]...)
}
在交易委托账本中,除了创建保存买/卖方委托单的列表外,我们还需要定义添加新委托单的方法。
委托单列表应当根据其类型按升序或降序排列:卖方委托单是按降序排列的,这样在列表中序号最大的委托单价格最低;买方委托单是按升序排列的,因此在其列表中最后的委托单价格最高。
由于绝大多数交易会在市场价格附近成交,我们可以轻松地从这些数组中插入或移除成员。
4.2 委托单处理
现在让我们来处理委托单。
在下面的代码中我们添加了一个命令来实现对限价委托单的处理。
文件engine/order_book_limit_order.go
的内容:
package engine
// Process an order and return the trades generated before adding the remaining amount to the market
func (book *OrderBook) Process(order Order) []Trade {
if order.Side == 1 {
return book.processLimitBuy(order)
}
return book.processLimitSell(order)
}
// Process a limit buy order
func (book *OrderBook) processLimitBuy(order Order) []Trade {
trades := make([]Trade, 0, 1)
n := len(book.SellOrders)
// check if we have at least one matching order
if n != 0 || book.SellOrders[n-1].Price <= order.Price {
// traverse all orders that match
for i := n - 1; i >= 0; i-- {
sellOrder := book.SellOrders[i]
if sellOrder.Price > order.Price {
break
}
// fill the entire order
if sellOrder.Amount >= order.Amount {
trades = append(trades, Trade{order.ID, sellOrder.ID, order.Amount, sellOrder.Price})
sellOrder.Amount -= order.Amount
if sellOrder.Amount == 0 {
book.removeSellOrder(i)
}
return trades
}
// fill a partial order and continue
if sellOrder.Amount < order.Amount {
trades = append(trades, Trade{order.ID, sellOrder.ID, sellOrder.Amount, sellOrder.Price})
order.Amount -= sellOrder.Amount
book.removeSellOrder(i)
continue
}
}
}
// finally add the remaining order to the list
book.addBuyOrder(order)
return trades
}
// Process a limit sell order
func (book *OrderBook) processLimitSell(order Order) []Trade {
trades := make([]Trade, 0, 1)
n := len(book.BuyOrders)
// check if we have at least one matching order
if n != 0 || book.BuyOrders[n-1].Price >= order.Price {
// traverse all orders that match
for i := n - 1; i >= 0; i-- {
buyOrder := book.BuyOrders[i]
if buyOrder.Price < order.Price {
break
}
// fill the entire order
if buyOrder.Amount >= order.Amount {
trades = append(trades, Trade{order.ID, buyOrder.ID, order.Amount, buyOrder.Price})
buyOrder.Amount -= order.Amount
if buyOrder.Amount == 0 {
book.removeBuyOrder(i)
}
return trades
}
// fill a partial order and continue
if buyOrder.Amount < order.Amount {
trades = append(trades, Trade{order.ID, buyOrder.ID, buyOrder.Amount, buyOrder.Price})
order.Amount -= buyOrder.Amount
book.removeBuyOrder(i)
continue
}
}
}
// finally add the remaining order to the list
book.addSellOrder(order)
return trades
}
看起来我们将一个方法变成了两个,分别处理买方委托单和卖方委托单。这两个方法在每个方面
都很相似,除了处理的市场侧不同。
算法非常简单。我们将一个买方委托单与所有的卖方委托单进行匹配,找出任何与买方委托价格
一致甚至更低的卖方委托单。当这一条件不能满足时,或者该买方委托单完成后,我们返会撮合
的交易。
4.3 接入Kafka
现在就快完成我们的交易引擎了,还需要接入Apache Kafka服务器,然后开始监听委托单。
main.go
文件的内容:
package main
import (
"engine/engine"
"log"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
)
func main() {
// create the consumer and listen for new order messages
consumer := createConsumer()
// create the producer of trade messages
producer := createProducer()
// create the order book
book := engine.OrderBook{
BuyOrders: make([]engine.Order, 0, 100),
SellOrders: make([]engine.Order, 0, 100),
}
// create a signal channel to know when we are done
done := make(chan bool)
// start processing orders
go func() {
for msg := range consumer.Messages() {
var order engine.Order
// decode the message
order.FromJSON(msg.Value)
// process the order
trades := book.Process(order)
// send trades to message queue
for _, trade := range trades {
rawTrade := trade.ToJSON()
producer.Input() <- &sarama.ProducerMessage{
Topic: "trades",
Value: sarama.ByteEncoder(rawTrade),
}
}
// mark the message as processed
consumer.MarkOffset(msg, "")
}
done <- true
}()
// wait until we are done
<-done
}
//
// Create the consumer
//
func createConsumer() *cluster.Consumer {
// define our configuration to the cluster
config := cluster.NewConfig()
config.Consumer.Return.Errors = false
config.Group.Return.Notifications = false
config.Consumer.Offsets.Initial = sarama.OffsetOldest
// create the consumer
consumer, err := cluster.NewConsumer([]string{"127.0.0.1:9092"}, "myconsumer", []string{"orders"}, config)
if err != nil {
log.Fatal("Unable to connect consumer to kafka cluster")
}
go handleErrors(consumer)
go handleNotifications(consumer)
return consumer
}
func handleErrors(consumer *cluster.Consumer) {
for err := range consumer.Errors() {
log.Printf("Error: %s\n", err.Error())
}
}
func handleNotifications(consumer *cluster.Consumer) {
for ntf := range consumer.Notifications() {
log.Printf("Rebalanced: %+v\n", ntf)
}
}
//
// Create the producer
//
func createProducer() sarama.AsyncProducer {
config := sarama.NewConfig()
config.Producer.Return.Successes = false
config.Producer.Return.Errors = true
config.Producer.RequiredAcks = sarama.WaitForAll
producer, err := sarama.NewAsyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
log.Fatal("Unable to connect producer to kafka server")
}
return producer
}
利用Golang的Sarama Kafka客户端开发库,我们可以分别创建一个接入Kafka的消费者和生产者。
消费者将在指定的Kafka主题上等待新的委托单,然后进行撮合处理。生成的交易接下来使用生产者发送到指定的交易主题。
Kafka消息采用字节数组编码,因此我们需要将其解码。反之,将交易传入消息队列时,我们还需要进行必要的编码。
5、结语
现在你有了一个可伸缩的交易引擎!完整的代码可以在GITHUB下载:crypto-exchange-engine。
不过这个引擎的目的是教学,另外代码还支持很多进一步的优化,例如:
- 使用一种更高效的匹配算法
- 添加取消订单的功能
- 增强通信能力
- 委托账本的备份与恢复
- 添加监视功能
如果你想学习区块链并在Blockchain Technologies建立职业生涯,那么请查看我们分享的一些以太坊、比特币、EOS、Fabric等区块链相关的交互式在线编程实战教程
原文链接:交易撮合引擎原理与实现 — 汇智网
上一篇: 问题说明:ffmpeg中调用avformat_close_input函数释放结构体时发生崩溃 :解决ffmpeg中avformat_close_input函数释放结构体时发生崩溃的问题
下一篇: 使用FFmpeg实现H.264解码
推荐阅读
-
【2022新手指南】Java编程进阶之路 - 六、技术架构篇 ### MySQL索引底层解析与优化实战 - 你会讲解MySQL索引的数据结构吗?性能调优技巧知多少? - Redis深度揭秘:你知道多少?从基础到哨兵、主从复制全梳理 - Redis持久化及哨兵模式详解,还有集群搭建和Leader选举黑箱打开 - Zookeeper是个啥?特性和应用场景大公开 - ZooKeeper集群搭建攻略及 Leader选举、读写一致性、共享锁实现细节 - 探究ZooKeeper中的Leader选举机制及其在分布式环境中的作用 - Zab协议深入剖析:原理、功能与在Zookeeper中的核心地位 - RabbitMQ全方位解读:工作模式、消费限流、可靠投递与配置策略 - 设计者视角:RabbitMQ过期时间、死信队列与延时队列实践指南 - RocketMQ特性和应用场景揭示:理解其精髓与差异化优势 - Kafka详细介绍:特性及广泛应用于实时数据处理的场景解析 - ElasticSearch实力揭秘:特性概述与作为搜索引擎的广泛应用 - MongoDB认知升级:非关系型数据库的优势阐述,安装与使用实战教学 - BIO/NIO/AIO网络模型对比:掌握它们的区别与在网络编程中的实际应用 - Netty带你飞:理解其超快速度背后的秘密,包括线程模型分析 - 网络通信黑科技:Netty编解码原理与常用编解码器的应用,Protostuff实战演示 - 解密Netty粘包与拆包现象,怎样有效应对这一常见问题 - 自定义Netty心跳检测机制,轻松调整检测间隔时间的艺术 - Dubbo轻骑兵介绍:核心特性概览,服务降级实战与其实现益处 - Dubbo三大神器解读:本地存根与本地伪装的实战运用与优势呈现 ----------------------- 七、结语与回顾
-
撮合交易引擎:工作原理及实际实施