付録B 最小 ASGI アプリ集

ASGI(Asynchronous Server Gateway Interface)は WSGI の非同期版であり、HTTP だけでなく WebSocket やサーバのライフサイクルイベントも統一的に扱えます。 本付録では、付録 A の WSGI アプリと対比しながら、最小限の ASGI アプリを段階的に構築します。


HTTP

ASGI アプリケーションの最小形は、scope, receive, send の 3 引数を取る非同期 callable です。 scope はリクエストのメタ情報(パス、メソッド、ヘッダーなど)を持つ辞書で、receivesend はそれぞれボディの受信・レスポンスの送信に使う非同期関数です。

async def application(scope, receive, send):
    assert scope["type"] == "http"

    body = b"Hello, ASGI World!\n"
    await send({
        "type": "http.response.start",
        "status": 200,
        "headers": [
            [b"content-type", b"text/plain; charset=utf-8"],
            [b"content-length", str(len(body)).encode()],
        ],
    })
    await send({
        "type": "http.response.body",
        "body": body,
    })

uvicorn app_asgi:application --bind 127.0.0.1:8000 で起動し、curl http://127.0.0.1:8000/ で動作を確認できます。

WSGI との最大の違いは、レスポンスが start_response + return [bytes] ではなく、http.response.starthttp.response.body という 2 段階の send 呼び出しで構成される点です。 この分離により、ストリーミングレスポンス("more_body": True)や Server-Sent Events のような段階的送信も自然に表現できます。

また、scope に含まれる情報は WSGI の environ と対応しますが、キー名が整理されています。 scope["method"] がリクエストメソッド、scope["path"] がパス、scope["query_string"] がクエリ文字列(バイト列)、scope["headers"] がヘッダーのリスト(各要素が [name, value] のバイト列タプル)です。

async def echo_app(scope, receive, send):
    assert scope["type"] == "http"

    method = scope["method"]
    path = scope["path"]
    query = scope.get("query_string", b"").decode()
    body = f"Method: {method}\nPath: {path}\nQuery: {query}\n".encode()

    await send({
        "type": "http.response.start",
        "status": 200,
        "headers": [
            [b"content-type", b"text/plain; charset=utf-8"],
            [b"content-length", str(len(body)).encode()],
        ],
    })
    await send({
        "type": "http.response.body",
        "body": body,
    })

body 受信

WSGI では environ["wsgi.input"].read() でリクエストボディを一括取得しましたが、ASGI ではボディは receive を繰り返し呼び出してチャンク単位で受信します。 これは大きなファイルアップロードやストリーミングリクエストに対応するための設計です。

async def read_body(receive):
    """リクエストボディを全て読み取るヘルパー"""
    chunks = []
    while True:
        message = await receive()
        chunks.append(message.get("body", b""))
        if not message.get("more_body", False):
            break
    return b"".join(chunks)

receive() が返す辞書には "type": "http.request""body" (バイト列)、"more_body"(まだ続くかどうかの真偽値)が含まれます。 more_bodyFalse(または欠落)のとき、ボディの受信が完了です。

この仕組みを使った JSON エコーサーバの例を示します。

import json

async def json_echo(scope, receive, send):
    assert scope["type"] == "http"

    if scope["method"] != "POST":
        await send_json(send, 405, {"error": "Method Not Allowed"})
        return

    raw = await read_body(receive)
    try:
        data = json.loads(raw)
    except (json.JSONDecodeError, UnicodeDecodeError):
        await send_json(send, 400, {"error": "Invalid JSON"})
        return

    response = {"received": data, "path": scope["path"]}
    await send_json(send, 200, response)


async def send_json(send, status, data):
    body = json.dumps(data, ensure_ascii=False).encode("utf-8")
    await send({
        "type": "http.response.start",
        "status": status,
        "headers": [
            [b"content-type", b"application/json; charset=utf-8"],
            [b"content-length", str(len(body)).encode()],
        ],
    })
    await send({
        "type": "http.response.body",
        "body": body,
    })

テストは curl -X POST -H "Content-Type: application/json" -d '{"name":"test"}' http://127.0.0.1:8000/ で行えます。

注意すべき点として、receive は一度しか消費できないため、ミドルウェアでボディを読み取った場合、後段のアプリケーションにボディを渡すには再構築した receive 関数を差し替える必要があります。 Starlette の Request オブジェクトはこの処理を内部で行い、キャッシュしたボディを返すようになっています。


WebSocket

ASGI が WSGI に対して持つ最大の優位性が、WebSocket のネイティブサポートです。 WebSocket 接続も同じ application(scope, receive, send) シグネチャで処理しますが、scope["type"]"websocket" になります。

