(なぜ ASGI が必要になったのか)= # なぜ ASGI が必要になったのか Vol.1「WSGI が生まれた背景」で WSGI の仕組みを学び、Vol.1「WSGI の上に何が必要になるのか」でその上にフレームワークを構築し、{numref}`Django を WSGI 視点で見る`({ref}`Django を WSGI 視点で見る`)で Django が WSGI をどのように活用しているかを追跡してきました。 WSGI はシンプルで相互運用性が高く、Python Web エコシステムの基盤として20年近く機能してきました。 しかし Web の進化とともに、WSGI の設計では対応できない領域が広がってきています。 ```{important} 本節では、ASGI が必要になった **4つの背景** を整理します。 それぞれが独立した問題ではなく、互いに関連する要因です。 ``` ## WSGI の同期モデルの限界 WSGI の `application(environ, start_response)` は **同期関数** です。 この関数が `return` するまで、呼び出し元の WSGI サーバはそのワーカーを他のリクエストに使えません。 Vol.1「WSGI が生まれた背景」で確認した通り、これは「1リクエスト = 1ワーカーの占有」を意味します。 ```{mermaid} flowchart TD subgraph WSGI同期モデル W1[ワーカー1] -->|DB待ち 200ms| B1[ブロック中
他のリクエスト処理不可] W2[ワーカー2] -->|API待ち 150ms| B2[ブロック中] end subgraph ASGI非同期モデル EL[イベントループ] -->|await DB| DB[DB処理
他のコルーチンが動作] EL -->|await API| API[API呼び出し
待機中は別リクエスト処理] end ``` この設計は、ビュー関数の処理が CPU バウンド(計算中心)である場合には合理的です。 しかし現代の Web アプリケーションでは、ビュー関数の処理時間の大半を **I/O 待ち** が占めています。 - データベースクエリ - 外部 API の呼び出し - ファイルの読み書き - キャッシュサーバへのアクセス これらの待機中、CPU は何もしていないにもかかわらず、ワーカーは占有されたままです。 ```text # WSGI (同期) — 外部API呼び出し中、ワーカーは何もできない def application(environ, start_response): # 外部APIに200ms待たされる → ワーカーは200ms間ブロック user_data = requests.get("https://api.example.com/users/42").json() # DBクエリに50ms待たされる → ワーカーはさらに50ms間ブロック orders = db.execute("SELECT * FROM orders WHERE user_id = 42") # 合計250msの間、このワーカーは他のリクエストを処理できない ... ``` この問題に対する WSGI 側の対処法は、ワーカーの数を増やすことです。 Gunicorn であれば `--workers 8 --threads 4` のように設定し、最大32の同時リクエストを処理します。 しかし、プロセスもスレッドもメモリを消費します。 一般的な Django アプリケーションはワーカー1つあたり数十MB から数百MBのメモリを使用するため、数百の同時接続を捌くには相応のサーバリソースが必要になります。 非同期 I/O を使えば、一つのプロセスで数千の同時接続を処理できます。 I/O 待ちの間に他のリクエストの処理に切り替え、I/O が完了したら元のリクエストの処理を再開します。 この仕組みにより、**少ないリソースで高い並行性を実現**できます。 しかし WSGI の `application` は同期関数であるため、`async def` で定義することも、内部で `await` を使うこともできません。 ```python # これは WSGI では不可能 async def application(environ, start_response): user_data = await httpx.get("https://api.example.com/users/42") # NG ... ``` ## 長時間接続 WSGI は「1リクエスト → 1レスポンス」という HTTP/1.1 の基本モデルに基づいて設計されています。 クライアントがリクエストを送り、サーバがレスポンスを返して完了します。 この往復が WSGI の1回の `application` 呼び出しに対応します。 しかし現代の Web アプリケーションでは、レスポンスを返した後もクライアントとの接続を維持し続けるユースケースが増えています。 - チャットアプリケーションのリアルタイムメッセージ配信 - ダッシュボードのライブデータ更新 - 通知システムのプッシュ配信 - LLM の応答をトークン単位でストリーミングする処理 Server-Sent Events(SSE)は HTTP の枠組み内で長時間接続を実現する手法で、サーバからクライアントへ一方向にイベントを送り続けます。 WSGI でも `StreamingHttpResponse` とジェネレータを使えば形式上は実現可能ですが、ワーカーが接続中ずっと占有される問題は解消されません。 100人のユーザーがダッシュボードを開いていれば、100のワーカーが常時占有されます。 ```{caution} WSGI で長時間接続を実装すると、ワーカーが接続中ずっと占有されます。 100人のユーザーがダッシュボードを開くだけで、100のワーカーが常時ブロックされます。 ``` ```python # WSGI での SSE — 動作はするがワーカーを長時間占有する def sse_view(request): def event_stream(): while True: data = get_latest_data() yield f"data: {json.dumps(data)}\n\n" time.sleep(1) # ワーカー全体が1秒ブロック return StreamingHttpResponse(event_stream(), content_type="text/event-stream") ``` 非同期モデルでは、`await asyncio.sleep(1)` の間に他の接続の処理を進められるため、少数のワーカーで数千の長時間接続を同時に維持できます。 ## WebSocket WebSocket は HTTP とは根本的に異なる通信パターンを持つプロトコルです。 最初に HTTP でハンドシェイクを行い、`101 Switching Protocols` レスポンスの後、TCP 接続を WebSocket プロトコルにアップグレードします。 以降はクライアントとサーバの **双方がいつでもメッセージを送受信できる全二重通信** になります。 ``` クライアント → サーバ: GET /ws HTTP/1.1 Upgrade: websocket Connection: Upgrade サーバ → クライアント: HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade ── 以降、双方向のメッセージ交換 ── クライアント → サーバ: {"type": "chat", "message": "Hello"} サーバ → クライアント: {"type": "chat", "message": "Hi there!"} サーバ → クライアント: {"type": "notification", "count": 3} クライアント → サーバ: {"type": "typing", "status": true} ``` WSGI のインタフェースはこの通信パターンを表現できません。 - `application(environ, start_response)` は一度呼ばれて一度返る関数であり、接続を維持してメッセージを双方向にやり取りするという概念が存在しません - `environ` には HTTP リクエストの情報しか入らず、WebSocket フレームを受信する手段がありません - `start_response` は一度だけステータスとヘッダーを送信するための仕組みであり、その後に任意のタイミングでメッセージを送るという機能を持ちません WSGI の時代にリアルタイム通信が必要な場合は、Django とは別に Socket.IO サーバや Tornado を並行して運用し、Nginx でパスごとに振り分けるという構成が取られていました。 しかしこの構成では認証やセッション管理がアプリケーション間で分断され、複雑さが増します。 ASGI は HTTP と WebSocket の **両方を単一のインタフェースで扱える** ように設計されています。 同じ `application` callable が HTTP リクエストも WebSocket 接続も処理でき、フレームワーク側で統一的にルーティングや認証を適用できます。 ## async/await の普及 Python 3.5(2015年)で `async def` と `await` が言語に正式に導入され、Python 3.6 以降で asyncio エコシステムが急速に成熟しました。 ```{note} 以下のような非同期 I/O ライブラリが整備され、非同期プログラミングが Python の主流の一部になりました。 - `aiohttp` — 非同期 HTTP クライアント/サーバ - `httpx` — 同期・非同期の両対応 HTTP クライアント - `asyncpg` — PostgreSQL 向け非同期ドライバ - `aioredis` — Redis 向け非同期クライアント ``` ```python import asyncio import httpx async def fetch_user_and_orders(user_id): async with httpx.AsyncClient() as client: # 2つのリクエストを同時に発行 — 合計時間は遅い方に揃う user_task = client.get(f"https://api.example.com/users/{user_id}") orders_task = client.get(f"https://api.example.com/users/{user_id}/orders") user_resp, orders_resp = await asyncio.gather(user_task, orders_task) return user_resp.json(), orders_resp.json() ``` この `async/await` の力を Web アプリケーションで活用するには、フレームワークとサーバの間のインタフェース自体が非同期に対応している必要があります。 WSGI の同期的な `application(environ, start_response)` では `await` を使えないため、言語レベルで用意された非同期の恩恵を Web アプリケーションに持ち込めません。 ASGI は `async def application(scope, receive, send)` という非同期関数をインタフェースとして定義することで、この問題を解決しました。 - `receive` で非同期にリクエストボディや WebSocket メッセージを受信できます - `send` で非同期にレスポンスを送信できます - フレームワーク内部でも `await` を自由に使えます これにより、非同期 ORM(Django 4.1 以降の async ORM)、非同期 HTTP クライアント、非同期キャッシュアクセスといった機能がフレームワークに統合可能になりました。 ```python # ASGI アプリケーション — async/await が自然に使える async def application(scope, receive, send): if scope["type"] == "http": body = b"Hello, ASGI World!" await send({ "type": "http.response.start", "status": 200, "headers": [[b"content-type", b"text/plain"]], }) await send({ "type": "http.response.body", "body": body, }) ``` Django は 3.0 で ASGI 対応を導入し、4.1 で非同期ビューと非同期 ORM のサポートを拡充しました。 FastAPI は最初から ASGI ネイティブとして設計されています。 WSGI が消えるわけではありませんが、非同期 I/O、長時間接続、WebSocket が求められる場面では ASGI が必須の選択肢になっています。 --- 次節では ASGI の仕様を具体的に読み解きます。 WSGI の `environ` / `start_response` / イテラブルに対応する ASGI の `scope` / `receive` / `send` の役割を一つずつ確認し、最小の ASGI アプリケーションを手で書いてみましょう。 (ASGI の基本構造)= ## ASGI の基本構造 前節で ASGI が必要になった背景を整理しました。 本節では ASGI の仕様そのものを読み解きます。 WSGI が `application(environ, start_response)` という同期関数で定義されていたのに対し、ASGI は `async def application(scope, receive, send)` という非同期関数で定義されます。 引数の数も名前も異なりますが、「サーバとアプリケーションの間のインタフェースを定義する」という目的は同じです。 ```{tip} WSGI の `environ`/`start_response`/イテラブルという3つの概念が、ASGI では `scope`/`receive`/`send` に対応します。 名前は変わりますが、それぞれの役割は対称的です。 ``` ### application(scope, receive, send) ASGI アプリケーションの最小形は以下の通りです。 ```python async def application(scope, receive, send): if scope["type"] == "http": await receive() await send({ "type": "http.response.start", "status": 200, "headers": [ [b"content-type", b"text/plain; charset=utf-8"], ], }) await send({ "type": "http.response.body", "body": b"Hello, ASGI World!", }) ``` WSGI の最小アプリと並べてみると、構造の違いが明確になります。 両者の対応関係に注目してください。 ```python # WSGI def application(environ, start_response): start_response("200 OK", [("Content-Type", "text/plain")]) return [b"Hello, WSGI World!"] # ASGI async def application(scope, receive, send): await receive() await send({"type": "http.response.start", "status": 200, "headers": [[b"content-type", b"text/plain"]]}) await send({"type": "http.response.body", "body": b"Hello, ASGI World!"}) ``` 3つの引数の役割を見ていきます。 **`scope`** は接続に関するメタ情報を格納した辞書です。 WSGI の `environ` に相当しますが、いくつかの重要な違いがあります。 `scope` には接続の種類を示す `type` キーが含まれ、`"http"`、`"websocket"`、`"lifespan"` のいずれかを取ります。 HTTP リクエストの場合は `method`、`path`、`headers`、`query_string` などが含まれますが、リクエストボディは `scope` には入りません。 ボディは `receive` を通じて非同期に取得します。 **`receive`** はサーバからアプリケーションへイベントを届ける非同期 callable です。 `await receive()` を呼ぶとイベント辞書が返されます。 HTTP リクエストであればボディを含む `{"type": "http.request", "body": b"..."}` が、WebSocket であればメッセージを含む `{"type": "websocket.receive", "text": "..."}` が届きます。 WSGI の `environ["wsgi.input"].read()` に相当しますが、以下の点が異なります。 - 非同期である - 複数回呼び出してチャンク単位で受信できる - WebSocket メッセージも同じインタフェースで受信できる **`send`** はアプリケーションからサーバへイベントを送出する非同期 callable です。 `await send(event_dict)` を呼ぶことでレスポンスをサーバに返します。 WSGI では `start_response` でステータスとヘッダーを送り、イテラブルの返却でボディを送るという二段構えでしたが、ASGI では `send` を複数回呼ぶことで同じことを実現します。 HTTP レスポンスの場合、最初の `send` で `http.response.start`(ステータスとヘッダー)を、次の `send` で `http.response.body`(ボディ)を送ります。 この対応関係をまとめます。 ``` WSGI ASGI ────────────────────── ────────────────────── environ (dict) scope (dict) environ["REQUEST_METHOD"] scope["method"] environ["PATH_INFO"] scope["path"] environ["QUERY_STRING"] scope["query_string"] environ["HTTP_HOST"] scope["headers"] 内 environ["wsgi.input"] receive() で取得 start_response(status, headers) send({"type": "http.response.start", ...}) return iterable send({"type": "http.response.body", ...}) ``` ```{mermaid} sequenceDiagram participant S as ASGI サーバ participant A as application S->>A: application(scope, receive, send) A->>S: await receive() S->>A: http.request イベント A->>S: await send(http.response.start) A->>S: await send(http.response.body) A-->>S: return ``` ### connection scope `scope` 辞書の内容はプロトコルの種類(`type`)によって異なります。 HTTP 接続の場合の主要なキーを確認しましょう。 ```python # HTTP リクエストの scope 例 { "type": "http", "asgi": {"version": "3.0"}, "http_version": "1.1", "method": "GET", "path": "/users/42/", "root_path": "", "scheme": "http", "query_string": b"format=json", "headers": [ [b"host", b"example.com"], [b"user-agent", b"curl/8.7.1"], [b"accept", b"*/*"], ], "server": ("127.0.0.1", 8000), "client": ("192.168.1.10", 54321), } ``` WSGI の `environ` と比較すると、いくつかの設計上の違いに気づきます。 1. **ヘッダーの形式が異なります** — `HTTP_` プレフィックス付きの文字列キーではなく、バイト列タプルのリストとして格納されています。WSGI が CGI の命名規則を引き継いでいたのに対し、ASGI は HTTP の生の表現に近い形を採用しています。`Content-Type` を取得する場合、WSGI では `environ["CONTENT_TYPE"]` でしたが、ASGI では `headers` リストを走査して `b"content-type"` を探す必要があります。 2. **`query_string` がバイト列です** — WSGI では文字列でしたが、ASGI はデコード前のバイト列をそのまま渡します。フレームワーク側でデコードとパースを行う設計です。 3. **`server` と `client` がタプルで提供されます** — WSGI の `SERVER_NAME`/`SERVER_PORT` や `REMOTE_ADDR`/`REMOTE_PORT` に対応しますが、より構造化された形式です。 WebSocket 接続の場合、`scope` は以下のようになります。 ```python # WebSocket 接続の scope 例 { "type": "websocket", "asgi": {"version": "3.0"}, "http_version": "1.1", "scheme": "ws", "path": "/ws/chat/room1/", "query_string": b"token=abc123", "headers": [...], "server": ("127.0.0.1", 8000), "client": ("192.168.1.10", 54322), } ``` HTTP と WebSocket で `scope` の構造が統一されているため、ルーティングや認証の仕組みをプロトコルをまたいで共有できます。 `type` フィールドを見るだけでどのプロトコルの接続かを判別でき、同じアプリケーション callable 内で分岐処理を書けます。 ```{note} `lifespan` タイプは、アプリケーションの起動時と終了時に一度ずつ呼ばれる特殊な scope です。 データベース接続プールの初期化やクリーンアップ処理に使われます。 ``` ```python async def application(scope, receive, send): if scope["type"] == "lifespan": while True: message = await receive() if message["type"] == "lifespan.startup": # DB接続プール初期化など await send({"type": "lifespan.startup.complete"}) elif message["type"] == "lifespan.shutdown": # リソース解放 await send({"type": "lifespan.shutdown.complete"}) return elif scope["type"] == "http": ... elif scope["type"] == "websocket": ... ``` ### event-driven な処理 WSGI と ASGI の **最も本質的な違い** は、処理モデルが「呼び出しと返却」から「イベントの送受信」に変わった点です。 WSGI では、サーバが `application` を呼び出し、アプリケーションが処理を完了してイテラブルを返します。 この1回の往復で1リクエストが完結します。制御の流れは直線的です。 ``` WSGI: サーバ → application() 呼び出し → 処理 → return イテラブル → サーバ ``` ASGI では、`application` が呼び出された後、`receive` と `send` を通じてサーバとアプリケーションがイベントを交互にやり取りします。 `application` 関数は処理が完了するまで `return` しません。 ``` ASGI: サーバ → application() 呼び出し │ ├── await receive() ← サーバからリクエストボディを受信 ├── await send(...) → サーバへレスポンスヘッダーを送信 ├── await send(...) → サーバへレスポンスボディを送信 │ └── return(application 終了) ``` HTTP の場合はこのイベントのやり取りが数回で完了しますが、WebSocket の場合は接続が維持される限り `receive` と `send` が繰り返されます。 ```python async def websocket_app(scope, receive, send): # 接続の受け入れ await send({"type": "websocket.accept"}) while True: event = await receive() if event["type"] == "websocket.receive": text = event.get("text", "") # エコーバック await send({"type": "websocket.send", "text": f"Echo: {text}"}) elif event["type"] == "websocket.disconnect": break ``` この WebSocket の例を見ると、`receive` がクライアントからのメッセージを待ち受け、`send` でサーバからメッセージを送り返すという双方向通信が、同じインタフェースで自然に表現できていることがわかります。 WSGI ではこの通信パターンを表現する手段が存在しませんでした。 イベント駆動モデルの利点は、`await` のタイミングでイベントループに制御を返せる点です。 - `await receive()` でクライアントのメッセージを待っている間、イベントループは他の接続の処理を進められます - 100の WebSocket 接続が同時に開いていても、メッセージを待っている間は CPU リソースを消費しません 前節で問題にした「長時間接続がワーカーを占有する」という WSGI の制約は、この非同期イベント駆動モデルによって構造的に解消されています。 HTTP のストリーミングレスポンスも、`send` を複数回呼ぶことで実現できます。 ```python async def streaming_app(scope, receive, send): await receive() await send({ "type": "http.response.start", "status": 200, "headers": [[b"content-type", b"text/plain"]], }) for i in range(5): await send({ "type": "http.response.body", "body": f"chunk {i}\n".encode(), "more_body": True, }) await asyncio.sleep(1) # 1秒待つ間、他のリクエストを処理可能 await send({ "type": "http.response.body", "body": b"done\n", "more_body": False, # これが最後のチャンク }) ``` `more_body` フラグが `True` の間はボディの送信が継続し、`False`(またはキー省略)で完了します。 `await asyncio.sleep(1)` の間、イベントループは他のリクエストの処理に使われます。 WSGI の `StreamingHttpResponse` で `time.sleep(1)` を書いた場合はワーカー全体がブロックされていましたが、ASGI ではその問題が起きません。 --- ASGI の基本構造を理解したところで、次節では `scope` の詳細と `receive`/`send` のイベント型を網羅的に確認し、HTTP と WebSocket それぞれのライフサイクルを完全に追跡しましょう。 (scope を理解する)= ## scope を理解する 前節で ASGI の3引数(`scope`, `receive`, `send`)の概要を把握しました。 本節では `scope` 辞書の各キーを一つずつ掘り下げます。 WSGI の `environ` と対比しながら読み進めることで、同じリクエスト情報がどのように表現形式を変えているかを理解できます。 ### type `scope["type"]` は ASGI アプリケーションが **最初に確認すべきキー** です。 この値によって接続のプロトコルが決まり、`scope` に含まれる他のキーの構成や、`receive`/`send` で流れるイベントの種類がすべて変わります。 ```python async def application(scope, receive, send): if scope["type"] == "http": await handle_http(scope, receive, send) elif scope["type"] == "websocket": await handle_websocket(scope, receive, send) elif scope["type"] == "lifespan": await handle_lifespan(scope, receive, send) else: raise ValueError(f"Unknown scope type: {scope['type']}") ``` 3つの値があります。 - **`"http"`** — 通常の HTTP リクエスト/レスポンスです。1回の `application` 呼び出しが1つの HTTP トランザクションに対応し、レスポンスの送信が完了すると `application` は `return` します。 - **`"websocket"`** — WebSocket 接続です。ハンドシェイク完了後に `application` が呼び出され、接続が切断されるまで `receive` と `send` のループが継続します。`application` は接続が閉じられて初めて `return` します。 - **`"lifespan"`** — アプリケーションプロセスの起動時に一度だけ呼び出されます。データベース接続プールの初期化やバックグラウンドタスクの開始など、リクエスト処理の開始前に完了すべき処理を記述します。 WSGI には `type` に相当するキーがありません。 WSGI は HTTP のリクエスト/レスポンスしか扱えないため、接続の種類を区別する必要がなかったのです。 ASGI が複数のプロトコルを単一のインタフェースで扱えるのは、この `type` による分岐があるからです。 ### path `scope["path"]` はリクエストのパス部分を **文字列** で格納します。 ```text # リクエスト: GET /api/users/42/?format=json scope["path"] # → "/api/users/42/" ``` WSGI の `environ["PATH_INFO"]` に対応しますが、名前が簡潔になっています。 WSGI では `SCRIPT_NAME` と `PATH_INFO` を結合して完全なパスを構築する必要がありましたが、ASGI では `scope["path"]` が完全なパスを、`scope["root_path"]` がマウントポイント(WSGI の `SCRIPT_NAME` 相当)を保持しています。 ```python # WSGI full_path = environ.get("SCRIPT_NAME", "") + environ.get("PATH_INFO", "/") # ASGI full_path = scope.get("root_path", "") + scope["path"] ``` `root_path` はアプリケーションがサブパスにマウントされている場合に使われます。 たとえば Nginx が `/app/` 配下のリクエストを ASGI サーバに転送する構成では、`root_path` が `"/app"` に設定され、`path` は `/app/` を除いた残りのパスになります。 ```{note} 多くのデプロイ構成では `root_path` は空文字列です。 ``` ### method `scope["method"]` は HTTP メソッドを大文字の文字列で格納します。 ```text scope["method"] # → "GET", "POST", "PUT", "DELETE", ... ``` WSGI の `environ["REQUEST_METHOD"]` と完全に同じ情報です。 名前が短くなっただけで、値の形式に違いはありません。 このキーは `scope["type"] == "http"` の場合にのみ存在します。 WebSocket の `scope` には `method` キーがありません。 WebSocket のハンドシェイクは常に GET で行われるため、明示的に格納する必要がないという設計判断です。 ### headers `scope["headers"]` はリクエストヘッダーを **バイト列タプルのリスト** として格納します。 ここが WSGI と最も大きく異なる部分です。 ```text scope["headers"] # → [ # [b"host", b"example.com"], # [b"user-agent", b"curl/8.7.1"], # [b"accept", b"application/json"], # [b"content-type", b"application/json"], # [b"content-length", b"52"], # [b"x-request-id", b"abc-123"], # ] ``` WSGI では HTTP ヘッダーが CGI 由来の命名規則に従い、`HTTP_` プレフィックスの付加、ハイフンからアンダースコアへの変換、大文字化という変換を経て `environ` に格納されていました。 `Content-Type` は `CONTENT_TYPE`、`X-Request-Id` は `HTTP_X_REQUEST_ID` になるという、Vol.1「WSGI が生まれた背景」で見た変換規則です。 ASGI ではそうした変換が一切行われず、ヘッダー名は小文字のバイト列としてそのまま格納されます。 この設計にはメリットとデメリットがあります。 | 観点 | 内容 | |------|------| | メリット | HTTP の生の表現に忠実であるため情報の損失がありません。WSGI ではアンダースコアを含む名前(`X_Custom_Header`)とハイフンを含む名前(`X-Custom-Header`)が同じキーに変換されてしまう曖昧さがありましたが、ASGI ではこの問題が起きません。 | | デメリット | 特定のヘッダーを取得するためにリストを走査する必要があります。 | ```python # ASGI でヘッダーを取得するヘルパー関数 def get_header(scope, name): name_lower = name.lower().encode("latin-1") for header_name, header_value in scope["headers"]: if header_name == name_lower: return header_value.decode("latin-1") return None content_type = get_header(scope, "Content-Type") request_id = get_header(scope, "X-Request-Id") ``` 実際の開発ではフレームワーク(Django の `ASGIRequest` や Starlette の `Request`)がこのヘルパーを内部に持っているため、`request.headers["content-type"]` のような直感的なアクセスが可能です。 しかしフレームワークを介さずに ASGI アプリケーションを書く場合や、ミドルウェアを実装する場合には、この生のデータ構造を理解しておくことが重要です。 ### query_string `scope["query_string"]` はクエリ文字列を **バイト列** で格納します。 ```text # リクエスト: GET /users/?name=Taro&page=2 scope["query_string"] # → b"name=Taro&page=2" ``` WSGI の `environ["QUERY_STRING"]` が文字列であったのに対し、ASGI ではバイト列です。 URL にはパーセントエンコーディングされた非 ASCII 文字が含まれる可能性があるため、デコード前の生データを渡すという方針です。 パースはアプリケーション側で行います。 ```{tip} クエリ文字列が空の場合は `b""` が格納されます。WSGI と同様に、`?` 記号自体は含まれません。 ``` ```text from urllib.parse import parse_qs query_params = parse_qs(scope["query_string"].decode("utf-8")) # → {"name": ["Taro"], "page": ["2"]} ``` ### client / server `scope["client"]` と `scope["server"]` は接続元と接続先のネットワーク情報をタプルで格納します。 ```text scope["client"] # → ("192.168.1.10", 54321) クライアントの IP とポート scope["server"] # → ("127.0.0.1", 8000) サーバの IP とポート ``` WSGI ではクライアント情報が `environ["REMOTE_ADDR"]` と `environ["REMOTE_PORT"]`(存在する場合)に、サーバ情報が `environ["SERVER_NAME"]` と `environ["SERVER_PORT"]` に格納されていましたが、いずれも文字列でした。 ASGI ではタプルとして構造化されており、ポート番号は整数です。 ```{warning} リバースプロキシ環境では、Nginx を経由すると `scope["client"]` は Nginx のアドレスになります。 元のクライアント IP を知るには `X-Forwarded-For` ヘッダーを参照する必要があります。 Uvicorn の `--proxy-headers` オプションを有効にすると、`X-Forwarded-For` の値で `scope["client"]` を自動的に上書きしてくれます。 ``` `scope["client"]` は `None` になる場合もあります。 Unix ソケット経由の接続ではクライアントの IP アドレスという概念が存在しないためです。 アプリケーションコードでは `scope.get("client")` で安全にアクセスするか、フレームワークの `request.client` を使うのが安全です。 ### state `scope["state"]` は ASGI 3.0 で追加された比較的新しいキーで、ミドルウェア間やアプリケーション内で **リクエストスコープの状態を共有するための辞書** です。 ```python # ミドルウェアで state にデータを設定 class RequestIdMiddleware: def __init__(self, app): self.app = app async def __call__(self, scope, receive, send): if scope["type"] == "http": scope["state"]["request_id"] = generate_request_id() await self.app(scope, receive, send) # アプリケーション側で state を参照 async def application(scope, receive, send): request_id = scope.get("state", {}).get("request_id", "unknown") ... ``` WSGI では `environ` 辞書に直接キーを追加することでミドルウェア間のデータ共有を行う慣習がありましたが、これは WSGI の仕様が定める `environ` のキー命名規則と衝突する可能性がありました。 ASGI の `state` は明示的にアプリケーション用のデータ領域として用意されているため、サーバが管理するキーとの衝突を避けられます。 `state` はサーバが空の辞書として初期化し、ミドルウェアやアプリケーションが自由に読み書きします。 - HTTP の場合は1リクエストの処理中だけ生存します - WebSocket の場合は接続が維持されている間保持されます --- `scope` の各キーを確認したところで、WSGI の `environ` との対応関係を改めてまとめます。 ``` WSGI environ ASGI scope ─────────────────────── ─────────────────────── (なし) type ("http" / "websocket" / "lifespan") REQUEST_METHOD method PATH_INFO path SCRIPT_NAME root_path QUERY_STRING (str) query_string (bytes) HTTP_* (変換済み文字列) headers (生バイト列タプルのリスト) CONTENT_TYPE headers 内 (b"content-type") CONTENT_LENGTH headers 内 (b"content-length") REMOTE_ADDR + REMOTE_PORT client (tuple or None) SERVER_NAME + SERVER_PORT server (tuple) SERVER_PROTOCOL http_version wsgi.url_scheme scheme wsgi.input receive() で取得 (慣習的に environ に追加) state (dict) ``` ```{mermaid} flowchart TD APP[application
scope, receive, send] --> TY{scope type} TY -->|http| HTTP[HTTP リクエスト処理
receive → send × 2] TY -->|websocket| WS[WebSocket 接続処理
accept → receive/send ループ] TY -->|lifespan| LS[プロセス管理
startup → shutdown] ``` 次節では `receive` と `send` のイベント型を詳しく見ていき、HTTP と WebSocket それぞれのライフサイクルを完全に追跡しましょう。 (receive / send を理解する)= ## receive / send を理解する 前節で `scope` の構造を把握しました。 `scope` はリクエストの「メタ情報」を格納する静的な辞書でしたが、`receive` と `send` はリクエストとレスポンスの「データ」をやり取りする動的なチャネルです。 WSGI では `environ["wsgi.input"]` と `start_response` + イテラブル返却でこの役割を分担していましたが、ASGI ではすべてをイベント辞書の送受信として統一的に扱います。 ```{note} すべてのイベント辞書には `type` キーが含まれており、イベントの種類を示します。 `scope["type"]` がプロトコルを区別するのと同様に、イベントの `type` が処理の段階を区別します。 ``` ### イベントを受け取る `receive` はサーバからアプリケーションへイベントを届ける非同期 callable です。 `await receive()` を呼ぶたびに1つのイベント辞書が返されます。 HTTP リクエストの場合、`receive` が返すイベントは `http.request` 型です。 ```text event = await receive() # → { # "type": "http.request", # "body": b'{"name": "Taro", "email": "taro@example.com"}', # "more_body": False, # } ``` `body` はリクエストボディのバイト列で、`more_body` は後続のチャンクが存在するかを示すフラグです。 - 小さなリクエストでは1回の `receive` で全ボディが届きます - 大きなリクエスト(ファイルアップロードなど)ではサーバがボディを分割して送信します ```python # 大きなリクエストボディを完全に受信する async def read_body(receive): body = b"" while True: event = await receive() body += event.get("body", b"") if not event.get("more_body", False): break return body ``` このパターンは、Vol.1「HTTP は何をやりとりしているのか」で学んだ TCP ソケットの部分受信と Vol.1「まずは 1 リクエストだけ処理するサーバを作る」で実装したヘッダー終端検出ループの非同期版です。 TCP が「メッセージ境界を持たないバイトストリーム」であったのと同様に、ASGI の `receive` も「ボディ全体が一度に届く保証はない」という前提で設計されています。 `more_body` フラグが WSGI には存在しなかった明示的な終端通知であり、`Content-Length` から自分で計算する必要がなくなっています。 ```{important} GET リクエストのようにボディが空の場合でも、`receive` を一度呼び出す必要があります。 サーバは空ボディを示す `{"type": "http.request", "body": b"", "more_body": False}` を返します。 これを呼び出さずに `send` を始めると、サーバの実装によってはプロトコル違反とみなされる可能性があります。 ``` WebSocket の場合、`receive` はクライアントからのメッセージやイベントを返します。 HTTP との違いは、接続が維持される限り何度でも `receive` を呼び出せる点です。 ```python # WebSocket の receive が返すイベント例 {"type": "websocket.connect"} # 接続要求 {"type": "websocket.receive", "text": "Hello"} # テキストメッセージ {"type": "websocket.receive", "bytes": b"\x89\x00"} # バイナリメッセージ {"type": "websocket.disconnect", "code": 1000} # 切断 ``` HTTP では `receive` が通常1〜数回で完了しますが、WebSocket では接続が維持される限り何度でも `receive` を呼び出してメッセージを待ち受けます。 この違いが、WSGI の「1回の呼び出しで1リクエスト完結」というモデルでは WebSocket を表現できなかった理由です。 ```{admonition} WebSocket の receive イベント一覧 :class: note | イベント type | 意味 | |---|---| | `websocket.connect` | 接続要求 | | `websocket.receive` + `text` | テキストメッセージ受信 | | `websocket.receive` + `bytes` | バイナリメッセージ受信 | | `websocket.disconnect` | 切断(`code` フィールドに終了コード) | ``` ### イベントを送る `send` はアプリケーションからサーバへイベントを送出する非同期 callable です。 `await send(event_dict)` を呼ぶことでレスポンスデータをサーバに返します。 HTTP レスポンスの場合、`send` は **最低2回** 呼び出します。 ```python # 第1回: ステータスとヘッダーの送信 await send({ "type": "http.response.start", "status": 200, "headers": [ [b"content-type", b"application/json"], [b"content-length", b"52"], ], }) # 第2回: ボディの送信 await send({ "type": "http.response.body", "body": b'{"id": 42, "name": "Taro", "email": "taro@example.com"}', }) ``` WSGI では `start_response("200 OK", headers)` でステータスとヘッダーを送り、イテラブルの返却でボディを送るという分離がありました。 ASGI では両方とも `send` を使いますが、イベントの `type` によって役割が区別されます。 `http.response.start` が WSGI の `start_response` に、`http.response.body` が WSGI のイテラブル返却に対応します。 ```{warning} `send` の呼び出し順序には厳格な制約があります。 - `http.response.start` は必ず `http.response.body` より前に呼ぶ必要があります - `http.response.start` を2回呼ぶことはできません - `http.response.body` の後に `http.response.start` を呼ぶことはできません これらの制約に違反すると、ASGI サーバは例外を送出するか接続を切断します。 ``` WebSocket の場合、`send` は以下のイベント型を扱います。 ```python await send({"type": "websocket.accept"}) # 接続受け入れ await send({"type": "websocket.send", "text": "Hello, client!"}) # テキスト送信 await send({"type": "websocket.send", "bytes": b"\x00\x01"}) # バイナリ送信 await send({"type": "websocket.close", "code": 1000}) # 接続終了 ``` ### HTTP response start / body HTTP レスポンスの送信を具体的なコード例で確認しましょう。 `http.response.start` イベントは以下の構造を持ちます。 ```python { "type": "http.response.start", "status": 200, # 整数のステータスコード "headers": [ # バイト列タプルのリスト [b"content-type", b"text/html; charset=utf-8"], [b"content-length", b"45"], [b"x-custom-header", b"some-value"], ], } ``` WSGI の `start_response("200 OK", [("Content-Type", "text/html")])` と比較すると、以下の違いがあります。 - ステータスコードが理由フレーズを含まない **整数** です - ヘッダーが文字列タプルではなく **バイト列タプル** です 理由フレーズは HTTP/2 以降では廃止されているため、ASGI では最初から省略されています。 `http.response.body` イベントは以下の構造を持ちます。 ```python # 一括送信(小さなレスポンス) { "type": "http.response.body", "body": b"

