AWS Cognito MFA 実装完全ガイド

AWS

  1. 目次
  2. 全体アーキテクチャ
    1. よくある誤解を先に解消
    2. 推奨アーキテクチャ図
    3. リクエスト・レスポンスフロー
  3. Cognito の役割と標準機能
    1. Cognito が提供するもの
    2. Cognito が提供しないもの(自分たちで実装)
    3. Cognito のセキュリティ機能
  4. 実装パターン
    1. パターン1: Cognito Hosted UI を使う(最も簡単)
    2. パターン2: カスタム UI + Cognito API(推奨)
  5. セキュリティ設計
    1. 1. 電話番号・メール属性の取り扱い
    2. 2. JWT トークン検証
    3. 3. CORS と CSRF 対策
    4. 4. Rate Limiting(ブルートフォース攻撃対策)
  6. フロントエンド実装
    1. React での例
    2. 認証済みリクエストのラッパー
  7. バックエンド実装
    1. 全体の実装フロー
    2. DB スキーマ設計
  8. インシデント対応
    1. よくあるトラブルと対応
  9. テナント対応アーキテクチャ
    1. シングルテナント vs マルチテナント
    2. マルチテナント実装(推奨)
    3. テナント間のデータアクセス制御
  10. 実装チェックリスト
    1. 事前準備
    2. Cognito 側の設定
    3. バックエンド実装
    4. フロントエンド実装
    5. セキュリティ
    6. テナント対応
    7. テスト
    8. 運用
  11. よくある質問 FAQ
    1. Q1: S3 でログイン画面をホストする?
    2. Q2: 電話番号は Cognito に保管?それとも DB?
    3. Q3: セキュリティ問題はないか?
    4. Q4: テナント対応の場合の実装は?
    5. Q5: UI/UX での考慮点は?
    6. Q6: インシデント対応は?
  12. 参考リソース
    1. AWS ドキュメント
    2. セキュリティベストプラクティス
    3. その他

目次

  1. 全体アーキテクチャ
  2. Cognito の役割と標準機能
  3. 実装パターン
  4. セキュリティ設計
  5. フロントエンド実装
  6. バックエンド実装
  7. インシデント対応
  8. テナント対応アーキテクチャ
  9. 実装チェックリスト

全体アーキテクチャ

よくある誤解を先に解消

Q: ログイン画面は S3 でホストする?
A: いいえ。ただし、静的ファイルは S3 でもよい。

❌ 間違い:
S3 → ログイン画面 HTML → Cognito API を直接呼び出し
   (セキュリティリスク:CORS、認証情報露出)

✅ 正しい:
- フロントエンド: CloudFront + S3(静的ファイル)
- API: API Gateway + Lambda(認証ロジック)
- 認証: Cognito がバックエンド側で管理
- トークン: HttpOnly Cookie で安全に受け渡し

推奨アーキテクチャ図

┌─────────────────────────────────────────────────────────────┐
│                      ユーザーのブラウザ                       │
└──────────────┬────────────────────────┬──────────────────────┘
              │                         │
        ┌─────▼─────┐          ┌────────▼────────┐
        │  CloudFront│          │  Cognito Auth   │
        │  + S3      │          │  UI (Hosted UI) │
        │ (静的ファイル) │        │  or Auth0      │
        └──────┬─────┘          │  (opt: 外部)    │
              │                 └────────┬────────┘
              │                         │
        ┌─────▼──────────────────────────▼──────────┐
        │      API Gateway + Lambda Layer             │
        │  (バックエンド認証ロジック)                │
        │  - Cognito との連携                        │
        │  - MFA チャレンジ処理                      │
        │  - トークン検証                             │
        └──────────────────┬──────────────────────────┘
                          │
        ┌─────────────────┼─────────────────┐
        │                 │                 │
   ┌────▼───┐    ┌───────▼───────┐   ┌───▼────────┐
   │Cognito │    │  RDS/DynamoDB  │   │  KMS      │
   │ User   │    │  (Application  │   │ (暗号化)  │
   │ Pool   │    │   User DB)     │   │           │
   └────┬───┘    └────────────────┘   └───────────┘
        │
   ┌────▼──────────────────┐
   │ SES / SNS              │
   │ (MFA配信: SMS/Email)  │
   └───────────────────────┘

リクエスト・レスポンスフロー

1. ユーザーがログイン画面で認証情報を入力
   │
2. フロントエンド → API Gateway/Lambda にPOST
   (HTTPS, CORS有効)
   │
3. Lambda が Cognito の initiate_auth を呼び出し
   │
4. Cognito がレスポンス:
   - MFA_REQUIRED(MFAが必要)
   - または SUCCESS(認証完了)
   │
5. MFA が必要な場合:
   │
   ├─ Cognito が SMS/Email/TOTP を配信(SES/SNS経由)
   │
   └─ フロントエンドは "MFA入力画面" を表示
      └─ ユーザーが OTP を入力
         │
         └─ Lambda が respond_to_auth_challenge を実行
            │
            └─ Cognito が MFA 検証
               │
               └─ トークン(IdToken, AccessToken)が返却
                  │
                  └─ Lambda が HttpOnly Cookie に格納して返す
                     │
                     └─ フロントエンドが自動的にクッキーを送付
                        (以後、認証が不要)

Cognito の役割と標準機能

Cognito が提供するもの