async def websocket_echo(scope, receive, send):
    assert scope["type"] == "websocket"

    # 1. 接続要求を受け取る
    event = await receive()
    assert event["type"] == "websocket.connect"

    # 2. 接続を受理する
    await send({"type": "websocket.accept"})

    # 3. メッセージのやり取り
    while True:
        event = await receive()

        if event["type"] == "websocket.disconnect":
            break

        if event["type"] == "websocket.receive":
            text = event.get("text", "")
            await send({
                "type": "websocket.send",
                "text": f"Echo: {text}",
            })

WebSocket の ASGI イベントフローは次のようになります。 クライアントが接続すると websocket.connect イベントが receive から届き、サーバは websocket.acceptsend して接続を確立します。 以降は websocket.receive(テキストまたはバイナリ)と websocket.send でメッセージを交換し、切断時に websocket.disconnect が届きます。 サーバ側から切断する場合は websocket.closesend します。

HTTP と WebSocket の両方を扱うアプリケーションは、scope["type"] で分岐します。

async def application(scope, receive, send):
    if scope["type"] == "http":
        await http_handler(scope, receive, send)
    elif scope["type"] == "websocket":
        await websocket_echo(scope, receive, send)
    elif scope["type"] == "lifespan":
        await lifespan_handler(scope, receive, send)

テストには websocat ws://127.0.0.1:8000/ws や Python の websockets ライブラリが便利です。 この分岐構造こそが、Starlette や FastAPI の Router が内部で行っていることの原型です。


lifespan

ASGI には lifespan プロトコルという、サーバの起動・終了時に一度だけ呼ばれるイベントの仕組みがあります。 DB コネクションプールの初期化・破棄、キャッシュのウォームアップ、バックグラウンドタスクの開始・停止など、アプリケーション全体のライフサイクル管理に使います。

async def application(scope, receive, send):
    if scope["type"] == "lifespan":
        while True:
            message = await receive()

            if message["type"] == "lifespan.startup":
                # 初期化処理
                try:
                    await setup_database()
                    await send({"type": "lifespan.startup.complete"})
                except Exception:
                    await send({
                        "type": "lifespan.startup.failed",
                        "message": "Database connection failed",
                    })
                    return

            elif message["type"] == "lifespan.shutdown":
                # 終了処理
                await cleanup_database()
                await send({"type": "lifespan.shutdown.complete"})
                return

    elif scope["type"] == "http":
        await http_handler(scope, receive, send)

lifespan のイベントフローは、サーバ起動時に lifespan.startup が送られ、アプリが初期化に成功すれば lifespan.startup.complete を返し、失敗すれば lifespan.startup.failed を返してサーバが起動を中止します。 サーバ停止時には lifespan.shutdown が送られ、アプリがクリーンアップを終えたら lifespan.shutdown.complete を返します。

実用的な例として、httpx.AsyncClient の共有インスタンスを lifespan で管理するパターンを示します。

# app_state をモジュールレベルで保持
app_state = {}

async def lifespan_handler(scope, receive, send):
    while True:
        message = await receive()
        if message["type"] == "lifespan.startup":
            import httpx
            app_state["http_client"] = httpx.AsyncClient(timeout=10.0)
            await send({"type": "lifespan.startup.complete"})
        elif message["type"] == "lifespan.shutdown":
            await app_state["http_client"].aclose()
            await send({"type": "lifespan.shutdown.complete"})
            return

FastAPI ではこの仕組みが @app.on_event("startup") / @app.on_event("shutdown") デコレータ(旧 API)や、lifespan コンテキストマネージャ(推奨 API)として抽象化されています。

from contextlib import asynccontextmanager
from fastapi import FastAPI
import httpx

@asynccontextmanager
async def lifespan(app):
    app.state.http_client = httpx.AsyncClient(timeout=10.0)
    yield
    await app.state.http_client.aclose()

app = FastAPI(lifespan=lifespan)

この FastAPI のコードが生の ASGI lifespan プロトコルをどれだけ簡潔に包んでいるかが分かると、フレームワークの抽象化の価値と、その裏で何が起きているかの両方を理解できるようになります。


付録 A(WSGI)と付録 B(ASGI)の対比まとめ

WSGI と ASGI の対応関係を示すと、インターフェースは callable(environ, start_response) bytesasync callable(scope, receive, send)、ボディ読み取りは environ["wsgi.input"].read()await receive() のループ、レスポンスは start_response + returnsend の 2 段階呼び出し、ミドルウェアは callable のラップ対 scope/receive/send の差し替え、WebSocket は非対応対 scope["type"] == "websocket" による統一処理、ライフサイクルは非対応対 lifespan プロトコルによる起動・終了管理、という違いがあります。

両付録のコードをすべて手元で動かし、printlogging を挟みながらイベントの流れを追体験することで、フレームワークの「魔法」が具体的なプロトコル操作の積み重ねであることを体感できます。