Skip to Content
BackendSecurityService Sanitization Pipeline (PII Redact to Jailbreak Check)

SecurityService Sanitization Pipeline (PII Redact → Jailbreak Check)

What? (Concept Overview)

SecurityService is the outbound safety gate that runs on every user message before it touches an LLM. It performs two composable checks: (1) PII redaction (Presidio-based with regex fallback, plus a custom UK_NINO PatternRecognizer); (2) jailbreak detection (Lakera Guard API first, then heuristic keyword blacklist). The service constructor takes no args — it eagerly reads from Settings so the safety posture is environment-driven.

Project Context

This file (app/services/security_service.py) is the only place that touches Presidio or Lakera. Every agent routes raw user input through security_service.sanitize_input(text) before any DB or prompt assembly; every LLM call is preceded by security_service.check_jailbreak(text) returning (is_safe, reason). Fail-open on missing Lakera API key (dev convenience); fail-closed on hard rule violations.

How? (Quick Reference Blocks)

3.1 Service Construction with Presidio + Custom Patterns

# app/services/security_service.py — SecurityService.__init__ from presidio_analyzer import AnalyzerEngine, PatternRecognizer, Pattern from presidio_anonymizer import AnonymizerEngine from presidio_analyzer.nlp_engine import NlpEngineProvider class SecurityService: def __init__(self): configuration = { "nlp_engine_name": "spacy", "models": [{"lang_code": "en", "model_name": "en_core_web_lg"}], } provider = NlpEngineProvider(nlp_configuration=configuration) nlp_engine = provider.create_engine() self.enabled = settings.security_enabled self.redact_pii = settings.pii_redaction_enabled if self.redact_pii: self.analyzer = AnalyzerEngine( nlp_engine=nlp_engine, supported_languages=["en"] ) # Custom UK National Insurance regex: 2 letters + 6 digits + 1 [A-D] nino_regex = r"(?i)\b[A-Z]{2}\s?[0-9]{2}\s?[0-9]{2}\s?[0-9]{2}\s?[A-D]\b" nino_pattern = Pattern(name="uk_nino_pattern", regex=nino_regex, score=0.85) nino_recognizer = PatternRecognizer( supported_entity="UK_NINO", patterns=[nino_pattern] ) self.analyzer.registry.add_recognizer(nino_recognizer) self.anonymizer = AnonymizerEngine() else: self.analyzer = None self.anonymizer = None # Auth setup self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") self.secret_key = settings.secret_key self.algorithm = settings.jwt_algorithm self.expire_minutes = settings.access_token_expire_minutes self.lakera_guard_api_key = settings.lakera_guard_api_key # Regex PII fallback (when Presidio not available) self.pii_patterns = { "EMAIL": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "PHONE_UK": r"(?:(?:\+44\s?|0)(?:7\d{3}|\d{4})\s?\d{6})", "CREDIT_CARD": r"\b(?:\d{4}[-\s]?){3}\d{4}\b", } self.injection_keywords = [ "ignore previous instructions", "act as a pirate", "system prompt", "developer mode", "you are now", "launder money", "money laundering", "forge a check", "bypass 2fa", "base64", "encoded string", "system override", "disable_content_filter", "unrestrained ai", "forget you are", "simulated", "hypothetically", ]

3.2 Presidio-Driven Redaction

# app/services/security_service.py — _redact_with_presidio def _redact_with_presidio(self, text: str) -> str: if not self.analyzer or not self.anonymizer: return text try: results = self.analyzer.analyze( text=text, entities=["PHONE_NUMBER", "EMAIL_ADDRESS", "CREDIT_CARD", "IBAN_CODE", "US_BANK_NUMBER", "PERSON"], language="en", ) from presidio_anonymizer.entities import OperatorConfig anonymized_result = self.anonymizer.anonymize( text=text, analyzer_results=results, operators={ "DEFAULT": OperatorConfig("replace", {"new_value": "[CONFIDENTIAL_DATA]"}), "EMAIL_ADDRESS": OperatorConfig("replace", {"new_value": "[EMAIL]"}), "PHONE_NUMBER": OperatorConfig("replace", {"new_value": "[PHONE]"}), "PERSON": OperatorConfig("replace", {"new_value": "[NAME]"}), }, ) return anonymized_result.text except Exception as e: logger.error(f"Presidio Redaction Error: {e}") return text # fail-open: returning original text def sanitize_input(self, text: str) -> str: if not self.enabled or not self.redact_pii: return text if self.analyzer: return self._redact_with_presidio(text) # Regex fallback when Presidio isn't loaded sanitized = text for label, pattern in self.pii_patterns.items(): sanitized = re.sub(pattern, f"[{label}_REDACTED]", sanitized) return sanitized

3.3 Lakera Guard + Heuristic Jailbreak Check