機能説明実装側で必要な対応
ユーザー登録email, phone_number 属性管理フロントエンド: 登録フォーム
ログイン認証パスワード検証バックエンド: initiate_auth
MFA配信SMS/Email OTP 生成・送信バックエンド: SES/SNS設定
トークン発行IdToken, AccessToken, RefreshTokenバックエンド: 後続処理で使用
ユーザー属性管理email, phone_number 等の更新バックエンド API: update_user_attributes
パスワードリセットForgot Password フローフロントエンド + バックエンド
セッション管理トークン検証、リフレッシュバックエンド: 全リクエストで検証

Cognito が提供しないもの(自分たちで実装)

機能実装場所詳細
カスタムUIフロントエンド企業ブランドに合わせたログイン画面
ビジネスロジックバックエンドユーザー権限、テナント管理等
監査ログバックエンド + CloudTrailセキュリティ監視用ログ
高度な MFA ルールバックエンド「IP変わったら強制MFA」等のカスタムロジック
複雑なワークフローバックエンド「初回ログイン時は強制パスワード変更」等
テナント管理バックエンド + DBマルチテナント対応

Cognito のセキュリティ機能

Cognito が自動で保護するもの

  • パスワードの暗号化(bcrypt相当)
  • トークンの署名検証(JWT RS256)
  • トークンの有効期限チェック
  • デフォルトで HTTPS のみ
  • XSS対策:ID Token に署名含む
  • CSRF対策:State パラメータ(OAuth 2.0)

⚠️ 開発者が対応する必要があるもの

  • HTTPS 強制(API Gateway)
  • CORS 設定(フロントエンドドメイン指定)
  • HttpOnly Cookie 設定(XSS対策)
  • Rate Limiting(ブルートフォース攻撃対策)
  • ログ監視(異常アクセス検知)
  • 電話番号の個人情報保護(GDPR等)

実装パターン

パターン1: Cognito Hosted UI を使う(最も簡単)

メリット

  • UI/UX は Cognito が提供
  • セキュリティベストプラクティス自動適用
  • メンテナンス少ない
  • OAuth 2.0 標準準拠

デメリット

  • ブランドカスタマイズに制限あり
  • 複雑なワークフロー実装困難

アーキテクチャ

フロントエンド
  │
  └─ Cognito Hosted UI
      │
      └─ ユーザー登録/ログイン/MFA(Cognito が担当)
         │
         └─ Authorization Code → Callback URI
            │
            └─ フロントエンド: Code を取得
               │
               └─ バックエンド API: Code を Token に交換
                  (initiate_auth ではなく AdminInitiateAuth 系を使用)
                  │
                  └─ トークン返却(HttpOnly Cookie)
                     │
                     └─ フロントエンド: 自動的にクッキー送付

実装例(バックエンド)

# Django / Flask での実装例

@app.route('/api/auth/callback', methods=['POST'])
def auth_callback():
    """Authorization Code を Token に交換"""
    code = request.json.get('code')
    
    client = boto3.client('cognito-idp')
    
    # Code を Token に交換
    response = client.initiate_auth(
        ClientId=COGNITO_CLIENT_ID,
        AuthFlow='ALLOW_REFRESH_TOKEN_AUTH',  # 実際には Admin権限でCode処理
        AuthParameters={
            'AUTHORIZATION_CODE': code
        }
    )
    
    id_token = response['AuthenticationResult']['IdToken']
    access_token = response['AuthenticationResult']['AccessToken']
    refresh_token = response['AuthenticationResult']['RefreshToken']
    
    # HttpOnly Cookie に格納
    resp = jsonify({'success': True})
    resp.set_cookie(
        'id_token',
        id_token,
        httponly=True,
        secure=True,
        samesite='Strict',
        max_age=3600  # 1時間
    )
    resp.set_cookie(
        'refresh_token',
        refresh_token,
        httponly=True,
        secure=True,
        samesite='Strict',
        max_age=604800  # 7日
    )
    
    return resp

パターン2: カスタム UI + Cognito API(推奨)

メリット

  • ブランド完全カスタマイズ
  • 複雑なワークフロー実装可能
  • ビジネスロジック統合容易

デメリット

  • セキュリティ責任は開発者側
  • 実装量多い

フロー

フロントエンド(カスタムUI)
  │
  ├─ ログイン画面
  │  │
  │  └─ ユーザー入力(Email + パスワード)
  │     │
  │     └─ POST /api/auth/login
  │        │
  │        └─ バックエンド: initiate_auth
  │           (Cognito への認証リクエスト)
  │           │
  │           ├─ レスポンス1: チャレンジ = NEW_PASSWORD_REQUIRED
  │           │  (初回ログイン時、強制パスワード変更が必要)
  │           │
  │           ├─ レスポンス2: チャレンジ = MFA_REQUIRED
  │           │  (MFA入力画面へ遷移)
  │           │
  │           └─ レスポンス3: 成功
  │              (トークン返却)
  │
  ├─ MFA 入力画面(条件付き)
  │  │
  │  └─ ユーザーが OTP 入力
  │     │
  │     └─ POST /api/auth/verify-mfa
  │        │
  │        └─ バックエンド: respond_to_auth_challenge
  │           │
  │           └─ トークン返却
  │              │
  │              └─ HttpOnly Cookie に格納
  │
  └─ ダッシュボード表示

実装例(バックエンド – Python)

import boto3
import hashlib
import hmac
import base64
from flask import Flask, request, jsonify, make_response

app = Flask(__name__)

COGNITO_CLIENT_ID = 'your-client-id'
COGNITO_CLIENT_SECRET = 'your-client-secret'  # 秘密キーが設定されている場合
AWS_REGION = 'ap-northeast-1'

client = boto3.client('cognito-idp', region_name=AWS_REGION)

