143 lines
4.9 KiB
Python
Executable File
143 lines
4.9 KiB
Python
Executable File
# -*- coding: utf-8 -*-
|
||
"""
|
||
敏感词过滤 - 党政/暴力/血腥/黄色
|
||
敏感图片检测 - 暴力/血腥/色情/政治敏感
|
||
合规、风险可控
|
||
"""
|
||
import os
|
||
import re
|
||
import base64
|
||
from typing import Tuple
|
||
|
||
# 敏感词库(按类别,可扩展)
|
||
_SENSITIVE_PATTERNS = {
|
||
"党政": [
|
||
r"习近平", r"共产党", r"党中央", r"政治局", r"六四", r"天安门",
|
||
r"法轮功", r"台独", r"藏独", r"疆独", r"邪教",
|
||
r"政治人物", r"政治事件", r"领导人", r"党政", r"时政",
|
||
r"特朗普", r"拜登", r"普京", r"泽连斯基",
|
||
r"trump", r"biden", r"putin", r"zelensky", r"xi\s*jinping",
|
||
],
|
||
"暴力": [
|
||
r"杀人", r"砍人", r"捅人", r"枪击", r"爆炸", r"恐怖袭击",
|
||
r"肢解", r"碎尸", r"虐杀", r"血洗",
|
||
],
|
||
"血腥": [
|
||
r"断肢", r"残肢", r"内脏", r"脑浆", r"血淋淋", r"尸块",
|
||
r"开膛", r"割喉", r"爆头",
|
||
],
|
||
"黄色": [
|
||
r"裸体", r"裸照", r"裸聊", r"色情", r" porn", r"porn",
|
||
r"做爱", r"性交", r"约炮", r"约炮", r"嫖娼", r"卖淫",
|
||
r"av女", r"av男", r"av片", r"av资源",
|
||
],
|
||
"擦边": [
|
||
r"擦边", r"大尺度", r"性感图", r"露点", r"半裸",
|
||
],
|
||
"地图": [
|
||
r"地图", r"地形图", r"行政区划图", r"世界地图", r"中国地图",
|
||
r"卫星地图", r"导航图", r"航海图", r"作战地图", r"军事地图",
|
||
],
|
||
}
|
||
|
||
_COMPILED: dict = {}
|
||
|
||
|
||
def _get_compiled():
|
||
global _COMPILED
|
||
if not _COMPILED:
|
||
for cat, patterns in _SENSITIVE_PATTERNS.items():
|
||
_COMPILED[cat] = [re.compile(p, re.I) for p in patterns]
|
||
return _COMPILED
|
||
|
||
|
||
def filter_sensitive(text: str) -> tuple[str, list[str]]:
|
||
"""
|
||
检测文本中的敏感词。不直接替换(避免误伤),返回 (原文本, 命中的类别列表)。
|
||
若命中则调用方应使用兜底回复或转人工。
|
||
|
||
Returns:
|
||
(text, hit_categories) - hit_categories 如 ["暴力", "黄色"],空则无敏感
|
||
"""
|
||
if not text or not text.strip():
|
||
return text, []
|
||
compiled = _get_compiled()
|
||
hit = []
|
||
for cat, pats in compiled.items():
|
||
for p in pats:
|
||
if p.search(text):
|
||
hit.append(cat)
|
||
break
|
||
return text, hit
|
||
|
||
|
||
def should_block_reply(text: str) -> tuple[bool, str]:
|
||
"""
|
||
判断 AI 回复是否应拦截。
|
||
Returns:
|
||
(should_block, fallback_reply) - 若应拦截,返回兜底话术
|
||
"""
|
||
_, hit = filter_sensitive(text)
|
||
if not hit:
|
||
return False, ""
|
||
return True, "好的,您稍等,我帮您确认一下"
|
||
|
||
|
||
def should_block_customer(text: str) -> bool:
|
||
"""判断客户消息是否应拒单(不处理图片、不回复)"""
|
||
_, hit = filter_sensitive(text)
|
||
return len(hit) > 0
|
||
|
||
|
||
_SENSITIVE_IMAGE_PROMPT = """请判断这张图片是否包含以下任何敏感内容:
|
||
- 暴力(打斗、武器、伤害)
|
||
- 血腥(伤口、血迹、残肢等)
|
||
- 色情(裸露、性暗示)
|
||
- 政治敏感(旗帜、标语、敏感人物等)
|
||
|
||
只回答「是」或「否」。若包含任一敏感内容则答「是」,否则答「否」。"""
|
||
|
||
|
||
async def is_sensitive_image(local_path: str) -> Tuple[bool, str]:
|
||
"""
|
||
使用视觉模型检测图片是否包含敏感内容。
|
||
需配置 OPENAI_API_KEY、OPENAI_BASE_URL、VISION_MODEL(或 SENSITIVE_IMAGE_MODEL)。
|
||
|
||
Returns:
|
||
(is_sensitive, reason) - 若敏感则 (True, "拒单原因"),否则 (False, "")
|
||
未配置或异常时返回 (False, ""),不拦截(fail open)
|
||
"""
|
||
if not os.path.exists(local_path):
|
||
return False, ""
|
||
api_key = os.getenv("OPENAI_API_KEY")
|
||
base_url = os.getenv("OPENAI_BASE_URL", "https://open.bigmodel.cn/api/paas/v4")
|
||
model = os.getenv("SENSITIVE_IMAGE_MODEL") or os.getenv("VISION_MODEL", "glm-4v-flash")
|
||
if not api_key:
|
||
return False, ""
|
||
try:
|
||
with open(local_path, "rb") as f:
|
||
b64 = base64.b64encode(f.read()).decode("utf-8")
|
||
from openai import AsyncOpenAI
|
||
client = AsyncOpenAI(base_url=base_url, api_key=api_key)
|
||
resp = await client.chat.completions.create(
|
||
model=model,
|
||
messages=[{
|
||
"role": "user",
|
||
"content": [
|
||
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}},
|
||
{"type": "text", "text": _SENSITIVE_IMAGE_PROMPT},
|
||
],
|
||
}],
|
||
)
|
||
try:
|
||
from utils.api_cost_tracker import record
|
||
record("gemini_vision", count=1)
|
||
except Exception:
|
||
pass
|
||
text = (resp.choices[0].message.content or "").strip()
|
||
is_sensitive = "是" in text
|
||
return is_sensitive, "图片包含敏感内容,无法处理" if is_sensitive else ""
|
||
except Exception as e:
|
||
print(f"[ContentFilter] 敏感图片检测异常: {e},放行")
|
||
return False, ""
|