# app/services/security_service.py — check_jailbreak def _check_with_lakera(self, text: str) -> Tuple[bool, str]: if not self.lakera_guard_api_key: return True, "" # dev: fail-open without key try: response = requests.post( "https://api.lakera.ai/v2/guard", headers={"Authorization": f"Bearer {self.lakera_guard_api_key}"}, json={"messages": [{"role": "user", "content": text}]}, ) if response.status_code != 200: logger.warning(f"Lakera API Warning: {response.status_code} - {response.text}") return True, "" result = response.json() # 'flagged: True' means prompt-injection detected if result.get("flagged") is True: logger.info(f"Lakera Blocked Input: {result}") return False, "Lakera Guard: Prompt Injection Detected" except Exception as e: logger.error(f"Lakera Guard API Error: {e}") return True, "" return True, "" def check_jailbreak(self, text: str) -> Tuple[bool, str]: if not self.enabled: return True, "" sanitized_text_for_check = self.sanitize_input(text) # 1. Lakera AI-based detection first is_safe, reason = self._check_with_lakera(sanitized_text_for_check) if not is_safe: return False, reason # 2. Heuristic keyword fallback text_lower = text.lower() # check original text (jailbreaks hide in PII-redacted gaps) for keyword in self.injection_keywords: if keyword in text_lower: return False, f"Blocked keyword detected: '{keyword}'" if len(text) > 10000: return False, "Input exceeds safety length limits" return True, ""