def get_secret_hash(username):
    """CLIENT_SECRET を使った署名(設定されている場合)"""
    if not COGNITO_CLIENT_SECRET:
        return None
    
    message = bytes(username + COGNITO_CLIENT_ID, 'utf-8')
    secret = bytes(COGNITO_CLIENT_SECRET, 'utf-8')
    dig = hmac.new(secret, message, hashlib.sha256).digest()
    return base64.b64encode(dig).decode()

@app.route('/api/auth/login', methods=['POST'])
def login():
    """ステップ 1: ログイン認証"""
    email = request.json.get('email')
    password = request.json.get('password')
    
    secret_hash = get_secret_hash(email)
    
    auth_params = {
        'USERNAME': email,
        'PASSWORD': password,
    }
    
    if secret_hash:
        auth_params['SECRET_HASH'] = secret_hash
    
    try:
        response = client.initiate_auth(
            ClientId=COGNITO_CLIENT_ID,
            AuthFlow='USER_PASSWORD_AUTH',  # ユーザー名とパスワードで認証
            AuthParameters=auth_params
        )
        
        # ケース1: 認証成功、MFA 不要
        if 'AuthenticationResult' in response:
            tokens = response['AuthenticationResult']
            return handle_successful_auth(tokens)
        
        # ケース2: MFA が必要
        if response.get('ChallengeName') == 'MFA_REQUIRED':
            session = response.get('Session')
            return jsonify({
                'challenge': 'MFA_REQUIRED',
                'session': session,
                'mfa_type': response.get('ChallengeParameters', {}).get('MFA_OPTION')
            }), 200
        
        # ケース3: 初回ログイン時、強制パスワード変更
        if response.get('ChallengeName') == 'NEW_PASSWORD_REQUIRED':
            session = response.get('Session')
            return jsonify({
                'challenge': 'NEW_PASSWORD_REQUIRED',
                'session': session
            }), 200
        
        return jsonify({'error': 'Unexpected challenge'}), 400
    
    except client.exceptions.UserNotConfirmedException:
        return jsonify({'error': 'User not confirmed'}), 401
    except client.exceptions.NotAuthorizedException:
        return jsonify({'error': 'Invalid credentials'}), 401
    except Exception as e:
        print(f"Auth error: {str(e)}")
        return jsonify({'error': 'Authentication failed'}), 500

@app.route('/api/auth/verify-mfa', methods=['POST'])
def verify_mfa():
    """ステップ 2: MFA 検証"""
    session = request.json.get('session')
    mfa_code = request.json.get('mfa_code')
    
    secret_hash = get_secret_hash(request.json.get('username', ''))
    
    auth_params = {
        'SOFTWARE_TOKEN_MFA_CODE': mfa_code,  # TOTP の場合
        # or 'SMS_MFA_CODE': mfa_code  # SMS の場合
    }
    
    if secret_hash:
        auth_params['SECRET_HASH'] = secret_hash
    
    try:
        response = client.respond_to_auth_challenge(
            ClientId=COGNITO_CLIENT_ID,
            ChallengeName='SOFTWARE_TOKEN_MFA',  # or 'SMS_MFA' / 'MFA_REQUIRED'
            Session=session,
            ChallengeResponses=auth_params
        )
        
        if 'AuthenticationResult' in response:
            tokens = response['AuthenticationResult']
            return handle_successful_auth(tokens)
        
        return jsonify({'error': 'MFA verification failed'}), 401
    
    except client.exceptions.CodeMismatchException:
        return jsonify({'error': 'Invalid MFA code'}), 401
    except Exception as e:
        print(f"MFA error: {str(e)}")
        return jsonify({'error': 'MFA verification failed'}), 500

def handle_successful_auth(tokens):
    """認証成功時の処理"""
    id_token = tokens['IdToken']
    access_token = tokens['AccessToken']
    refresh_token = tokens.get('RefreshToken')
    expires_in = tokens['ExpiresIn']
    
    # トークンのデコード(署名検証なし、ペイロード確認のみ)
    import json
    import base64
    
    def decode_token(token):
        # JWT の第2部分(ペイロード)をデコード
        parts = token.split('.')
        payload = parts[1]
        # パディングを補足
        payload += '=' * (4 - len(payload) % 4)
        return json.loads(base64.b64decode(payload))
    
    id_payload = decode_token(id_token)
    user_sub = id_payload.get('sub')  # Cognito の一意なユーザーID
    user_email = id_payload.get('email')
    
    # DB にユーザーが存在するか確認
    # (テナント対応の場合は tenant_id も確認)
    db_user = check_or_create_user(user_sub, user_email)
    
    if not db_user:
        return jsonify({'error': 'User not found in database'}), 401
    
    # HttpOnly Cookie に格納
    resp = jsonify({
        'success': True,
        'user_id': db_user.id,
        'email': user_email
    })
    
    resp.set_cookie(
        'id_token',
        id_token,
        httponly=True,
        secure=True,
        samesite='Strict',
        max_age=expires_in
    )
    
    if refresh_token:
        resp.set_cookie(
            'refresh_token',
            refresh_token,
            httponly=True,
            secure=True,
            samesite='Strict',
            max_age=2592000  # 30日
        )
    
    return resp

def check_or_create_user(cognito_sub, email):
    """DB にユーザーレコードを確認/作成"""
    # ここは Django ORM や SQLAlchemy を使用
    from myapp.models import User
    
    user = User.query.filter_by(cognito_sub=cognito_sub).first()
    
    if not user:
        user = User(
            cognito_sub=cognito_sub,
            email=email,
            is_active=True
        )
        db.session.add(user)
        db.session.commit()
    
    return user