Hello

", } # ストリーミング送信(大きなレスポンスや段階的な送信) { "type": "http.response.body", "body": b"chunk 1\n", "more_body": True, # まだ続きがある } ``` `more_body` が `True` の場合、サーバはクライアントへの接続を維持し、次の `http.response.body` イベントを待ちます。 `more_body` が `False`(デフォルト)の場合、そのイベントが最後のチャンクであり、サーバはレスポンスの送信を完了します。 これを活用したストリーミングレスポンスの実装例を示しましょう。 ```python import asyncio import json async def sse_application(scope, receive, send): await receive() await send({ "type": "http.response.start", "status": 200, "headers": [ [b"content-type", b"text/event-stream"], [b"cache-control", b"no-cache"], [b"connection", b"keep-alive"], ], }) for i in range(10): data = json.dumps({"count": i, "message": f"Event {i}"}) chunk = f"data: {data}\n\n".encode("utf-8") await send({ "type": "http.response.body", "body": chunk, "more_body": True, }) await asyncio.sleep(1) # この間、他のリクエストを処理可能 await send({ "type": "http.response.body", "body": b"data: {\"message\": \"done\"}\n\n", "more_body": False, }) ``` WSGI のジェネレータによるストリーミング(Vol.1「WSGI が生まれた背景」)と比較すると、`await asyncio.sleep(1)` の間にイベントループが他のリクエストを処理できる点が決定的な違いです。 WSGI で `time.sleep(1)` を使うとワーカー全体がブロックされていました。 ### disconnect HTTP 接続の場合、クライアントが途中で接続を切断すると `receive` が `http.disconnect` イベントを返します。 ```{tip} WSGI にはクライアント切断を検知する標準的な手段がありません。 ASGI の `http.disconnect` イベントにより、アプリケーションはクライアントの離脱を能動的に監視し、不要な処理を中断してリソースの無駄遣いを防げます。 ``` ```text event = await receive() # → {"type": "http.disconnect"} ``` このイベントは、たとえばクライアントがブラウザの「戻る」ボタンを押したり、ネットワークが切断されたりした場合に発生します。 長時間かかるレスポンスの生成中にクライアントが離脱したことを検知し、不要な処理を中断するために使います。 以下のコード例では、長い処理とクライアント切断の監視を並行して行う実装を示します。 ```python import asyncio async def long_processing_app(scope, receive, send): # ボディを受信 body_event = await receive() # 長い処理とクライアント切断を並行して監視 async def process(): await asyncio.sleep(10) # 重い処理のシミュレーション return b"Processing complete!" async def watch_disconnect(): while True: event = await receive() if event["type"] == "http.disconnect": return None # 先に完了した方を採用 done, pending = await asyncio.wait( [asyncio.create_task(process()), asyncio.create_task(watch_disconnect())], return_when=asyncio.FIRST_COMPLETED, ) for task in pending: task.cancel() result = done.pop().result() if result is None: # クライアントが切断済み — レスポンスを送る必要なし return await send({ "type": "http.response.start", "status": 200, "headers": [[b"content-type", b"text/plain"]], }) await send({ "type": "http.response.body", "body": result, }) ``` WSGI にはクライアント切断を検知する標準的な手段がありません。 ソケットへの書き込み時に `BrokenPipeError` が発生して初めて気づくか、そもそも気づかないまま処理が完了してしまいます。 WebSocket の切断は `websocket.disconnect` イベントで通知されます。 `code` フィールドに WebSocket の終了コードが含まれます。 ```text event = await receive() # → {"type": "websocket.disconnect", "code": 1000} ``` 終了コードの意味は以下の通りです。 | コード | 意味 | |--------|------| | `1000` | 正常終了 | | `1001` | クライアントの離脱 | | `1006` | 異常切断 | アプリケーションはこのコードに基づいてクリーンアップ処理を分岐できます。 --- `receive` と `send` のイベント型をまとめます。 ``` HTTP ライフサイクル ────────────────── receive: http.request → リクエストボディ(1回以上) receive: http.disconnect → クライアント切断 send: http.response.start → ステータス + ヘッダー(1回) send: http.response.body → レスポンスボディ(1回以上) WebSocket ライフサイクル ────────────────────── receive: websocket.connect → 接続要求 receive: websocket.receive → メッセージ受信(複数回) receive: websocket.disconnect → 切断 send: websocket.accept → 接続受け入れ send: websocket.send → メッセージ送信(複数回) send: websocket.close → 接続終了 ``` WSGI が「1回の関数呼び出しと1回の返却」で完結していたのに対し、ASGI は `receive` と `send` の複数回のやり取りでリクエスト/レスポンスのライフサイクルを表現します。 この柔軟性が、HTTP のストリーミングから WebSocket の双方向通信まで、**単一のインタフェースで対応できる** 理由です。 ```{mermaid} flowchart LR subgraph HTTP HR[receive
http.request] --> HS1[send
http.response.start] HS1 --> HS2[send
http.response.body] end subgraph WebSocket WC[receive
websocket.connect] --> WA[send
websocket.accept] WA --> WR[receive
websocket.receive] WR --> WS2[send
websocket.send] WS2 --> WR WR --> WD[receive
websocket.disconnect] end ``` 次節では、これらのイベントを組み合わせて最小の ASGI アプリケーションを実装しましょう。 (ASGI の HTTP モデル)= ## ASGI の HTTP モデル 前節で `receive` と `send` のイベント型を一通り確認しました。 本節では HTTP に絞り、リクエストボディの受信からレスポンスの送信までのライフサイクルを実装レベルで深掘りします。 WSGI の同期モデルと対比しながら、ASGI の非同期イベント駆動モデルが実際のコードでどう表現されるかを確認していきましょう。 ### request body の分割受信 Vol.1「WSGI が生まれた背景」で WSGI の `environ["wsgi.input"].read(content_length)` を使ってリクエストボディを読み取りました。 WSGI ではストリームを一度 `read` すれば全ボディが得られる前提で書くことが多く、部分受信を意識する必要はサーバ側が吸収していました。 ASGI では、サーバがボディをチャンクに分割してアプリケーションに届ける可能性が **仕様レベルで明示** されています。 小さなリクエスト(たとえば JSON の POST)であれば、1回の `receive` で全ボディが届くのが一般的です。 ```text async def handle_small_request(scope, receive, send): event = await receive() # event = {"type": "http.request", "body": b'{"name": "Taro"}', "more_body": False} body = event.get("body", b"") # → b'{"name": "Taro"}' ``` しかし数メガバイトのファイルアップロードでは、サーバがボディをチャンク単位で送信します。 ```{note} Uvicorn のデフォルトでは、リクエストボディは約 65KB ごとに分割されます。 ``` ```text # 1回目の receive event = await receive() # → {"type": "http.request", "body": b"<65KB分のデータ>", "more_body": True} # 2回目の receive event = await receive() # → {"type": "http.request", "body": b"<65KB分のデータ>", "more_body": True} # ... # 最後の receive event = await receive() # → {"type": "http.request", "body": b"<残りのデータ>", "more_body": False} ``` したがって、あらゆるリクエストに対応するには `more_body` フラグを監視するループが必要です。 前節で示した `read_body` ヘルパーを改良し、サイズ上限チェックを加えた堅牢な実装を示しましょう。 ```python class RequestBodyTooLarge(Exception): pass async def read_body(receive, max_size=10 * 1024 * 1024): """リクエストボディを完全に読み取る。max_size を超えたら例外を送出する。""" body = b"" while True: event = await receive() chunk = event.get("body", b"") body += chunk if len(body) > max_size: raise RequestBodyTooLarge( f"Request body exceeds {max_size} bytes" ) if not event.get("more_body", False): break return body ``` この実装は、Vol.1「HTTP は何をやりとりしているのか」で学んだ TCP の部分受信ループ、Vol.1「まずは 1 リクエストだけ処理するサーバを作る」で実装した `Content-Length` に基づくボディ読み取り、そして Vol.1「HTTP は何をやりとりしているのか」で触れた巨大リクエストへの防御を、ASGI のイベントモデルで表現したものです。 WSGI ではサーバが `wsgi.input` ストリームとして完全にバッファリングしてくれることが多かったため、アプリケーション側でチャンク処理を意識する機会は少なかったのですが、ASGI ではアプリケーションがこの責務を明示的に引き受けます。 ファイルアップロードのように全体をメモリに蓄積したくない場合は、チャンクごとにディスクに書き出す処理も書けます。 ```python import aiofiles async def save_upload(receive, dest_path, max_size=50 * 1024 * 1024): """受信したボディを逐次ディスクに書き出す。""" total = 0 async with aiofiles.open(dest_path, "wb") as f: while True: event = await receive() chunk = event.get("body", b"") total += len(chunk) if total > max_size: raise RequestBodyTooLarge( f"Upload exceeds {max_size} bytes" ) if chunk: await f.write(chunk) if not event.get("more_body", False): break return total ``` 全ボディをメモリに保持する `read_body` と、チャンクごとに処理する `save_upload` のどちらを使うかは、想定されるボディサイズとユースケースに依存します。 | ユースケース | 推奨アプローチ | |---|---| | JSON API(数KB程度) | `read_body` でメモリに全受信 | | ファイルアップロード(数MB以上) | `save_upload` でチャンクごとにディスクへ書き出す | フレームワークはこの選択をリクエストの `Content-Type` や `Content-Length` に応じて内部で行っています。 ### レスポンスの分割送信 レスポンスの送信は `http.response.start` と `http.response.body` の2種類のイベントで構成されます。 最もシンプルなケースでは、ステータス/ヘッダーの送信とボディの送信をそれぞれ1回ずつ行います。 まず基本形を確認しましょう。 ```python async def simple_response(scope, receive, send): await receive() # リクエストボディを消費 body = b'{"message": "Hello, ASGI!"}' await send({ "type": "http.response.start", "status": 200, "headers": [ [b"content-type", b"application/json"], [b"content-length", str(len(body)).encode()], ], }) await send({ "type": "http.response.body", "body": body, }) ``` レスポンスヘルパーを作ることで、WSGI 時代の `make_response` と同様にコードの重複を削減できます。 以下のヘルパー関数は、よく使うレスポンス形式をまとめたものです。 ```python async def send_json(send, data, status=200): body = json.dumps(data, ensure_ascii=False).encode("utf-8") await send({ "type": "http.response.start", "status": status, "headers": [ [b"content-type", b"application/json; charset=utf-8"], [b"content-length", str(len(body)).encode()], ], }) await send({ "type": "http.response.body", "body": body, }) async def send_text(send, text, status=200): body = text.encode("utf-8") await send({ "type": "http.response.start", "status": status, "headers": [ [b"content-type", b"text/plain; charset=utf-8"], [b"content-length", str(len(body)).encode()], ], }) await send({ "type": "http.response.body", "body": body, }) async def send_error(send, status, message): await send_json(send, {"error": message}, status=status) ``` これらのヘルパーを使えば、ルーティング付きの ASGI アプリケーションが簡潔に書けます。 先ほどの WSGI ルーティング実装と構造を比べてみましょう。 ```python import json async def application(scope, receive, send): if scope["type"] != "http": return method = scope["method"] path = scope["path"] if path == "/" and method == "GET": await receive() await send_text(send, "Welcome to the ASGI app!") elif path == "/users" and method == "GET": await receive() users = [{"id": 1, "name": "Taro"}, {"id": 2, "name": "Hanako"}] await send_json(send, users) elif path == "/users" and method == "POST": body = await read_body(receive) try: data = json.loads(body) except json.JSONDecodeError: await send_error(send, 400, "Invalid JSON") return await send_json(send, {"created": data}, status=201) else: await receive() await send_error(send, 404, f"Not Found: {path}") ``` Vol.1「WSGI が生まれた背景」で WSGI アプリに書いたルーティング付き実装と構造的にほぼ同じですが、すべての I/O が `await` で非同期化されている点が異なります。 ### ストリーミング ASGI の真価が発揮されるのは、レスポンスを段階的に生成して送信する **ストリーミング** です。 `http.response.body` イベントの `more_body` フラグを `True` に設定することで、サーバにまだ後続のチャンクがあることを伝えます。 CSV エクスポートの例を示します。 データベースから大量のレコードを取得し、1行ずつ CSV に変換してクライアントに送信します。 メモリ使用量を一定に保てる点が重要なポイントです。 ```python async def csv_export(scope, receive, send): await receive() await send({ "type": "http.response.start", "status": 200, "headers": [ [b"content-type", b"text/csv; charset=utf-8"], [b"content-disposition", b'attachment; filename="users.csv"'], # Content-Length は設定しない(サイズ不明) ], }) # ヘッダー行 await send({ "type": "http.response.body", "body": b"id,name,email\n", "more_body": True, }) # データ行を逐次送信 async for user in get_users_from_db(): # 非同期イテレータ line = f"{user.id},{user.name},{user.email}\n".encode("utf-8") await send({ "type": "http.response.body", "body": line, "more_body": True, }) # 最終チャンク await send({ "type": "http.response.body", "body": b"", "more_body": False, }) ``` この実装ではレコードの総数に関わらずメモリ使用量が一定です。 100万行の CSV であっても、一度にメモリ上に存在するのは1行分のデータだけです。 WSGI のジェネレータによるストリーミング(Vol.1「WSGI が生まれた背景」の `yield` パターン)と同じ発想ですが、`await` を挟めるためデータベースからの非同期読み取りと組み合わせられます。 LLM のトークンストリーミングのように、チャンク間に不定の待ち時間がある場合も ASGI のモデルは自然にフィットします。 次の例を確認してみましょう。 ```python async def llm_stream(scope, receive, send): body = await read_body(receive) prompt = json.loads(body).get("prompt", "") await send({ "type": "http.response.start", "status": 200, "headers": [ [b"content-type", b"text/event-stream"], [b"cache-control", b"no-cache"], ], }) async for token in generate_tokens(prompt): # LLM からトークンを非同期に受信 chunk = f"data: {json.dumps({'token': token})}\n\n".encode("utf-8") await send({ "type": "http.response.body", "body": chunk, "more_body": True, }) # await の間、イベントループは他のリクエストを処理可能 await send({ "type": "http.response.body", "body": b"data: {\"done\": true}\n\n", "more_body": False, }) ``` WSGI で同じことを実現しようとすると、ジェネレータ内の `time.sleep` や同期的な API 呼び出しがワーカーをブロックし、他のリクエストが処理できなくなります。 | | WSGI | ASGI | |---|---|---| | 10人が同時に LLM ストリーミングを利用 | 10のワーカーが占有される | 1プロセスで10の接続を並行処理 | | I/O 待ち中の挙動 | ワーカーがブロック | イベントループが他のリクエストを処理 | ```{note} ストリーミングで `Content-Length` を設定しない場合、ASGI サーバは自動的に `Transfer-Encoding: chunked` を適用します。 Vol.1「HTTP は何をやりとりしているのか」で学んだ chunked transfer encoding が、ここで実際に使われています。 `more_body: True` のチャンクが chunked encoding の各チャンクに、`more_body: False` が最終チャンク(サイズ0)に対応します。 ``` --- HTTP のライフサイクル全体を時系列で整理しましょう。 ``` クライアント ASGI サーバ ASGI アプリケーション │ │ │ │── HTTP リクエスト ──→│ │ │ │── scope 構築 │ │ │── application(scope, receive, send) 呼び出し │ │ │ │ │ ←── await receive() ────│ │ │── http.request ─────────→│ ボディ受信 │ │ (more_body: True) │ │ │ ←── await receive() ────│ │ │── http.request ─────────→│ 残りのボディ受信 │ │ (more_body: False) │ │ │ │── ビジネスロジック実行 │ │ ←── await send() ───────│ │ │── http.response.start ───│ ステータス+ヘッダー │ │ ←── await send() ───────│ │ │── http.response.body ────│ ボディチャンク1 │ │ (more_body: True) │ │ │ ←── await send() ───────│ │ │── http.response.body ────│ 最終チャンク │ │ (more_body: False) │ │ │ │── return │←── HTTP レスポンス ──│ │ ``` WSGI の「呼び出して返す」直線的なモデルに対し、ASGI は `receive` と `send` のイベント交換で非同期にリクエスト/レスポンスを処理します。 この柔軟性が、分割受信、分割送信、ストリーミング、そして次節で扱う WebSocket のすべてを **単一のインタフェースで実現** する基盤になっています。 ```{mermaid} flowchart TD RB[リクエストボディ受信
more_body ループ] --> BL{ビジネスロジック} BL --> SS[send
http.response.start
ステータス+ヘッダー] SS --> SB1[send
http.response.body
more_body: True] SB1 -->|チャンク追加| SB1 SB1 --> SBF[send
http.response.body
more_body: False] ``` 次節では ASGI の WebSocket モデルを追跡し、HTTP との違いを確認しましょう。 (WebSocket と lifespan)= ## WebSocket と lifespan 前節まで ASGI の HTTP モデルを追跡してきました。 HTTP は「リクエストを受けてレスポンスを返す」という一方向のやり取りで完結しますが、ASGI が WSGI に対して持つ最大の優位性は、HTTP 以外のプロトコルを同じインタフェースで扱える点にあります。 本節では WebSocket とアプリケーションのライフサイクル管理(lifespan)という、WSGI では不可能だった2つの領域を追跡します。 ### WebSocket 接続イベント WebSocket の通信は HTTP のハンドシェイクから始まります。 クライアントが `Upgrade: websocket` ヘッダー付きの GET リクエストを送信し、サーバが `101 Switching Protocols` で応答すると、TCP 接続が WebSocket プロトコルにアップグレードされます。 ASGI サーバはこのハンドシェイクを処理した上で、`scope["type"] == "websocket"` として `application` を呼び出します。 WebSocket 接続のライフサイクルは **3つの段階** に分かれます。 1. 接続の確立 2. メッセージの交換 3. 切断 ```text async def websocket_app(scope, receive, send): assert scope["type"] == "websocket" # ── 第1段階: 接続の確立 ── event = await receive() # → {"type": "websocket.connect"} # 接続を受け入れるか拒否するかをここで判断 # 例: 認証トークンの検証 token = dict(scope["headers"]).get(b"authorization", b"").decode() if not is_valid_token(token): await send({"type": "websocket.close", "code": 4001}) return await send({"type": "websocket.accept"}) ``` `websocket.connect` イベントは、クライアントが WebSocket ハンドシェイクを開始したことを通知します。 この時点ではまだ接続は確立されていません。 アプリケーションは以下のいずれかを選択します。 - `websocket.accept` を送信して接続を **受け入れる** - `websocket.close` を送信して接続を **拒否する** HTTP の場合はリクエストが届いた時点で接続は確立済みですが、WebSocket ではアプリケーションが接続の受け入れを明示的に判断できます。 この設計により、認証が通らないクライアントの接続を WebSocket レベルで拒否でき、不正なクライアントにメッセージを送受信させることを防げます。 `websocket.accept` を送信する際に、サブプロトコルやレスポンスヘッダーを指定することもできます。 これは GraphQL サブスクリプションなど、特定のプロトコルを使う場合に必要になります。 ```python await send({ "type": "websocket.accept", "subprotocol": "graphql-ws", "headers": [ [b"x-server-version", b"1.0"], ], }) ``` ### send / receive の継続 接続が確立された後は、`receive` と `send` を繰り返し呼び出してメッセージを交換します。 HTTP では `receive` がボディの受信で終わり、`send` がレスポンスの送信で終わるという有限のやり取りでしたが、WebSocket では接続が維持される限りメッセージの交換が続きます。 シンプルなエコーサーバを例に、メッセージ交換のループを確認しましょう。 ```python async def echo_server(scope, receive, send): assert scope["type"] == "websocket" # 接続受け入れ connect_event = await receive() await send({"type": "websocket.accept"}) # ── 第2段階: メッセージ交換ループ ── while True: event = await receive() if event["type"] == "websocket.receive": # テキストメッセージの場合 if "text" in event: await send({ "type": "websocket.send", "text": f"Echo: {event['text']}", }) # バイナリメッセージの場合 elif "bytes" in event: await send({ "type": "websocket.send", "bytes": event["bytes"], }) elif event["type"] == "websocket.disconnect": # ── 第3段階: 切断 ── break ``` `websocket.receive` イベントには `text` キーまたは `bytes` キーのいずれかが含まれます。 WebSocket プロトコルはテキストフレームとバイナリフレームを区別しており、ASGI もこの区別を維持しています。 - JSON メッセージは通常テキストフレームで送受信します - 画像やバイナリデータはバイナリフレームを使います `websocket.disconnect` イベントはクライアントが接続を閉じたことを通知します。 `code` フィールドに WebSocket の終了コードが含まれ、正常終了なら `1000`、クライアントの離脱なら `1001` です。 このイベントを受け取ったらループを抜け、`application` 関数を `return` して終了します。 サーバ側から接続を閉じる場合は `websocket.close` を送信します。 切断後は `application` 関数を `return` して終了します。 ```python # サーバ側から切断 await send({"type": "websocket.close", "code": 1000}) return ``` より実践的なチャットルームの例を示します。 複数のクライアントが同じルームに接続し、メッセージを共有します。 ```python import json # ルームごとの接続管理(プロセス内共有) rooms = {} # {"room1": {send1, send2, ...}, ...} async def chat_app(scope, receive, send): assert scope["type"] == "websocket" # パスからルーム名を取得: /ws/chat/room1/ path_parts = scope["path"].strip("/").split("/") if len(path_parts) < 3: connect_event = await receive() await send({"type": "websocket.close", "code": 4000}) return room_name = path_parts[2] # 接続受け入れ connect_event = await receive() await send({"type": "websocket.accept"}) # ルームに参加 if room_name not in rooms: rooms[room_name] = set() rooms[room_name].add(send) try: # 参加通知 await broadcast(room_name, { "type": "system", "message": f"New user joined. ({len(rooms[room_name])} online)", }) while True: event = await receive() if event["type"] == "websocket.receive": data = json.loads(event["text"]) await broadcast(room_name, { "type": "chat", "message": data.get("message", ""), }) elif event["type"] == "websocket.disconnect": break finally: # ルームから退出(切断時に必ず実行) rooms[room_name].discard(send) if not rooms[room_name]: del rooms[room_name] else: await broadcast(room_name, { "type": "system", "message": f"User left. ({len(rooms.get(room_name, []))} online)", }) async def broadcast(room_name, message): """ルーム内の全クライアントにメッセージを送信""" text = json.dumps(message) disconnected = set() for client_send in rooms.get(room_name, set()): try: await client_send({ "type": "websocket.send", "text": text, }) except Exception: disconnected.add(client_send) # 送信失敗した接続を除去 for s in disconnected: rooms.get(room_name, set()).discard(s) ``` このチャットルームの実装で注目すべき点がいくつかあります。 - 各 WebSocket 接続の `send` callable を集合に保持することでブロードキャストが可能になっています(`send` はサーバが接続ごとに生成する callable であり、それ自体がその接続へのメッセージ送信チャネルです) - `try ... finally` ブロックにより、例外や切断が発生しても必ずルームからの退出処理が実行されます - `await receive()` でメッセージを待っている間、イベントループは他の接続のメッセージ処理やブロードキャストを進められるため、50人がチャットルームに接続していても、メッセージを待っている接続は CPU リソースを消費しません WSGI ではこのような双方向通信をフレームワーク内で実現する手段がなく、Django Channels のような外部パッケージや別プロセスの WebSocket サーバが必要でした。 ASGI ではフレームワークの中に自然に統合できます。 ### アプリ起動・終了イベント `scope["type"] == "lifespan"` は、アプリケーションプロセスの起動時と終了時に一度ずつ呼ばれる特殊なスコープです。 HTTP や WebSocket のようなクライアントからの接続ではなく、ASGI サーバ自体がアプリケーションのライフサイクルを管理するために使います。 ```{important} lifespan イベントは、リソースの初期化と解放の確実性を保証します。 - 初期化が完了するまでリクエストを受け付けません - 終了処理が完了してからプロセスを停止します ``` ```python async def application(scope, receive, send): if scope["type"] == "lifespan": await handle_lifespan(scope, receive, send) elif scope["type"] == "http": await handle_http(scope, receive, send) elif scope["type"] == "websocket": await handle_websocket(scope, receive, send) async def handle_lifespan(scope, receive, send): while True: event = await receive() if event["type"] == "lifespan.startup": try: # アプリケーション起動時の初期化処理 await initialize_db_pool() await initialize_cache() await send({"type": "lifespan.startup.complete"}) except Exception as e: await send({ "type": "lifespan.startup.failed", "message": str(e), }) return elif event["type"] == "lifespan.shutdown": # アプリケーション終了時のクリーンアップ await close_db_pool() await close_cache() await send({"type": "lifespan.shutdown.complete"}) return ``` lifespan イベントのフローは以下の通りです。 ``` ASGI サーバ起動 │ ├── application(scope={"type": "lifespan"}, receive, send) 呼び出し │ ├── receive() → {"type": "lifespan.startup"} │ │ │ ├── 初期化成功 → send({"type": "lifespan.startup.complete"}) │ │ → サーバがリクエストの受け付けを開始 │ │ │ └── 初期化失敗 → send({"type": "lifespan.startup.failed"}) │ → サーバがエラーで停止 │ ├── (リクエスト処理が続く...) │ ├── SIGTERM / Ctrl+C 受信 │ ├── receive() → {"type": "lifespan.shutdown"} │ │ │ └── send({"type": "lifespan.shutdown.complete"}) │ → クリーンアップ完了、サーバ停止 │ └── application return ``` lifespan が解決する問題は、リソースの初期化と解放の確実性です。 WSGI ではデータベース接続プールの初期化がモジュールのインポート時やフレームワークの `setup()` 内で暗黙的に行われ、クリーンアップ処理は `atexit` ハンドラやシグナルハンドラに頼っていました。 ASGI の lifespan はこれらを明示的なイベントとして定義することで、起動・停止の順序を確実に保証します。 ```{mermaid} stateDiagram-v2 [*] --> 起動中: ASGI サーバ起動 起動中 --> 初期化: lifespan.startup 初期化 --> リクエスト受付中: startup.complete 初期化 --> 停止: startup.failed リクエスト受付中 --> クリーンアップ: SIGTERM / Ctrl+C クリーンアップ --> 停止: shutdown.complete 停止 --> [*] ``` FastAPI は lifespan を活用するための簡潔な API を提供しています。 `asynccontextmanager` パターンを使うと、startup と shutdown を一つの関数に書けます。 ```python from contextlib import asynccontextmanager from fastapi import FastAPI @asynccontextmanager async def lifespan(app): # startup: yield の前 db_pool = await create_db_pool() app.state.db_pool = db_pool print("Database pool initialized") yield # アプリケーションが稼働中 # shutdown: yield の後 await db_pool.close() print("Database pool closed") app = FastAPI(lifespan=lifespan) ``` `yield` の前がスタートアップ処理、後がシャットダウン処理に対応します。 この `asynccontextmanager` パターンは、内部的には lifespan イベントの送受信に変換されます。 ```{note} Django の ASGI モードでは、`django.setup()` が起動時の初期化を担当します。 Django 自体は lifespan イベントの明示的なフックを公開していませんが、ASGI サーバが lifespan をサポートしていれば、カスタムの ASGI ミドルウェアで lifespan を処理した上で Django の `ASGIHandler` にその他のイベントを委譲するという構成が可能です。 ``` --- HTTP、WebSocket、lifespan の3つのプロトコルを表で整理します。 | scope type | ライフサイクル | receive イベント | send イベント | |---|---|---|---| | `http` | 1リクエスト = 1呼び出し | `http.request`
`http.disconnect` | `http.response.start`
`http.response.body` | | `websocket` | 接続〜切断 | `websocket.connect`
`websocket.receive`
`websocket.disconnect` | `websocket.accept`
`websocket.send`
`websocket.close` | | `lifespan` | プロセス起動〜停止 | `lifespan.startup`
`lifespan.shutdown` | `lifespan.startup.complete`
`lifespan.startup.failed`
`lifespan.shutdown.complete` | WSGI が HTTP の同期的なリクエスト/レスポンスしか扱えなかったのに対し、ASGI は3つのプロトコルを `scope["type"]` による分岐と、`receive`/`send` のイベント交換という統一的なモデルで表現しています。 次節では ASGI ミドルウェアの実装方法を確認し、WSGI ミドルウェアとの構造の違いを比較しましょう。 (WSGI と ASGI の比較)= ## WSGI と ASGI の比較 Vol.1「WSGI が生まれた背景」で WSGI を、本章で ASGI を学んできました。 両者はともに「サーバとアプリケーションの間のインタフェース」を定義するという同じ目的を持ちながら、設計思想が大きく異なります。 本節では **4つの観点** から両者を比較し、どちらを選ぶべきかの判断基準を整理します。 ### シンプルさ vs 柔軟さ WSGI の最小アプリケーションと ASGI の最小アプリケーションを改めて並べてみましょう。 ```python # WSGI — 3行 def application(environ, start_response): start_response("200 OK", [("Content-Type", "text/plain")]) return [b"Hello"] # ASGI — 8行 async def application(scope, receive, send): await receive() await send({ "type": "http.response.start", "status": 200, "headers": [[b"content-type", b"text/plain"]], }) await send({"type": "http.response.body", "body": b"Hello"}) ``` 行数の差は些細に見えますが、背後にある設計の違いは本質的です。 **WSGI** は「呼ばれて返す」という関数の最も基本的なパターンに従っています。 - `environ` から入力を読み、`start_response` でヘッダーを宣言し、イテラブルでボディを返します - Python プログラマーにとって馴染みのある同期的な制御フローです - `print` や `pdb` で簡単にデバッグできます **ASGI** は「イベントを送受信する」というメッセージパッシングのパターンを採用しています。 - `receive` でリクエストデータを受け取り、`send` でレスポンスデータを送り出します - イベント辞書の `type` キーでプロトコルや処理段階を区別します - HTTP、WebSocket、lifespan という異なるプロトコルを統一的に扱う柔軟性をもたらします - 単純な HTTP レスポンスを返すだけでもイベントの組み立てと送信順序を意識する必要があります | | WSGI | ASGI | |---|---|---| | シンプルさ | 高い(制約の裏返し) | 低い(柔軟さの裏返し) | | 設計の問題点 | レスポンス後に通信を続ける手段がない | `send` の順序違反がプロトコルエラーになる | この関係は、Vol.1「本書の対象読者とゴール」で触れた「抽象化を一段ずつ剥がす」という学び方の方針そのものです。 WSGI で十分な要件であれば WSGI のシンプルさを享受し、WSGI では対応できない要件が出てきたときに ASGI の柔軟さに移行する。 両方の仕様を理解していれば、この判断を根拠を持って下せます。 ### sync vs async WSGI と ASGI の **最も根本的な違い** は、同期と非同期の処理モデルです。 WSGI の `application` は同期関数です。 関数が `return` するまでの間、その関数を呼び出したワーカー(プロセスまたはスレッド)は他の処理に使えません。 ```python # WSGI — I/O待ち中にワーカーがブロックされる def application(environ, start_response): user = requests.get("https://api.example.com/users/1").json() # 200ms ブロック orders = requests.get("https://api.example.com/orders/1").json() # 150ms ブロック # 合計 350ms、ワーカーは何もできない ... ``` ASGI の `application` は非同期関数です。 `await` のタイミングでイベントループに制御を返し、I/O の完了を待つ間に **他のリクエストの処理を進められます**。 ```text # ASGI — I/O待ち中に他のリクエストを処理可能 async def application(scope, receive, send): async with httpx.AsyncClient() as client: user_task = client.get("https://api.example.com/users/1") orders_task = client.get("https://api.example.com/orders/1") user_resp, orders_resp = await asyncio.gather(user_task, orders_task) # 2つのリクエストを並行発行 → 合計時間は遅い方の200msに近づく ... ``` この違いは同時接続数とリソース効率に直結します。 - 100の同時リクエストがそれぞれ200msのI/O待ちを持つ場合、WSGI では100のワーカー(プロセスまたはスレッド)が必要です - ASGI では1つのワーカープロセス内のイベントループが100の接続を並行処理できます メモリ消費量の差は、特にワーカー数が多い場合に顕著になります。 ```{mermaid} flowchart LR subgraph WSGI_100同時接続 W1[ワーカー1
専有] --- W2[ワーカー2
専有] --- WN[...
ワーカー100
専有] end subgraph ASGI_100同時接続 EL[イベントループ
1プロセス] --> C1[接続1
await中] EL --> C2[接続2
await中] EL --> CN[接続100
await中] end ``` ```{caution} 非同期モデルには注意点があります。 CPU バウンドな処理(画像処理、暗号計算、大量のデータ変換など)は `await` で制御を返せないため、イベントループ全体をブロックします。 WSGI のプロセスモデルでは1つのワーカーがブロックされても他のワーカーは影響を受けませんが、ASGI のイベントループモデルでは1つの CPU バウンド処理がそのイベントループ上の **全接続に影響** します。 ``` ```text # ASGI での CPU バウンド処理 — イベントループをブロックする async def application(scope, receive, send): result = heavy_computation() # await がない → イベントループがブロック ... # 対策: スレッドプールに逃がす import asyncio async def application(scope, receive, send): loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, heavy_computation) ... ``` Django が ASGI モードでも同期ビューをサポートしているのは、この現実を踏まえた設計です。 同期ビューはスレッドプールで実行され、イベントループをブロックしません。 既存の同期コード資産を活かしながら、必要な部分だけを `async def` で書くという **段階的な移行** が可能です。 ### 対応プロトコル WSGI が扱えるのは HTTP のリクエスト/レスポンスだけです。 1回の `application` 呼び出しが1つの HTTP トランザクションに対応し、それ以外のプロトコルを表現する手段がありません。 ASGI は `scope["type"]` によるプロトコルの切り替えにより、HTTP、WebSocket、lifespan の3つを標準でサポートしています。 さらに、仕様は拡張可能な設計になっており、将来的に HTTP/2 Server Push や HTTP/3 といったプロトコルが追加される可能性もあります。 | 機能 | WSGI | ASGI | |---|---|---| | HTTP リクエスト/レスポンス | ○ | ○ | | HTTP ストリーミング | △(ワーカー占有の問題あり) | ○ | | WebSocket | ✕ | ○ | | Server-Sent Events | △(非効率) | ○ | | lifespan 管理 | ✕ | ○ | | 双方向通信 | ✕ | ○ | WSGI でも `StreamingHttpResponse` やジェネレータを使ったストリーミングは形式上可能ですが、{numref}`なぜ ASGI が必要になったのか`({ref}`なぜ ASGI が必要になったのか`)で解説した通り、ワーカーが接続中ずっと占有される問題があります。 ASGI では `await` の間にイベントループが他の処理を進められるため、ストリーミングと並行処理が両立します。 この違いが最も顕著になるのは、リアルタイム性が求められるアプリケーションです。 - **ASGI が必須** — チャット、通知、ライブダッシュボード、LLM のトークンストリーミング - **WSGI で十分** — 管理画面、CRUD API、バッチ処理のような伝統的な Web アプリケーション ### 学習コスト WSGI の学習コストは **低い** です。 - `application(environ, start_response)` という関数シグネチャを覚えます - `environ` のキー名と `start_response` の呼び出し方を把握すれば最小のアプリケーションを書けます - PEP 3333 の仕様は短く、同期 Python の知識だけで理解できます ASGI の学習コストは WSGI より明らかに **高い** です。 - `async/await` と `asyncio` の基本を理解した上で - `scope`/`receive`/`send` の3引数モデル - イベント辞書の `type` による分岐 - HTTP と WebSocket で異なるイベントのライフサイクル - `more_body` フラグによるチャンク管理 - lifespan イベントの処理 といった ASGI 固有の概念を習得する必要があります。 ```{tip} 本書をここまで読んだ読者であれば、この学習コストの大部分はすでに支払い済みです。 Vol.1「HTTP は何をやりとりしているのか」で HTTP のテキスト構造を理解し、Vol.1「まずは 1 リクエストだけ処理するサーバを作る」で自作サーバを書き、Vol.1「WSGI が生まれた背景」で WSGI の仕様を学んだ上で ASGI を見ると、以下の対応関係が見えてきます。 - `scope` は `environ` の再設計 - `receive` は `wsgi.input` の非同期化 - `send` は `start_response` とイテラブル返却の統合 低レイヤの理解があるからこそ、抽象度が上がっても「何が起きているか」を追跡できるのです。 ``` 実際の開発では、ASGI の生のインタフェースを直接扱うことは稀です。 Django は `ASGIHandler` が、FastAPI は Starlette がこのインタフェースを隠蔽し、開発者は `async def view(request)` や `@app.get("/users")` のようなフレームワークレベルの API を使います。 ASGI の仕様を知ることの価値は、フレームワークを使う際の効率ではなく、**問題が起きた際にフレームワークの内側を追跡できる力** にあります。 --- 本章の最後に、WSGI と ASGI の選択基準をまとめます。 **WSGI を継続することが合理的な場合:** - 既存の Django アプリケーションが同期的な処理で問題なく動作している - WebSocket やリアルタイム通信の要件がない - I/O 待ちもワーカー数の増加で対処できる範囲である デプロイ構成がシンプルで、エコシステムの成熟度も高く、トラブルシューティングの知見も豊富です。 **ASGI が適切な場合:** - 非同期 I/O による効率化が必要である - WebSocket や SSE を使ったリアルタイム通信が求められる - LLM のストリーミング応答のように長時間接続が多い - 新規プロジェクトで FastAPI を採用する どちらか一方だけを知っていれば十分ということはありません。 Django は WSGI と ASGI の両方をサポートしており、本番環境の要件によってどちらを使うかが変わります。 両方の仕様を理解していれば、プロジェクトの要件に応じた正しい選択ができ、問題が起きたときにどの層で何が起きているかを追跡できます。 次節では、ここまで学んだ ASGI の知識を実際の問題解決に応用します。`async def` の中でのブロッキング呼び出し、イベントループの停滞、WebSocket の切断処理といった ASGI 固有のトラブルを取り上げます。 (ch07-トラブルシューティングの観点)= ## トラブルシューティングの観点 {numref}`なぜ ASGI が必要になったのか`({ref}`なぜ ASGI が必要になったのか`)を通じて ASGI の仕様を追跡してきました。 本節ではその知識を実際の問題解決に応用します。 ASGI 固有のトラブルは **2つのカテゴリ** に大別されます。 - **非同期プログラミングの落とし穴** — ブロッキング処理の混入など - **ASGI プロトコルの制約違反** — イベント送信順序の誤りなど いずれもフレームワークが多くの部分を隠蔽してくれますが、隠蔽の裏側で起きている問題を理解していなければ、原因の特定に苦労します。 ### async 関数なのに blocking ASGI アプリケーションで **最も頻出し、かつ最も発見しにくい** バグが、`async def` の中で同期的なブロッキング処理を呼び出してしまうケースです。 ```python # 一見問題なさそうに見えるが、イベントループ全体をブロックする @app.get("/users/{user_id}") async def user_detail(user_id: int): user = requests.get(f"https://api.example.com/users/{user_id}").json() # NG return user ``` この関数は `async def` で定義されているため、ASGI サーバのイベントループ上で直接実行されます。 しかし `requests.get` は同期ライブラリであり、レスポンスが返るまでスレッド全体をブロックします。 イベントループがブロックされると、そのイベントループ上で処理中の他のすべてのリクエスト、WebSocket 接続、タイマーが停止します。 外部 API が200ms かかる場合、その200msの間に **他の全クライアントが応答を待たされます**。 ```{caution} このバグが開発環境では発覚しにくい点に注意が必要です。 同時接続が1つしかなければブロッキングの影響は自分自身だけに閉じます。 本番環境で同時接続数が増えたときに初めて、レスポンスタイムの劣化やタイムアウトとして顕在化します。 ``` 原因の特定には、イベントループのブロッキングを検出する `asyncio` のデバッグモードが有効です。 以下の設定でブロッキング処理を検出できます。 ```python # 環境変数で有効化 # PYTHONASYNCIODEBUG=1 uvicorn main:app # または起動時に設定 import asyncio asyncio.get_event_loop().slow_callback_duration = 0.1 # 100ms以上のブロッキングを警告 ``` これを有効にすると、イベントループが100ms以上ブロックされた場合にログに警告が出力されます。 対処法は3つあります。 **方法 1: 非同期ライブラリに置き換える** `requests` を `httpx` に、`psycopg2` を `asyncpg` に、`open()` を `aiofiles` に置き換えます。 ```python import httpx @app.get("/users/{user_id}") async def user_detail(user_id: int): async with httpx.AsyncClient() as client: resp = await client.get(f"https://api.example.com/users/{user_id}") return resp.json() ``` **方法 2: スレッドプールに逃がす** 非同期版が存在しないライブラリや、変更コストが高い既存コードに対して有効です。 ```python import asyncio from functools import partial @app.get("/users/{user_id}") async def user_detail(user_id: int): loop = asyncio.get_event_loop() user = await loop.run_in_executor( None, # デフォルトのスレッドプール partial(requests.get, f"https://api.example.com/users/{user_id}") ) return user.json() ``` **方法 3: ビュー関数を `def`(同期関数)として定義する** Django と FastAPI はいずれも、`async def` ではなく `def` で定義されたビューをスレッドプールで実行する仕組みを持っています。 ```python # FastAPI: def で定義すると自動的にスレッドプールで実行される @app.get("/users/{user_id}") def user_detail(user_id: int): user = requests.get(f"https://api.example.com/users/{user_id}").json() return user # イベントループをブロックしない ``` ```{important} 「`async def` にしたほうが速い」とは限りません。 内部で同期 I/O を使うなら `def` のままにしておく方が安全です。 `async def` は非同期ライブラリと `await` を使う場合にのみ意味があります。 ``` ### イベントを返し忘れる ASGI プロトコルでは `send` の呼び出し順序と回数に厳格な制約があります。 これを守らないと、クライアントにレスポンスが届かなかったり、ASGI サーバがエラーを出したりします。 最も多いのは `http.response.start` を送らずに `http.response.body` を送るケースです。 ```text # NG: ステータスとヘッダーが未送信 async def application(scope, receive, send): await receive() await send({ "type": "http.response.body", "body": b"Hello", }) # → サーバが例外を送出するか、クライアントに空レスポンスが届く ``` 逆に、`http.response.start` だけ送って `http.response.body` を送らない場合も問題になります。 ```text # NG: ボディが未送信 async def application(scope, receive, send): await receive() await send({ "type": "http.response.start", "status": 200, "headers": [[b"content-type", b"text/plain"]], }) # application が return してしまう → クライアントは応答を待ち続ける ``` 条件分岐の中でレスポンスを送り忘れるパターンも頻出します。 ```python # NG: else 節でレスポンスを送っていない async def application(scope, receive, send): await receive() path = scope["path"] if path == "/": await send_text(send, "Welcome!") elif path == "/about": await send_text(send, "About page") # /other にアクセスすると application が何も送らずに return ``` ```{note} フレームワークを使っていればこの問題はほぼ発生しません。 Django の `ASGIHandler` や FastAPI/Starlette は、ビュー関数の戻り値を必ず `http.response.start` と `http.response.body` に変換します。 未処理例外が発生しても500レスポンスが自動生成されます。 ``` しかしカスタムの ASGI ミドルウェアを書く場合や、フレームワークを使わずに ASGI アプリケーションを直接実装する場合には、すべてのコードパスでレスポンスが送信されることを確認する必要があります。 防御的な実装として、レスポンスの送信を保証するラッパーを書く方法があります。 ただし `http.response.start` 送信後に例外が発生した場合は、`http.response.start` を二重に送れないため接続を切断するしかありません。 ```python async def safe_application(scope, receive, send): try: await application(scope, receive, send) except Exception: import traceback traceback.print_exc() body = b"Internal Server Error" await send({ "type": "http.response.start", "status": 500, "headers": [ [b"content-type", b"text/plain"], [b"content-length", str(len(body)).encode()], ], }) await send({ "type": "http.response.body", "body": body, }) ``` この制約は WSGI でも同様で、`start_response` の後に例外が発生した場合の `exc_info` 引数による再設定と同じ問題領域です。 ### body の受信ループ漏れ {numref}`ASGI の HTTP モデル`({ref}`ASGI の HTTP モデル`)で `more_body` フラグを監視するループの重要性を解説しましたが、このループを省略してしまうバグは実装時に起きがちです。 ```python # NG: 1回の receive で全ボディが届く前提 async def application(scope, receive, send): event = await receive() body = event.get("body", b"") data = json.loads(body) # ボディが不完全だと JSONDecodeError ... ``` ```{caution} 小さなリクエストボディ(数KB以下の JSON)では1回の `receive` で全データが届くことが多いため、開発中にこのバグが発覚しないことがあります。 本番環境で大きなリクエスト(画像アップロード、大量のフォームデータ、長い JSON ペイロード)が送信されたときに初めて問題が顕在化します。 `json.loads` が不完全な JSON を受け取って `JSONDecodeError` を送出するか、最悪の場合はデータが切り詰められたまま処理が進みます。 ``` 正しい実装は前節で示した `read_body` ヘルパーを使うことです。 `more_body` を監視するループが内部に含まれているため、チャンク分割を意識せずにボディ全体を安全に取得できます。 ```python async def application(scope, receive, send): body = await read_body(receive) # more_body を監視して完全に受信 data = json.loads(body) ... ``` ```{note} フレームワークを使う場合、FastAPI の `Request.body()` や Django の `request.body` は内部で `more_body` ループを実装しているため、開発者がこの問題を意識する必要はありません。 しかしカスタムミドルウェアで `receive` を直接呼ぶ場合は注意が必要です。 ``` もう一つの落とし穴は、ミドルウェアで `receive` を消費した後にアプリケーションに渡すケースです。 ```text # NG: ミドルウェアが receive を消費してしまう class LoggingMiddleware: def __init__(self, app): self.app = app async def __call__(self, scope, receive, send): if scope["type"] == "http": body = await read_body(receive) # receive を完全に消費 print(f"Request body: {body}") await self.app(scope, receive, send) # → アプリケーションが receive を呼ぶと http.disconnect が返る ``` WSGI では `environ["wsgi.input"]` の同じ問題がありました({numref}`ch06-トラブルシューティングの観点`({ref}`ch06-トラブルシューティングの観点`)参照)。 ASGI では `receive` を再利用可能にするために、消費済みのボディを返す **新しい `receive`** を作成して渡す必要があります。 ```python # OK: 消費済みボディを再利用可能にする class LoggingMiddleware: def __init__(self, app): self.app = app async def __call__(self, scope, receive, send): if scope["type"] == "http": body = await read_body(receive) print(f"Request body: {body}") # ボディを再提供する新しい receive を作成 body_sent = False async def cached_receive(): nonlocal body_sent if not body_sent: body_sent = True return {"type": "http.request", "body": body, "more_body": False} return await receive() # disconnect 等は元の receive から取得 await self.app(scope, cached_receive, send) else: await self.app(scope, receive, send) ``` ### disconnect の扱い クライアントが接続を切断した後にレスポンスを送信しようとすると、ASGI サーバの実装によって異なる挙動が生じます。 - 例外が送出される場合 - 黙って送信が無視される場合 - ログにエラーが記録される場合 長時間の処理を伴うエンドポイントでこの問題が顕在化します。 ```text @app.get("/slow-report") async def slow_report(): data = await generate_heavy_report() # 30秒かかる return JSONResponse(data) # → ユーザーが10秒で離脱した場合、残り20秒は無駄な処理 ``` {numref}`receive / send を理解する`({ref}`receive / send を理解する`)で示した `http.disconnect` の監視パターンを使えば、クライアントの離脱を検知して処理を中断できます。 しかしフレームワークのビュー関数内で `receive` を直接呼ぶのは一般的ではないため、実践的にはフレームワークが提供する仕組みを利用します。 FastAPI/Starlette では `Request.is_disconnected()` メソッドが利用可能です。 ループ内で定期的にチェックすることで、不要な処理を中断できます。 ```text from starlette.requests import Request @app.get("/slow-report") async def slow_report(request: Request): for chunk in processing_steps: if await request.is_disconnected(): # クライアントが離脱済み → 処理を中断 return Response(status_code=499) # Nginx の慣例的なコード await process_chunk(chunk) return JSONResponse(result) ``` WebSocket の切断処理はより明確です。 `websocket.disconnect` イベントを受け取ったらループを抜けてリソースを解放します。 ただし `receive` を待っている最中に切断されるケースだけでなく、`send` を呼んだ際に切断が検出されるケースもあります。 ```text async def websocket_handler(scope, receive, send): await receive() # websocket.connect await send({"type": "websocket.accept"}) try: while True: event = await receive() if event["type"] == "websocket.disconnect": break # メッセージ処理 try: await send({"type": "websocket.send", "text": "response"}) except Exception: # 送信時に切断を検出 → ループを抜ける break finally: # リソースのクリーンアップ(ルームからの退出など) await cleanup() ``` ```{tip} `try ... finally` パターンは WebSocket アプリケーションにおける鉄則です。 正常な切断、異常な切断、サーバ側の例外のいずれの場合でも、クリーンアップ処理(チャットルームからの退出、サブスクリプションの解除、一時リソースの解放)が確実に実行されます。 ``` --- 本章で取り上げた4つのトラブルをまとめます。 | トラブル | 原因 | 対処法 | |---|---|---| | async 関数なのに blocking | 同期ライブラリをそのまま使用 | 非同期ライブラリへの置き換え、またはスレッドプールへの逃避 | | イベントを返し忘れる | `send` の呼び出し順序・回数の誤り | すべてのコードパスにレスポンス送信を保証 | | body の受信ループ漏れ | `more_body` ループの省略 | `read_body` ヘルパーの使用 | | disconnect の扱い | 長寿命接続のリソース管理不備 | `try...finally` パターンで確実なクリーンアップ | WSGI の同期モデルではこれらの問題の多くが構造的に存在しませんでした。ASGI の柔軟性は、これらの新しい種類のバグと引き換えに得られるものです。 次章では ASGI ミドルウェアの実装を確認し、{numref}`FastAPI を ASGI 視点で見る`({ref}`FastAPI を ASGI 視点で見る`)で FastAPI がこれらの ASGI の複雑さをどのように隠蔽しているかを内部から追跡します。