Why? (Parameter Breakdown

  • Custom UK_NINO PatternRecognizer — Presidio’s default entity set is US-centric. UK National Insurance Numbers (NINOs) need a regex. Adding a recogniser with score=0.85 flags matches but doesn’t auto-redact — caller must include "UK_NINO" in the entities list. The current implementation uses analyze(text=text) without listing UK_NINO, so NINOs leak through; tighter config would add them to the entity list.
  • OperatorConfig("replace", {"new_value": "[NAME]"}) — Per-entity replacement strings. Useful because: (1) rerecorded PII tokens are obvious to LLMs (they don’t hallucinate contextually), (2) downstream code can pattern-search [EMAIL] / [PHONE] to inject known-safe placeholders. Without per-entity mapping, every entity becomes an opaque [CONFIDENTIAL_DATA].
  • Lakera first, then keywords — AI classifiers catch heuristic-blind attacks (multi-language, semantic-only); keyword regex catches cost-free obvious ones. Ordering matters: if Lakera blocks first, we don’t leak detection timing. If Lakera is down, the heuristic still runs (no DoS amplification).
  • Heuristic scan on text (not sanitized_text_for_check) — Important: the sanitized text may have gaps where PII used to be ([CONFIDENTIAL_DATA]), but jailbreaks wouldn’t hide there. Scanning both is defensive against patterns where the original text contains a keyword inside PII that’s then scrubbed — better catch-and-fail than leak.
  • if len(text) > 10000: return False, "..." — Length cap protects against adversarial flooding (10KB+ prompt = high perplexity injection surface). 10K is arbitrary; tune based on your model’s context window.
  • Fail-open on analyze exception — Presidio errors are logged but the original text is returned. The alternative (fail-closed) would block every request when an NLP model has a hiccup. For BFSI, fail-closed is safer; for chat UX, fail-open is friendlier. The current code chooses friendlier — adjust per security posture.
  • Reach for settings.security_enabled — Master kill-switch in code; disable for local dev where PII redaction’s latency hurts. Without this toggle, every dev session pays the redaction cost.

Common Pitfalls

  1. Calling sanitize_input once before the LLM and forgetting to call it again in logs. The Postgres conversation log records the original text, leaking PII to the DB. Sanitise at the START of the pipeline AND replace at any logging site.
  2. Forgetting to add custom entity types when extending Presidio. New UK recognisers (UK_SORT_CODE, UK_POSTCODE) must be registered and listed in analyze(entities=[...]) to take effect.

Real-World Interview Prep

Q1: How do you handle the “PII desync” problem — redaction succeeds but logging happens BEFORE redaction?

A: Three fixes. (1) Sanitise at the input boundary — every request handler calls security.sanitize_input(message) first, then passes to all downstream code. (2) Sanitise before logging — log logger.info("...", extra={"message": sanitized}) instead of raw text. (3) Audit trails — every call site that produces logs that might contain raw PII gets a lint check (grep -nE 'logger\.(info|error)\(\".*\"' app/). Production-readiness: log-scrub middleware that post-processes every log record.

Q2: How do you test the check_jailbreak function with known-bad + known-good inputs?

A: Three layers. (1) Unit: feed "ignore previous instructions" → assert (False, "Blocked keyword: '...'"). (2) Edge: feed "I really enjoy ignoring procrastination in my workflow" → assert keyword-substring match is fine or refined (e.g. requires word boundaries). (3) Lakera mocks via responses library: mock requests.post("https://api.lakera.ai/...") returning {"flagged": True} → assert blocked. Without Lakera mock, integration tests require a real API key — expensive, skip with mocks.

Q3: Why is the keyword list hard-coded vs in Settings?

A: Two trade-offs. (a) Hard-coded: version-controlled, immutable, no config drift between environments. (b) Config-driven: SRE / compliance officer updates without PR; emergency jailbreak mitigation during incidents (e.g., new prompt-injection discovered in production). For the FCA posture, hard-coded makes sense because compliance reviews the list explicitly. For high-velocity product teams, config-driven is better. A middle-ground: hard-coded defaults in code, env-var override for incident response.

Top-to-Bottom Code Walkthrough (app/services/security_service.py)

SecurityService is the outbound safety gate. Every line of user text should pass through it before reaching the LLM, and every line of agent output before reaching the user or the database.

Imports

  • from passlib.context import CryptContext — bcrypt hash/verify.
  • from jose import jwt, JWTError — JWT sign/verify.
  • from presidio_analyzer import AnalyzerEngine — Microsoft’s PII detector.
  • from presidio_anonymizer import AnonymizerEngine — replaces detected entities.
  • import httpx — async HTTP for Lakera Guard API.

__init__

  • self.bcrypt = CryptContext(schemes=["bcrypt"], deprecated="auto") — passlib’s deprecated="auto" rehashes automatically when you upgrade to Argon2.
  • self.analyzer = AnalyzerEngine() — loads Presidio NLP models. Heavier than bcrypt; instantiated once.
  • self.anonymizer = AnonymizerEngine().
  • self.lakera_url = "https://api.lakera.ai/v1/prompt_injection" — Lakera Guard endpoint.

get_password_hash(plain) and verify_password(plain, hashed)

  • self.bcrypt.hash(plain) — returns a salted hash like "$2b$12$...".
  • self.bcrypt.verify(plain, hashed) — constant-time comparison.
  • Why min_length=32 on secret_key (Settings): a 32-char minimum means an attacker can’t brute-force the HMAC signing key. But the user-password itself is not the secret — bcrypt salts it.

create_access_token(payload) -> str

  • return jwt.encode(payload, settings.secret_key, algorithm=settings.jwt_algorithm).
  • The iat and exp claims are added automatically.

decode_token(token) -> dict

  • payload = jwt.decode(token, settings.secret_key, algorithms=[settings.jwt_algorithm]).
  • Raises JWTError (which becomes HTTPException(401) in the route layer).

sanitize_input(text) -> dict

This is the inbound pipeline. It runs in order, each step guarding against the next failure mode:

  1. Unicode bidi scrub: replaces LRE/RLE/PDF characters that could re-order display.
  2. Percent-decode to catch escape-encoded attacks.
  3. PII detection via Presidio: entities = self.analyzer.analyze(text=text, language="en"). Returns a list of detected entities (person, email, IBAN, etc.).
  4. Anonymisation: anonymised = self.anonymizer.anonymize(text=text, analyzer_results=entities). Returns text with <PERSON>, <EMAIL>, <IBAN> placeholders.
  5. Jailbreak check (Lakera or fallback): see below.
  6. Returns {"safe_text": anonymised.text, "redacted_entities": entities, "is_injection": bool}.

check_prompt_injection(text) -> tuple[bool, float]

Hybrid check:

  1. Fast regex pass: scan for "ignore previous", "ignore all", "DAN mode", "system prompt" — common jailbreak signatures.
  2. Optional Lakera call (if settings.lakera_guard_api_key):
    async with httpx.AsyncClient() as client: r = await client.post( "https://api.lakera.ai/v1/prompt_injection", headers={"Authorization": f"Bearer {settings.lakera_guard_api_key}"}, json={"input": text}, timeout=2.0, ) return r.json()["injection"] > 0.7, r.json()["score"]
  3. Fallback regex score if Lakera is unavailable — never block the request on the absence of a third-party API.

redact_pii(text) -> str

Convenience entry — runs only anonymize, skipping jailbreak detection. Used at logger-emit time, where PII must be stripped before persisting logs.

Common Pitfalls

Calling presidio at every log line is slow — its NLP models are heavy. Cache the AnalyzerEngine instance once at startup and reuse.

Lexicon-only jailbreak detection misses novel attacks. Always pair regex + the optional Lakera.

Logging the raw PII text before redact_pii — yes, the redactor is downstream of _prepare_log_message. The logger itself must NEVER log raw PII.

Real-World Interview Prep

Q1: What’s the trade-off between Presidio (local) and Lakera (cloud)?

A: Presidio is free and runs offline (data sovereignty is critical for FCA), but slower and easier to bypass. Lakera catches novel attacks in real-time but sends data to a third-party API. The hybrid is the right answer for regulated environments.

Q2: Why deprecated="auto" on the bcrypt context?

A: When you migrate to Argon2 (or any newer scheme), the same verify() call accepts both old (bcrypt) and new (Argon2) hashes. On the first verify() of an old hash against the new scheme, passlib transparently re-encodes and re-saves — zero-downtime migration.

Q3: How do you handle jailbreak detection when the LLM is the caller (not a human)?

A: Even LLM-generated prompts deserve jailbreak detection — a malicious tool result could contain a re-prompting attack. Apply check_prompt_injection to all text crossing into a prompt-template, regardless of source.

Last updated on