@app.route('/api/auth/refresh', methods=['POST'])
def refresh_token():
    """リフレッシュトークンで新しい AccessToken を取得"""
    refresh_token = request.cookies.get('refresh_token')
    
    if not refresh_token:
        return jsonify({'error': 'No refresh token'}), 401
    
    secret_hash = get_secret_hash('')  # ユーザー名不要な場合
    
    auth_params = {'REFRESH_TOKEN': refresh_token}
    if secret_hash:
        auth_params['SECRET_HASH'] = secret_hash
    
    try:
        response = client.initiate_auth(
            ClientId=COGNITO_CLIENT_ID,
            AuthFlow='REFRESH_TOKEN_AUTH',
            AuthParameters=auth_params
        )
        
        new_tokens = response['AuthenticationResult']
        new_access_token = new_tokens['AccessToken']
        
        resp = jsonify({'success': True})
        resp.set_cookie(
            'access_token',
            new_access_token,
            httponly=True,
            secure=True,
            samesite='Strict',
            max_age=new_tokens['ExpiresIn']
        )
        
        return resp
    
    except Exception as e:
        return jsonify({'error': 'Token refresh failed'}), 401

@app.route('/api/protected', methods=['GET'])
def protected_route():
    """認証が必要なエンドポイント"""
    id_token = request.cookies.get('id_token')
    
    if not id_token:
        return jsonify({'error': 'Not authenticated'}), 401
    
    try:
        # トークン署名検証(実装例は後述)
        payload = verify_jwt_token(id_token)
        user_sub = payload.get('sub')
        
        # ビジネスロジック処理
        user = User.query.filter_by(cognito_sub=user_sub).first()
        
        return jsonify({
            'message': f'Hello, {user.email}',
            'user_id': user.id
        })
    
    except Exception as e:
        return jsonify({'error': 'Invalid token'}), 401

def verify_jwt_token(token):
    """JWT トークンの署名を検証"""
    import json
    import base64
    from jose import jwt, JWTError
    
    try:
        # AWS Cognito の公開鍵を使用して署名検証
        # (実装例は後述のセキュリティ設計セクション)
        payload = jwt.get_unverified_claims(token)
        # 実際には jwt.decode(token, key=...) で署名検証
        return payload
    except JWTError:
        raise Exception("Invalid token")

セキュリティ設計

1. 電話番号・メール属性の取り扱い

Cognito での保管

✅ Cognito が自動で暗号化:
  - ユーザープール内の user 属性(phone_number, email)
  - バックアップ暗号化(KMS キー)

⚠️ 懸念点:
  - Cognito ユーザープール内に個人識別情報が保管
  - GDPR「忘れられる権利」対応時の削除
  - AWS IAM で不正アクセスされると露出

推奨: ハイブリッドアプローチ

┌──────────────────────────────────┐
│      Cognito ユーザープール       │
│  ✅ Email(認証用)              │
│  ✅ Sub(一意のID)              │
└──────────────────────────────────┘
         │
         └─ Cognito Sub を使用してリンク
                │
         ┌──────▼──────────────────┐
         │   Application DB         │
         │  ✅ Cognito Sub          │
         │  ✅ Phone Number (暗号化)│
         │  ✅ Email (暗号化)       │
         │  ✅ Tenant ID            │
         │  ✅ User Permissions     │
         └─────────────────────────┘

実装例(DB側)

from cryptography.fernet import Fernet
import os

# 環境変数から暗号化キーを読み込む
ENCRYPTION_KEY = os.environ.get('ENCRYPTION_KEY').encode()
cipher = Fernet(ENCRYPTION_KEY)

class User(db.Model):
    __tablename__ = 'users'
    
    id = db.Column(db.Integer, primary_key=True)
    cognito_sub = db.Column(db.String(128), unique=True, not_null=True, index=True)
    # Cognito側でメール管理
    # email は Cognito の verified_email を使用
    
    # 暗号化で保管
    _phone_number_encrypted = db.Column(db.String(256), nullable=True)
    
    tenant_id = db.Column(db.Integer, db.ForeignKey('tenants.id'), not_null=True)
    
    @property
    def phone_number(self):
        """復号化して返却"""
        if self._phone_number_encrypted:
            return cipher.decrypt(self._phone_number_encrypted.encode()).decode()
        return None
    
    @phone_number.setter
    def phone_number(self, value):
        """暗号化して保管"""
        if value:
            self._phone_number_encrypted = cipher.encrypt(value.encode()).decode()
        else:
            self._phone_number_encrypted = None
    
    def to_dict(self):
        """API レスポンスで個人情報を除外"""
        return {
            'id': self.id,
            'cognito_sub': self.cognito_sub,
            # phone_number は含めない(外部APIでは不要)
        }

2. JWT トークン検証

Cognito の公開鍵で署名検証

import json
import base64
from urllib.request import urlopen
from jose import jwt, JWTError

AWS_REGION = 'ap-northeast-1'
COGNITO_USER_POOL_ID = 'ap-northeast-1_xxxxxxxxx'

# Cognito の公開鍵を取得(キャッシュ推奨)
def get_jwks():
    """AWS Cognito の JWKS エンドポイントから公開鍵を取得"""
    url = f'https://cognito-idp.{AWS_REGION}.amazonaws.com/{COGNITO_USER_POOL_ID}/.well-known/jwks.json'
    
    # 本来はキャッシュすべき(Redis等)
    with urlopen(url) as response:
        return json.loads(response.read())

