FIREして市場に落ちている金を拾うブログ

30代でFIREし、プログラムを勉強して米国株式と仮想通貨で落ちている金をせっせと拾う記録。

Websocket実装に向けて非同期処理を勉強する(WebSocket)その3

前回に引き続き、まちゅけんさん記事を見ながら勉強。

RESTベースで作っていたロジックを移植

どうやらBalanceについては実装されてないらしい。
また、Positionについても、Store.Positionは自炊よりも遅れる模様(RESTで取得しているため?)。
よって、Balance, Positionいずれも自炊することにした。Storeで使うのはOrderbook、Fills、Ordersのみ。

そういうわけなので、RESTベースで過去に作っていたBotからロジックを移植。
過去ロジックでは、Balance, PositionについてループのはじめにRESTで取得していたが、
Bot起動時に初期値を取得したあと、Store.Fillsのデータを使って自炊するように変更した。

なんとか動く感じになった。

Place Orderの待機ロジックを変更

公式サンプルでは、高頻度botにおいてはRESTよりWebSocketが早い可能性があるので、
下記コードでイベント待機することになっている。

            # オーダー執行
            if cond:
                # 高頻度では重複オーダーしないようにオーダー後WebSocketでデータ受信するまで待機させる
                # RESTの応答よりWebSocketのイベントの方が速い可能性があるので先にイベント待機タスクをスケジュールする
                event = asyncio.create_task(store.order.wait())
                await client.post('/v2/private/order/create', data={
                    'symbol': 'BTCUSD',
                    'side': side,
                    'order_type': 'Limit',
                    'qty': qty,
                    'price': price,
                    'time_in_force': 'GoodTillCancel',
                })
                await event

やってみたんだが、うまく動作しないようであった。

今回は両建てbotを移植したので、出しておいたMaker注文が約定したら、反対側のTaker注文を発するロジックになってるが、
上記コードでは

1. Maker注文約定認識(store.fills.findで把握)
2. Balance/ Position自炊(未ヘッジ分を把握)
3. 未ヘッジ分のTaker注文発注
4. Taker発注認識(store.order.waitイベント発生)
5. Taker注文約定認識(store.fills.findで把握)
6. Balance/ Position自炊(全量ヘッジを確認)

となるはずが、

1. Maker注文約定認識(store.fills.findで把握)
2. Balance/ Position自炊(未ヘッジ分を把握)
3. 未ヘッジ分の①Taker注文発注
4. ①Taker発注認識(store.order.waitイベント発生)
5. Balance/ Position自炊(3のTaker注文約定未認識のため、一部未ヘッジのままと認識)
6. 誤認識の未ヘッジ分②Taker注文発注

7. ①Taker注文約定認識(store.order.waitイベント発生)
8. ②Taker注文約定認識(時間経過)
9. Balance/ Position自炊(二回ヘッジオーダーをしたため、反対側へのヘッジが必要と認識)
10. 3~9をエンドレスループ

となり、ひたすら往復ビンタでOIを積み上げることになってしまった。
ロットを絞ってなかったらえらい目に合うとこだった。

対応策としては、シンプルだが

                event = asyncio.create_task(store.fills.wait())

として、ヘッジオーダーの約定まで待機することで解決。

Websocket実装に向けて非同期処理を勉強する(WebSocket)その2

前回に引き続き、まちゅけんさん記事を見ながら勉強。
Pybottersを触る。

自分のドキュメント読めてなさに絶望

