# -*- coding: utf-8 -*- """ 敏感词过滤 - 党政/暴力/血腥/黄色 敏感图片检测 - 暴力/血腥/色情/政治敏感 合规、风险可控 """ import os import re import base64 import json import asyncio from typing import Tuple # 敏感词库(按类别,可扩展) _SENSITIVE_PATTERNS = { "党政": [ r"习近平", r"共产党", r"党中央", r"政治局", r"六四", r"天安门", r"法轮功", r"台独", r"藏独", r"疆独", r"邪教", r"政治人物", r"政治事件", r"领导人", r"党政", r"时政", r"特朗普", r"拜登", r"普京", r"泽连斯基", r"trump", r"biden", r"putin", r"zelensky", r"xi\s*jinping", ], "暴力": [ r"杀人", r"砍人", r"捅人", r"枪击", r"爆炸", r"恐怖袭击", r"肢解", r"碎尸", r"虐杀", r"血洗", ], "血腥": [ r"断肢", r"残肢", r"内脏", r"脑浆", r"血淋淋", r"尸块", r"开膛", r"割喉", r"爆头", ], "黄色": [ r"裸体", r"裸照", r"裸聊", r"色情", r" porn", r"porn", r"做爱", r"性交", r"约炮", r"约炮", r"嫖娼", r"卖淫", r"av女", r"av男", r"av片", r"av资源", ], "擦边": [ r"擦边", r"大尺度", r"性感图", r"露点", r"半裸", ], "地图": [ r"地图", r"地形图", r"行政区划图", r"世界地图", r"中国地图", r"卫星地图", r"导航图", r"航海图", r"作战地图", r"军事地图", ], } _COMPILED: dict = {} _TEXT_RISK_CACHE: dict = {} # text -> (block, category, reason, ts) _TEXT_RISK_CACHE_TTL = 300 def _get_compiled(): global _COMPILED if not _COMPILED: for cat, patterns in _SENSITIVE_PATTERNS.items(): _COMPILED[cat] = [re.compile(p, re.I) for p in patterns] return _COMPILED def filter_sensitive(text: str) -> tuple[str, list[str]]: """ 检测文本中的敏感词。不直接替换(避免误伤),返回 (原文本, 命中的类别列表)。 若命中则调用方应使用兜底回复或转人工。 Returns: (text, hit_categories) - hit_categories 如 ["暴力", "黄色"],空则无敏感 """ if not text or not text.strip(): return text, [] compiled = _get_compiled() hit = [] for cat, pats in compiled.items(): for p in pats: if p.search(text): hit.append(cat) break return text, hit def should_block_reply(text: str) -> tuple[bool, str]: """ 判断 AI 回复是否应拦截。 Returns: (should_block, fallback_reply) - 若应拦截,返回兜底话术 """ _, hit = filter_sensitive(text) if not hit: return False, "" return True, "好的,您稍等,我帮您确认一下" def should_block_customer(text: str) -> bool: """判断客户消息是否应拒单(不处理图片、不回复)""" _, hit = filter_sensitive(text) return len(hit) > 0 def _risk_from_keyword(text: str) -> tuple[bool, str, str]: """关键词风控兜底:返回 (是否拦截, 类别, 原因)。""" _, hit = filter_sensitive(text) if not hit: return False, "none", "" cats = set(hit) if "地图" in cats: return True, "map", "命中地图关键词" if "党政" in cats: return True, "political", "命中政治关键词" if {"黄色", "擦边"} & cats: return True, "sexual", "命中涉黄关键词" if {"暴力", "血腥"} & cats: return True, "violent", "命中暴力关键词" return True, "other", "命中敏感关键词" def _extract_json(text: str) -> dict: t = (text or "").strip() if not t: return {} try: return json.loads(t) except Exception: pass m = re.search(r"\{[\s\S]*\}", t) if not m: return {} try: return json.loads(m.group(0)) except Exception: return {} async def _detect_customer_risk_with_ai(text: str) -> tuple[bool, str, str]: """ AI 文本风控:返回 (是否拦截, 类别, 原因)。 类别: map/political/sexual/violent/other/none """ raw = (text or "").strip() if not raw: return False, "none", "" api_key = os.getenv("OPENAI_API_KEY", "").strip() if not api_key: return False, "none", "" try: now = asyncio.get_running_loop().time() except Exception: now = 0.0 cached = _TEXT_RISK_CACHE.get(raw) if cached and (now - cached[3]) < _TEXT_RISK_CACHE_TTL: return cached[0], cached[1], cached[2] base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1").strip() model = os.getenv("TEXT_RISK_MODEL", "").strip() or os.getenv("OPENAI_MODEL", "gpt-4o-mini") timeout_s = float(os.getenv("AI_TEXT_RISK_TIMEOUT_SECONDS", "4")) prompt = ( "你是电商客服文本风控分类器。请判断用户文本是否属于禁止接单内容。\n" "禁止接单类别:\n" "1) map: 地图相关(地图、行政区划图、地形图、卫星地图、导航图等)\n" "2) political: 政治/党政/领导人/政治事件\n" "3) sexual: 色情/擦边/裸露/性暗示\n" "4) violent: 暴力/血腥\n" "若不属于以上类别,category=none 且 block=false。\n" "只输出 JSON,不要解释。\n" "格式:{\"block\":true/false,\"category\":\"map|political|sexual|violent|other|none\",\"reason\":\"简短原因\"}\n" f"用户文本:{raw}" ) try: from openai import AsyncOpenAI client = AsyncOpenAI(api_key=api_key, base_url=base_url, timeout=timeout_s) resp = await client.chat.completions.create( model=model, temperature=0, messages=[ {"role": "system", "content": "你是严格 JSON 输出的风控分类器。"}, {"role": "user", "content": prompt}, ], ) content = (resp.choices[0].message.content or "").strip() data = _extract_json(content) block = bool(data.get("block", False)) category = str(data.get("category", "none") or "none").strip().lower() reason = str(data.get("reason", "") or "").strip() if category not in {"map", "political", "sexual", "violent", "other", "none"}: category = "none" block = False if category == "none": block = False _TEXT_RISK_CACHE[raw] = (block, category, reason, now) return block, category, reason except Exception: return False, "none", "" async def should_block_customer_smart(text: str) -> tuple[bool, str, str]: """ 客户文本风控总入口:AI 优先,关键词兜底。 Returns: (block, category, reason) """ ai_enabled = os.getenv("AI_TEXT_RISK_ENABLED", "true").strip().lower() in {"1", "true", "yes", "on"} if ai_enabled: block, cat, reason = await _detect_customer_risk_with_ai(text) if block: return True, cat, reason or "AI 判定敏感" return _risk_from_keyword(text) _SENSITIVE_IMAGE_PROMPT = """请判断这张图片是否包含以下任何敏感内容: - 暴力(打斗、武器、伤害) - 血腥(伤口、血迹、残肢等) - 色情(裸露、性暗示) - 政治敏感(旗帜、标语、敏感人物等) 只回答「是」或「否」。若包含任一敏感内容则答「是」,否则答「否」。""" async def is_sensitive_image(local_path: str) -> Tuple[bool, str]: """ 使用视觉模型检测图片是否包含敏感内容。 需配置 OPENAI_API_KEY、OPENAI_BASE_URL、VISION_MODEL(或 SENSITIVE_IMAGE_MODEL)。 Returns: (is_sensitive, reason) - 若敏感则 (True, "拒单原因"),否则 (False, "") 未配置或异常时返回 (False, ""),不拦截(fail open) """ if not os.path.exists(local_path): return False, "" api_key = os.getenv("OPENAI_API_KEY") base_url = os.getenv("OPENAI_BASE_URL", "https://open.bigmodel.cn/api/paas/v4") model = os.getenv("SENSITIVE_IMAGE_MODEL") or os.getenv("VISION_MODEL", "glm-4v-flash") if not api_key: return False, "" try: with open(local_path, "rb") as f: b64 = base64.b64encode(f.read()).decode("utf-8") from openai import AsyncOpenAI client = AsyncOpenAI(base_url=base_url, api_key=api_key) resp = await client.chat.completions.create( model=model, messages=[{ "role": "user", "content": [ {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}}, {"type": "text", "text": _SENSITIVE_IMAGE_PROMPT}, ], }], ) try: from utils.api_cost_tracker import record record("gemini_vision", count=1) except Exception: pass text = (resp.choices[0].message.content or "").strip() is_sensitive = "是" in text return is_sensitive, "图片包含敏感内容,无法处理" if is_sensitive else "" except Exception as e: print(f"[ContentFilter] 敏感图片检测异常: {e},放行") return False, ""