def get_key_from_jwks(kid):
    """キーID から公開鍵を取得"""
    jwks = get_jwks()
    for key in jwks['keys']:
        if key['kid'] == kid:
            return key
    return None

def verify_jwt_token(token):
    """JWT トークンの署名を検証"""
    try:
        # ヘッダーをデコード(署名検証なし)
        header = jwt.get_unverified_header(token)
        kid = header['kid']
        
        # 公開鍵を取得
        jwk = get_key_from_jwks(kid)
        if not jwk:
            raise JWTError("Key not found")
        
        # 署名検証付きでデコード
        payload = jwt.decode(
            token,
            jwk,
            algorithms=['RS256'],
            audience=COGNITO_CLIENT_ID,  # ClientID との一致確認
            options={
                'verify_aud': True,
                'verify_exp': True,  # 有効期限確認
            }
        )
        
        return payload
    
    except JWTError as e:
        raise Exception(f"Token validation failed: {str(e)}")

3. CORS と CSRF 対策

from flask_cors import CORS
from flask_wtf.csrf import CSRFProtect

app = Flask(__name__)

# CORS 設定(フロントエンドドメイン指定)
CORS(app, resources={
    r"/api/*": {
        "origins": ["https://yourdomain.com"],
        "supports_credentials": True,  # Cookie を含める
        "allowed_methods": ["GET", "POST", "PUT", "DELETE"],
        "allowed_headers": ["Content-Type"],
    }
})

# CSRF 保護(トークンベース)
csrf = CSRFProtect(app)

@app.route('/api/auth/login', methods=['POST'])
@csrf.exempt  # 認証前は CSRF 除外
def login():
    # ログイン処理
    pass

4. Rate Limiting(ブルートフォース攻撃対策)

from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

limiter = Limiter(
    app=app,
    key_func=get_remote_address,
    default_limits=["200 per day", "50 per hour"]
)

@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("5 per minute")  # ログイン: 1分間に5回まで
def login():
    # ログイン処理
    pass

@app.route('/api/auth/verify-mfa', methods=['POST'])
@limiter.limit("10 per minute")  # MFA検証: 1分間に10回まで
def verify_mfa():
    # MFA検証処理
    pass

フロントエンド実装

React での例

// pages/Login.jsx
import React, { useState } from 'react';
import { useNavigate } from 'react-router-dom';

export default function Login() {
  const [email, setEmail] = useState('');
  const [password, setPassword] = useState('');
  const [mfaRequired, setMfaRequired] = useState(false);
  const [mfaCode, setMfaCode] = useState('');
  const [session, setSession] = useState('');
  const [error, setError] = useState('');
  const [loading, setLoading] = useState(false);
  const navigate = useNavigate();

  const handleLogin = async (e) => {
    e.preventDefault();
    setLoading(true);
    setError('');

    try {
      const response = await fetch('/api/auth/login', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        credentials: 'include', // Cookie を含める
        body: JSON.stringify({ email, password }),
      });

      const data = await response.json();

      if (response.ok) {
        // ケース1: 認証成功
        if (data.success) {
          navigate('/dashboard');
        }
        // ケース2: MFA が必要
        else if (data.challenge === 'MFA_REQUIRED') {
          setSession(data.session);
          setMfaRequired(true);
        }
      } else {
        setError(data.error || 'Login failed');
      }
    } catch (err) {
      setError('Network error');
    } finally {
      setLoading(false);
    }
  };

  const handleMFAVerify = async (e) => {
    e.preventDefault();
    setLoading(true);
    setError('');

    try {
      const response = await fetch('/api/auth/verify-mfa', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        credentials: 'include',
        body: JSON.stringify({
          session,
          mfa_code: mfaCode,
          username: email,
        }),
      });

      const data = await response.json();

      if (response.ok && data.success) {
        navigate('/dashboard');
      } else {
        setError(data.error || 'MFA verification failed');
      }
    } catch (err) {
      setError('Network error');
    } finally {
      setLoading(false);
    }
  };

  if (mfaRequired) {
    return (
      <div className="login-container">
        <h2>MFA 認証</h2>
        <p>認証アプリまたはメールで受け取った6桁のコードを入力してください</p>
        <form onSubmit={handleMFAVerify}>
          <input
            type="text"
            placeholder="000000"
            value={mfaCode}
            onChange={(e) => setMfaCode(e.target.value)}
            maxLength="6"
            pattern="[0-9]{6}"
            required
            disabled={loading}
          />
          <button type="submit" disabled={loading}>
            {loading ? '検証中...' : '確認'}
          </button>
        </form>
        {error && <p className="error">{error}</p>}
      </div>
    );
  }

  return (
    <div className="login-container">
      <h2>ログイン</h2>
      <form onSubmit={handleLogin}>
        <input
          type="email"
          placeholder="メールアドレス"
          value={email}
          onChange={(e) => setEmail(e.target.value)}
          required
          disabled={loading}
        />
        <input
          type="password"
          placeholder="パスワード"
          value={password}
          onChange={(e) => setPassword(e.target.value)}
          required
          disabled={loading}
        />
        <button type="submit" disabled={loading}>
          {loading ? 'ログイン中...' : 'ログイン'}
        </button>
      </form>
      {error && <p className="error">{error}</p>}
    </div>
  );
}

認証済みリクエストのラッパー