Exampleで書かれているBybitの例を適当にFTXに書き換えて入れてみるが動作しない。
しょうがないので、Data storeのInitializeで使われているclient.get('orders)などで何が吐き出されているのか見ようとするが、そもそもこの吐き出しがうまくいかない。デバッグのためのコードすらかけないことに絶望。

色々いじっていると、元のドキュメントに書いてある。。。

戻り値はライブラリ aiohttp.ClientResponse のインターフェースです。 status プロパティでHTTPステータスを取得できます。
json, text メソッドでレスポンスボディを取得できます。

そりゃclient.getで取得したオブジェクトをprintしてもちゃんと出ないわ。.json()して出力してようやく中身が見えた。

さらにちゃんと見ると、FTXのDatastore関連の話が書いてあるじゃん。
この時間は一体なんだったのか。
コピペしてポジション取得して、FillsをWebSocketで捕捉できるようになった。

しかしBalanceがないな。もしかしてここは自分で書かないといけない感じだろうか。。。

Websocket実装に向けて非同期処理を勉強する(WebSocket)

前回に引き継ぎ、まちゅけんさんの記事を見ながら勉強。今回はWebSocket。
WebSocketの記事は流し読みして、pybotterライブラリを使うところにいきなり突入。

これまでWebSocketがうまく動作しなかった理由がようやくわかる

これまで、いろんなラッパーを使おうとするも、どうしてもエラーが出て使えなかったが、ようやくわかった。
Jupyter NotebookとPythonでは、微妙に非同期処理の使い方が違うらしい。

import asyncio
import pybotters

apis = {"bybit": ["YOUR_API_KEY", "YOUR_API_SECRET"]}

async def main():
    async with pybotters.Client(base_url="https://api.bybit.com", apis=apis) as client:
        r = await client.get("/v2/public/tickers", params={"symbol": "BTCUSD"})
        data = await r.json()
        price = round(float(data["result"][0]["bid_price"]) - 300.0, 1)

        r = await client.post(
            "/v2/private/order/create",
            data={"symbol": "BTCUSD", "side": "Buy", "order_type": "Limit", "qty": 1, "price": price, "time_in_force": "PostOnly"},
        )
        data = await r.json()
        print(data)
 
asyncio.run(main())

上記コードをFTX用に書き換えて実行したら、下記エラーが出た。

asyncio.run() cannot be called from a running event loop

なんでか調べたところ、こちらに行き着いた。

The asyncio.run() documentation says:

This function cannot be called when another asyncio event loop is running in the same thread.

The problem in your case is that jupyter (IPython) is already running an event loop (for IPython ≥ 7.0):

You can now use async/await at the top level in the IPython terminal and in the notebook, it should — in most of the cases — “just work”. Update IPython to version 7+, IPykernel to version 5+, and you’re off to the races.

That's the reason why you don't need to start the event loop yourself in jupyter and you can directly call await main(url) even outside asynchronous functions.

In jupyter

async def main():
print(1)

await main()
In plain Python (≥3.7)

import asyncio

async def main():
print(1)

asyncio.run(main())

Jupyter Notebookでは、プログラムとは別にEvent Loopを回しているようで、
それとは別にEvent Loopにタスクを突っ込んで回そうとしてもだめらしい。
今回は最終的にPythonとして実行するため、Jupyter NotebookではなくPythonで開発するように変更した。

エンドポイント設定でハマる

Pythonでサンプルコードを変えて入れていたが、ずっとNot Logged Inとエラーが出てハマる。
FTX側のホワイトリスト設定も見てみるが、特に問題はない。
どうもわからなかったので、もしかしてPybottersのAuthentificationにミスが有るのでは?と思って見るがバグを発見できず。

しばらく試行錯誤して、base_url = 'https://ftx.com/api/'となっており、r = await client.get('/positions')としていたことに気づく。
もしかして"/"が重複してるのでは?と思い片方削ると無事成功。
えー、、、URL間違えてもNot Logged Inって出るの・・・。Authentificationミスのときだけそのエラーが出てほしかった。

self設定でハマる

なんか引数が足りんと言われるなぁ・・・と思ってると、Classの中の関数で、selfを引数に設定し忘れていた。

そんなわけて一応WebSocket動作

import time
import asyncio
import pybotters

class FTX():
    # 定数
    URLS = {'REST': 'https://ftx.com/api/',
           'WebSocket': 'wss://ftx.com/ws/',
          } 

    def __init__(self, api_key = None, api_secret = None, subaccount = None):
        # APIキー、Secretをセット
        self.api_key = api_key
        self.api_secret = api_secret
        self.subaccount = subaccount    
        self.apis = {
            'ftx': [self.api_key, self.api_secret],
        }

    # ------------------------------------------------ #
    # WebSocket
    # ------------------------------------------------ #
    async def main(self):
        async with pybotters.Client(base_url = self.URLS['REST'], apis = self.apis, headers={'FTX-SUBACCOUNT': self.subaccount}) as client:
            symbol = 'BTC-PERP'
            await client.ws_connect(
                self.URLS['WebSocket'],
                send_json=[
                    {'op': 'subscribe', 'channel': 'orders'},
                    {'op': 'subscribe', 'channel': 'fills'},
                ],
                hdlr_json=store.onmessage,)

            while True:
                start = time.time()
                await store.orders.wait()
                orders = store.orders.find()
                print(orders)
                await asyncio.sleep(5)
                end = time.time()
                elpsed_time = end - start
                print(elpsed_time)

try:
    _api_key = ''
    _api_secret = ''
    _subaccount = ''
    
    ftx = FTX(_api_key, _api_secret, _subaccount)
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())  # need windowsOS python3.8
    asyncio.run(ftx.main())

