付録B 最小 ASGI アプリ集
ASGI(Asynchronous Server Gateway Interface)は WSGI の非同期版であり、HTTP だけでなく WebSocket やサーバのライフサイクルイベントも統一的に扱えます。 本付録では、付録 A の WSGI アプリと対比しながら、最小限の ASGI アプリを段階的に構築します。
HTTP
ASGI アプリケーションの最小形は、scope, receive, send の 3 引数を取る非同期 callable です。
scope はリクエストのメタ情報(パス、メソッド、ヘッダーなど)を持つ辞書で、receive と send はそれぞれボディの受信・レスポンスの送信に使う非同期関数です。
async def application(scope, receive, send):
assert scope["type"] == "http"
body = b"Hello, ASGI World!\n"
await send({
"type": "http.response.start",
"status": 200,
"headers": [
[b"content-type", b"text/plain; charset=utf-8"],
[b"content-length", str(len(body)).encode()],
],
})
await send({
"type": "http.response.body",
"body": body,
})
uvicorn app_asgi:application --bind 127.0.0.1:8000 で起動し、curl http://127.0.0.1:8000/ で動作を確認できます。
WSGI との最大の違いは、レスポンスが start_response + return [bytes] ではなく、http.response.start と http.response.body という 2 段階の send 呼び出しで構成される点です。
この分離により、ストリーミングレスポンス("more_body": True)や Server-Sent Events のような段階的送信も自然に表現できます。
また、scope に含まれる情報は WSGI の environ と対応しますが、キー名が整理されています。
scope["method"] がリクエストメソッド、scope["path"] がパス、scope["query_string"] がクエリ文字列(バイト列)、scope["headers"] がヘッダーのリスト(各要素が [name, value] のバイト列タプル)です。
async def echo_app(scope, receive, send):
assert scope["type"] == "http"
method = scope["method"]
path = scope["path"]
query = scope.get("query_string", b"").decode()
body = f"Method: {method}\nPath: {path}\nQuery: {query}\n".encode()
await send({
"type": "http.response.start",
"status": 200,
"headers": [
[b"content-type", b"text/plain; charset=utf-8"],
[b"content-length", str(len(body)).encode()],
],
})
await send({
"type": "http.response.body",
"body": body,
})
body 受信
WSGI では environ["wsgi.input"].read() でリクエストボディを一括取得しましたが、ASGI ではボディは receive を繰り返し呼び出してチャンク単位で受信します。
これは大きなファイルアップロードやストリーミングリクエストに対応するための設計です。
async def read_body(receive):
"""リクエストボディを全て読み取るヘルパー"""
chunks = []
while True:
message = await receive()
chunks.append(message.get("body", b""))
if not message.get("more_body", False):
break
return b"".join(chunks)
receive() が返す辞書には "type": "http.request"、"body" (バイト列)、"more_body"(まだ続くかどうかの真偽値)が含まれます。
more_body が False(または欠落)のとき、ボディの受信が完了です。
この仕組みを使った JSON エコーサーバの例を示します。
import json
async def json_echo(scope, receive, send):
assert scope["type"] == "http"
if scope["method"] != "POST":
await send_json(send, 405, {"error": "Method Not Allowed"})
return
raw = await read_body(receive)
try:
data = json.loads(raw)
except (json.JSONDecodeError, UnicodeDecodeError):
await send_json(send, 400, {"error": "Invalid JSON"})
return
response = {"received": data, "path": scope["path"]}
await send_json(send, 200, response)
async def send_json(send, status, data):
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
await send({
"type": "http.response.start",
"status": status,
"headers": [
[b"content-type", b"application/json; charset=utf-8"],
[b"content-length", str(len(body)).encode()],
],
})
await send({
"type": "http.response.body",
"body": body,
})
テストは curl -X POST -H "Content-Type: application/json" -d '{"name":"test"}' http://127.0.0.1:8000/ で行えます。
注意すべき点として、receive は一度しか消費できないため、ミドルウェアでボディを読み取った場合、後段のアプリケーションにボディを渡すには再構築した receive 関数を差し替える必要があります。
Starlette の Request オブジェクトはこの処理を内部で行い、キャッシュしたボディを返すようになっています。
WebSocket
ASGI が WSGI に対して持つ最大の優位性が、WebSocket のネイティブサポートです。
WebSocket 接続も同じ application(scope, receive, send) シグネチャで処理しますが、scope["type"] が "websocket" になります。
async def websocket_echo(scope, receive, send):
assert scope["type"] == "websocket"
# 1. 接続要求を受け取る
event = await receive()
assert event["type"] == "websocket.connect"
# 2. 接続を受理する
await send({"type": "websocket.accept"})
# 3. メッセージのやり取り
while True:
event = await receive()
if event["type"] == "websocket.disconnect":
break
if event["type"] == "websocket.receive":
text = event.get("text", "")
await send({
"type": "websocket.send",
"text": f"Echo: {text}",
})
WebSocket の ASGI イベントフローは次のようになります。
クライアントが接続すると websocket.connect イベントが receive から届き、サーバは websocket.accept を send して接続を確立します。
以降は websocket.receive(テキストまたはバイナリ)と websocket.send でメッセージを交換し、切断時に websocket.disconnect が届きます。
サーバ側から切断する場合は websocket.close を send します。
HTTP と WebSocket の両方を扱うアプリケーションは、scope["type"] で分岐します。
async def application(scope, receive, send):
if scope["type"] == "http":
await http_handler(scope, receive, send)
elif scope["type"] == "websocket":
await websocket_echo(scope, receive, send)
elif scope["type"] == "lifespan":
await lifespan_handler(scope, receive, send)
テストには websocat ws://127.0.0.1:8000/ws や Python の websockets ライブラリが便利です。
この分岐構造こそが、Starlette や FastAPI の Router が内部で行っていることの原型です。
lifespan
ASGI には lifespan プロトコルという、サーバの起動・終了時に一度だけ呼ばれるイベントの仕組みがあります。 DB コネクションプールの初期化・破棄、キャッシュのウォームアップ、バックグラウンドタスクの開始・停止など、アプリケーション全体のライフサイクル管理に使います。
async def application(scope, receive, send):
if scope["type"] == "lifespan":
while True:
message = await receive()
if message["type"] == "lifespan.startup":
# 初期化処理
try:
await setup_database()
await send({"type": "lifespan.startup.complete"})
except Exception:
await send({
"type": "lifespan.startup.failed",
"message": "Database connection failed",
})
return
elif message["type"] == "lifespan.shutdown":
# 終了処理
await cleanup_database()
await send({"type": "lifespan.shutdown.complete"})
return
elif scope["type"] == "http":
await http_handler(scope, receive, send)
lifespan のイベントフローは、サーバ起動時に lifespan.startup が送られ、アプリが初期化に成功すれば lifespan.startup.complete を返し、失敗すれば lifespan.startup.failed を返してサーバが起動を中止します。
サーバ停止時には lifespan.shutdown が送られ、アプリがクリーンアップを終えたら lifespan.shutdown.complete を返します。
実用的な例として、httpx.AsyncClient の共有インスタンスを lifespan で管理するパターンを示します。
# app_state をモジュールレベルで保持
app_state = {}
async def lifespan_handler(scope, receive, send):
while True:
message = await receive()
if message["type"] == "lifespan.startup":
import httpx
app_state["http_client"] = httpx.AsyncClient(timeout=10.0)
await send({"type": "lifespan.startup.complete"})
elif message["type"] == "lifespan.shutdown":
await app_state["http_client"].aclose()
await send({"type": "lifespan.shutdown.complete"})
return
FastAPI ではこの仕組みが @app.on_event("startup") / @app.on_event("shutdown") デコレータ(旧 API)や、lifespan コンテキストマネージャ(推奨 API)として抽象化されています。
from contextlib import asynccontextmanager
from fastapi import FastAPI
import httpx
@asynccontextmanager
async def lifespan(app):
app.state.http_client = httpx.AsyncClient(timeout=10.0)
yield
await app.state.http_client.aclose()
app = FastAPI(lifespan=lifespan)
この FastAPI のコードが生の ASGI lifespan プロトコルをどれだけ簡潔に包んでいるかが分かると、フレームワークの抽象化の価値と、その裏で何が起きているかの両方を理解できるようになります。
付録 A(WSGI)と付録 B(ASGI)の対比まとめ
WSGI と ASGI の対応関係を示すと、インターフェースは callable(environ, start_response) → bytes 対 async callable(scope, receive, send)、ボディ読み取りは environ["wsgi.input"].read() 対 await receive() のループ、レスポンスは start_response + return 対 send の 2 段階呼び出し、ミドルウェアは callable のラップ対 scope/receive/send の差し替え、WebSocket は非対応対 scope["type"] == "websocket" による統一処理、ライフサイクルは非対応対 lifespan プロトコルによる起動・終了管理、という違いがあります。
両付録のコードをすべて手元で動かし、print や logging を挟みながらイベントの流れを追体験することで、フレームワークの「魔法」が具体的なプロトコル操作の積み重ねであることを体感できます。