// utils/api.js
export async function authenticatedFetch(url, options = {}) {
  const response = await fetch(url, {
    ...options,
    credentials: 'include', // Cookie を含める
    headers: {
      'Content-Type': 'application/json',
      ...options.headers,
    },
  });

  // トークン期限切れ時は自動リフレッシュ
  if (response.status === 401) {
    const refreshResponse = await fetch('/api/auth/refresh', {
      method: 'POST',
      credentials: 'include',
    });

    if (refreshResponse.ok) {
      // リトライ
      return fetch(url, {
        ...options,
        credentials: 'include',
      });
    } else {
      // ログインページへリダイレクト
      window.location.href = '/login';
    }
  }

  return response;
}

// ページでの使用
const response = await authenticatedFetch('/api/protected');
const data = await response.json();

バックエンド実装

全体の実装フロー

1. 依存関係のインストール
   pip install boto3 flask python-jose pyjwt cryptography flask-cors flask-limiter

2. 環境変数設定
   .env ファイルに Cognito クライアント情報を記載

3. Cognito ユーザープール設定
   - MFA 有効化
   - SES/SNS 設定

4. API エンドポイント実装
   - /api/auth/login
   - /api/auth/verify-mfa
   - /api/auth/refresh
   - /api/protected (認証が必要)

5. トークン検証
   - JWT の署名検証
   - 有効期限確認
   - リフレッシュトークン処理

DB スキーマ設計

-- ユーザーテーブル
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    cognito_sub VARCHAR(128) UNIQUE NOT NULL,
    -- メール: Cognito の email 属性を使用(連携)
    phone_number_encrypted VARCHAR(256),  -- 暗号化済み
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    is_active BOOLEAN DEFAULT TRUE,
    tenant_id INTEGER NOT NULL REFERENCES tenants(id)
);

-- Cognito Sub インデックス(高速検索)
CREATE INDEX idx_users_cognito_sub ON users(cognito_sub);
CREATE INDEX idx_users_tenant_id ON users(tenant_id);

-- テナントテーブル
CREATE TABLE tenants (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- ロール・パーミッションテーブル
CREATE TABLE user_roles (
    id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    role_name VARCHAR(50) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

インシデント対応

よくあるトラブルと対応

1. ユーザーが MFA コードを入力できない

原因:
  a) SMS/Email が届かない
  b) コード有効期限が切れた(デフォルト24時間)
  c) TOTP の時刻がズレている

対応:
  a) 再送信機能を提供
     - POST /api/auth/resend-mfa-code
  
  b) コード有効期限を延長(ただしセキュリティリスク)
     - Cognito 設定で変更可能
  
  c) TOTP の場合、バックアップコード提供
     - バックアップコードは一度限り、1回限りの使用

実装例:

@app.route('/api/auth/resend-mfa-code', methods=['POST'])
def resend_mfa_code():
    """MFA コードの再送信"""
    email = request.json.get('email')
    
    # Cognito から admin 権限で再送信
    try:
        response = client.admin_initiate_auth(
            UserPoolId=COGNITO_USER_POOL_ID,
            ClientId=COGNITO_CLIENT_ID,
            AuthFlow='ADMIN_NO_SRP_AUTH',
            AuthParameters={
                'USERNAME': email,
                'PASSWORD': '***',  # 既に検証済みのため不要(実装により異なる)
            }
        )
        return jsonify({'success': True})
    except Exception as e:
        return jsonify({'error': 'Failed to resend code'}), 500

2. ユーザーがバックアップコードを紛失

原因: TOTP 設定時にバックアップコードを保管していない

対応:
  - ヘルプデスク: admin_delete_user_attributes で TOTP を削除
  - ユーザーに再セットアップをさせる
  - または管理画面からセキュリティチャレンジ質問で確認後、復旧

実装例:
  client.admin_delete_user_attributes(
      UserPoolId=COGNITO_USER_POOL_ID,
      Username=email,
      UserAttributeNames=['custom:totp_secret']
  )

3. IP アドレス変更後に MFA が求められない

原因: Cognito のデフォルトは IP 変更を検知しない

対応:
  - 高度なセキュリティ設定で "リスク検知" を有効化
  - IP が変わった場合、Cognito が自動的に MFA を要求
  
設定:
  Cognito コンソール → User Pool
  → Security → Risk configuration
  → Sign-in risk → Require MFA if risk is detected

4. メール配信が多数の迷惑メール扱い

原因: SES の認証設定が不完全

対応:
  1. SPF レコード設定確認
     v=spf1 include:amazonses.com ~all
  
  2. DKIM 署名有効化
     Cognito → Security → Email → SES から確認
  
  3. DMARC ポリシー設定
     _dmarc.yourdomain.com の TXT レコード設定
  
  4. From アドレスを verified domain にする
     SES コンソール → Verified Identities で確認

5. セッションタイムアウト後のトークン更新

ユーザー体験:
  1. ユーザーが長時間操作しない
  2. AccessToken が 1 時間で期限切れ
  3. API リクエスト時に 401 返却
  4. フロントエンド自動リフレッシュ(ユーザーに無知)
  5. RefreshToken で新しい AccessToken 取得
  6. 操作続行

注意: RefreshToken も 30 日で期限切れ
      → 再度ログインが必要

テナント対応アーキテクチャ

シングルテナント vs マルチテナント

シングルテナント(1つの Cognito ユーザープール = 1つの顧客)

企業A
├─ Cognito User Pool A
│  └─ users: user1@a.com, user2@a.com
└─ DB Tenant A
   └─ users: (cognito_sub と同じ)

企業B
├─ Cognito User Pool B
│  └─ users: user1@b.com, user2@b.com
└─ DB Tenant B
   └─ users: (cognito_sub と同じ)

