AWS Cognito MFA 実装 – よくある質問(FAQ)

AWS

Q1: ログイン画面は S3 でホストする?それとも?

❌ 誤解

S3 でログイン HTML をホスト
  ↓
HTML 内で Cognito API を直接呼び出し
  ↓
レスポンスをフロントエンドで処理

問題点:

  • CORS エラーが頻発
  • Cognito Client Secret が露出する危険
  • トークンをフロントエンドで直接管理するとセキュリティリスク

✅ 正しい構成

【フロントエンド】
S3 + CloudFront(静的ファイル配信)
  ├─ HTML(ログイン画面)
  ├─ CSS(スタイル)
  └─ JavaScript(ロジック)
         ↓
      API Gateway へ HTTPS リクエスト

【バックエンド】
API Gateway + Lambda/EC2
  ├─ Cognito SDK を使用(boto3 等)
  ├─ 認証ロジック実装
  └─ トークン取得・検証
         ↓
      HttpOnly Cookie でトークン返却
         ↓
フロントエンド自動的に Cookie を送付

具体的な実装フロー

// フロントエンド(S3 ホスト)
const loginForm = document.getElementById('login-form');
loginForm.addEventListener('submit', async (e) => {
  e.preventDefault();
  
  const email = document.getElementById('email').value;
  const password = document.getElementById('password').value;
  
  // ① バックエンド API を呼び出し
  const response = await fetch('https://api.yourcompany.com/auth/login', {
    method: 'POST',
    credentials: 'include',  // Cookie を送付
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ email, password })
  });
  
  // ② バックエンドが HttpOnly Cookie にトークンを格納している
  // ③ ブラウザが自動的にクッキーを保管
  
  if (response.ok) {
    window.location.href = '/dashboard';  // リダイレクト
  }
});
# バックエンド(API Gateway + Lambda)
@app.route('/api/auth/login', methods=['POST'])
def login():
    email = request.json.get('email')
    password = request.json.get('password')
    
    # Cognito を呼び出し
    client = boto3.client('cognito-idp')
    response = client.initiate_auth(
        ClientId='your-client-id',
        AuthFlow='USER_PASSWORD_AUTH',
        AuthParameters={
            'USERNAME': email,
            'PASSWORD': password
        }
    )
    
    # トークンを取得
    id_token = response['AuthenticationResult']['IdToken']
    
    # HttpOnly Cookie に格納して返却
    resp = make_response(jsonify({'success': True}))
    resp.set_cookie(
        'id_token',
        id_token,
        httponly=True,     # JavaScript でアクセス不可
        secure=True,       # HTTPS のみ送付
        samesite='Strict', # CSRF 対策
        max_age=3600       # 1時間
    )
    
    return resp

Q2: 電話番号・メールアドレスはどこに保管?セキュリティ問題は?

保管場所の選択肢

オプション1: Cognito のみに保管

✓ メリット:
  - AWS の暗号化で保護
  - ユーザー属性管理が簡単

✗ デメリット:
  - AWS IAM 権限が強すぎると露出リスク
  - Cognito ユーザープール全体が漏洩すると個人情報も漏洩

オプション2: DB のみに保管(推奨)

✓ メリット:
  - 暗号化・復号化のキーを自社で管理
  - Cognito で管理する情報を最小化
  - GDPR「忘れられる権利」対応が容易

✗ デメリット:
  - 実装量が増加

オプション3: ハイブリッド(最も推奨)

┌─────────────────────────────────┐
│      Cognito ユーザープール      │
├─────────────────────────────────┤
│ ✓ Email(認証用、必須)         │
│ ✓ Sub(ユーザー一意ID)         │
│ ✓ Phone_number(MFA用)        │
│ × パスワード(Cognito が暗号化)│
└─────────────────────────────────┘
           ↓ Cognito Sub でリンク
         
┌─────────────────────────────────┐
│      アプリケーション DB        │
├─────────────────────────────────┤
│ ✓ Cognito Sub                   │
│ ✓ Phone Number(暗号化)        │
│ ✓ Email(暗号化)               │
│ ✓ Tenant ID                    │
│ ✓ ビジネス情報                  │
│ ✓ ロール・パーミッション        │
└─────────────────────────────────┘

ハイブリッド実装例(推奨)

from cryptography.fernet import Fernet
import os

# 環境変数から暗号化キーを読み込む(AWS Secrets Manager推奨)
ENCRYPTION_KEY = os.environ.get('ENCRYPTION_KEY').encode()
cipher = Fernet(ENCRYPTION_KEY)

