在快節奏的股市中,實時交易系統讓投資者能即時抓住交易機會。尤其自從2020年台股實施逐筆交易以來,市場的動態更加迅速。在這種環境下,能否迅速分析和反應,直接影響到交易成果。想像一下,如果我們可以在每次價格更新時立即執行交易策略,那麼我們就能最快速的反應市場變化,也就是所謂的「逐筆洗價」的概念。
為了達到這種快速反應,系統就必須使用並行處理(Concurrency)。並行處理讓系統能夠同時進行多個任務,這不僅能提升效率,也確保能夠即時處理來自不同商品的數據。在技術上,這通常涉及三種主要的方式:多線程(Threading)、非同步處理(Asyncio)和多進程處理(Multiprocessing)。初次接觸可能聽起來有點複雜,但別擔心,我們來一一深入淺出地介紹這些概念。
在本教程中,我們將通過比較這三種方法在處理股票逐筆交易數據時的效率和適用情境,來幫助你選擇最適合你交易策略需求的法方。
讓我們開始探索這些並行處理技術如何讓你的交易系統如虎添翼吧!
Disclaimer: 本文僅供教學與參考之用,實務交易應自行評估並承擔相關風險
完整範例程式
完整的範例程式碼在此: https://github.com/phenomenoner/FubonNeo_examples/tree/main/realtime_price_driven_strategy_executor
共同設定
無論是使用多線程(Threading)、非同步處理(Asyncio)還是多進程處理(Multiprocessing),我們的三個範例程式有一些共通的組件。首先,我們使用富邦新一代API(FubonSDK)來連接市 場數據和未來執行交易策略操作。
另外,程式中都使用了環境變數來管理敏感資訊,如登入憑證和使用者資訊。這些資訊被存儲在.env文件中,這是一個簡單的文本文件,讓我們將登入資訊從程式碼中分離出來,以增強安全性和便於管理。使用dotenv庫,我們可以輕鬆地在程式中加載這些環境變數,確保敏感信息不會被記錄在程式碼裡。
每個範例程式的主要執行功能 run() 也有著類似的結構。這個函數負責初始化實時數據連接,訂閱特定股票的行情,並根據接收到的數據執行交易策略。這一設計模式確保了無論使用哪種並行處理技術,程式的核心功能性都能被保留並有效的執行。
通過這種共同的設置,我們能夠更專注於比較不同並行處理技術的實際應用效果,而不用擔心基本功能的差異。接下來的章節將展示每種技術的具體實現和應用場景,幫助你更好地理解並選擇適合自己需求的方式
並行處理(Concurrency)模式概覽
在開始深入各種並行處理技術之前,了解它們的基本概念和常見用途是非常重要的。每種模式都有其獨特的優勢和限制,適合於不同類型的應用場景。
多線程(Threading)
多線程是一種在同一程序中執行多個線程(即輕量級的執行過程),讓每個線程都能夠同時進行操作。這種方法可以有效地提高應用程序的響應性和處理速度,特別適合於I/O密集型的應用,如檔案讀寫、網路操作等。然而,多線程需要管理線程之間的資源共享,尤其在資料一致性和 線程安全方面,需要仔細處理競爭條件(race conditions)和死鎖(deadlocks)的問題。
非同步處理(Asyncio)
非同步處理利用單一執行線程,通過事件循環和協程來管理多個任務。這種模型不是透過多個線程來實現並行,而是將任務分解為可以在等待I/O操作(如網路請求或資料庫查詢)完成時被暫停和恢復的步驟。非同步處理特別適合處理大量的非阻塞I/O操作,能夠在不增加額外硬件資源的情況下,大幅提升應用效能和擴展性。
多進程處理(Multiprocessing)
多進程處理通過創建多個進程,每個進程都運行在自己的記憶體空間內,來達到真正的並行計算。這種模型適合於CPU密集型的任務,如大規模計算和數據分析,因為它可以避免GIL(Global Interpreter Lock)的限制,並充分利用多核處理器。然而,進程間的通訊(Inter-process Communication, IPC)較為複雜,需要使用專門的IPC機制,如隊列(queues)和管道(pipes),來進行數據交換。這些工具幫助進程安全有效地共享信息,儘管它們增加了管理的複雜度。
範例程式詳解
共同的環境設置與初始化
環境設置
在三個範例程式中,我們首先導入了必要的模組和.env文件中的環境變數,這包括用戶的登入憑證等資訊。這樣的設計有助於保護敏感資訊,並使得程式更容易在不同環境中部署 。
import json
import os
from dotenv import load_dotenv
from fubon_neo.sdk import FubonSDK #富邦新一代API
load_dotenv() # Load .env
# 將登入資料載入環境變數
id = os.getenv("ID")
trade_password = os.getenv("TRADEPASS")
cert_filepath = os.getenv("CERTFILEPATH")
cert_password = os.getenv("CERTPASSS")
定義類別 (Class)
在我們的範例程式中,所有的功能都被封裝在對應的的 class 中。使用 class 的設計有幾個好處:
- 封裝性(Encapsulation):類別允許我們將相關的數據(如 accounts)和方法(如 login, run, __handle_message, 和 __execute_strategy)封裝在一起,這有助於保持程式碼的組織性和可讀性。也可以隱藏不必要的細節,只開放必要的操作接口。
- 重用性(Reusability):在類中定義的方法可以被類的任何實例重用。這意味著如果我們需要創建多個策略執行器對象,每個對象都可以使用相同的方法但保持自己的狀態,這在多策略交易系統中特別有用。
- 繼承性(Inheritance):使用類結構還允許我們透過繼承擴展現有的功能。例如,如果未來需要開發一個特殊的策略執行器,它可以繼承這邊的class並修改或擴展其功能,而無需重寫共用的功能。
這些函數被設計為類的方法主要是為了利用這些面向對象程式設計的優勢,從而使得程式碼更加靈活和可維護。
登入函數
Class 中的的 login 函數負責使用提供的憑證資料登入 API,並存儲賬戶信息。這是所有範例程式共有的一部分,核心功能在於通過API建立連接並驗證用戶
def login(self, id, pwd, certpath, certpwd):
self.accounts = self.sdk.login(id, pwd, certpath, certpwd)
return self.accounts
範例程式碼1: 多線程 (Threading)
run函數: 啟動策略執行器
在 run 函數中,首先確保已經登入,然後初始化實時市場數據的連接,並為每個股票設置鎖和最新時間戳,這 些是為了線程安全和數據一致性考量。
def run(self, symbols):
if self.accounts is None:
print("請先登入 ...")
return
self.sdk.init_realtime()
marketdata_ws = self.sdk.marketdata.websocket_client.stock
marketdata_ws.on("message", self.__handle_message)
marketdata_ws.connect()
for symbol in symbols:
self.locks[symbol] = threading.Lock() # 多線程鎖
self.lastest_timestamp[symbol] = None
marketdata_ws.subscribe({'channel': 'trades', 'symbol': symbol}) # 訂閱行情
while True:
time.sleep(5) # Keep the program running
__handle_message函數: 處理來自市場的訊息
這個方法首先解析從 WebSocket 接收到的消息。如果事件類型是"data",它會使用 threading.Thread 來啟動一個新的線程來處理交易策略,這樣做可以確保每次數據更新時策略都能快速響應,而不會被前一次數據處理的延遲所影響。
def __handle_message(self, message):
msg = json.loads(message)
event = msg["event"]
data = msg["data"]
if event == "data":
threading.Thread(target=self.__execute_strategy, args=(data,)).start()
elif event == "pong" or event == "heartbeat":
return # Keep the connection alive
else:
print(f"Unhandled event: {event}")
__execute_strategy函數: 執行交易策略
這個函數是每個新線程的目標函數,其中包含了確保數據一致性和避免重複處理的邏輯。使用 locks[symbol] 鎖定確 保對於同一支股票,我們的交易策略一次只執行一筆報價,避免混亂。
def __execute_strategy(self, data):
symbol = data["symbol"]
timestamp = int(data["time"])
with self.locks[symbol]: # Ensure that the strategy for a single symbol is not executed concurrently
# Check if this price data is the most recent
if self.lastest_timestamp[symbol] is None or self.lastest_timestamp[symbol] < timestamp:
self.lastest_timestamp[symbol] = timestamp
else:
return # Skip if the data is not the latest
# TODO: Implement trading strategy logic
print(f"{symbol}, 報價 {data['price']}, 執行策略 ...")
time.sleep(3) # Dummy sleep for demonstration
在 __execute_strategy 函數中,我們首先使用 with self.locks[symbol] 來獲取對應股票的多線程鎖。這個步驟是關鍵,因為他防止了對同一股票的多個線程同時進行計算和操作,從而避免了潛在的競爭條件( race conditions )。在鎖定期間,我們檢查接收到的數據是否是該股票的最新數據。如果是,我們將更新時間戳並進行策略計算;如果不是,我們將忽略該數據,避免重複處理過時的信息。
範例程式碼2: 非同步 (Async)
此範例程式引入了 asyncio 模組,用於管理非同步操作,適合於高負載的非阻塞I/O場景。
import asyncio
StrategyExecutorAsync Class 解說
StrategyExecutorAsync class 封裝了與市場數據訂閱及消息處理相關的非同步方法。此類設計利用asyncio的事件循環(event loop)來有效管理多個非同步任務。
class StrategyExecutorAsync:
def __init__(self):
self.sdk = FubonSDK()
self.accounts = None
self.event_loop = asyncio.new_event_loop() # 為此 class instance 創建的事件循環(async event loop)
run函數:啟動策略執行
此方法初始化實時數據連接並設置非同步事件處理器,以非阻塞方式處理市場事件和數據。
def run(self, symbols):
if self.accounts is None:
print("請先登入 ...")
return
self.sdk.init_realtime()
marketdata_ws = self.sdk.marketdata.websocket_client.stock
marketdata_ws.on("message", self.__handle_message)
marketdata_ws.connect()
for symbol in symbols:
marketdata_ws.subscribe({'channel': 'trades', 'symbol': symbol})
asyncio.run(self.__keep_running()) # 啟動事件循環
__keep_running 函數:保持事件循環運行
__keep_running 是核心的協程(coroutine),在剛剛 run 函數的最後呼叫,其作用是使得程序能夠持續運行而不退出,同時允許事件循環 (event loop) 持續運行。在 asyncio 框架中,事件循環是處理所有非同步操作的中心機制。如果沒有一個持續運行的事件循環,程式會執行完畢並立即退出,那麼如之後我們新增的非同步任務 (如 __handle_message 函數中的 self.event_loop.create_task 所新增的任務) 將無法被執行。
async def __keep_running(self):
while True:
await asyncio.sleep(5) # 讓事件循環持續運作
__handle_message函數:非同步處理訊息
此方法解析從行情 WebSocket 所接收到的數據,根據事件類型非同步安排任務執行。
def __handle_message(self, message):
msg = json.loads(message)
event = msg["event"]
data = msg["data"]
if event == "data":
# 發送非同步任務
self.event_loop.create_task(self.__execute_strategy(data))
elif event == "pong" or event == "heartbeat":
return # 處理保持連線的事件
else:
print(f"Unhandled event: {event}")
__execute_strategy函數:非同步執行交易策略
這個協程負責非同步執行交易策略,處理市場數據更新,透過安排的任務確保主線程不被阻塞。
async def __execute_strategy(self, data):
symbol = data["symbol"]
print(f"{symbol}, 報價 {data['price']}, 執行策略 ...")
await asyncio.sleep(3) # 模擬處理時間
關於 async def v.s. def
在 asyncio 中,有些函數我們用 async def 來定義,這表示定義一個協程 (coroutine),這是一種特殊類型的函數,它的執行可以在任何 await 表達式的地方被暫停,並在未來某個時點從上次暫停的地方繼續執行。這與傳統的函數(使用 def 定義)不同,後者是同步執行的,一旦開始就會執行到結束,中間無法中斷。
使用 async def 允許函數執行非同步等待(如 I/O 操作),而不會阻塞整個程序的執行。這對於需要高性能 I/O 處理的應用來說至關重要。在我們的情況下,async def 用於定義可以進行非阻塞等待操作的協程,如 await asyncio.sleep(3) 或等待網路回應 (例如使用 aiohttp)。
這些非同步操作讓我們的交易策略可以在不同的任務之間高效地切換,優化資源使用,提高應用的反應速度和整體效率。這種模式是處理大量並發數據和高頻交易策略的理想選擇。
範例程式碼3: 多進程處理 (Multiprocessing)
此範例採用多進程處理(Multiprocessing),適用於執行計算密集型任務,這種方法可以有效地利用多核心處理器的能力。
import multiprocessing
execute_strategy 函數:處理交易策略
在多進程的實現中,execute_strategy 函數沒有放在 class 中,這樣的設計與前兩個範例不同。函數放在類別之外,因為每個進程需要執行完全獨立的任務,並且在不同的記憶體空間中操作。將這個函數放在類別外使得每個進程可以更簡單地啟動和管理其專屬的執行環境。
def execute_strategy(price_queue):
while True:
try:
data = price_queue.get(timeout=5) # 從隊列中獲取價格資訊
except queue.Empty:
continue # 如果隊列空,則繼續等待
symbol = data["symbol"]
print(f"{symbol}, 報價 {data['price']}, 執行策略 ...")
# 此處添加交易策略邏輯
...
此函數從多進程隊列中提取數據,並處理每個股票的交易策略。由於每個進程均在其自己的記憶體空間內運行,這有助於避免全局解釋器鎖(GIL)的問題,並提升執行效率。
StrategyExecutorMultiprocess Class 詳解
此class專為有效處理多進程而設計,透過為每個股票符號創建獨立的進程來執行交易策略。
- 初始化隊列 (queue)字典:
class StrategyExecutorMultiprocess:
def __init__(self):
...
self.price_queue = {} # 初始化一個字典來儲存每個股票的隊列
字典 self.price_queue 以股號作為 key,存儲每個的相對應的隊列 (queue),這些隊列用來在主進程和專門執行各個商品交易策略子進程之間的傳遞數據。此設置確保了個別商品的相關的數據被隔離管理,降低了複雜性和潛在的數據處理錯誤風險。
- 處理進程:
def run(self, symbols):
...
strategy_processes = []
for symbol in symbols:
self.price_queue[symbol] = multiprocessing.Queue() # 為 每個符號創建一個新的隊列
process = multiprocessing.Process(target=execute_strategy, args=(self.price_queue[symbol],))
process.start()
strategy_processes.append(process)
for process in strategy_processes:
process.join() # 等待所有進程完成
- strategy_processes:此列表用於跟蹤創建的所有子進程。 對於每個股票代號,實例化一個新的 multiprocessing.Queue(),並創建一個新進程,目標函數是 - execute_strategy,參數是對應的隊列。這種設計允許每個進程根據通過其指定隊列接收的數據獨立執行策略。
- process.start() :啟動進程,開始執行目標函數。
- process.join():等待進程完成。這通常用於防止程式在所有進程完成執行之前退出,確保所有策略在程序結束前完成。
- 處理成交價數據:
def __handle_message(self, message):
msg = json.loads(message)
event = msg["event"]
data = msg["data"]
if event == "data":
symbol = data["symbol"]
self.price_queue[symbol].put(data) # 將數據放入對應股票的隊列
- 當新的市場數據到來時,將其根據股號推入相應的隊列。這一步至關重要,因為它將數據輸入到策略處理進程中,確保每個進程僅接收到其特定交易策略相關的數據。
- execute_strategy:這個函數在一個獨立的進程中運行,持續從其隊列中檢索數據並處理它。由於數據由主進程輸入到隊列中,並由子進程消費,因此明確了責任分離,增強了性能和可維護性。
此多進程的設計利用了多核處理器的計算資源,每個獨立操作交易策略的進程都有效提高了系統的整體性能。這種方法在需要處理大量計算密集型任務的交易系統中表現尤為有效。
三種並行處理模式比較
每種並行模式在實際場景中的表現都有其利弊,需考量因素包括CPU使用率、記憶體開銷、響應性和錯誤處理:
多線程 (Threading)
- CPU 使用率: 對 I/O-bound 任務效率高,但在計算密集型場景中可能因上下文切換導致較高的 CPU 開銷。
- 記憶體用量:線程共享同一記憶體空間,可能導致較低的記憶體消 耗,但若管理不當則風險較高。
- 響應性能:能夠同時處理多個操作,對 I/O 操作的響應非常快。
- 錯誤處理:由於共享狀態和並行問題(如競態條件和死鎖),錯誤處理可能較為複雜。
優點:適合需要並行處理且非重計算任務的應用。 缺點:處理共享資源時容易出錯,易發生並行錯誤
非同步處理 (Asyncio)
- CPU 使用率:因使用非阻塞操作和單線程事件循環,CPU 開銷最小,非常 適合 I/O-bound 任務。
- 記憶體用量:通常低於多線程,因不需多線程上下文。
- 響應性:在 I/O-bound 场景中響應極高,因其非阻塞本質和有效的任務調度。
- 錯誤處理:相對於多線程簡單,因無鎖或共享記憶體問題,但需妥善處理協程中的異常。
優點:極佳的高 I/O 操作能力,如處理大量同時網絡連接。 缺點:不適合 CPU-bound 任務,非同步編程模式有一定學習曲線。
多進程處理 (Multiprocessing)
- CPU 使用率:非常有效地利用多核心進行 CPU-bound 任務,不受 Python 的全局解釋器鎖(GIL)影響。
- 記憶體用量:由於每個進程有自己的記憶體空間,記憶體使用率較高。
- 響應性能:適合可平行化任務,但維持進程間通訊所需資源可能導致延遲。
- 錯誤處理:進程間隔離,單一進程故障不會影響其他進程,使此模型對進程級錯誤具有較強的韌性,但需小心處理進程間通訊錯誤。
優點:最適合 CPU 密集型操作;進程完全獨立,增強了穩定性和可靠性。 缺點:高記憶體使用率;進程間通訊較為複雜且速度慢於線程。
在 Python 中掌握並行處理的藝術
想像自己踏入一間充 滿咖啡香的精品咖啡館,每種並行處理模型就像是製作咖啡的不同技術,幫助你高效且精確地準備每一杯完美的咖啡。
多線程:繁忙的咖啡吧台
在 Python 的世界裡,使用多線程就好比是在咖啡吧台上有多位咖啡師同時工作。這些咖啡師共享相同的咖啡機和工具,他們需要彼此協調,以避免相互干擾,同時保持服務速度和品質。這種方式非常適合於處理需要等待咖啡機或磨豆機操作的任務,這些操作需要關注但不需持續行動。
然而,當所有咖啡師同時忙於複雜的咖啡訂單時,吧台可能會變得擁擠,這就類似於多線程中的 CPU 開銷增加。由於所有人共享相同的設備和空間,雖然可以節省資源,但如果管理不當,很容易發生錯誤,比如誤用咖啡豆或者配錯奶泡。
非同步處理:單獨的咖啡師與自動化工具
非同步處理就像是一位咖啡師操作多台自動化的咖啡機。這位咖啡師不需要在每台機器旁邊等待,而是設置好程序後就可以處理其他任務,比如準備飲料或服務顧客。這種模式在需要處理大量快速且簡單的咖啡訂單時表現出色,例如快速製作濾泡咖啡或即溶咖啡。
由於只需要一人操作,這種方法對 CPU 資源的使用非常少,記憶體開銷也比多人操作的情況要低。非同步處理的響應性非常高,非常適合在高客流量時保持服務的流暢和快速。
多進程處理:分店經營模式
多進程處理可以比作是開設多家咖啡店,每家店都有自己的咖啡師和設備。這樣可以在不同的地點同時為更多的顧客提供服務,每個進程就像是一個獨立的咖啡店。這種方式適合於需要大量分散處理的複雜咖啡訂單,如同時製作多種手沖單品咖啡。
結語
正如在忙碌的咖啡館中, 每一位咖啡師都需精確快速地應對每一杯咖啡的製作,即時交易系統也必須以同樣的效率和精確度來處理股市的動態。在股市這個快速變化的環境中,如同熟練的咖啡師利用先進的咖啡機與技術一樣,交易系統利用多線程、非同步處理和多進程這三種並行處理技術來確保每次市場價格更新時能迅速且準確地執行交易策略。
多線程如同多位咖啡師在一個吧台上協同工作,適合快速處理多個交易任務;非同步處理則像是使用多台自動咖啡機的咖啡師,能在不阻塞主操作的同時,處理大量的數據請求;而多進程則提供了將各個交易策略分配到不同「咖啡店」的能力,使每個策略都能在最適合其需求的環境中最大化效能。
選擇合適的並行處理模型,就像是為你的咖啡館選擇最合適的咖啡製作技術一樣重要。根據你的交易系統的規模、複雜性、以及對延遲的敏感性來選擇最適合的技術,將確保你在股市的競爭中始終保持領先。
透過本教程,希望能幫助你了解並掌握這些並行處理技術,讓你的交易系統不僅如同精製咖啡般精確無誤,更能在股市的激烈競爭中穩占優勢。就讓我們在這場技術與市場的競賽中,握緊手中的精粹過的策略,享受每一次交易帶來的刺激與成功吧!