Cloud/GCP

Model Armor 실시간 알림 설정 방안

달빛궁전- 2026. 4. 1. 16:09

 

개요

Google Cloud의 Model Armor는 LLM(대규모 언어 모델) 서비스 이용 시 발생할 수 있는 보안 위협을 실시간으로 탐지하고 차단하는 핵심 보안 솔루션입니다.

프롬프트 인젝션, 탈옥(Jailbreak) 시도, 개인정보(PII) 유출 및 부적절한 콘텐츠 생성 등의 위협은 기업의 데이터 자산과 서비스 신뢰도에 직접적인 타격을 줄 수 있습니다.

본 가이드는 Model Armor를 통해 생성되는 SanitizeOperation 로그를 실시간으로 탐지하고 , 복잡한 JSON 형식의 감사 로그를 파싱하여 운영자가 즉시 위협 상황을 인지할 수 있도록 알림 체계를 구축하는 방안을 제시합니다.

 

구성도

Log Explorer 이동 Log 검색

resource.type="modelarmor.googleapis.com/SanitizeOperation"

Model Armor 로 차단, 허용된 내용이 출력됩니다. 추가적으로 차단된 내용만 보고싶다면 아래 필터를 추가합니다.

resource.type="modelarmor.googleapis.com/SanitizeOperation"
jsonPayload.sanitizationResult.sanitizationVerdict="MODEL_ARMOR_SANITIZATION_VERDICT_BLOCK"

  • 로그싱크 생성 싱크 이름과 설명작성

  • Pub/Sub 을 생성 합니다.topic만 옵션없이 생성해도 됩니다.

 

  • 로그 라우팅 싱크, Pub/Sub Topic을 제대로 만들었는지 체크합니다.

 

  • Cloud Run으로 이동합니다.

  • 서비스 이름, 리전 선택후 런타임은 Python 으로 선택합니다.
    트리거 설정은 아래에 있으며, Log 발생시에만 필요하기 때문에 “요청 기반"과 수동 확장 (1대)로 설정했습니다.
    이는 서비스 사용량에 따르게 설정하면 됩니다.

  • 트리거 설정은 처음 만들 Pub/Sub을 연결하여 줍니다.

  • requirements.txt
functions-framework==3.*
requests==2.31.0

 

import base64
import json

def model_armor_notification_handler(request):
    try:
        request_json = request.get_json(silent=True)
        if not request_json: return "No Data", 200

        # Pub/Sub 데이터 추출
        raw_data = request_json.get('message', {}).get('data') or request_json.get('data')
        if not raw_data: return "No Data", 200

        pubsub_message = base64.b64decode(raw_data).decode('utf-8')

        try:
            log_data = json.loads(pubsub_message)
            payload = log_data.get('jsonPayload', {})
            result = payload.get('sanitizationResult', {})

            # 1. 기본 정보 추출
            project = log_data.get('resource', {}).get('labels', {}).get('resource_container', 'N/A')
            template = log_data.get('resource', {}).get('labels', {}).get('template_id', 'default')

            # 2. 사용자 입력 텍스트 (Input)
            user_input = payload.get('sanitizationInput', {}).get('text', 'N/A')

            # 3. 차단 결과 및 사유 (Verdict)
            verdict = result.get('sanitizationVerdict', 'UNKNOWN').replace('MODEL_ARMOR_SANITIZATION_VERDICT_', '')

            # 4. 탐지된 필터 상세 (Filter Results) 파싱
            filter_results = result.get('filterResults', {})
            detected = []

            for f_name, f_data in filter_results.items():
                # 필터 구조가 조금씩 다르므로 동적 접근 (pi_and_jailbreak, rai, sdp 등)
                for key in f_data.keys():
                    if 'Result' in key:
                        inner = f_data[key]
                        # matchState가 MATCH_FOUND인 것만 추출
                        if inner.get('matchState') == 'MATCH_FOUND':
                            conf = inner.get('confidenceLevel', 'N/A')
                            detected.append(f"{f_name}({conf})")

            detection_str = ", ".join(detected) if detected else "None"

            # 최종 요약 로그 출력 (IAM_ALERT_SUMMARY 스타일 계승)
            # 이 형식을 Cloud Logging에서 다시 잡아내어 알림으로 보냅니다.
            summary = f"MA_ALERT_SUMMARY | 프로젝트: {project} | 템플릿: {template} | 판결: {verdict} | 탐지필터: {detection_str} | 입력문구: {user_input}"
            print(summary)

        except json.JSONDecodeError:
            print(f"MA_ALERT_SUMMARY | 테스트 메시지 수신: {pubsub_message}")

        return "OK", 200

    except Exception as e:
        print(f"ERROR: {str(e)}")
        return "Internal Error", 500

 

 

  • 결과 
    Log Explorer 이동하여 아래와 같이 로그를 찾습니다.
"textpayload: "MA_ALERT_SUMMARY" 

해당 로그는 앞서 만든 Cloud Run으로 Log을 파싱하여 발생되는 로그이며, 아래와 같이
Model Armor 템플릿 이름, 판결 (허용, 차단), 탐지된 필터 내용, 입력된 문구를 정리하여 관리자가 한 눈에 내용을 파악할 수 있게 합니다.



로그의 상세내용은 아래와 같습니다.