class User(db.Model):
    __tablename__ = 'users'
    
    id = db.Column(db.Integer, primary_key=True)
    
    # Cognito の Sub を保管(ユーザー特定用)
    cognito_sub = db.Column(db.String(128), unique=True, not_null=True, index=True)
    
    # Email は Cognito の verified_email を使用
    # DB では暗号化して保管(オプション)
    _email_encrypted = db.Column(db.String(256))
    
    # 電話番号は DB で暗号化して保管
    _phone_number_encrypted = db.Column(db.String(256))
    
    tenant_id = db.Column(db.Integer, db.ForeignKey('tenants.id'), not_null=True)
    
    @property
    def phone_number(self):
        """復号化して返却"""
        if self._phone_number_encrypted:
            return cipher.decrypt(self._phone_number_encrypted.encode()).decode()
        return None
    
    @phone_number.setter
    def phone_number(self, value):
        """暗号化して保管"""
        if value:
            self._phone_number_encrypted = cipher.encrypt(value.encode()).decode()
    
    @property
    def email(self):
        """Cognito の email 属性を優先使用"""
        # API からは Cognito の email を使用
        # DB 側での保管は不要(ただし必要に応じて暗号化)
        return None  # Cognito 側から取得
    
    def to_dict(self):
        """API レスポンス用(個人情報を含めない)"""
        return {
            'id': self.id,
            'cognito_sub': self.cognito_sub,
            'tenant_id': self.tenant_id,
            # phone_number は含めない(外部API は不要)
        }

# 使用例
user = User.query.get(user_id)
user.phone_number = '+819012345678'  # 自動的に暗号化
db.session.add(user)
db.session.commit()

# 取得時は自動的に復号化
print(user.phone_number)  # '+819012345678' と表示されるが、DB には暗号化された状態で保管

セキュリティ設定チェックリスト

☐ 暗号化キー(ENCRYPTION_KEY)は AWS Secrets Manager に保管
☐ 復号化は必要な時だけ(ただし表示なし)
☐ API レスポンスに phone_number を含めない
☐ ログに個人情報を含めない
☐ GDPR 対応: テナント削除時は個人情報も削除
☐ IAM ロール: 必要最小限の権限のみ
☐ VPC エンドポイント: AWS リソースへの通信を VPC 内で閉じる

Q3: Cognito Sub と DB ユーザー ID の関係

設計パターン

パターン1: Sub を主キーにする(シンプル)

CREATE TABLE users (
    cognito_sub VARCHAR(128) PRIMARY KEY,
    email VARCHAR(255) NOT NULL,
    tenant_id INTEGER NOT NULL,
    created_at TIMESTAMP
);

メリット: 参照フロー単純 デメリット: Sub は UUID 形式で長い、他テーブルの FK も長くなる

パターン2: 整数 ID を主キー、Sub を別カラムに(推奨)

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    cognito_sub VARCHAR(128) UNIQUE NOT NULL,
    email VARCHAR(255) NOT NULL,
    tenant_id INTEGER NOT NULL,
    created_at TIMESTAMP
);

CREATE INDEX idx_users_cognito_sub ON users(cognito_sub);
CREATE INDEX idx_users_tenant ON users(tenant_id);

メリット: 参照が高速、他テーブルの FK が小さい デメリット: Sub で検索する必要あり

パターン3: 両方保管(最も安全)

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    cognito_sub VARCHAR(128) UNIQUE NOT NULL,
    email VARCHAR(255) NOT NULL,
    tenant_id INTEGER NOT NULL,
    created_at TIMESTAMP,
    updated_at TIMESTAMP
);

-- 関連テーブル
CREATE TABLE user_sessions (
    id BIGSERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
    cognito_sub VARCHAR(128) NOT NULL,  -- 検証用
    token_hash VARCHAR(256) NOT NULL,
    created_at TIMESTAMP
);

ログイン時の処理フロー

# ステップ1: Cognito 認証
response = client.initiate_auth(
    ClientId=CLIENT_ID,
    AuthFlow='USER_PASSWORD_AUTH',
    AuthParameters={
        'USERNAME': email,
        'PASSWORD': password
    }
)

# ステップ2: ID Token をデコード
id_token = response['AuthenticationResult']['IdToken']
payload = jwt.decode(id_token, options={"verify_signature": False})
user_sub = payload['sub']

# ステップ3: DB でユーザーを検索
user = User.query.filter_by(cognito_sub=user_sub).first()

