개요
Google Cloud의 Model Armor는 LLM(대규모 언어 모델) 서비스 이용 시 발생할 수 있는 보안 위협을 실시간으로 탐지하고 차단하는 핵심 보안 솔루션입니다.
프롬프트 인젝션, 탈옥(Jailbreak) 시도, 개인정보(PII) 유출 및 부적절한 콘텐츠 생성 등의 위협은 기업의 데이터 자산과 서비스 신뢰도에 직접적인 타격을 줄 수 있습니다.
본 가이드는 Model Armor를 통해 생성되는 SanitizeOperation 로그를 실시간으로 탐지하고 , 복잡한 JSON 형식의 감사 로그를 파싱하여 운영자가 즉시 위협 상황을 인지할 수 있도록 알림 체계를 구축하는 방안을 제시합니다.
구성도

Log Explorer 이동 Log 검색
resource.type="modelarmor.googleapis.com/SanitizeOperation"
Model Armor 로 차단, 허용된 내용이 출력됩니다. 추가적으로 차단된 내용만 보고싶다면 아래 필터를 추가합니다.
resource.type="modelarmor.googleapis.com/SanitizeOperation"
jsonPayload.sanitizationResult.sanitizationVerdict="MODEL_ARMOR_SANITIZATION_VERDICT_BLOCK"
- 로그싱크 생성 싱크 이름과 설명작성
- Pub/Sub 을 생성 합니다.topic만 옵션없이 생성해도 됩니다.
- 로그 라우팅 싱크, Pub/Sub Topic을 제대로 만들었는지 체크합니다.
- Cloud Run으로 이동합니다.
- 서비스 이름, 리전 선택후 런타임은 Python 으로 선택합니다.
트리거 설정은 아래에 있으며, Log 발생시에만 필요하기 때문에 “요청 기반"과 수동 확장 (1대)로 설정했습니다.
이는 서비스 사용량에 따르게 설정하면 됩니다.
-
- 트리거 설정은 처음 만들 Pub/Sub을 연결하여 줍니다.
- requirements.txt
functions-framework==3.*
requests==2.31.0
import base64
import json
def model_armor_notification_handler(request):
try:
request_json = request.get_json(silent=True)
if not request_json: return "No Data", 200
# Pub/Sub 데이터 추출
raw_data = request_json.get('message', {}).get('data') or request_json.get('data')
if not raw_data: return "No Data", 200
pubsub_message = base64.b64decode(raw_data).decode('utf-8')
try:
log_data = json.loads(pubsub_message)
payload = log_data.get('jsonPayload', {})
result = payload.get('sanitizationResult', {})
# 1. 기본 정보 추출
project = log_data.get('resource', {}).get('labels', {}).get('resource_container', 'N/A')
template = log_data.get('resource', {}).get('labels', {}).get('template_id', 'default')
# 2. 사용자 입력 텍스트 (Input)
user_input = payload.get('sanitizationInput', {}).get('text', 'N/A')
# 3. 차단 결과 및 사유 (Verdict)
verdict = result.get('sanitizationVerdict', 'UNKNOWN').replace('MODEL_ARMOR_SANITIZATION_VERDICT_', '')
# 4. 탐지된 필터 상세 (Filter Results) 파싱
filter_results = result.get('filterResults', {})
detected = []
for f_name, f_data in filter_results.items():
# 필터 구조가 조금씩 다르므로 동적 접근 (pi_and_jailbreak, rai, sdp 등)
for key in f_data.keys():
if 'Result' in key:
inner = f_data[key]
# matchState가 MATCH_FOUND인 것만 추출
if inner.get('matchState') == 'MATCH_FOUND':
conf = inner.get('confidenceLevel', 'N/A')
detected.append(f"{f_name}({conf})")
detection_str = ", ".join(detected) if detected else "None"
# 최종 요약 로그 출력 (IAM_ALERT_SUMMARY 스타일 계승)
# 이 형식을 Cloud Logging에서 다시 잡아내어 알림으로 보냅니다.
summary = f"MA_ALERT_SUMMARY | 프로젝트: {project} | 템플릿: {template} | 판결: {verdict} | 탐지필터: {detection_str} | 입력문구: {user_input}"
print(summary)
except json.JSONDecodeError:
print(f"MA_ALERT_SUMMARY | 테스트 메시지 수신: {pubsub_message}")
return "OK", 200
except Exception as e:
print(f"ERROR: {str(e)}")
return "Internal Error", 500
- 결과
Log Explorer 이동하여 아래와 같이 로그를 찾습니다.
"textpayload: "MA_ALERT_SUMMARY"
해당 로그는 앞서 만든 Cloud Run으로 Log을 파싱하여 발생되는 로그이며, 아래와 같이
Model Armor 템플릿 이름, 판결 (허용, 차단), 탐지된 필터 내용, 입력된 문구를 정리하여 관리자가 한 눈에 내용을 파악할 수 있게 합니다.
로그의 상세내용은 아래와 같습니다.
'Cloud > GCP' 카테고리의 다른 글
| GCP OAuth(인증)을 PSC (Private Service Connect)를 통해 사설망으로 연결하는 방안 (0) | 2026.03.31 |
|---|---|
| GCP 권한 변경시 실시간 알림 설정방안 (0) | 2026.02.28 |
| GCP 프로젝트 삭제 방안 (0) | 2026.02.27 |
| GCP Workload Identity Federation (WIF - 워크로드아이덴티티) 설정 (0) | 2026.02.26 |
| Keycloak 설치 및 구성 매뉴얼 (GCP WIF 테스트) (0) | 2026.02.25 |
| Vertex AI Gemini API PSC 테스트 가이드 (Python) (0) | 2026.02.19 |
| AI SRE Auto-Remediator - Google Gemini AI 기반 실시간 시스템 장애 탐지 및 자동 복구 에이전트 - PT본 (0) | 2026.01.11 |