メリット:
  - 完全に独立(セキュリティ最強)
  - 個別スケーリング可能
  
デメリット:
  - 管理コストが増加(ユーザープール数が増える)
  - コスト増加

マルチテナント(1つの Cognito ユーザープール = 複数の顧客)推奨

┌──────────────────────────────────┐
│   Cognito User Pool (共有)        │
│  users:                          │
│  - user1@a.com (tenant_id=A)     │
│  - user2@b.com (tenant_id=B)     │
└──────┬───────────────────────────┘
       │
   ┌───▼────────────────────────────┐
   │   Application DB                │
   │                                 │
   │ tenants table:                  │
   │  - id=A, name="企業A"           │
   │  - id=B, name="企業B"           │
   │                                 │
   │ users table:                    │
   │  - id=1, cognito_sub=..., tenant_id=A
   │  - id=2, cognito_sub=..., tenant_id=B
   │                                 │
   │ resources table:                │
   │  - id=1, tenant_id=A, data=... │
   │  - id=2, tenant_id=B, data=... │
   └─────────────────────────────────┘

マルチテナント実装(推奨)

Cognito 側

1. カスタム属性を追加(Cognito コンソール)
   Cognito → User attributes
   → Add custom attribute: tenant_id (Number)

2. ユーザー登録時に tenant_id を設定
   - フロントエンド: 登録フォームで tenant_id を入力
   - バックエンド: AdminCreateUser 時に tenant_id を設定

3. ID Token に tenant_id を含める
   - Cognito → App clients → Token Customization
   - Read attributes: tenant_id
   - Include → ID token

DB 側

class User(db.Model):
    __tablename__ = 'users'
    
    id = db.Column(db.Integer, primary_key=True)
    cognito_sub = db.Column(db.String(128), unique=True, not_null=True)
    tenant_id = db.Column(db.Integer, db.ForeignKey('tenants.id'), not_null=True)
    email = db.Column(db.String(255), not_null=True)
    
    # インデックス: tenant_id と cognito_sub の複合インデックス
    __table_args__ = (
        db.Index('idx_user_tenant_cognito', 'tenant_id', 'cognito_sub'),
    )

class Tenant(db.Model):
    __tablename__ = 'tenants'
    
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(255), not_null=True)
    users = db.relationship('User', backref='tenant')

バックエンド(テナント分離)

from functools import wraps
from flask import request, jsonify

def get_tenant_from_token():
    """トークンから tenant_id を抽出"""
    id_token = request.cookies.get('id_token')
    if not id_token:
        return None
    
    try:
        payload = verify_jwt_token(id_token)
        return int(payload.get('custom:tenant_id'))
    except:
        return None

