# 付録B 最小 ASGI アプリ集 ASGI(Asynchronous Server Gateway Interface)は WSGI の非同期版であり、HTTP だけでなく WebSocket やサーバのライフサイクルイベントも統一的に扱えます。 本付録では、付録 A の WSGI アプリと対比しながら、最小限の ASGI アプリを段階的に構築します。 --- ## HTTP ASGI アプリケーションの最小形は、`scope`, `receive`, `send` の 3 引数を取る非同期 callable です。 `scope` はリクエストのメタ情報(パス、メソッド、ヘッダーなど)を持つ辞書で、`receive` と `send` はそれぞれボディの受信・レスポンスの送信に使う非同期関数です。 ```python async def application(scope, receive, send): assert scope["type"] == "http" body = b"Hello, ASGI World!\n" await send({ "type": "http.response.start", "status": 200, "headers": [ [b"content-type", b"text/plain; charset=utf-8"], [b"content-length", str(len(body)).encode()], ], }) await send({ "type": "http.response.body", "body": body, }) ``` `uvicorn app_asgi:application --bind 127.0.0.1:8000` で起動し、`curl http://127.0.0.1:8000/` で動作を確認できます。 WSGI との最大の違いは、レスポンスが `start_response` + `return [bytes]` ではなく、`http.response.start` と `http.response.body` という 2 段階の `send` 呼び出しで構成される点です。 この分離により、ストリーミングレスポンス(`"more_body": True`)や Server-Sent Events のような段階的送信も自然に表現できます。 また、`scope` に含まれる情報は WSGI の `environ` と対応しますが、キー名が整理されています。 `scope["method"]` がリクエストメソッド、`scope["path"]` がパス、`scope["query_string"]` がクエリ文字列(バイト列)、`scope["headers"]` がヘッダーのリスト(各要素が `[name, value]` のバイト列タプル)です。 ```python async def echo_app(scope, receive, send): assert scope["type"] == "http" method = scope["method"] path = scope["path"] query = scope.get("query_string", b"").decode() body = f"Method: {method}\nPath: {path}\nQuery: {query}\n".encode() await send({ "type": "http.response.start", "status": 200, "headers": [ [b"content-type", b"text/plain; charset=utf-8"], [b"content-length", str(len(body)).encode()], ], }) await send({ "type": "http.response.body", "body": body, }) ``` --- ## body 受信 WSGI では `environ["wsgi.input"].read()` でリクエストボディを一括取得しましたが、ASGI ではボディは `receive` を繰り返し呼び出してチャンク単位で受信します。 これは大きなファイルアップロードやストリーミングリクエストに対応するための設計です。 ```python async def read_body(receive): """リクエストボディを全て読み取るヘルパー""" chunks = [] while True: message = await receive() chunks.append(message.get("body", b"")) if not message.get("more_body", False): break return b"".join(chunks) ``` `receive()` が返す辞書には `"type": "http.request"`、`"body"` (バイト列)、`"more_body"`(まだ続くかどうかの真偽値)が含まれます。 `more_body` が `False`(または欠落)のとき、ボディの受信が完了です。 この仕組みを使った JSON エコーサーバの例を示します。 ```python import json async def json_echo(scope, receive, send): assert scope["type"] == "http" if scope["method"] != "POST": await send_json(send, 405, {"error": "Method Not Allowed"}) return raw = await read_body(receive) try: data = json.loads(raw) except (json.JSONDecodeError, UnicodeDecodeError): await send_json(send, 400, {"error": "Invalid JSON"}) return response = {"received": data, "path": scope["path"]} await send_json(send, 200, response) async def send_json(send, status, data): body = json.dumps(data, ensure_ascii=False).encode("utf-8") await send({ "type": "http.response.start", "status": status, "headers": [ [b"content-type", b"application/json; charset=utf-8"], [b"content-length", str(len(body)).encode()], ], }) await send({ "type": "http.response.body", "body": body, }) ``` テストは `curl -X POST -H "Content-Type: application/json" -d '{"name":"test"}' http://127.0.0.1:8000/` で行えます。 注意すべき点として、`receive` は一度しか消費できないため、ミドルウェアでボディを読み取った場合、後段のアプリケーションにボディを渡すには再構築した `receive` 関数を差し替える必要があります。 Starlette の `Request` オブジェクトはこの処理を内部で行い、キャッシュしたボディを返すようになっています。 --- ## WebSocket ASGI が WSGI に対して持つ最大の優位性が、WebSocket のネイティブサポートです。 WebSocket 接続も同じ `application(scope, receive, send)` シグネチャで処理しますが、`scope["type"]` が `"websocket"` になります。 ```python async def websocket_echo(scope, receive, send): assert scope["type"] == "websocket" # 1. 接続要求を受け取る event = await receive() assert event["type"] == "websocket.connect" # 2. 接続を受理する await send({"type": "websocket.accept"}) # 3. メッセージのやり取り while True: event = await receive() if event["type"] == "websocket.disconnect": break if event["type"] == "websocket.receive": text = event.get("text", "") await send({ "type": "websocket.send", "text": f"Echo: {text}", }) ``` WebSocket の ASGI イベントフローは次のようになります。 クライアントが接続すると `websocket.connect` イベントが `receive` から届き、サーバは `websocket.accept` を `send` して接続を確立します。 以降は `websocket.receive`(テキストまたはバイナリ)と `websocket.send` でメッセージを交換し、切断時に `websocket.disconnect` が届きます。 サーバ側から切断する場合は `websocket.close` を `send` します。 HTTP と WebSocket の両方を扱うアプリケーションは、`scope["type"]` で分岐します。 ```python async def application(scope, receive, send): if scope["type"] == "http": await http_handler(scope, receive, send) elif scope["type"] == "websocket": await websocket_echo(scope, receive, send) elif scope["type"] == "lifespan": await lifespan_handler(scope, receive, send) ``` テストには `websocat ws://127.0.0.1:8000/ws` や Python の `websockets` ライブラリが便利です。 この分岐構造こそが、Starlette や FastAPI の `Router` が内部で行っていることの原型です。 --- ## lifespan ASGI には lifespan プロトコルという、サーバの起動・終了時に一度だけ呼ばれるイベントの仕組みがあります。 DB コネクションプールの初期化・破棄、キャッシュのウォームアップ、バックグラウンドタスクの開始・停止など、アプリケーション全体のライフサイクル管理に使います。 ```python async def application(scope, receive, send): if scope["type"] == "lifespan": while True: message = await receive() if message["type"] == "lifespan.startup": # 初期化処理 try: await setup_database() await send({"type": "lifespan.startup.complete"}) except Exception: await send({ "type": "lifespan.startup.failed", "message": "Database connection failed", }) return elif message["type"] == "lifespan.shutdown": # 終了処理 await cleanup_database() await send({"type": "lifespan.shutdown.complete"}) return elif scope["type"] == "http": await http_handler(scope, receive, send) ``` lifespan のイベントフローは、サーバ起動時に `lifespan.startup` が送られ、アプリが初期化に成功すれば `lifespan.startup.complete` を返し、失敗すれば `lifespan.startup.failed` を返してサーバが起動を中止します。 サーバ停止時には `lifespan.shutdown` が送られ、アプリがクリーンアップを終えたら `lifespan.shutdown.complete` を返します。 実用的な例として、`httpx.AsyncClient` の共有インスタンスを lifespan で管理するパターンを示します。 ```python # app_state をモジュールレベルで保持 app_state = {} async def lifespan_handler(scope, receive, send): while True: message = await receive() if message["type"] == "lifespan.startup": import httpx app_state["http_client"] = httpx.AsyncClient(timeout=10.0) await send({"type": "lifespan.startup.complete"}) elif message["type"] == "lifespan.shutdown": await app_state["http_client"].aclose() await send({"type": "lifespan.shutdown.complete"}) return ``` FastAPI ではこの仕組みが `@app.on_event("startup")` / `@app.on_event("shutdown")` デコレータ(旧 API)や、`lifespan` コンテキストマネージャ(推奨 API)として抽象化されています。 ```python from contextlib import asynccontextmanager from fastapi import FastAPI import httpx @asynccontextmanager async def lifespan(app): app.state.http_client = httpx.AsyncClient(timeout=10.0) yield await app.state.http_client.aclose() app = FastAPI(lifespan=lifespan) ``` この FastAPI のコードが生の ASGI lifespan プロトコルをどれだけ簡潔に包んでいるかが分かると、フレームワークの抽象化の価値と、その裏で何が起きているかの両方を理解できるようになります。 --- **付録 A(WSGI)と付録 B(ASGI)の対比まとめ** WSGI と ASGI の対応関係を示すと、インターフェースは `callable(environ, start_response) → bytes` 対 `async callable(scope, receive, send)`、ボディ読み取りは `environ["wsgi.input"].read()` 対 `await receive()` のループ、レスポンスは `start_response` + `return` 対 `send` の 2 段階呼び出し、ミドルウェアは callable のラップ対 `scope/receive/send` の差し替え、WebSocket は非対応対 `scope["type"] == "websocket"` による統一処理、ライフサイクルは非対応対 `lifespan` プロトコルによる起動・終了管理、という違いがあります。 両付録のコードをすべて手元で動かし、`print` や `logging` を挟みながらイベントの流れを追体験することで、フレームワークの「魔法」が具体的なプロトコル操作の積み重ねであることを体感できます。