2. なぜ ASGI が必要になったのか
Vol.1「WSGI が生まれた背景」で WSGI の仕組みを学び、Vol.1「WSGI の上に何が必要になるのか」でその上にフレームワークを構築し、1 章(Django を WSGI 視点で見る)で Django が WSGI をどのように活用しているかを追跡してきました。 WSGI はシンプルで相互運用性が高く、Python Web エコシステムの基盤として20年近く機能してきました。 しかし Web の進化とともに、WSGI の設計では対応できない領域が広がってきています。
重要
本節では、ASGI が必要になった 4つの背景 を整理します。 それぞれが独立した問題ではなく、互いに関連する要因です。
2.1. WSGI の同期モデルの限界
WSGI の application(environ, start_response) は 同期関数 です。
この関数が return するまで、呼び出し元の WSGI サーバはそのワーカーを他のリクエストに使えません。
Vol.1「WSGI が生まれた背景」で確認した通り、これは「1リクエスト = 1ワーカーの占有」を意味します。
この設計は、ビュー関数の処理が CPU バウンド(計算中心)である場合には合理的です。 しかし現代の Web アプリケーションでは、ビュー関数の処理時間の大半を I/O 待ち が占めています。
データベースクエリ
外部 API の呼び出し
ファイルの読み書き
キャッシュサーバへのアクセス
これらの待機中、CPU は何もしていないにもかかわらず、ワーカーは占有されたままです。
# WSGI (同期) — 外部API呼び出し中、ワーカーは何もできない
def application(environ, start_response):
# 外部APIに200ms待たされる → ワーカーは200ms間ブロック
user_data = requests.get("https://api.example.com/users/42").json()
# DBクエリに50ms待たされる → ワーカーはさらに50ms間ブロック
orders = db.execute("SELECT * FROM orders WHERE user_id = 42")
# 合計250msの間、このワーカーは他のリクエストを処理できない
...
この問題に対する WSGI 側の対処法は、ワーカーの数を増やすことです。
Gunicorn であれば --workers 8 --threads 4 のように設定し、最大32の同時リクエストを処理します。
しかし、プロセスもスレッドもメモリを消費します。
一般的な Django アプリケーションはワーカー1つあたり数十MB から数百MBのメモリを使用するため、数百の同時接続を捌くには相応のサーバリソースが必要になります。
非同期 I/O を使えば、一つのプロセスで数千の同時接続を処理できます。
I/O 待ちの間に他のリクエストの処理に切り替え、I/O が完了したら元のリクエストの処理を再開します。
この仕組みにより、少ないリソースで高い並行性を実現できます。
しかし WSGI の application は同期関数であるため、async def で定義することも、内部で await を使うこともできません。
# これは WSGI では不可能
async def application(environ, start_response):
user_data = await httpx.get("https://api.example.com/users/42") # NG
...
2.2. 長時間接続
WSGI は「1リクエスト → 1レスポンス」という HTTP/1.1 の基本モデルに基づいて設計されています。
クライアントがリクエストを送り、サーバがレスポンスを返して完了します。
この往復が WSGI の1回の application 呼び出しに対応します。
しかし現代の Web アプリケーションでは、レスポンスを返した後もクライアントとの接続を維持し続けるユースケースが増えています。
チャットアプリケーションのリアルタイムメッセージ配信
ダッシュボードのライブデータ更新
通知システムのプッシュ配信
LLM の応答をトークン単位でストリーミングする処理
Server-Sent Events(SSE)は HTTP の枠組み内で長時間接続を実現する手法で、サーバからクライアントへ一方向にイベントを送り続けます。
WSGI でも StreamingHttpResponse とジェネレータを使えば形式上は実現可能ですが、ワーカーが接続中ずっと占有される問題は解消されません。
100人のユーザーがダッシュボードを開いていれば、100のワーカーが常時占有されます。
注意
WSGI で長時間接続を実装すると、ワーカーが接続中ずっと占有されます。 100人のユーザーがダッシュボードを開くだけで、100のワーカーが常時ブロックされます。
# WSGI での SSE — 動作はするがワーカーを長時間占有する
def sse_view(request):
def event_stream():
while True:
data = get_latest_data()
yield f"data: {json.dumps(data)}\n\n"
time.sleep(1) # ワーカー全体が1秒ブロック
return StreamingHttpResponse(event_stream(), content_type="text/event-stream")
非同期モデルでは、await asyncio.sleep(1) の間に他の接続の処理を進められるため、少数のワーカーで数千の長時間接続を同時に維持できます。
2.3. WebSocket
WebSocket は HTTP とは根本的に異なる通信パターンを持つプロトコルです。
最初に HTTP でハンドシェイクを行い、101 Switching Protocols レスポンスの後、TCP 接続を WebSocket プロトコルにアップグレードします。
以降はクライアントとサーバの 双方がいつでもメッセージを送受信できる全二重通信 になります。
クライアント → サーバ: GET /ws HTTP/1.1
Upgrade: websocket
Connection: Upgrade
サーバ → クライアント: HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
── 以降、双方向のメッセージ交換 ──
クライアント → サーバ: {"type": "chat", "message": "Hello"}
サーバ → クライアント: {"type": "chat", "message": "Hi there!"}
サーバ → クライアント: {"type": "notification", "count": 3}
クライアント → サーバ: {"type": "typing", "status": true}
WSGI のインタフェースはこの通信パターンを表現できません。
application(environ, start_response)は一度呼ばれて一度返る関数であり、接続を維持してメッセージを双方向にやり取りするという概念が存在しませんenvironには HTTP リクエストの情報しか入らず、WebSocket フレームを受信する手段がありませんstart_responseは一度だけステータスとヘッダーを送信するための仕組みであり、その後に任意のタイミングでメッセージを送るという機能を持ちません
WSGI の時代にリアルタイム通信が必要な場合は、Django とは別に Socket.IO サーバや Tornado を並行して運用し、Nginx でパスごとに振り分けるという構成が取られていました。 しかしこの構成では認証やセッション管理がアプリケーション間で分断され、複雑さが増します。
ASGI は HTTP と WebSocket の 両方を単一のインタフェースで扱える ように設計されています。
同じ application callable が HTTP リクエストも WebSocket 接続も処理でき、フレームワーク側で統一的にルーティングや認証を適用できます。
2.4. async/await の普及
Python 3.5(2015年)で async def と await が言語に正式に導入され、Python 3.6 以降で asyncio エコシステムが急速に成熟しました。
注釈
以下のような非同期 I/O ライブラリが整備され、非同期プログラミングが Python の主流の一部になりました。
aiohttp— 非同期 HTTP クライアント/サーバhttpx— 同期・非同期の両対応 HTTP クライアントasyncpg— PostgreSQL 向け非同期ドライバaioredis— Redis 向け非同期クライアント
import asyncio
import httpx
async def fetch_user_and_orders(user_id):
async with httpx.AsyncClient() as client:
# 2つのリクエストを同時に発行 — 合計時間は遅い方に揃う
user_task = client.get(f"https://api.example.com/users/{user_id}")
orders_task = client.get(f"https://api.example.com/users/{user_id}/orders")
user_resp, orders_resp = await asyncio.gather(user_task, orders_task)
return user_resp.json(), orders_resp.json()
この async/await の力を Web アプリケーションで活用するには、フレームワークとサーバの間のインタフェース自体が非同期に対応している必要があります。
WSGI の同期的な application(environ, start_response) では await を使えないため、言語レベルで用意された非同期の恩恵を Web アプリケーションに持ち込めません。
ASGI は async def application(scope, receive, send) という非同期関数をインタフェースとして定義することで、この問題を解決しました。
receiveで非同期にリクエストボディや WebSocket メッセージを受信できますsendで非同期にレスポンスを送信できますフレームワーク内部でも
awaitを自由に使えます
これにより、非同期 ORM(Django 4.1 以降の async ORM)、非同期 HTTP クライアント、非同期キャッシュアクセスといった機能がフレームワークに統合可能になりました。
# ASGI アプリケーション — async/await が自然に使える
async def application(scope, receive, send):
if scope["type"] == "http":
body = b"Hello, ASGI World!"
await send({
"type": "http.response.start",
"status": 200,
"headers": [[b"content-type", b"text/plain"]],
})
await send({
"type": "http.response.body",
"body": body,
})
Django は 3.0 で ASGI 対応を導入し、4.1 で非同期ビューと非同期 ORM のサポートを拡充しました。 FastAPI は最初から ASGI ネイティブとして設計されています。 WSGI が消えるわけではありませんが、非同期 I/O、長時間接続、WebSocket が求められる場面では ASGI が必須の選択肢になっています。
次節では ASGI の仕様を具体的に読み解きます。
WSGI の environ / start_response / イテラブルに対応する ASGI の scope / receive / send の役割を一つずつ確認し、最小の ASGI アプリケーションを手で書いてみましょう。
2.5. ASGI の基本構造
前節で ASGI が必要になった背景を整理しました。
本節では ASGI の仕様そのものを読み解きます。
WSGI が application(environ, start_response) という同期関数で定義されていたのに対し、ASGI は async def application(scope, receive, send) という非同期関数で定義されます。
引数の数も名前も異なりますが、「サーバとアプリケーションの間のインタフェースを定義する」という目的は同じです。
Tip
WSGI の environ/start_response/イテラブルという3つの概念が、ASGI では scope/receive/send に対応します。
名前は変わりますが、それぞれの役割は対称的です。
2.5.1. application(scope, receive, send)
ASGI アプリケーションの最小形は以下の通りです。
async def application(scope, receive, send):
if scope["type"] == "http":
await receive()
await send({
"type": "http.response.start",
"status": 200,
"headers": [
[b"content-type", b"text/plain; charset=utf-8"],
],
})
await send({
"type": "http.response.body",
"body": b"Hello, ASGI World!",
})
WSGI の最小アプリと並べてみると、構造の違いが明確になります。 両者の対応関係に注目してください。
# WSGI
def application(environ, start_response):
start_response("200 OK", [("Content-Type", "text/plain")])
return [b"Hello, WSGI World!"]
# ASGI
async def application(scope, receive, send):
await receive()
await send({"type": "http.response.start", "status": 200,
"headers": [[b"content-type", b"text/plain"]]})
await send({"type": "http.response.body", "body": b"Hello, ASGI World!"})
3つの引数の役割を見ていきます。
scope は接続に関するメタ情報を格納した辞書です。
WSGI の environ に相当しますが、いくつかの重要な違いがあります。
scope には接続の種類を示す type キーが含まれ、"http"、"websocket"、"lifespan" のいずれかを取ります。
HTTP リクエストの場合は method、path、headers、query_string などが含まれますが、リクエストボディは scope には入りません。
ボディは receive を通じて非同期に取得します。
receive はサーバからアプリケーションへイベントを届ける非同期 callable です。
await receive() を呼ぶとイベント辞書が返されます。
HTTP リクエストであればボディを含む {"type": "http.request", "body": b"..."} が、WebSocket であればメッセージを含む {"type": "websocket.receive", "text": "..."} が届きます。
WSGI の environ["wsgi.input"].read() に相当しますが、以下の点が異なります。
非同期である
複数回呼び出してチャンク単位で受信できる
WebSocket メッセージも同じインタフェースで受信できる
send はアプリケーションからサーバへイベントを送出する非同期 callable です。
await send(event_dict) を呼ぶことでレスポンスをサーバに返します。
WSGI では start_response でステータスとヘッダーを送り、イテラブルの返却でボディを送るという二段構えでしたが、ASGI では send を複数回呼ぶことで同じことを実現します。
HTTP レスポンスの場合、最初の send で http.response.start(ステータスとヘッダー)を、次の send で http.response.body(ボディ)を送ります。
この対応関係をまとめます。
WSGI ASGI
────────────────────── ──────────────────────
environ (dict) scope (dict)
environ["REQUEST_METHOD"] scope["method"]
environ["PATH_INFO"] scope["path"]
environ["QUERY_STRING"] scope["query_string"]
environ["HTTP_HOST"] scope["headers"] 内
environ["wsgi.input"] receive() で取得
start_response(status, headers) send({"type": "http.response.start", ...})
return iterable send({"type": "http.response.body", ...})
2.5.2. connection scope
scope 辞書の内容はプロトコルの種類(type)によって異なります。
HTTP 接続の場合の主要なキーを確認しましょう。
# HTTP リクエストの scope 例
{
"type": "http",
"asgi": {"version": "3.0"},
"http_version": "1.1",
"method": "GET",
"path": "/users/42/",
"root_path": "",
"scheme": "http",
"query_string": b"format=json",
"headers": [
[b"host", b"example.com"],
[b"user-agent", b"curl/8.7.1"],
[b"accept", b"*/*"],
],
"server": ("127.0.0.1", 8000),
"client": ("192.168.1.10", 54321),
}
WSGI の environ と比較すると、いくつかの設計上の違いに気づきます。
ヘッダーの形式が異なります —
HTTP_プレフィックス付きの文字列キーではなく、バイト列タプルのリストとして格納されています。WSGI が CGI の命名規則を引き継いでいたのに対し、ASGI は HTTP の生の表現に近い形を採用しています。Content-Typeを取得する場合、WSGI ではenviron["CONTENT_TYPE"]でしたが、ASGI ではheadersリストを走査してb"content-type"を探す必要があります。query_stringがバイト列です — WSGI では文字列でしたが、ASGI はデコード前のバイト列をそのまま渡します。フレームワーク側でデコードとパースを行う設計です。serverとclientがタプルで提供されます — WSGI のSERVER_NAME/SERVER_PORTやREMOTE_ADDR/REMOTE_PORTに対応しますが、より構造化された形式です。
WebSocket 接続の場合、scope は以下のようになります。
# WebSocket 接続の scope 例
{
"type": "websocket",
"asgi": {"version": "3.0"},
"http_version": "1.1",
"scheme": "ws",
"path": "/ws/chat/room1/",
"query_string": b"token=abc123",
"headers": [...],
"server": ("127.0.0.1", 8000),
"client": ("192.168.1.10", 54322),
}
HTTP と WebSocket で scope の構造が統一されているため、ルーティングや認証の仕組みをプロトコルをまたいで共有できます。
type フィールドを見るだけでどのプロトコルの接続かを判別でき、同じアプリケーション callable 内で分岐処理を書けます。
注釈
lifespan タイプは、アプリケーションの起動時と終了時に一度ずつ呼ばれる特殊な scope です。
データベース接続プールの初期化やクリーンアップ処理に使われます。
async def application(scope, receive, send):
if scope["type"] == "lifespan":
while True:
message = await receive()
if message["type"] == "lifespan.startup":
# DB接続プール初期化など
await send({"type": "lifespan.startup.complete"})
elif message["type"] == "lifespan.shutdown":
# リソース解放
await send({"type": "lifespan.shutdown.complete"})
return
elif scope["type"] == "http":
...
elif scope["type"] == "websocket":
...
2.5.3. event-driven な処理
WSGI と ASGI の 最も本質的な違い は、処理モデルが「呼び出しと返却」から「イベントの送受信」に変わった点です。
WSGI では、サーバが application を呼び出し、アプリケーションが処理を完了してイテラブルを返します。
この1回の往復で1リクエストが完結します。制御の流れは直線的です。
WSGI: サーバ → application() 呼び出し → 処理 → return イテラブル → サーバ
ASGI では、application が呼び出された後、receive と send を通じてサーバとアプリケーションがイベントを交互にやり取りします。
application 関数は処理が完了するまで return しません。
ASGI: サーバ → application() 呼び出し
│
├── await receive() ← サーバからリクエストボディを受信
├── await send(...) → サーバへレスポンスヘッダーを送信
├── await send(...) → サーバへレスポンスボディを送信
│
└── return(application 終了)
HTTP の場合はこのイベントのやり取りが数回で完了しますが、WebSocket の場合は接続が維持される限り receive と send が繰り返されます。
async def websocket_app(scope, receive, send):
# 接続の受け入れ
await send({"type": "websocket.accept"})
while True:
event = await receive()
if event["type"] == "websocket.receive":
text = event.get("text", "")
# エコーバック
await send({"type": "websocket.send", "text": f"Echo: {text}"})
elif event["type"] == "websocket.disconnect":
break
この WebSocket の例を見ると、receive がクライアントからのメッセージを待ち受け、send でサーバからメッセージを送り返すという双方向通信が、同じインタフェースで自然に表現できていることがわかります。
WSGI ではこの通信パターンを表現する手段が存在しませんでした。
イベント駆動モデルの利点は、await のタイミングでイベントループに制御を返せる点です。
await receive()でクライアントのメッセージを待っている間、イベントループは他の接続の処理を進められます100の WebSocket 接続が同時に開いていても、メッセージを待っている間は CPU リソースを消費しません
前節で問題にした「長時間接続がワーカーを占有する」という WSGI の制約は、この非同期イベント駆動モデルによって構造的に解消されています。
HTTP のストリーミングレスポンスも、send を複数回呼ぶことで実現できます。
async def streaming_app(scope, receive, send):
await receive()
await send({
"type": "http.response.start",
"status": 200,
"headers": [[b"content-type", b"text/plain"]],
})
for i in range(5):
await send({
"type": "http.response.body",
"body": f"chunk {i}\n".encode(),
"more_body": True,
})
await asyncio.sleep(1) # 1秒待つ間、他のリクエストを処理可能
await send({
"type": "http.response.body",
"body": b"done\n",
"more_body": False, # これが最後のチャンク
})
more_body フラグが True の間はボディの送信が継続し、False(またはキー省略)で完了します。
await asyncio.sleep(1) の間、イベントループは他のリクエストの処理に使われます。
WSGI の StreamingHttpResponse で time.sleep(1) を書いた場合はワーカー全体がブロックされていましたが、ASGI ではその問題が起きません。
ASGI の基本構造を理解したところで、次節では scope の詳細と receive/send のイベント型を網羅的に確認し、HTTP と WebSocket それぞれのライフサイクルを完全に追跡しましょう。
2.6. scope を理解する
前節で ASGI の3引数(scope, receive, send)の概要を把握しました。
本節では scope 辞書の各キーを一つずつ掘り下げます。
WSGI の environ と対比しながら読み進めることで、同じリクエスト情報がどのように表現形式を変えているかを理解できます。
2.6.1. type
scope["type"] は ASGI アプリケーションが 最初に確認すべきキー です。
この値によって接続のプロトコルが決まり、scope に含まれる他のキーの構成や、receive/send で流れるイベントの種類がすべて変わります。
async def application(scope, receive, send):
if scope["type"] == "http":
await handle_http(scope, receive, send)
elif scope["type"] == "websocket":
await handle_websocket(scope, receive, send)
elif scope["type"] == "lifespan":
await handle_lifespan(scope, receive, send)
else:
raise ValueError(f"Unknown scope type: {scope['type']}")
3つの値があります。
"http"— 通常の HTTP リクエスト/レスポンスです。1回のapplication呼び出しが1つの HTTP トランザクションに対応し、レスポンスの送信が完了するとapplicationはreturnします。"websocket"— WebSocket 接続です。ハンドシェイク完了後にapplicationが呼び出され、接続が切断されるまでreceiveとsendのループが継続します。applicationは接続が閉じられて初めてreturnします。"lifespan"— アプリケーションプロセスの起動時に一度だけ呼び出されます。データベース接続プールの初期化やバックグラウンドタスクの開始など、リクエスト処理の開始前に完了すべき処理を記述します。
WSGI には type に相当するキーがありません。
WSGI は HTTP のリクエスト/レスポンスしか扱えないため、接続の種類を区別する必要がなかったのです。
ASGI が複数のプロトコルを単一のインタフェースで扱えるのは、この type による分岐があるからです。
2.6.2. path
scope["path"] はリクエストのパス部分を 文字列 で格納します。
# リクエスト: GET /api/users/42/?format=json
scope["path"] # → "/api/users/42/"
WSGI の environ["PATH_INFO"] に対応しますが、名前が簡潔になっています。
WSGI では SCRIPT_NAME と PATH_INFO を結合して完全なパスを構築する必要がありましたが、ASGI では scope["path"] が完全なパスを、scope["root_path"] がマウントポイント(WSGI の SCRIPT_NAME 相当)を保持しています。
# WSGI
full_path = environ.get("SCRIPT_NAME", "") + environ.get("PATH_INFO", "/")
# ASGI
full_path = scope.get("root_path", "") + scope["path"]
root_path はアプリケーションがサブパスにマウントされている場合に使われます。
たとえば Nginx が /app/ 配下のリクエストを ASGI サーバに転送する構成では、root_path が "/app" に設定され、path は /app/ を除いた残りのパスになります。
注釈
多くのデプロイ構成では root_path は空文字列です。
2.6.3. method
scope["method"] は HTTP メソッドを大文字の文字列で格納します。
scope["method"] # → "GET", "POST", "PUT", "DELETE", ...
WSGI の environ["REQUEST_METHOD"] と完全に同じ情報です。
名前が短くなっただけで、値の形式に違いはありません。
このキーは scope["type"] == "http" の場合にのみ存在します。
WebSocket の scope には method キーがありません。
WebSocket のハンドシェイクは常に GET で行われるため、明示的に格納する必要がないという設計判断です。
2.6.4. headers
scope["headers"] はリクエストヘッダーを バイト列タプルのリスト として格納します。
ここが WSGI と最も大きく異なる部分です。
scope["headers"]
# → [
# [b"host", b"example.com"],
# [b"user-agent", b"curl/8.7.1"],
# [b"accept", b"application/json"],
# [b"content-type", b"application/json"],
# [b"content-length", b"52"],
# [b"x-request-id", b"abc-123"],
# ]
WSGI では HTTP ヘッダーが CGI 由来の命名規則に従い、HTTP_ プレフィックスの付加、ハイフンからアンダースコアへの変換、大文字化という変換を経て environ に格納されていました。
Content-Type は CONTENT_TYPE、X-Request-Id は HTTP_X_REQUEST_ID になるという、Vol.1「WSGI が生まれた背景」で見た変換規則です。
ASGI ではそうした変換が一切行われず、ヘッダー名は小文字のバイト列としてそのまま格納されます。
この設計にはメリットとデメリットがあります。
観点 |
内容 |
|---|---|
メリット |
HTTP の生の表現に忠実であるため情報の損失がありません。WSGI ではアンダースコアを含む名前( |
デメリット |
特定のヘッダーを取得するためにリストを走査する必要があります。 |
# ASGI でヘッダーを取得するヘルパー関数
def get_header(scope, name):
name_lower = name.lower().encode("latin-1")
for header_name, header_value in scope["headers"]:
if header_name == name_lower:
return header_value.decode("latin-1")
return None
content_type = get_header(scope, "Content-Type")
request_id = get_header(scope, "X-Request-Id")
実際の開発ではフレームワーク(Django の ASGIRequest や Starlette の Request)がこのヘルパーを内部に持っているため、request.headers["content-type"] のような直感的なアクセスが可能です。
しかしフレームワークを介さずに ASGI アプリケーションを書く場合や、ミドルウェアを実装する場合には、この生のデータ構造を理解しておくことが重要です。
2.6.5. query_string
scope["query_string"] はクエリ文字列を バイト列 で格納します。
# リクエスト: GET /users/?name=Taro&page=2
scope["query_string"] # → b"name=Taro&page=2"
WSGI の environ["QUERY_STRING"] が文字列であったのに対し、ASGI ではバイト列です。
URL にはパーセントエンコーディングされた非 ASCII 文字が含まれる可能性があるため、デコード前の生データを渡すという方針です。
パースはアプリケーション側で行います。
Tip
クエリ文字列が空の場合は b"" が格納されます。WSGI と同様に、? 記号自体は含まれません。
from urllib.parse import parse_qs
query_params = parse_qs(scope["query_string"].decode("utf-8"))
# → {"name": ["Taro"], "page": ["2"]}
2.6.6. client / server
scope["client"] と scope["server"] は接続元と接続先のネットワーク情報をタプルで格納します。
scope["client"] # → ("192.168.1.10", 54321) クライアントの IP とポート
scope["server"] # → ("127.0.0.1", 8000) サーバの IP とポート
WSGI ではクライアント情報が environ["REMOTE_ADDR"] と environ["REMOTE_PORT"](存在する場合)に、サーバ情報が environ["SERVER_NAME"] と environ["SERVER_PORT"] に格納されていましたが、いずれも文字列でした。
ASGI ではタプルとして構造化されており、ポート番号は整数です。
警告
リバースプロキシ環境では、Nginx を経由すると scope["client"] は Nginx のアドレスになります。
元のクライアント IP を知るには X-Forwarded-For ヘッダーを参照する必要があります。
Uvicorn の --proxy-headers オプションを有効にすると、X-Forwarded-For の値で scope["client"] を自動的に上書きしてくれます。
scope["client"] は None になる場合もあります。
Unix ソケット経由の接続ではクライアントの IP アドレスという概念が存在しないためです。
アプリケーションコードでは scope.get("client") で安全にアクセスするか、フレームワークの request.client を使うのが安全です。
2.6.7. state
scope["state"] は ASGI 3.0 で追加された比較的新しいキーで、ミドルウェア間やアプリケーション内で リクエストスコープの状態を共有するための辞書 です。
# ミドルウェアで state にデータを設定
class RequestIdMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
scope["state"]["request_id"] = generate_request_id()
await self.app(scope, receive, send)
# アプリケーション側で state を参照
async def application(scope, receive, send):
request_id = scope.get("state", {}).get("request_id", "unknown")
...
WSGI では environ 辞書に直接キーを追加することでミドルウェア間のデータ共有を行う慣習がありましたが、これは WSGI の仕様が定める environ のキー命名規則と衝突する可能性がありました。
ASGI の state は明示的にアプリケーション用のデータ領域として用意されているため、サーバが管理するキーとの衝突を避けられます。
state はサーバが空の辞書として初期化し、ミドルウェアやアプリケーションが自由に読み書きします。
HTTP の場合は1リクエストの処理中だけ生存します
WebSocket の場合は接続が維持されている間保持されます
scope の各キーを確認したところで、WSGI の environ との対応関係を改めてまとめます。
WSGI environ ASGI scope
─────────────────────── ───────────────────────
(なし) type ("http" / "websocket" / "lifespan")
REQUEST_METHOD method
PATH_INFO path
SCRIPT_NAME root_path
QUERY_STRING (str) query_string (bytes)
HTTP_* (変換済み文字列) headers (生バイト列タプルのリスト)
CONTENT_TYPE headers 内 (b"content-type")
CONTENT_LENGTH headers 内 (b"content-length")
REMOTE_ADDR + REMOTE_PORT client (tuple or None)
SERVER_NAME + SERVER_PORT server (tuple)
SERVER_PROTOCOL http_version
wsgi.url_scheme scheme
wsgi.input receive() で取得
(慣習的に environ に追加) state (dict)
次節では receive と send のイベント型を詳しく見ていき、HTTP と WebSocket それぞれのライフサイクルを完全に追跡しましょう。
2.7. receive / send を理解する
前節で scope の構造を把握しました。
scope はリクエストの「メタ情報」を格納する静的な辞書でしたが、receive と send はリクエストとレスポンスの「データ」をやり取りする動的なチャネルです。
WSGI では environ["wsgi.input"] と start_response + イテラブル返却でこの役割を分担していましたが、ASGI ではすべてをイベント辞書の送受信として統一的に扱います。
注釈
すべてのイベント辞書には type キーが含まれており、イベントの種類を示します。
scope["type"] がプロトコルを区別するのと同様に、イベントの type が処理の段階を区別します。
2.7.1. イベントを受け取る
receive はサーバからアプリケーションへイベントを届ける非同期 callable です。
await receive() を呼ぶたびに1つのイベント辞書が返されます。
HTTP リクエストの場合、receive が返すイベントは http.request 型です。
event = await receive()
# → {
# "type": "http.request",
# "body": b'{"name": "Taro", "email": "[email protected]"}',
# "more_body": False,
# }
body はリクエストボディのバイト列で、more_body は後続のチャンクが存在するかを示すフラグです。
小さなリクエストでは1回の
receiveで全ボディが届きます大きなリクエスト(ファイルアップロードなど)ではサーバがボディを分割して送信します
# 大きなリクエストボディを完全に受信する
async def read_body(receive):
body = b""
while True:
event = await receive()
body += event.get("body", b"")
if not event.get("more_body", False):
break
return body
このパターンは、Vol.1「HTTP は何をやりとりしているのか」で学んだ TCP ソケットの部分受信と Vol.1「まずは 1 リクエストだけ処理するサーバを作る」で実装したヘッダー終端検出ループの非同期版です。
TCP が「メッセージ境界を持たないバイトストリーム」であったのと同様に、ASGI の receive も「ボディ全体が一度に届く保証はない」という前提で設計されています。
more_body フラグが WSGI には存在しなかった明示的な終端通知であり、Content-Length から自分で計算する必要がなくなっています。
重要
GET リクエストのようにボディが空の場合でも、receive を一度呼び出す必要があります。
サーバは空ボディを示す {"type": "http.request", "body": b"", "more_body": False} を返します。
これを呼び出さずに send を始めると、サーバの実装によってはプロトコル違反とみなされる可能性があります。
WebSocket の場合、receive はクライアントからのメッセージやイベントを返します。
HTTP との違いは、接続が維持される限り何度でも receive を呼び出せる点です。
# WebSocket の receive が返すイベント例
{"type": "websocket.connect"} # 接続要求
{"type": "websocket.receive", "text": "Hello"} # テキストメッセージ
{"type": "websocket.receive", "bytes": b"\x89\x00"} # バイナリメッセージ
{"type": "websocket.disconnect", "code": 1000} # 切断
HTTP では receive が通常1〜数回で完了しますが、WebSocket では接続が維持される限り何度でも receive を呼び出してメッセージを待ち受けます。
この違いが、WSGI の「1回の呼び出しで1リクエスト完結」というモデルでは WebSocket を表現できなかった理由です。
WebSocket の receive イベント一覧
イベント type |
意味 |
|---|---|
|
接続要求 |
|
テキストメッセージ受信 |
|
バイナリメッセージ受信 |
|
切断( |
2.7.2. イベントを送る
send はアプリケーションからサーバへイベントを送出する非同期 callable です。
await send(event_dict) を呼ぶことでレスポンスデータをサーバに返します。
HTTP レスポンスの場合、send は 最低2回 呼び出します。
# 第1回: ステータスとヘッダーの送信
await send({
"type": "http.response.start",
"status": 200,
"headers": [
[b"content-type", b"application/json"],
[b"content-length", b"52"],
],
})
# 第2回: ボディの送信
await send({
"type": "http.response.body",
"body": b'{"id": 42, "name": "Taro", "email": "[email protected]"}',
})
WSGI では start_response("200 OK", headers) でステータスとヘッダーを送り、イテラブルの返却でボディを送るという分離がありました。
ASGI では両方とも send を使いますが、イベントの type によって役割が区別されます。
http.response.start が WSGI の start_response に、http.response.body が WSGI のイテラブル返却に対応します。
警告
send の呼び出し順序には厳格な制約があります。
http.response.startは必ずhttp.response.bodyより前に呼ぶ必要がありますhttp.response.startを2回呼ぶことはできませんhttp.response.bodyの後にhttp.response.startを呼ぶことはできません
これらの制約に違反すると、ASGI サーバは例外を送出するか接続を切断します。
WebSocket の場合、send は以下のイベント型を扱います。
await send({"type": "websocket.accept"}) # 接続受け入れ
await send({"type": "websocket.send", "text": "Hello, client!"}) # テキスト送信
await send({"type": "websocket.send", "bytes": b"\x00\x01"}) # バイナリ送信
await send({"type": "websocket.close", "code": 1000}) # 接続終了
2.7.3. HTTP response start / body
HTTP レスポンスの送信を具体的なコード例で確認しましょう。
http.response.start イベントは以下の構造を持ちます。
{
"type": "http.response.start",
"status": 200, # 整数のステータスコード
"headers": [ # バイト列タプルのリスト
[b"content-type", b"text/html; charset=utf-8"],
[b"content-length", b"45"],
[b"x-custom-header", b"some-value"],
],
}
WSGI の start_response("200 OK", [("Content-Type", "text/html")]) と比較すると、以下の違いがあります。
ステータスコードが理由フレーズを含まない 整数 です
ヘッダーが文字列タプルではなく バイト列タプル です
理由フレーズは HTTP/2 以降では廃止されているため、ASGI では最初から省略されています。
http.response.body イベントは以下の構造を持ちます。
# 一括送信(小さなレスポンス)
{
"type": "http.response.body",
"body": b"<html><body><h1>Hello</h1></body></html>",
}
# ストリーミング送信(大きなレスポンスや段階的な送信)
{
"type": "http.response.body",
"body": b"chunk 1\n",
"more_body": True, # まだ続きがある
}
more_body が True の場合、サーバはクライアントへの接続を維持し、次の http.response.body イベントを待ちます。
more_body が False(デフォルト)の場合、そのイベントが最後のチャンクであり、サーバはレスポンスの送信を完了します。
これを活用したストリーミングレスポンスの実装例を示しましょう。
import asyncio
import json
async def sse_application(scope, receive, send):
await receive()
await send({
"type": "http.response.start",
"status": 200,
"headers": [
[b"content-type", b"text/event-stream"],
[b"cache-control", b"no-cache"],
[b"connection", b"keep-alive"],
],
})
for i in range(10):
data = json.dumps({"count": i, "message": f"Event {i}"})
chunk = f"data: {data}\n\n".encode("utf-8")
await send({
"type": "http.response.body",
"body": chunk,
"more_body": True,
})
await asyncio.sleep(1) # この間、他のリクエストを処理可能
await send({
"type": "http.response.body",
"body": b"data: {\"message\": \"done\"}\n\n",
"more_body": False,
})
WSGI のジェネレータによるストリーミング(Vol.1「WSGI が生まれた背景」)と比較すると、await asyncio.sleep(1) の間にイベントループが他のリクエストを処理できる点が決定的な違いです。
WSGI で time.sleep(1) を使うとワーカー全体がブロックされていました。
2.7.4. disconnect
HTTP 接続の場合、クライアントが途中で接続を切断すると receive が http.disconnect イベントを返します。
Tip
WSGI にはクライアント切断を検知する標準的な手段がありません。
ASGI の http.disconnect イベントにより、アプリケーションはクライアントの離脱を能動的に監視し、不要な処理を中断してリソースの無駄遣いを防げます。
event = await receive()
# → {"type": "http.disconnect"}
このイベントは、たとえばクライアントがブラウザの「戻る」ボタンを押したり、ネットワークが切断されたりした場合に発生します。 長時間かかるレスポンスの生成中にクライアントが離脱したことを検知し、不要な処理を中断するために使います。 以下のコード例では、長い処理とクライアント切断の監視を並行して行う実装を示します。
import asyncio
async def long_processing_app(scope, receive, send):
# ボディを受信
body_event = await receive()
# 長い処理とクライアント切断を並行して監視
async def process():
await asyncio.sleep(10) # 重い処理のシミュレーション
return b"Processing complete!"
async def watch_disconnect():
while True:
event = await receive()
if event["type"] == "http.disconnect":
return None
# 先に完了した方を採用
done, pending = await asyncio.wait(
[asyncio.create_task(process()),
asyncio.create_task(watch_disconnect())],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
result = done.pop().result()
if result is None:
# クライアントが切断済み — レスポンスを送る必要なし
return
await send({
"type": "http.response.start",
"status": 200,
"headers": [[b"content-type", b"text/plain"]],
})
await send({
"type": "http.response.body",
"body": result,
})
WSGI にはクライアント切断を検知する標準的な手段がありません。
ソケットへの書き込み時に BrokenPipeError が発生して初めて気づくか、そもそも気づかないまま処理が完了してしまいます。
WebSocket の切断は websocket.disconnect イベントで通知されます。
code フィールドに WebSocket の終了コードが含まれます。
event = await receive()
# → {"type": "websocket.disconnect", "code": 1000}
終了コードの意味は以下の通りです。
コード |
意味 |
|---|---|
|
正常終了 |
|
クライアントの離脱 |
|
異常切断 |
アプリケーションはこのコードに基づいてクリーンアップ処理を分岐できます。
receive と send のイベント型をまとめます。
HTTP ライフサイクル
──────────────────
receive: http.request → リクエストボディ(1回以上)
receive: http.disconnect → クライアント切断
send: http.response.start → ステータス + ヘッダー(1回)
send: http.response.body → レスポンスボディ(1回以上)
WebSocket ライフサイクル
──────────────────────
receive: websocket.connect → 接続要求
receive: websocket.receive → メッセージ受信(複数回)
receive: websocket.disconnect → 切断
send: websocket.accept → 接続受け入れ
send: websocket.send → メッセージ送信(複数回)
send: websocket.close → 接続終了
WSGI が「1回の関数呼び出しと1回の返却」で完結していたのに対し、ASGI は receive と send の複数回のやり取りでリクエスト/レスポンスのライフサイクルを表現します。
この柔軟性が、HTTP のストリーミングから WebSocket の双方向通信まで、単一のインタフェースで対応できる 理由です。
次節では、これらのイベントを組み合わせて最小の ASGI アプリケーションを実装しましょう。
2.8. ASGI の HTTP モデル
前節で receive と send のイベント型を一通り確認しました。
本節では HTTP に絞り、リクエストボディの受信からレスポンスの送信までのライフサイクルを実装レベルで深掘りします。
WSGI の同期モデルと対比しながら、ASGI の非同期イベント駆動モデルが実際のコードでどう表現されるかを確認していきましょう。
2.8.1. request body の分割受信
Vol.1「WSGI が生まれた背景」で WSGI の environ["wsgi.input"].read(content_length) を使ってリクエストボディを読み取りました。
WSGI ではストリームを一度 read すれば全ボディが得られる前提で書くことが多く、部分受信を意識する必要はサーバ側が吸収していました。
ASGI では、サーバがボディをチャンクに分割してアプリケーションに届ける可能性が 仕様レベルで明示 されています。
小さなリクエスト(たとえば JSON の POST)であれば、1回の receive で全ボディが届くのが一般的です。
async def handle_small_request(scope, receive, send):
event = await receive()
# event = {"type": "http.request", "body": b'{"name": "Taro"}', "more_body": False}
body = event.get("body", b"")
# → b'{"name": "Taro"}'
しかし数メガバイトのファイルアップロードでは、サーバがボディをチャンク単位で送信します。
注釈
Uvicorn のデフォルトでは、リクエストボディは約 65KB ごとに分割されます。
# 1回目の receive
event = await receive()
# → {"type": "http.request", "body": b"<65KB分のデータ>", "more_body": True}
# 2回目の receive
event = await receive()
# → {"type": "http.request", "body": b"<65KB分のデータ>", "more_body": True}
# ...
# 最後の receive
event = await receive()
# → {"type": "http.request", "body": b"<残りのデータ>", "more_body": False}
したがって、あらゆるリクエストに対応するには more_body フラグを監視するループが必要です。
前節で示した read_body ヘルパーを改良し、サイズ上限チェックを加えた堅牢な実装を示しましょう。
class RequestBodyTooLarge(Exception):
pass
async def read_body(receive, max_size=10 * 1024 * 1024):
"""リクエストボディを完全に読み取る。max_size を超えたら例外を送出する。"""
body = b""
while True:
event = await receive()
chunk = event.get("body", b"")
body += chunk
if len(body) > max_size:
raise RequestBodyTooLarge(
f"Request body exceeds {max_size} bytes"
)
if not event.get("more_body", False):
break
return body
この実装は、Vol.1「HTTP は何をやりとりしているのか」で学んだ TCP の部分受信ループ、Vol.1「まずは 1 リクエストだけ処理するサーバを作る」で実装した Content-Length に基づくボディ読み取り、そして Vol.1「HTTP は何をやりとりしているのか」で触れた巨大リクエストへの防御を、ASGI のイベントモデルで表現したものです。
WSGI ではサーバが wsgi.input ストリームとして完全にバッファリングしてくれることが多かったため、アプリケーション側でチャンク処理を意識する機会は少なかったのですが、ASGI ではアプリケーションがこの責務を明示的に引き受けます。
ファイルアップロードのように全体をメモリに蓄積したくない場合は、チャンクごとにディスクに書き出す処理も書けます。
import aiofiles
async def save_upload(receive, dest_path, max_size=50 * 1024 * 1024):
"""受信したボディを逐次ディスクに書き出す。"""
total = 0
async with aiofiles.open(dest_path, "wb") as f:
while True:
event = await receive()
chunk = event.get("body", b"")
total += len(chunk)
if total > max_size:
raise RequestBodyTooLarge(
f"Upload exceeds {max_size} bytes"
)
if chunk:
await f.write(chunk)
if not event.get("more_body", False):
break
return total
全ボディをメモリに保持する read_body と、チャンクごとに処理する save_upload のどちらを使うかは、想定されるボディサイズとユースケースに依存します。
ユースケース |
推奨アプローチ |
|---|---|
JSON API(数KB程度) |
|
ファイルアップロード(数MB以上) |
|
フレームワークはこの選択をリクエストの Content-Type や Content-Length に応じて内部で行っています。
2.8.2. レスポンスの分割送信
レスポンスの送信は http.response.start と http.response.body の2種類のイベントで構成されます。
最もシンプルなケースでは、ステータス/ヘッダーの送信とボディの送信をそれぞれ1回ずつ行います。
まず基本形を確認しましょう。
async def simple_response(scope, receive, send):
await receive() # リクエストボディを消費
body = b'{"message": "Hello, ASGI!"}'
await send({
"type": "http.response.start",
"status": 200,
"headers": [
[b"content-type", b"application/json"],
[b"content-length", str(len(body)).encode()],
],
})
await send({
"type": "http.response.body",
"body": body,
})
レスポンスヘルパーを作ることで、WSGI 時代の make_response と同様にコードの重複を削減できます。
以下のヘルパー関数は、よく使うレスポンス形式をまとめたものです。
async def send_json(send, data, status=200):
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
await send({
"type": "http.response.start",
"status": status,
"headers": [
[b"content-type", b"application/json; charset=utf-8"],
[b"content-length", str(len(body)).encode()],
],
})
await send({
"type": "http.response.body",
"body": body,
})
async def send_text(send, text, status=200):
body = text.encode("utf-8")
await send({
"type": "http.response.start",
"status": status,
"headers": [
[b"content-type", b"text/plain; charset=utf-8"],
[b"content-length", str(len(body)).encode()],
],
})
await send({
"type": "http.response.body",
"body": body,
})
async def send_error(send, status, message):
await send_json(send, {"error": message}, status=status)
これらのヘルパーを使えば、ルーティング付きの ASGI アプリケーションが簡潔に書けます。 先ほどの WSGI ルーティング実装と構造を比べてみましょう。
import json
async def application(scope, receive, send):
if scope["type"] != "http":
return
method = scope["method"]
path = scope["path"]
if path == "/" and method == "GET":
await receive()
await send_text(send, "Welcome to the ASGI app!")
elif path == "/users" and method == "GET":
await receive()
users = [{"id": 1, "name": "Taro"}, {"id": 2, "name": "Hanako"}]
await send_json(send, users)
elif path == "/users" and method == "POST":
body = await read_body(receive)
try:
data = json.loads(body)
except json.JSONDecodeError:
await send_error(send, 400, "Invalid JSON")
return
await send_json(send, {"created": data}, status=201)
else:
await receive()
await send_error(send, 404, f"Not Found: {path}")
Vol.1「WSGI が生まれた背景」で WSGI アプリに書いたルーティング付き実装と構造的にほぼ同じですが、すべての I/O が await で非同期化されている点が異なります。
2.8.3. ストリーミング
ASGI の真価が発揮されるのは、レスポンスを段階的に生成して送信する ストリーミング です。
http.response.body イベントの more_body フラグを True に設定することで、サーバにまだ後続のチャンクがあることを伝えます。
CSV エクスポートの例を示します。 データベースから大量のレコードを取得し、1行ずつ CSV に変換してクライアントに送信します。 メモリ使用量を一定に保てる点が重要なポイントです。
async def csv_export(scope, receive, send):
await receive()
await send({
"type": "http.response.start",
"status": 200,
"headers": [
[b"content-type", b"text/csv; charset=utf-8"],
[b"content-disposition", b'attachment; filename="users.csv"'],
# Content-Length は設定しない(サイズ不明)
],
})
# ヘッダー行
await send({
"type": "http.response.body",
"body": b"id,name,email\n",
"more_body": True,
})
# データ行を逐次送信
async for user in get_users_from_db(): # 非同期イテレータ
line = f"{user.id},{user.name},{user.email}\n".encode("utf-8")
await send({
"type": "http.response.body",
"body": line,
"more_body": True,
})
# 最終チャンク
await send({
"type": "http.response.body",
"body": b"",
"more_body": False,
})
この実装ではレコードの総数に関わらずメモリ使用量が一定です。
100万行の CSV であっても、一度にメモリ上に存在するのは1行分のデータだけです。
WSGI のジェネレータによるストリーミング(Vol.1「WSGI が生まれた背景」の yield パターン)と同じ発想ですが、await を挟めるためデータベースからの非同期読み取りと組み合わせられます。
LLM のトークンストリーミングのように、チャンク間に不定の待ち時間がある場合も ASGI のモデルは自然にフィットします。 次の例を確認してみましょう。
async def llm_stream(scope, receive, send):
body = await read_body(receive)
prompt = json.loads(body).get("prompt", "")
await send({
"type": "http.response.start",
"status": 200,
"headers": [
[b"content-type", b"text/event-stream"],
[b"cache-control", b"no-cache"],
],
})
async for token in generate_tokens(prompt): # LLM からトークンを非同期に受信
chunk = f"data: {json.dumps({'token': token})}\n\n".encode("utf-8")
await send({
"type": "http.response.body",
"body": chunk,
"more_body": True,
})
# await の間、イベントループは他のリクエストを処理可能
await send({
"type": "http.response.body",
"body": b"data: {\"done\": true}\n\n",
"more_body": False,
})
WSGI で同じことを実現しようとすると、ジェネレータ内の time.sleep や同期的な API 呼び出しがワーカーをブロックし、他のリクエストが処理できなくなります。
WSGI |
ASGI |
|
|---|---|---|
10人が同時に LLM ストリーミングを利用 |
10のワーカーが占有される |
1プロセスで10の接続を並行処理 |
I/O 待ち中の挙動 |
ワーカーがブロック |
イベントループが他のリクエストを処理 |
注釈
ストリーミングで Content-Length を設定しない場合、ASGI サーバは自動的に Transfer-Encoding: chunked を適用します。
Vol.1「HTTP は何をやりとりしているのか」で学んだ chunked transfer encoding が、ここで実際に使われています。
more_body: True のチャンクが chunked encoding の各チャンクに、more_body: False が最終チャンク(サイズ0)に対応します。
HTTP のライフサイクル全体を時系列で整理しましょう。
クライアント ASGI サーバ ASGI アプリケーション
│ │ │
│── HTTP リクエスト ──→│ │
│ │── scope 構築 │
│ │── application(scope, receive, send) 呼び出し
│ │ │
│ │ ←── await receive() ────│
│ │── http.request ─────────→│ ボディ受信
│ │ (more_body: True) │
│ │ ←── await receive() ────│
│ │── http.request ─────────→│ 残りのボディ受信
│ │ (more_body: False) │
│ │ │── ビジネスロジック実行
│ │ ←── await send() ───────│
│ │── http.response.start ───│ ステータス+ヘッダー
│ │ ←── await send() ───────│
│ │── http.response.body ────│ ボディチャンク1
│ │ (more_body: True) │
│ │ ←── await send() ───────│
│ │── http.response.body ────│ 最終チャンク
│ │ (more_body: False) │
│ │ │── return
│←── HTTP レスポンス ──│ │
WSGI の「呼び出して返す」直線的なモデルに対し、ASGI は receive と send のイベント交換で非同期にリクエスト/レスポンスを処理します。
この柔軟性が、分割受信、分割送信、ストリーミング、そして次節で扱う WebSocket のすべてを 単一のインタフェースで実現 する基盤になっています。
次節では ASGI の WebSocket モデルを追跡し、HTTP との違いを確認しましょう。
2.9. WebSocket と lifespan
前節まで ASGI の HTTP モデルを追跡してきました。 HTTP は「リクエストを受けてレスポンスを返す」という一方向のやり取りで完結しますが、ASGI が WSGI に対して持つ最大の優位性は、HTTP 以外のプロトコルを同じインタフェースで扱える点にあります。 本節では WebSocket とアプリケーションのライフサイクル管理(lifespan)という、WSGI では不可能だった2つの領域を追跡します。
2.9.1. WebSocket 接続イベント
WebSocket の通信は HTTP のハンドシェイクから始まります。
クライアントが Upgrade: websocket ヘッダー付きの GET リクエストを送信し、サーバが 101 Switching Protocols で応答すると、TCP 接続が WebSocket プロトコルにアップグレードされます。
ASGI サーバはこのハンドシェイクを処理した上で、scope["type"] == "websocket" として application を呼び出します。
WebSocket 接続のライフサイクルは 3つの段階 に分かれます。
接続の確立
メッセージの交換
切断
async def websocket_app(scope, receive, send):
assert scope["type"] == "websocket"
# ── 第1段階: 接続の確立 ──
event = await receive()
# → {"type": "websocket.connect"}
# 接続を受け入れるか拒否するかをここで判断
# 例: 認証トークンの検証
token = dict(scope["headers"]).get(b"authorization", b"").decode()
if not is_valid_token(token):
await send({"type": "websocket.close", "code": 4001})
return
await send({"type": "websocket.accept"})
websocket.connect イベントは、クライアントが WebSocket ハンドシェイクを開始したことを通知します。
この時点ではまだ接続は確立されていません。
アプリケーションは以下のいずれかを選択します。
websocket.acceptを送信して接続を 受け入れるwebsocket.closeを送信して接続を 拒否する
HTTP の場合はリクエストが届いた時点で接続は確立済みですが、WebSocket ではアプリケーションが接続の受け入れを明示的に判断できます。 この設計により、認証が通らないクライアントの接続を WebSocket レベルで拒否でき、不正なクライアントにメッセージを送受信させることを防げます。
websocket.accept を送信する際に、サブプロトコルやレスポンスヘッダーを指定することもできます。
これは GraphQL サブスクリプションなど、特定のプロトコルを使う場合に必要になります。
await send({
"type": "websocket.accept",
"subprotocol": "graphql-ws",
"headers": [
[b"x-server-version", b"1.0"],
],
})
2.9.2. send / receive の継続
接続が確立された後は、receive と send を繰り返し呼び出してメッセージを交換します。
HTTP では receive がボディの受信で終わり、send がレスポンスの送信で終わるという有限のやり取りでしたが、WebSocket では接続が維持される限りメッセージの交換が続きます。
シンプルなエコーサーバを例に、メッセージ交換のループを確認しましょう。
async def echo_server(scope, receive, send):
assert scope["type"] == "websocket"
# 接続受け入れ
connect_event = await receive()
await send({"type": "websocket.accept"})
# ── 第2段階: メッセージ交換ループ ──
while True:
event = await receive()
if event["type"] == "websocket.receive":
# テキストメッセージの場合
if "text" in event:
await send({
"type": "websocket.send",
"text": f"Echo: {event['text']}",
})
# バイナリメッセージの場合
elif "bytes" in event:
await send({
"type": "websocket.send",
"bytes": event["bytes"],
})
elif event["type"] == "websocket.disconnect":
# ── 第3段階: 切断 ──
break
websocket.receive イベントには text キーまたは bytes キーのいずれかが含まれます。
WebSocket プロトコルはテキストフレームとバイナリフレームを区別しており、ASGI もこの区別を維持しています。
JSON メッセージは通常テキストフレームで送受信します
画像やバイナリデータはバイナリフレームを使います
websocket.disconnect イベントはクライアントが接続を閉じたことを通知します。
code フィールドに WebSocket の終了コードが含まれ、正常終了なら 1000、クライアントの離脱なら 1001 です。
このイベントを受け取ったらループを抜け、application 関数を return して終了します。
サーバ側から接続を閉じる場合は websocket.close を送信します。
切断後は application 関数を return して終了します。
# サーバ側から切断
await send({"type": "websocket.close", "code": 1000})
return
より実践的なチャットルームの例を示します。 複数のクライアントが同じルームに接続し、メッセージを共有します。
import json
# ルームごとの接続管理(プロセス内共有)
rooms = {} # {"room1": {send1, send2, ...}, ...}
async def chat_app(scope, receive, send):
assert scope["type"] == "websocket"
# パスからルーム名を取得: /ws/chat/room1/
path_parts = scope["path"].strip("/").split("/")
if len(path_parts) < 3:
connect_event = await receive()
await send({"type": "websocket.close", "code": 4000})
return
room_name = path_parts[2]
# 接続受け入れ
connect_event = await receive()
await send({"type": "websocket.accept"})
# ルームに参加
if room_name not in rooms:
rooms[room_name] = set()
rooms[room_name].add(send)
try:
# 参加通知
await broadcast(room_name, {
"type": "system",
"message": f"New user joined. ({len(rooms[room_name])} online)",
})
while True:
event = await receive()
if event["type"] == "websocket.receive":
data = json.loads(event["text"])
await broadcast(room_name, {
"type": "chat",
"message": data.get("message", ""),
})
elif event["type"] == "websocket.disconnect":
break
finally:
# ルームから退出(切断時に必ず実行)
rooms[room_name].discard(send)
if not rooms[room_name]:
del rooms[room_name]
else:
await broadcast(room_name, {
"type": "system",
"message": f"User left. ({len(rooms.get(room_name, []))} online)",
})
async def broadcast(room_name, message):
"""ルーム内の全クライアントにメッセージを送信"""
text = json.dumps(message)
disconnected = set()
for client_send in rooms.get(room_name, set()):
try:
await client_send({
"type": "websocket.send",
"text": text,
})
except Exception:
disconnected.add(client_send)
# 送信失敗した接続を除去
for s in disconnected:
rooms.get(room_name, set()).discard(s)
このチャットルームの実装で注目すべき点がいくつかあります。
各 WebSocket 接続の
sendcallable を集合に保持することでブロードキャストが可能になっています(sendはサーバが接続ごとに生成する callable であり、それ自体がその接続へのメッセージ送信チャネルです)try ... finallyブロックにより、例外や切断が発生しても必ずルームからの退出処理が実行されますawait receive()でメッセージを待っている間、イベントループは他の接続のメッセージ処理やブロードキャストを進められるため、50人がチャットルームに接続していても、メッセージを待っている接続は CPU リソースを消費しません
WSGI ではこのような双方向通信をフレームワーク内で実現する手段がなく、Django Channels のような外部パッケージや別プロセスの WebSocket サーバが必要でした。 ASGI ではフレームワークの中に自然に統合できます。
2.9.3. アプリ起動・終了イベント
scope["type"] == "lifespan" は、アプリケーションプロセスの起動時と終了時に一度ずつ呼ばれる特殊なスコープです。
HTTP や WebSocket のようなクライアントからの接続ではなく、ASGI サーバ自体がアプリケーションのライフサイクルを管理するために使います。
重要
lifespan イベントは、リソースの初期化と解放の確実性を保証します。
初期化が完了するまでリクエストを受け付けません
終了処理が完了してからプロセスを停止します
async def application(scope, receive, send):
if scope["type"] == "lifespan":
await handle_lifespan(scope, receive, send)
elif scope["type"] == "http":
await handle_http(scope, receive, send)
elif scope["type"] == "websocket":
await handle_websocket(scope, receive, send)
async def handle_lifespan(scope, receive, send):
while True:
event = await receive()
if event["type"] == "lifespan.startup":
try:
# アプリケーション起動時の初期化処理
await initialize_db_pool()
await initialize_cache()
await send({"type": "lifespan.startup.complete"})
except Exception as e:
await send({
"type": "lifespan.startup.failed",
"message": str(e),
})
return
elif event["type"] == "lifespan.shutdown":
# アプリケーション終了時のクリーンアップ
await close_db_pool()
await close_cache()
await send({"type": "lifespan.shutdown.complete"})
return
lifespan イベントのフローは以下の通りです。
ASGI サーバ起動
│
├── application(scope={"type": "lifespan"}, receive, send) 呼び出し
│
├── receive() → {"type": "lifespan.startup"}
│ │
│ ├── 初期化成功 → send({"type": "lifespan.startup.complete"})
│ │ → サーバがリクエストの受け付けを開始
│ │
│ └── 初期化失敗 → send({"type": "lifespan.startup.failed"})
│ → サーバがエラーで停止
│
├── (リクエスト処理が続く...)
│
├── SIGTERM / Ctrl+C 受信
│
├── receive() → {"type": "lifespan.shutdown"}
│ │
│ └── send({"type": "lifespan.shutdown.complete"})
│ → クリーンアップ完了、サーバ停止
│
└── application return
lifespan が解決する問題は、リソースの初期化と解放の確実性です。
WSGI ではデータベース接続プールの初期化がモジュールのインポート時やフレームワークの setup() 内で暗黙的に行われ、クリーンアップ処理は atexit ハンドラやシグナルハンドラに頼っていました。
ASGI の lifespan はこれらを明示的なイベントとして定義することで、起動・停止の順序を確実に保証します。
FastAPI は lifespan を活用するための簡潔な API を提供しています。
asynccontextmanager パターンを使うと、startup と shutdown を一つの関数に書けます。
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app):
# startup: yield の前
db_pool = await create_db_pool()
app.state.db_pool = db_pool
print("Database pool initialized")
yield # アプリケーションが稼働中
# shutdown: yield の後
await db_pool.close()
print("Database pool closed")
app = FastAPI(lifespan=lifespan)
yield の前がスタートアップ処理、後がシャットダウン処理に対応します。
この asynccontextmanager パターンは、内部的には lifespan イベントの送受信に変換されます。
注釈
Django の ASGI モードでは、django.setup() が起動時の初期化を担当します。
Django 自体は lifespan イベントの明示的なフックを公開していませんが、ASGI サーバが lifespan をサポートしていれば、カスタムの ASGI ミドルウェアで lifespan を処理した上で Django の ASGIHandler にその他のイベントを委譲するという構成が可能です。
HTTP、WebSocket、lifespan の3つのプロトコルを表で整理します。
scope type |
ライフサイクル |
receive イベント |
send イベント |
|---|---|---|---|
|
1リクエスト = 1呼び出し |
|
|
|
接続〜切断 |
|
|
|
プロセス起動〜停止 |
|
|
WSGI が HTTP の同期的なリクエスト/レスポンスしか扱えなかったのに対し、ASGI は3つのプロトコルを scope["type"] による分岐と、receive/send のイベント交換という統一的なモデルで表現しています。
次節では ASGI ミドルウェアの実装方法を確認し、WSGI ミドルウェアとの構造の違いを比較しましょう。
2.10. WSGI と ASGI の比較
Vol.1「WSGI が生まれた背景」で WSGI を、本章で ASGI を学んできました。 両者はともに「サーバとアプリケーションの間のインタフェース」を定義するという同じ目的を持ちながら、設計思想が大きく異なります。 本節では 4つの観点 から両者を比較し、どちらを選ぶべきかの判断基準を整理します。
2.10.1. シンプルさ vs 柔軟さ
WSGI の最小アプリケーションと ASGI の最小アプリケーションを改めて並べてみましょう。
# WSGI — 3行
def application(environ, start_response):
start_response("200 OK", [("Content-Type", "text/plain")])
return [b"Hello"]
# ASGI — 8行
async def application(scope, receive, send):
await receive()
await send({
"type": "http.response.start",
"status": 200,
"headers": [[b"content-type", b"text/plain"]],
})
await send({"type": "http.response.body", "body": b"Hello"})
行数の差は些細に見えますが、背後にある設計の違いは本質的です。
WSGI は「呼ばれて返す」という関数の最も基本的なパターンに従っています。
environから入力を読み、start_responseでヘッダーを宣言し、イテラブルでボディを返しますPython プログラマーにとって馴染みのある同期的な制御フローです
printやpdbで簡単にデバッグできます
ASGI は「イベントを送受信する」というメッセージパッシングのパターンを採用しています。
receiveでリクエストデータを受け取り、sendでレスポンスデータを送り出しますイベント辞書の
typeキーでプロトコルや処理段階を区別しますHTTP、WebSocket、lifespan という異なるプロトコルを統一的に扱う柔軟性をもたらします
単純な HTTP レスポンスを返すだけでもイベントの組み立てと送信順序を意識する必要があります
WSGI |
ASGI |
|
|---|---|---|
シンプルさ |
高い(制約の裏返し) |
低い(柔軟さの裏返し) |
設計の問題点 |
レスポンス後に通信を続ける手段がない |
|
この関係は、Vol.1「本書の対象読者とゴール」で触れた「抽象化を一段ずつ剥がす」という学び方の方針そのものです。 WSGI で十分な要件であれば WSGI のシンプルさを享受し、WSGI では対応できない要件が出てきたときに ASGI の柔軟さに移行する。 両方の仕様を理解していれば、この判断を根拠を持って下せます。
2.10.2. sync vs async
WSGI と ASGI の 最も根本的な違い は、同期と非同期の処理モデルです。
WSGI の application は同期関数です。
関数が return するまでの間、その関数を呼び出したワーカー(プロセスまたはスレッド)は他の処理に使えません。
# WSGI — I/O待ち中にワーカーがブロックされる
def application(environ, start_response):
user = requests.get("https://api.example.com/users/1").json() # 200ms ブロック
orders = requests.get("https://api.example.com/orders/1").json() # 150ms ブロック
# 合計 350ms、ワーカーは何もできない
...
ASGI の application は非同期関数です。
await のタイミングでイベントループに制御を返し、I/O の完了を待つ間に 他のリクエストの処理を進められます。
# ASGI — I/O待ち中に他のリクエストを処理可能
async def application(scope, receive, send):
async with httpx.AsyncClient() as client:
user_task = client.get("https://api.example.com/users/1")
orders_task = client.get("https://api.example.com/orders/1")
user_resp, orders_resp = await asyncio.gather(user_task, orders_task)
# 2つのリクエストを並行発行 → 合計時間は遅い方の200msに近づく
...
この違いは同時接続数とリソース効率に直結します。
100の同時リクエストがそれぞれ200msのI/O待ちを持つ場合、WSGI では100のワーカー(プロセスまたはスレッド)が必要です
ASGI では1つのワーカープロセス内のイベントループが100の接続を並行処理できます
メモリ消費量の差は、特にワーカー数が多い場合に顕著になります。
注意
非同期モデルには注意点があります。
CPU バウンドな処理(画像処理、暗号計算、大量のデータ変換など)は await で制御を返せないため、イベントループ全体をブロックします。
WSGI のプロセスモデルでは1つのワーカーがブロックされても他のワーカーは影響を受けませんが、ASGI のイベントループモデルでは1つの CPU バウンド処理がそのイベントループ上の 全接続に影響 します。
# ASGI での CPU バウンド処理 — イベントループをブロックする
async def application(scope, receive, send):
result = heavy_computation() # await がない → イベントループがブロック
...
# 対策: スレッドプールに逃がす
import asyncio
async def application(scope, receive, send):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, heavy_computation)
...
Django が ASGI モードでも同期ビューをサポートしているのは、この現実を踏まえた設計です。
同期ビューはスレッドプールで実行され、イベントループをブロックしません。
既存の同期コード資産を活かしながら、必要な部分だけを async def で書くという 段階的な移行 が可能です。
2.10.3. 対応プロトコル
WSGI が扱えるのは HTTP のリクエスト/レスポンスだけです。
1回の application 呼び出しが1つの HTTP トランザクションに対応し、それ以外のプロトコルを表現する手段がありません。
ASGI は scope["type"] によるプロトコルの切り替えにより、HTTP、WebSocket、lifespan の3つを標準でサポートしています。
さらに、仕様は拡張可能な設計になっており、将来的に HTTP/2 Server Push や HTTP/3 といったプロトコルが追加される可能性もあります。
機能 |
WSGI |
ASGI |
|---|---|---|
HTTP リクエスト/レスポンス |
○ |
○ |
HTTP ストリーミング |
△(ワーカー占有の問題あり) |
○ |
WebSocket |
✕ |
○ |
Server-Sent Events |
△(非効率) |
○ |
lifespan 管理 |
✕ |
○ |
双方向通信 |
✕ |
○ |
WSGI でも StreamingHttpResponse やジェネレータを使ったストリーミングは形式上可能ですが、2 章(なぜ ASGI が必要になったのか)で解説した通り、ワーカーが接続中ずっと占有される問題があります。
ASGI では await の間にイベントループが他の処理を進められるため、ストリーミングと並行処理が両立します。
この違いが最も顕著になるのは、リアルタイム性が求められるアプリケーションです。
ASGI が必須 — チャット、通知、ライブダッシュボード、LLM のトークンストリーミング
WSGI で十分 — 管理画面、CRUD API、バッチ処理のような伝統的な Web アプリケーション
2.10.4. 学習コスト
WSGI の学習コストは 低い です。
application(environ, start_response)という関数シグネチャを覚えますenvironのキー名とstart_responseの呼び出し方を把握すれば最小のアプリケーションを書けますPEP 3333 の仕様は短く、同期 Python の知識だけで理解できます
ASGI の学習コストは WSGI より明らかに 高い です。
async/awaitとasyncioの基本を理解した上でscope/receive/sendの3引数モデルイベント辞書の
typeによる分岐HTTP と WebSocket で異なるイベントのライフサイクル
more_bodyフラグによるチャンク管理lifespan イベントの処理
といった ASGI 固有の概念を習得する必要があります。
Tip
本書をここまで読んだ読者であれば、この学習コストの大部分はすでに支払い済みです。
Vol.1「HTTP は何をやりとりしているのか」で HTTP のテキスト構造を理解し、Vol.1「まずは 1 リクエストだけ処理するサーバを作る」で自作サーバを書き、Vol.1「WSGI が生まれた背景」で WSGI の仕様を学んだ上で ASGI を見ると、以下の対応関係が見えてきます。
scopeはenvironの再設計receiveはwsgi.inputの非同期化sendはstart_responseとイテラブル返却の統合
低レイヤの理解があるからこそ、抽象度が上がっても「何が起きているか」を追跡できるのです。
実際の開発では、ASGI の生のインタフェースを直接扱うことは稀です。
Django は ASGIHandler が、FastAPI は Starlette がこのインタフェースを隠蔽し、開発者は async def view(request) や @app.get("/users") のようなフレームワークレベルの API を使います。
ASGI の仕様を知ることの価値は、フレームワークを使う際の効率ではなく、問題が起きた際にフレームワークの内側を追跡できる力 にあります。
本章の最後に、WSGI と ASGI の選択基準をまとめます。
WSGI を継続することが合理的な場合:
既存の Django アプリケーションが同期的な処理で問題なく動作している
WebSocket やリアルタイム通信の要件がない
I/O 待ちもワーカー数の増加で対処できる範囲である
デプロイ構成がシンプルで、エコシステムの成熟度も高く、トラブルシューティングの知見も豊富です。
ASGI が適切な場合:
非同期 I/O による効率化が必要である
WebSocket や SSE を使ったリアルタイム通信が求められる
LLM のストリーミング応答のように長時間接続が多い
新規プロジェクトで FastAPI を採用する
どちらか一方だけを知っていれば十分ということはありません。
Django は WSGI と ASGI の両方をサポートしており、本番環境の要件によってどちらを使うかが変わります。
両方の仕様を理解していれば、プロジェクトの要件に応じた正しい選択ができ、問題が起きたときにどの層で何が起きているかを追跡できます。
次節では、ここまで学んだ ASGI の知識を実際の問題解決に応用します。async def の中でのブロッキング呼び出し、イベントループの停滞、WebSocket の切断処理といった ASGI 固有のトラブルを取り上げます。
2.11. トラブルシューティングの観点
2 章(なぜ ASGI が必要になったのか)を通じて ASGI の仕様を追跡してきました。 本節ではその知識を実際の問題解決に応用します。 ASGI 固有のトラブルは 2つのカテゴリ に大別されます。
非同期プログラミングの落とし穴 — ブロッキング処理の混入など
ASGI プロトコルの制約違反 — イベント送信順序の誤りなど
いずれもフレームワークが多くの部分を隠蔽してくれますが、隠蔽の裏側で起きている問題を理解していなければ、原因の特定に苦労します。
2.11.1. async 関数なのに blocking
ASGI アプリケーションで 最も頻出し、かつ最も発見しにくい バグが、async def の中で同期的なブロッキング処理を呼び出してしまうケースです。
# 一見問題なさそうに見えるが、イベントループ全体をブロックする
@app.get("/users/{user_id}")
async def user_detail(user_id: int):
user = requests.get(f"https://api.example.com/users/{user_id}").json() # NG
return user
この関数は async def で定義されているため、ASGI サーバのイベントループ上で直接実行されます。
しかし requests.get は同期ライブラリであり、レスポンスが返るまでスレッド全体をブロックします。
イベントループがブロックされると、そのイベントループ上で処理中の他のすべてのリクエスト、WebSocket 接続、タイマーが停止します。
外部 API が200ms かかる場合、その200msの間に 他の全クライアントが応答を待たされます。
注意
このバグが開発環境では発覚しにくい点に注意が必要です。 同時接続が1つしかなければブロッキングの影響は自分自身だけに閉じます。 本番環境で同時接続数が増えたときに初めて、レスポンスタイムの劣化やタイムアウトとして顕在化します。
原因の特定には、イベントループのブロッキングを検出する asyncio のデバッグモードが有効です。
以下の設定でブロッキング処理を検出できます。
# 環境変数で有効化
# PYTHONASYNCIODEBUG=1 uvicorn main:app
# または起動時に設定
import asyncio
asyncio.get_event_loop().slow_callback_duration = 0.1 # 100ms以上のブロッキングを警告
これを有効にすると、イベントループが100ms以上ブロックされた場合にログに警告が出力されます。
対処法は3つあります。
方法 1: 非同期ライブラリに置き換える
requests を httpx に、psycopg2 を asyncpg に、open() を aiofiles に置き換えます。
import httpx
@app.get("/users/{user_id}")
async def user_detail(user_id: int):
async with httpx.AsyncClient() as client:
resp = await client.get(f"https://api.example.com/users/{user_id}")
return resp.json()
方法 2: スレッドプールに逃がす
非同期版が存在しないライブラリや、変更コストが高い既存コードに対して有効です。
import asyncio
from functools import partial
@app.get("/users/{user_id}")
async def user_detail(user_id: int):
loop = asyncio.get_event_loop()
user = await loop.run_in_executor(
None, # デフォルトのスレッドプール
partial(requests.get, f"https://api.example.com/users/{user_id}")
)
return user.json()
方法 3: ビュー関数を def(同期関数)として定義する
Django と FastAPI はいずれも、async def ではなく def で定義されたビューをスレッドプールで実行する仕組みを持っています。
# FastAPI: def で定義すると自動的にスレッドプールで実行される
@app.get("/users/{user_id}")
def user_detail(user_id: int):
user = requests.get(f"https://api.example.com/users/{user_id}").json()
return user # イベントループをブロックしない
重要
「async def にしたほうが速い」とは限りません。
内部で同期 I/O を使うなら def のままにしておく方が安全です。
async def は非同期ライブラリと await を使う場合にのみ意味があります。
2.11.2. イベントを返し忘れる
ASGI プロトコルでは send の呼び出し順序と回数に厳格な制約があります。
これを守らないと、クライアントにレスポンスが届かなかったり、ASGI サーバがエラーを出したりします。
最も多いのは http.response.start を送らずに http.response.body を送るケースです。
# NG: ステータスとヘッダーが未送信
async def application(scope, receive, send):
await receive()
await send({
"type": "http.response.body",
"body": b"Hello",
})
# → サーバが例外を送出するか、クライアントに空レスポンスが届く
逆に、http.response.start だけ送って http.response.body を送らない場合も問題になります。
# NG: ボディが未送信
async def application(scope, receive, send):
await receive()
await send({
"type": "http.response.start",
"status": 200,
"headers": [[b"content-type", b"text/plain"]],
})
# application が return してしまう → クライアントは応答を待ち続ける
条件分岐の中でレスポンスを送り忘れるパターンも頻出します。
# NG: else 節でレスポンスを送っていない
async def application(scope, receive, send):
await receive()
path = scope["path"]
if path == "/":
await send_text(send, "Welcome!")
elif path == "/about":
await send_text(send, "About page")
# /other にアクセスすると application が何も送らずに return
注釈
フレームワークを使っていればこの問題はほぼ発生しません。
Django の ASGIHandler や FastAPI/Starlette は、ビュー関数の戻り値を必ず http.response.start と http.response.body に変換します。
未処理例外が発生しても500レスポンスが自動生成されます。
しかしカスタムの ASGI ミドルウェアを書く場合や、フレームワークを使わずに ASGI アプリケーションを直接実装する場合には、すべてのコードパスでレスポンスが送信されることを確認する必要があります。
防御的な実装として、レスポンスの送信を保証するラッパーを書く方法があります。
ただし http.response.start 送信後に例外が発生した場合は、http.response.start を二重に送れないため接続を切断するしかありません。
async def safe_application(scope, receive, send):
try:
await application(scope, receive, send)
except Exception:
import traceback
traceback.print_exc()
body = b"Internal Server Error"
await send({
"type": "http.response.start",
"status": 500,
"headers": [
[b"content-type", b"text/plain"],
[b"content-length", str(len(body)).encode()],
],
})
await send({
"type": "http.response.body",
"body": body,
})
この制約は WSGI でも同様で、start_response の後に例外が発生した場合の exc_info 引数による再設定と同じ問題領域です。
2.11.3. body の受信ループ漏れ
2.8 章(ASGI の HTTP モデル)で more_body フラグを監視するループの重要性を解説しましたが、このループを省略してしまうバグは実装時に起きがちです。
# NG: 1回の receive で全ボディが届く前提
async def application(scope, receive, send):
event = await receive()
body = event.get("body", b"")
data = json.loads(body) # ボディが不完全だと JSONDecodeError
...
注意
小さなリクエストボディ(数KB以下の JSON)では1回の receive で全データが届くことが多いため、開発中にこのバグが発覚しないことがあります。
本番環境で大きなリクエスト(画像アップロード、大量のフォームデータ、長い JSON ペイロード)が送信されたときに初めて問題が顕在化します。
json.loads が不完全な JSON を受け取って JSONDecodeError を送出するか、最悪の場合はデータが切り詰められたまま処理が進みます。
正しい実装は前節で示した read_body ヘルパーを使うことです。
more_body を監視するループが内部に含まれているため、チャンク分割を意識せずにボディ全体を安全に取得できます。
async def application(scope, receive, send):
body = await read_body(receive) # more_body を監視して完全に受信
data = json.loads(body)
...
注釈
フレームワークを使う場合、FastAPI の Request.body() や Django の request.body は内部で more_body ループを実装しているため、開発者がこの問題を意識する必要はありません。
しかしカスタムミドルウェアで receive を直接呼ぶ場合は注意が必要です。
もう一つの落とし穴は、ミドルウェアで receive を消費した後にアプリケーションに渡すケースです。
# NG: ミドルウェアが receive を消費してしまう
class LoggingMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
body = await read_body(receive) # receive を完全に消費
print(f"Request body: {body}")
await self.app(scope, receive, send)
# → アプリケーションが receive を呼ぶと http.disconnect が返る
WSGI では environ["wsgi.input"] の同じ問題がありました(1.9 章(トラブルシューティングの観点)参照)。
ASGI では receive を再利用可能にするために、消費済みのボディを返す 新しい receive を作成して渡す必要があります。
# OK: 消費済みボディを再利用可能にする
class LoggingMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
body = await read_body(receive)
print(f"Request body: {body}")
# ボディを再提供する新しい receive を作成
body_sent = False
async def cached_receive():
nonlocal body_sent
if not body_sent:
body_sent = True
return {"type": "http.request", "body": body, "more_body": False}
return await receive() # disconnect 等は元の receive から取得
await self.app(scope, cached_receive, send)
else:
await self.app(scope, receive, send)
2.11.4. disconnect の扱い
クライアントが接続を切断した後にレスポンスを送信しようとすると、ASGI サーバの実装によって異なる挙動が生じます。
例外が送出される場合
黙って送信が無視される場合
ログにエラーが記録される場合
長時間の処理を伴うエンドポイントでこの問題が顕在化します。
@app.get("/slow-report")
async def slow_report():
data = await generate_heavy_report() # 30秒かかる
return JSONResponse(data)
# → ユーザーが10秒で離脱した場合、残り20秒は無駄な処理
2.7 章(receive / send を理解する)で示した http.disconnect の監視パターンを使えば、クライアントの離脱を検知して処理を中断できます。
しかしフレームワークのビュー関数内で receive を直接呼ぶのは一般的ではないため、実践的にはフレームワークが提供する仕組みを利用します。
FastAPI/Starlette では Request.is_disconnected() メソッドが利用可能です。
ループ内で定期的にチェックすることで、不要な処理を中断できます。
from starlette.requests import Request
@app.get("/slow-report")
async def slow_report(request: Request):
for chunk in processing_steps:
if await request.is_disconnected():
# クライアントが離脱済み → 処理を中断
return Response(status_code=499) # Nginx の慣例的なコード
await process_chunk(chunk)
return JSONResponse(result)
WebSocket の切断処理はより明確です。
websocket.disconnect イベントを受け取ったらループを抜けてリソースを解放します。
ただし receive を待っている最中に切断されるケースだけでなく、send を呼んだ際に切断が検出されるケースもあります。
async def websocket_handler(scope, receive, send):
await receive() # websocket.connect
await send({"type": "websocket.accept"})
try:
while True:
event = await receive()
if event["type"] == "websocket.disconnect":
break
# メッセージ処理
try:
await send({"type": "websocket.send", "text": "response"})
except Exception:
# 送信時に切断を検出 → ループを抜ける
break
finally:
# リソースのクリーンアップ(ルームからの退出など)
await cleanup()
Tip
try ... finally パターンは WebSocket アプリケーションにおける鉄則です。
正常な切断、異常な切断、サーバ側の例外のいずれの場合でも、クリーンアップ処理(チャットルームからの退出、サブスクリプションの解除、一時リソースの解放)が確実に実行されます。
本章で取り上げた4つのトラブルをまとめます。
トラブル |
原因 |
対処法 |
|---|---|---|
async 関数なのに blocking |
同期ライブラリをそのまま使用 |
非同期ライブラリへの置き換え、またはスレッドプールへの逃避 |
イベントを返し忘れる |
|
すべてのコードパスにレスポンス送信を保証 |
body の受信ループ漏れ |
|
|
disconnect の扱い |
長寿命接続のリソース管理不備 |
|
WSGI の同期モデルではこれらの問題の多くが構造的に存在しませんでした。ASGI の柔軟性は、これらの新しい種類のバグと引き換えに得られるものです。
次章では ASGI ミドルウェアの実装を確認し、4 章(FastAPI を ASGI 視点で見る)で FastAPI がこれらの ASGI の複雑さをどのように隠蔽しているかを内部から追跡します。