def tenant_required(f):
    """テナント検証デコレータ"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        tenant_id = get_tenant_from_token()
        if not tenant_id:
            return jsonify({'error': 'Tenant not found'}), 401
        
        # テナント ID をコンテキストに保存(後続処理で使用)
        request.tenant_id = tenant_id
        return f(*args, **kwargs)
    
    return decorated_function

@app.route('/api/resources', methods=['GET'])
@tenant_required
def get_resources():
    """テナント分離されたリソース取得"""
    tenant_id = request.tenant_id
    
    # 現在のテナントのリソースのみ取得
    resources = Resource.query.filter_by(tenant_id=tenant_id).all()
    
    return jsonify([r.to_dict() for r in resources])

@app.route('/api/resources', methods=['POST'])
@tenant_required
def create_resource():
    """テナント分離されたリソース作成"""
    tenant_id = request.tenant_id
    
    resource = Resource(
        tenant_id=tenant_id,
        data=request.json.get('data')
    )
    db.session.add(resource)
    db.session.commit()
    
    return jsonify(resource.to_dict()), 201

テナント間のデータ漏洩防止

class BaseQuery(db.Query):
    """自動的にテナント フィルタを適用"""
    
    def filter_by_tenant(self, tenant_id):
        """明示的なテナントフィルタ"""
        return self.filter_by(tenant_id=tenant_id)

db.session.query = BaseQuery

# SQLAlchemy イベントで自動フィルタ
@event.listens_for(Session, 'before_flush')
def receive_before_flush(session, flush_context, instances):
    """データ保存時の tenant_id チェック"""
    for instance in session.new:
        if hasattr(instance, 'tenant_id') and not instance.tenant_id:
            raise ValueError(f"{instance.__class__.__name__} requires tenant_id")

テナント間のデータアクセス制御

# ❌ ダメな例: tenant_id チェックなし
@app.route('/api/resources/<resource_id>', methods=['GET'])
def get_resource(resource_id):
    resource = Resource.query.get(resource_id)
    return jsonify(resource.to_dict())
    # 他のテナントのリソースにアクセスされる可能性

# ✅ 正しい例: tenant_id チェックあり
@app.route('/api/resources/<resource_id>', methods=['GET'])
@tenant_required
def get_resource(resource_id):
    resource = Resource.query.filter_by(
        id=resource_id,
        tenant_id=request.tenant_id  # テナント検証
    ).first()
    
    if not resource:
        return jsonify({'error': 'Resource not found'}), 404
    
    return jsonify(resource.to_dict())

実装チェックリスト

事前準備

☐ AWS アカウント及び IAM ユーザー作成
☐ Cognito ユーザープール作成
☐ App Client 設定(User Password Authentication 有効)
☐ SES/SNS セットアップ(MFA 配信用)
☐ RDS/DynamoDB テーブル設計・作成
☐ KMS キー作成(暗号化用)
☐ API Gateway セットアップ(HTTPS有効化)

Cognito 側の設定

☐ MFA 有効化(SMS / Email / TOTP 選択)
☐ SMS 設定:
   ☐ IAM ロール設定(SNS へのアクセス権)
   ☐ テスト SMS 送信
☐ Email 設定:
   ☐ SES ドメイン認証(SPF/DKIM/DMARC)
   ☐ SES Verified Identity 設定
   ☐ Bounce/Complaint 監視設定
☐ パスワードポリシー設定
☐ セッションタイムアウト設定(デフォルト: AccessToken 1時間)
☐ MFA コード有効期限設定(デフォルト: 24時間)
☐ ユーザーレジストレーション設定(Self-registration 許可か管理者作成か)

バックエンド実装

☐ boto3 を使った Cognito API 連携
☐ initiate_auth(ログイン認証)
☐ respond_to_auth_challenge(MFA検証)
☐ admin_initiate_auth(管理者による認証)
☐ JWT トークン検証(署名確認)
☐ トークンリフレッシュ処理
☐ ユーザー属性管理(update_user_attributes)
☐ 暗号化・復号化(個人情報保護)
☐ Rate Limiting(ブルートフォース対策)
☐ ロギング・監査ログ
☐ エラーハンドリング(Cognito例外処理)

フロントエンド実装

☐ ログインフォーム
☐ MFA 入力画面(動的表示)
☐ パスワード変更画面
☐ プロフィール管理画面
☐ エラーメッセージ表示
☐ ローディング状態管理
☐ トークン自動リフレッシュ
☐ アウトログ機能
☐ セッションタイムアウト警告
☐ メール確認フロー(新規登録時)

セキュリティ

☐ HTTPS 強制(すべてのエンドポイント)
☐ CORS 設定(許可ドメイン限定)
☐ CSRF トークン実装
☐ HttpOnly Cookie(トークン保管)
☐ Secure フラグ(HTTPS のみ送信)
☐ SameSite フラグ(CSRF 対策)
☐ CSP(Content Security Policy)ヘッダ設定
☐ X-Frame-Options(クリックジャッキング対策)
☐ WAF(Web Application Firewall)設定
☐ VPC エンドポイント設定(AWS 内部通信)
☐ KMS による暗号化(個人情報)
☐ CloudTrail ログ有効化(監査)

テナント対応

☐ Cognito カスタム属性(tenant_id)追加
☐ DB スキーマに tenant_id カラム追加
☐ テナント分離ロジック実装
☐ すべてのクエリに tenant_id フィルタ追加
☐ テナント間のデータリーク防止テスト
☐ テナント削除時のデータ削除ロジック
☐ テナント間のリソースアクセステスト

テスト

☐ ユニットテスト(個別機能)
☐ 統合テスト(Cognito + DB)
☐ セキュリティテスト(OWASP Top 10)
☐ MFA フロー テスト(SMS / Email / TOTP)
☐ トークン期限切れテスト
☐ リフレッシュトークン テスト
☐ Rate Limiting テスト
☐ テナント分離テスト
☐ エラーハンドリング テスト
☐ パフォーマンステスト(負荷テスト)

運用

☐ ログ監視体制構築(CloudWatch)
☐ アラート設定(異常ログイン、MFA失敗率等)
☐ 定期的なセキュリティパッチ適用
☐ バックアップ戦略(ユーザープール、DB)
☐ 災害復旧計画(Cognito User Pool バックアップ)
☐ ドキュメント整備(操作マニュアル)
☐ ヘルプデスク体制構築
☐ インシデント対応ガイド作成
☐ コスト監視(Cognito, SES, Lambda)
☐ 定期監査(3ヶ月ごと)

よくある質問 FAQ

Q1: S3 でログイン画面をホストする?

A: いいえ。S3 は静的ファイルのみ。認証ロジックはバックエンド(Lambda/EC2)で実装。

正しい構成:
S3(HTML/CSS/JS) → CloudFront
                  ↓
            フロントエンド
                  ↓
            API Gateway + Lambda
                  ↓
            Cognito(認証)

Q2: 電話番号は Cognito に保管?それとも DB?

A: ハイブリッドアプローチ推奨:

  • Cognito: Email, Sub(必須)
  • DB: 電話番号(暗号化)、テナント ID、ビジネス情報

Q3: セキュリティ問題はないか?

A: Cognito は個人識別情報を暗号化で保護。さらに DB でも暗号化推奨。

多層防御:
Cognito 暗号化 + DB 暗号化 + KMS 鍵管理 + IAM 権限制限

Q4: テナント対応の場合の実装は?

A: Cognito カスタム属性 + DB テナントテーブル

Cognito の ID Token
  ↓
{
  "sub": "xxxxxxxx",
  "email": "user@example.com",
  "custom:tenant_id": "123"
}
  ↓
DB クエリ時に tenant_id でフィルタ

Q5: UI/UX での考慮点は?

A:

  • ログイン画面から MFA 画面への遷移を自然に
  • エラーメッセージは具体的に(「ログイン失敗」ではなく「メールアドレスが間違っています」)
  • MFA コード自動入力(スマートフォン)対応
  • パスワード忘れ時のリセットフロー設計

Q6: インシデント対応は?

A: CloudWatch ログ監視 + アラート設定

監視項目:
- ログイン失敗回数(IP別)
- MFA コード失敗回数
- 異常なリフレッシュトークン使用
- 権限昇格の試み

参考リソース

AWS ドキュメント

セキュリティベストプラクティス

その他


最終更新: 2026年6月
対象バージョン: AWS Cognito (2026年版)

コメント