# ステップ4: ユーザーが存在しなければ作成
if not user:
    user = User(
        cognito_sub=user_sub,
        email=payload['email'],
        tenant_id=payload.get('custom:tenant_id')
    )
    db.session.add(user)
    db.session.commit()

# ステップ5: Cookie に格納
resp = jsonify({'user_id': user.id, 'success': True})
resp.set_cookie('id_token', id_token, httponly=True, secure=True)

return resp

Q4: テナント対応の実装方法

全体フロー

1. Cognito ユーザープール登録時
   ↓
   Email + Password + Custom: tenant_id を保管
   
2. ログイン時
   ↓
   Cognito 認証 → ID Token に custom:tenant_id を含める
   
3. API リクエスト時
   ↓
   ID Token から tenant_id を抽出
   ↓
   すべてのクエリに WHERE tenant_id = ? を追加

ステップ1: Cognito にカスタム属性を追加

# AWS CLI で追加
aws cognito-idp update-user-pool \
    --user-pool-id ap-northeast-1_xxxxxxxxx \
    --schema Name=tenant_id,AttributeDataType=Number,Mutable=true \
    --region ap-northeast-1

ステップ2: ユーザー登録時に tenant_id を設定

@app.route('/api/auth/register', methods=['POST'])
def register():
    email = request.json.get('email')
    password = request.json.get('password')
    tenant_id = request.json.get('tenant_id')  # フロントエンドから受け取る
    
    client = boto3.client('cognito-idp')
    
    # ユーザー作成
    client.admin_create_user(
        UserPoolId=COGNITO_USER_POOL_ID,
        Username=email,
        TemporaryPassword=password,
        MessageAction='SUPPRESS'  # メール送信しない
    )
    
    # tenant_id をカスタム属性として設定
    client.admin_update_user_attributes(
        UserPoolId=COGNITO_USER_POOL_ID,
        Username=email,
        UserAttributes=[
            {
                'Name': 'custom:tenant_id',
                'Value': str(tenant_id)
            }
        ]
    )
    
    return jsonify({'success': True})

ステップ3: ログイン時に tenant_id を ID Token に含める

Cognito コンソール
  → User Pool → App clients → Token Customization
  → Read attributes: custom:tenant_id を追加
  → Include in ID Token チェック

ステップ4: バックエンド で tenant_id を検証

from functools import wraps

def get_tenant_from_token():
    """ID Token から tenant_id を抽出"""
    id_token = request.cookies.get('id_token')
    if not id_token:
        return None
    
    try:
        payload = jwt.decode(id_token, options={"verify_signature": False})
        tenant_id = payload.get('custom:tenant_id')
        return int(tenant_id) if tenant_id else None
    except:
        return None