except Exception as e:
    print('=== エラー内容 ===')
    print(f'{str(e)}')

except KeyboardInterrupt:
    pass

あとはこいつを使ってラッパー化していくつもり。

Websocket実装に向けて非同期処理を勉強する(REST API)

REST APIだけで実装しているbotにWebsocket入れたーい。
そんな思いでまちゅけんさんの記事を勉強したメモです。元記事はこちら。

zenn.dev

import asyncio
import aiohttp

async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.bybit.com/v2/public/tickers?symbol=BTCUSD") as resp:
            data = await resp.text()
        print(data)

asyncio.run(main())

5行目で非同期コンテキストマネージャーを用いて HTTPクライアントセッション [3] を作成しています

いきなり用語がわからない。非同期コンテキストマネージャーとはなんぞや?
調べてみると、withがコンテキストマネージャというらしい。それをasyncで使ってるので非同期コンテキストマネージャ。
調べてないが、ClientSessionをwithで開くと、with構文終了後、セッションが勝手に閉じるようになってると思われる。

6行目でまた非同期コンテキストマネージャーを用いて HTTPリクエスト(GET)を行いレスポンスを待機 ( HTTP通信 [4] )します ( HTTPヘッダ が取得されます)

書いてあることはわかるが、必要性が理解できない。
5行目で非同期コンテキストマネージャー使ってるので、セッションは閉じるようになってるはず。6行目でも非同期コンテキストマネージャーを使う必要があるのか?なにか閉じるものがあるのか?
7行目のawaitとはどう使い分けているのか?根底コネクタってなんだ?

なお、コルーチン関数で実装されているメソッドはawait文で実行する必要があるらしい。へー。

そう考えていると、次にもう一つ違う書き方が。

import asyncio
import aiohttp

async def main():
    async with aiohttp.ClientSession() as session:
        r = await session.get("https://api.bybit.com/v2/public/tickers", params={"symbol": "BTCUSD"})
        data = await r.json()
        print(data["result"])
 
asyncio.run(main())

やっぱasync withでgetする必要はないらしい。だよねー。

import asyncio
import aiohttp
import time

async def fetch(url, params={}):
    r = await session.get(url, params=params)
    data = await r.json()
    return data

async def main():
    global session
    async with aiohttp.ClientSession() as session:
        stime = time.time()

        results = []
        coro1 = fetch("https://api.bybit.com/v2/public/tickers", params={"symbol": "BTCUSD"})
        coro2 = fetch("https://api.bybit.com/v2/public/tickers", params={"symbol": "ETHUSD"})
        coro3 = fetch("https://api.bybit.com/v2/public/tickers", params={"symbol": "EOSUSD"})
        coro4 = fetch("https://api.bybit.com/v2/public/tickers", params={"symbol": "XRPUSD"})
        task1 = asyncio.create_task(coro1)
        task2 = asyncio.create_task(coro2)
        task3 = asyncio.create_task(coro3)
        task4 = asyncio.create_task(coro4)
        await task1
        await task2
        await task3
        await task4
        results.append(task1.result())
        results.append(task2.result())
        results.append(task3.result())
        results.append(task4.result())
        print(results)

        etime = time.time()
        print(f"Total {etime - stime:.4f} sec")

asyncio.run(main())

前節との最も重要な違いは20-23行目でコルーチンをTask化し実行を イベントループ にスケジュールし、それを24-27行目で各Taskが終了するのを await文 で待機しているところです。
前節ではイベントループにスケジュールせずに直接await文で実行していました。ここで分かるのは await文は "実行する" というよりも、awaitを行った行で制御を離してイベントループに任せて "awaitしたオブジェクトが終了するまで待機する" という命令 であることです。

すごくよくわかった。
非同期処理の本質的なメリットは、1つ目のリクエストへのレスポンスが返ってくる前に、2つ目のリクエストを投げられること。
そのため重要なのは、イベントループにタスクを入れきったあと、awaitで終了するまで待機すること!

Websocketは記事を分けます。