def tenant_required(f):
    """デコレータ: 全エンドポイントで tenant_id を検証"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        tenant_id = get_tenant_from_token()
        
        if not tenant_id:
            return jsonify({'error': 'Tenant not found'}), 401
        
        # tenant_id をリクエストコンテキストに保存
        request.tenant_id = tenant_id
        
        return f(*args, **kwargs)
    
    return decorated_function

# 使用例
@app.route('/api/resources', methods=['GET'])
@tenant_required
def list_resources():
    """テナント内のリソース一覧"""
    tenant_id = request.tenant_id
    
    # ❌ 危険: tenant_id チェックなし
    # resources = Resource.query.all()
    
    # ✅ 正しい: tenant_id で必ずフィルタ
    resources = Resource.query.filter_by(tenant_id=tenant_id).all()
    
    return jsonify([r.to_dict() for r in resources])

@app.route('/api/resources/<resource_id>', methods=['GET'])
@tenant_required
def get_resource(resource_id):
    """リソース取得(テナント検証必須)"""
    tenant_id = request.tenant_id
    
    # 複合条件で検索: リソースIDとテナントID両方
    resource = Resource.query.filter_by(
        id=resource_id,
        tenant_id=tenant_id
    ).first()
    
    if not resource:
        return jsonify({'error': 'Resource not found'}), 404
    
    return jsonify(resource.to_dict())

ステップ5: テナント間のデータリーク防止

# ❌ ダメな例: 他テナントのデータにアクセスされる
@app.route('/api/resources/<resource_id>')
def get_resource(resource_id):
    resource = Resource.query.get(resource_id)  # tenant_id チェックなし
    return jsonify(resource.to_dict())  # 他テナントでも取得可能

# ✅ 正しい例: tenant_id チェックあり
@app.route('/api/resources/<resource_id>')
@tenant_required
def get_resource(resource_id):
    resource = Resource.query.filter_by(
        id=resource_id,
        tenant_id=request.tenant_id  # 必須チェック
    ).first()
    
    if not resource:
        return jsonify({'error': 'Not found'}), 404
    
    return jsonify(resource.to_dict())

# テスト: Company A ユーザーが Company B のリソースにアクセス試行
def test_tenant_isolation():
    # Company A のトークンでログイン
    token_a = login('user@companyA.com', 'password')
    
    # Company B のリソース ID を指定
    resource_id = 202  # Company B のリソース
    
    # アクセス試行
    response = client.get(
        f'/api/resources/{resource_id}',
        headers={'Cookie': f'id_token={token_a}'}
    )
    
    # ✓ 403 Forbidden または 404 Not Found を返すべき
    assert response.status_code in [403, 404]
    assert response.json.get('error') is not None

Q5: UI/UX での実装ポイント

ログイン画面

// ライフサイクル: ログイン → (MFA必須?) → ダッシュボード

export default function LoginPage() {
  const [step, setStep] = useState('login');  // login | mfa | success
  const [email, setEmail] = useState('');
  const [password, setPassword] = useState('');
  const [mfaCode, setMfaCode] = useState('');
  const [error, setError] = useState('');
  const [session, setSession] = useState('');
  
  const handleLogin = async (e) => {
    e.preventDefault();
    setError('');
    
    try {
      const response = await fetch('/api/auth/login', {
        method: 'POST',
        credentials: 'include',
        body: JSON.stringify({ email, password })
      });
      
      const data = await response.json();
      
      if (response.ok) {
        if (data.success) {
          // MFA 不要: ダッシュボード表示
          setStep('success');
          setTimeout(() => {
            window.location.href = '/dashboard';
          }, 1500);
        } else if (data.challenge === 'MFA_REQUIRED') {
          // MFA 必須: 入力画面へ遷移
          setSession(data.session);
          setStep('mfa');
        }
      } else {
        // エラー: 具体的なメッセージを表示
        setError(data.error || 'ログインに失敗しました');
      }
    } catch (err) {
      setError('ネットワークエラーが発生しました');
    }
  };
  
  const handleMFAVerify = async (e) => {
    e.preventDefault();
    setError('');
    
    if (mfaCode.length !== 6) {
      setError('6桁のコードを入力してください');
      return;
    }
    
    try {
      const response = await fetch('/api/auth/verify-mfa', {
        method: 'POST',
        credentials: 'include',
        body: JSON.stringify({
          session,
          mfa_code: mfaCode,
          username: email
        })
      });
      
      const data = await response.json();
      
      if (response.ok && data.success) {
        setStep('success');
        setTimeout(() => {
          window.location.href = '/dashboard';
        }, 1500);
      } else {
        setError(data.error || 'MFA 認証に失敗しました');
        setMfaCode('');  // コードをクリア
      }
    } catch (err) {
      setError('ネットワークエラーが発生しました');
    }
  };
  
  if (step === 'success') {
    return (
      <div className="success-message">
        <i className="icon-checkmark"></i>
        <h2>ログインしました</h2>
        <p>ダッシュボードへ移動中...</p>
      </div>
    );
  }
  
  if (step === 'mfa') {
    return (
      <div className="mfa-form">
        <h2>認証コードを入力</h2>
        <p>認証アプリまたはメールで受け取った6桁のコードを入力してください</p>
        
        <form onSubmit={handleMFAVerify}>
          <input
            type="text"
            placeholder="000000"
            value={mfaCode}
            onChange={(e) => setMfaCode(e.target.value.replace(/\D/g, '').slice(0, 6))}
            maxLength="6"
            pattern="[0-9]{6}"
            required
            autoComplete="one-time-code"
            inputMode="numeric"
          />
          <button type="submit">確認</button>
        </form>
        
        {error && <div className="error-message">{error}</div>}
        
        <button type="button" onClick={() => setStep('login')} className="back-button">
          ← 戻る
        </button>
      </div>
    );
  }
  
  return (
    <div className="login-form">
      <h2>ログイン</h2>
      
      <form onSubmit={handleLogin}>
        <input
          type="email"
          placeholder="メールアドレス"
          value={email}
          onChange={(e) => setEmail(e.target.value)}
          required
        />
        
        <input
          type="password"
          placeholder="パスワード"
          value={password}
          onChange={(e) => setPassword(e.target.value)}
          required
        />
        
        <button type="submit">ログイン</button>
      </form>
      
      {error && <div className="error-message">{error}</div>}
      
      <a href="/forgot-password">パスワードを忘れた</a>
    </div>
  );
}

Q6: インシデント対応

よくあるトラブル

1. MFA コードが届かない

症状: ユーザーが「メールを受け取っていない」と報告

対応ステップ:
1. メールアドレスが正しいか確認
2. 迷惑メールフォルダを確認するよう誘導
3. 再送信機能を提供
4. 問題が継続する場合は SES 送信ログを確認

実装:
@app.route('/api/auth/resend-mfa-code', methods=['POST'])
def resend_mfa_code():
    email = request.json.get('email')
    
    # ユーザーが存在するか確認
    user = User.query.filter_by(email=email).first()
    if not user:
        return jsonify({'error': 'User not found'}), 401
    
    # Rate limiting: 1分間に1回まで
    # (実装省略)
    
    # SES でメール送信
    # (実装省略)
    
    return jsonify({'success': True, 'message': 'コードを送信しました'})

2. トークン期限切れ

症状: API が 401 Unauthorized を返す

原因: ID Token が 1 時間で期限切れ

対応:
フロントエンド側で自動リフレッシュ

// fetch wrapper
async function authenticatedFetch(url, options = {}) {
  let response = await fetch(url, {
    ...options,
    credentials: 'include'
  });
  
  if (response.status === 401) {
    // トークンをリフレッシュ
    const refreshResponse = await fetch('/api/auth/refresh', {
      method: 'POST',
      credentials: 'include'
    });
    
    if (refreshResponse.ok) {
      // リトライ
      response = await fetch(url, {
        ...options,
        credentials: 'include'
      });
    } else {
      // ログインページへリダイレクト
      window.location.href = '/login';
    }
  }
  
  return response;
}

3. メールが迷惑メールフォルダに入る

原因: SPF/DKIM/DMARC 設定が不完全

対応:
1. SES から Send Test Email
2. SPF レコード確認
   v=spf1 include:amazonses.com ~all
3. DKIM 署名有効化
4. DMARC ポリシー設定
   _dmarc.yourdomain.com TXT "v=DMARC1;p=none;rua=mailto:..."

テスト:
$ dig _acme-challenge.yourdomain.com TXT
$ nslookup yourdomain.com

Q7: Cognito 標準機能だけで実装できるか?

Cognito の標準機能でカバーされる部分

✅ ユーザー管理
  - ユーザー登録
  - ログイン
  - MFA(SMS/Email/TOTP)
  - パスワード管理
  - トークン発行

✅ セキュリティ
  - パスワード暗号化
  - JWT署名
  - トークン有効期限

❌ ビジネスロジック(自分たちで実装)
  - テナント管理
  - ロール・パーミッション
  - 監査ログ
  - アクセス制御

テナント対応は Cognito 標準ではできない

❌ Cognito は複数のテナントの概念を持たない
   → アプリケーション側で実装

✅ ただし Cognito + アプリケーション DB で実現可能

まとめ

項目場所実装
ログイン画面フロントエンド(S3)HTML/CSS/JavaScript
認証ロジックバックエンド(Lambda)Cognito SDK(boto3)
トークン管理HttpOnly Cookieバックエンド側で設定
電話番号保管DB暗号化(推奨)
Email 保管Cognito + DBCognito 主、DB は暗号化
テナント管理DBCustom attribute + DB フィルタ
セキュリティ多層防御HTTPS + CORS + Rate Limit + 暗号化

最終的な推奨構成:

┌─ フロントエンド(S3 + CloudFront)
│  ├─ ログイン画面(HTML)
│  ├─ MFA 入力画面(JavaScript で動的表示)
│  └─ ダッシュボード
│
├─ API Gateway + Lambda(バックエンド)
│  ├─ Cognito SDK 統合
│  ├─ JWT 検証
│  ├─ 暗号化・復号化
│  ├─ テナント検証
│  └─ エラーハンドリング
│
├─ Cognito(認証)
│  ├─ ユーザープール
│  ├─ MFA 配信(SES/SNS)
│  └─ トークン発行
│
├─ RDS/DynamoDB(アプリケーション DB)
│  ├─ Cognito Sub ← FK
│  ├─ Tenant ID
│  ├─ 電話番号(暗号化)
│  └─ ビジネスデータ
│
└─ KMS(暗号化鍵管理)
   └─ Fernet キーで個人情報を暗号化

この構成で、セキュア、スケーラブル、マルチテナント対応の認証システムが実現できます。

コメント