Back to Home

Building a Real-Time Personalized Quick Suggest System with WebSockets and FAISS

How I built a sub-5ms autocomplete system that learns from user behavior in real-time using FAISS semantic search, WebSocket event streaming, and dynamic personalization.

Most autocomplete systems are static—they return the same suggestions regardless of who's typing. But what if your suggestions could learn from each user's behavior in real-time? What if hovering over a suggestion for 3 seconds, or clicking but not selecting, could influence what appears next?

I built exactly this for a chatbot interface: a quick-suggest system that serves personalized suggestions in under 6ms, learns from clicks/hovers/selections via WebSocket, and continuously grows its suggestion pool from user queries. Here's the complete technical deep-dive.

The Architecture at a Glance

# System Architecture Overview
#
#  ┌─────────────────────────────────────────────────────────────┐
#  │                      Frontend (React)                        │
#  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────┐  │
#  │  │ Input Field │  │ Suggestions │  │ Interaction Tracker │  │
#  │  │   (typing)  │  │   (render)  │  │ (hover/click/select)│  │
#  │  └──────┬──────┘  └──────▲──────┘  └──────────┬──────────┘  │
#  │         │                │                     │             │
#  │         └────────────────┼─────────────────────┘             │
#  │                          │                                   │
#  └──────────────────────────┼───────────────────────────────────┘
#                             │ WebSocket
#                             │ (bidirectional)
#  ┌──────────────────────────┼───────────────────────────────────┐
#  │                    FastAPI Server                            │
#  │  ┌───────────────────────▼──────────────────────────────┐   │
#  │  │              WebSocket Handler                        │   │
#  │  │  • Parse events (typing/click/hover/selection)       │   │
#  │  │  • Route to appropriate handlers                     │   │
#  │  └───────────┬───────────────────────────┬──────────────┘   │
#  │              │                           │                   │
#  │  ┌───────────▼───────────┐  ┌───────────▼───────────────┐   │
#  │  │   Suggestion Engine   │  │   Personalization Engine   │   │
#  │  │  • FAISS index search │  │  • Keyword extraction      │   │
#  │  │  • Embedding model    │  │  • Interest profile merge  │   │
#  │  │  • Threshold filter   │  │  • Re-ranking logic        │   │
#  │  └───────────┬───────────┘  └───────────┬───────────────┘   │
#  │              │                           │                   │
#  │  ┌───────────▼───────────────────────────▼───────────────┐   │
#  │  │                 Index Manager                          │   │
#  │  │  • Hot-reload index  • Add new suggestions            │   │
#  │  │  • Background sync   • S3 persistence                 │   │
#  │  └───────────────────────────┬───────────────────────────┘   │
#  └──────────────────────────────┼───────────────────────────────┘
#                                 │
#           ┌─────────────────────┼─────────────────────┐
#           │                     │                     │
#  ┌────────▼────────┐   ┌───────▼───────┐   ┌────────▼────────┐
#  │      Redis      │   │      S3       │   │   PostgreSQL    │
#  │  User profiles  │   │  FAISS index  │   │   Analytics     │
#  │  Session state  │   │  Suggestions  │   │   Aggregations  │
#  └─────────────────┘   └───────────────┘   └─────────────────┘

The system has four key components:

  1. FAISS Index: Semantic search over 6,000+ suggestions in sub-millisecond
  2. WebSocket Layer: Bidirectional streaming for real-time interaction capture
  3. Personalization Engine: Re-ranks suggestions based on user interest profile
  4. Index Manager: Hot-reloads new suggestions without downtime

Part 1: Semantic Search with FAISS

Traditional autocomplete uses prefix matching or fuzzy string search. But users don't always type the exact prefix—someone looking for "Databricks funding" might type "how much did databricks raise". We need semantic search.

Setting Up the Index

from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import json

# Load the embedding model (384 dimensions, fast inference)
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

# Load pre-computed suggestions (6,000+ queries)
with open("knowledge_base/autocomplete_suggestions.json", "r") as f:
    suggestions = json.load(f)['suggestions']

# Encode all suggestions into embeddings
embeddings = model.encode(suggestions)
print(f"Embeddings shape: {embeddings.shape}")  # (6185, 384)

# Initialize FAISS index with Inner Product similarity
# (cosine similarity when vectors are normalized)
index = faiss.IndexFlatIP(384)
index.add(np.array(embeddings).astype('float32'))

# Persist to disk for production use
faiss.write_index(index, "knowledge_base/autocomplete.idx")

Key decisions:

  • all-MiniLM-L6-v2: 384-dim embeddings, 22M params, ~5ms encode time. Perfect balance of quality and speed.
  • IndexFlatIP: Inner Product similarity (equivalent to cosine when normalized). Exact search, no approximation.
  • 6,185 suggestions: Pre-computed from common user queries, documentation, and product features.

Search Function

def get_suggestions(user_input: str, k: int = 5, threshold: float = 0.8):
    """
    Retrieve top-k semantically similar suggestions.

    Args:
        user_input: Partial query typed by user
        k: Maximum suggestions to return
        threshold: Minimum similarity score (0-1)

    Returns:
        List of suggestion strings above threshold
    """
    # Encode user input to same embedding space
    query_embedding = model.encode([user_input])

    # FAISS search returns (distances, indices)
    distances, indices = index.search(
        query_embedding.astype('float32'),
        k
    )

    # Filter by threshold and return suggestions
    results = [
        suggestions[idx]
        for i, idx in enumerate(indices[0])
        if distances[0][i] > threshold
    ]

    return results

# Example usage
>>> get_suggestions("Cerebras")
['what about cerebras',
 'Tell me about Cerebras',
 'tell me about cerebras',
 'what about cerebras then?',
 'show me cerebras now']

>>> get_suggestions("databricks funding")
['How much funding has Databricks raised?',
 'Tell me about Databricks latest round',
 'Databricks valuation']

The threshold parameter is crucial—it filters out low-confidence matches. At 0.8, we only return suggestions that are genuinely semantically similar, not just vaguely related.

Performance Characteristics

# FAISS Inner Product search performance on 6,185 suggestions:
#
# Index type: IndexFlatIP (brute force, exact search)
# Embedding dim: 384
# Hardware: M1 MacBook Pro
#
# Benchmark results:
# - Single query: 0.3ms average
# - Batch of 10: 0.8ms average
# - Batch of 100: 4.2ms average
#
# For larger indexes (100K+ suggestions), use IndexIVFFlat:
#
# nlist = 100  # Number of clusters
# quantizer = faiss.IndexFlatIP(384)
# index = faiss.IndexIVFFlat(quantizer, 384, nlist, faiss.METRIC_INNER_PRODUCT)
# index.train(embeddings)  # Required for IVF
# index.add(embeddings)
# index.nprobe = 10  # Search 10 nearest clusters
#
# This reduces search from O(n) to O(n/nlist * nprobe)
Why FAISS Over Elasticsearch?

Elasticsearch's vector search (dense_vector) adds ~20-50ms latency for semantic queries. FAISS operates purely in-memory with optimized SIMD instructions, achieving sub-millisecond search. For autocomplete where every millisecond matters, this is the difference between "instant" and "noticeable delay".

Part 2: WebSocket Event Streaming

HTTP request-response is too slow for autocomplete—every keystroke would incur connection overhead. WebSockets give us a persistent bidirectional channel: the client streams typing events, the server streams suggestions back.

Server-Side Connection Manager

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Set
import asyncio
import json

app = FastAPI()

# Connection manager for handling multiple clients
class ConnectionManager:
    def __init__(self):
        # Map: user_id -> WebSocket connection
        self.active_connections: Dict[str, WebSocket] = {}
        # Map: user_id -> set of interests (keywords)
        self.user_interests: Dict[str, Set[str]] = {}

    async def connect(self, websocket: WebSocket, user_id: str):
        await websocket.accept()
        self.active_connections[user_id] = websocket
        self.user_interests.setdefault(user_id, set())

    def disconnect(self, user_id: str):
        self.active_connections.pop(user_id, None)

    async def send_suggestions(self, user_id: str, suggestions: list):
        if user_id in self.active_connections:
            await self.active_connections[user_id].send_json({
                "type": "suggestions",
                "data": suggestions
            })

    def update_interests(self, user_id: str, keywords: Set[str]):
        """Merge new keywords into user's interest profile"""
        self.user_interests[user_id].update(keywords)

manager = ConnectionManager()

WebSocket Endpoint

@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
    await manager.connect(websocket, user_id)

    try:
        while True:
            # Receive events from client
            data = await websocket.receive_json()
            event_type = data.get("type")

            if event_type == "typing":
                # User is typing - get suggestions
                query = data.get("query", "")
                if len(query) >= 2:  # Min 2 chars before suggesting
                    suggestions = get_personalized_suggestions(
                        user_id=user_id,
                        query=query,
                        k=5
                    )
                    await manager.send_suggestions(user_id, suggestions)

            elif event_type == "click":
                # User clicked a suggestion
                clicked = data.get("suggestion")
                keywords = extract_keywords(clicked)
                manager.update_interests(user_id, keywords)

                # Log for analytics
                await log_interaction(user_id, "click", clicked)

            elif event_type == "hover":
                # User hovered over suggestion (3+ seconds)
                hovered = data.get("suggestion")
                dwell_time = data.get("dwell_ms", 0)

                if dwell_time > 3000:  # Significant interest signal
                    keywords = extract_keywords(hovered)
                    # Lower weight than click
                    manager.update_interests(user_id, keywords)

            elif event_type == "selection":
                # User selected suggestion and submitted
                selected = data.get("suggestion")
                keywords = extract_keywords(selected)
                # Highest weight - explicit selection
                manager.update_interests(user_id, keywords)

                # Potentially add to suggestion pool if novel
                await maybe_add_suggestion(selected)

    except WebSocketDisconnect:
        manager.disconnect(user_id)

We capture four types of user interactions:

Event Signal Strength What It Tells Us
typing Trigger User wants suggestions (request)
hover (3s+) Weak interest User is considering this topic
click Medium interest User engaged with suggestion
selection Strong interest User explicitly chose this query

Client-Side Implementation

// Frontend WebSocket client (TypeScript/React)

class SuggestClient {
    private ws: WebSocket | null = null;
    private userId: string;
    private reconnectAttempts = 0;
    private maxReconnectAttempts = 5;

    constructor(userId: string) {
        this.userId = userId;
        this.connect();
    }

    private connect() {
        this.ws = new WebSocket(`wss://api.example.com/ws/${this.userId}`);

        this.ws.onopen = () => {
            console.log('WebSocket connected');
            this.reconnectAttempts = 0;
        };

        this.ws.onmessage = (event) => {
            const data = JSON.parse(event.data);
            if (data.type === 'suggestions') {
                this.onSuggestions(data.data);
            }
        };

        this.ws.onclose = () => {
            // Exponential backoff reconnection
            if (this.reconnectAttempts < this.maxReconnectAttempts) {
                const delay = Math.pow(2, this.reconnectAttempts) * 1000;
                setTimeout(() => this.connect(), delay);
                this.reconnectAttempts++;
            }
        };
    }

    // Called when user types in input
    sendTypingEvent(query: string) {
        this.send({ type: 'typing', query });
    }

    // Called when user clicks a suggestion
    sendClickEvent(suggestion: string) {
        this.send({ type: 'click', suggestion });
    }

    // Called when user hovers for 3+ seconds
    sendHoverEvent(suggestion: string, dwellMs: number) {
        this.send({ type: 'hover', suggestion, dwell_ms: dwellMs });
    }

    // Called when user selects and submits
    sendSelectionEvent(suggestion: string) {
        this.send({ type: 'selection', suggestion });
    }

    private send(data: object) {
        if (this.ws?.readyState === WebSocket.OPEN) {
            this.ws.send(JSON.stringify(data));
        }
    }

    // Override this to update UI
    onSuggestions(suggestions: string[]) {
        console.log('Received suggestions:', suggestions);
    }
}

The client handles reconnection with exponential backoff—critical for mobile users with flaky connections. Each event is fire-and-forget; we don't wait for acknowledgment.

Tracking Hover Dwell Time

// React hook for tracking hover with dwell time

import { useRef, useCallback } from 'react';

function useHoverTracking(client: SuggestClient) {
    const hoverTimerRef = useRef>(new Map());
    const hoverStartRef = useRef>(new Map());

    const onMouseEnter = useCallback((suggestion: string) => {
        // Record hover start time
        hoverStartRef.current.set(suggestion, Date.now());

        // Set timer to fire at 3 seconds
        const timer = setTimeout(() => {
            const startTime = hoverStartRef.current.get(suggestion);
            if (startTime) {
                const dwellMs = Date.now() - startTime;
                client.sendHoverEvent(suggestion, dwellMs);
            }
        }, 3000);

        hoverTimerRef.current.set(suggestion, timer);
    }, [client]);

    const onMouseLeave = useCallback((suggestion: string) => {
        // Cancel timer if user leaves before 3 seconds
        const timer = hoverTimerRef.current.get(suggestion);
        if (timer) {
            clearTimeout(timer);
            hoverTimerRef.current.delete(suggestion);
        }
        hoverStartRef.current.delete(suggestion);
    }, []);

    return { onMouseEnter, onMouseLeave };
}

// Usage in component
function SuggestionItem({ suggestion, client }) {
    const { onMouseEnter, onMouseLeave } = useHoverTracking(client);

    return (
        
  • onMouseEnter(suggestion)} onMouseLeave={() => onMouseLeave(suggestion)} onClick={() => client.sendClickEvent(suggestion)} > {suggestion}
  • ); }

    This hook tracks when users hover over suggestions. A 3-second hover indicates genuine interest—they're reading and considering. We capture this as a weak positive signal for personalization.

    Part 3: Real-Time Personalization

    The magic happens in the re-ranking step. We combine semantic relevance with user interest overlap to surface suggestions that match both what they're typing and what they care about.

    import spacy
    from collections import Counter
    from typing import Set, List
    
    # Load spaCy for keyword extraction
    nlp = spacy.load("en_core_web_sm")
    
    def extract_keywords(text: str) -> Set[str]:
        """Extract meaningful keywords from text using NER and noun chunks"""
        doc = nlp(text)
    
        keywords = set()
    
        # Named entities (companies, products, etc.)
        for ent in doc.ents:
            if ent.label_ in ['ORG', 'PRODUCT', 'GPE', 'PERSON']:
                keywords.add(ent.text.lower())
    
        # Noun chunks (key phrases)
        for chunk in doc.noun_chunks:
            # Filter out pronouns and very short chunks
            if chunk.root.pos_ != 'PRON' and len(chunk.text) > 2:
                keywords.add(chunk.text.lower())
    
        return keywords
    
    def get_personalized_suggestions(
        user_id: str,
        query: str,
        k: int = 5
    ) -> List[str]:
        """
        Get suggestions personalized to user's interests.
    
        Strategy:
        1. Get top-2k candidates from FAISS (semantic match)
        2. Re-rank based on user interest overlap
        3. Return top-k after re-ranking
        """
        # Step 1: Get candidate suggestions (2x what we need)
        query_embedding = model.encode([query])
        distances, indices = index.search(
            query_embedding.astype('float32'),
            k * 4  # Over-fetch for re-ranking
        )
    
        candidates = [
            (suggestions[idx], distances[0][i])
            for i, idx in enumerate(indices[0])
            if distances[0][i] > 0.5  # Relaxed threshold for candidates
        ]
    
        # Step 2: Get user's interest profile
        user_interests = manager.user_interests.get(user_id, set())
    
        if not user_interests:
            # No personalization data yet - return by semantic score
            return [c[0] for c in candidates[:k]]
    
        # Step 3: Re-rank by combining semantic score + interest overlap
        def personalization_score(suggestion: str, base_score: float) -> float:
            suggestion_keywords = extract_keywords(suggestion)
            overlap = len(suggestion_keywords & user_interests)
    
            # Weighted combination: 70% semantic, 30% personalization
            # Overlap bonus: up to 0.3 additional score
            interest_bonus = min(overlap * 0.1, 0.3)
    
            return base_score * 0.7 + interest_bonus
    
        ranked = sorted(
            candidates,
            key=lambda x: personalization_score(x[0], x[1]),
            reverse=True
        )
    
        return [r[0] for r in ranked[:k]]

    How the Scoring Works

    Consider a user who has previously clicked on suggestions about "Databricks", "funding rounds", and "Series D". Their interest profile contains these keywords.

    When they type "data company", we might get these candidates:

    Suggestion Semantic Score Interest Overlap Final Score
    "Tell me about Snowflake" 0.82 0 keywords 0.82 × 0.7 = 0.57
    "Databricks latest funding" 0.78 2 keywords 0.78 × 0.7 + 0.2 = 0.75
    "Data warehouse comparison" 0.85 0 keywords 0.85 × 0.7 = 0.60

    The Databricks suggestion wins despite lower semantic score because it matches the user's demonstrated interests. This is collaborative filtering meets semantic search.

    Part 4: Continuous Index Growth

    The suggestion pool isn't static—it grows from user queries. When someone types a novel, high-quality query, we add it to the index for future users.

    import threading
    import time
    from datetime import datetime, timedelta
    
    class SuggestionIndexManager:
        """
        Manages the FAISS index with hot-reload capability.
        Allows adding new suggestions without downtime.
        """
    
        def __init__(self, index_path: str, suggestions_path: str):
            self.index_path = index_path
            self.suggestions_path = suggestions_path
    
            # Load initial index
            self.index = faiss.read_index(index_path)
            with open(suggestions_path) as f:
                self.suggestions = json.load(f)['suggestions']
    
            # Pending suggestions (not yet indexed)
            self.pending_suggestions: List[str] = []
            self.pending_lock = threading.Lock()
    
            # Start background index update thread
            self.update_thread = threading.Thread(
                target=self._background_update_loop,
                daemon=True
            )
            self.update_thread.start()
    
        def add_suggestion(self, suggestion: str):
            """Queue a new suggestion for indexing"""
            with self.pending_lock:
                if suggestion not in self.suggestions:
                    self.pending_suggestions.append(suggestion)
    
        def _background_update_loop(self):
            """Periodically batch-update the index"""
            while True:
                time.sleep(300)  # Update every 5 minutes
    
                with self.pending_lock:
                    if not self.pending_suggestions:
                        continue
    
                    new_suggestions = self.pending_suggestions.copy()
                    self.pending_suggestions.clear()
    
                # Encode new suggestions
                new_embeddings = model.encode(new_suggestions)
    
                # Add to index (thread-safe for reads)
                self.index.add(new_embeddings.astype('float32'))
                self.suggestions.extend(new_suggestions)
    
                # Persist to disk
                faiss.write_index(self.index, self.index_path)
                with open(self.suggestions_path, 'w') as f:
                    json.dump({'suggestions': self.suggestions}, f)
    
                # Upload to S3 for other instances
                s3_upload_file(BUCKET_NAME, FAISS_INDEX_S3, self.index_path)
                s3_upload_file(BUCKET_NAME, SUGGESTIONS_S3, self.suggestions_path)
    
                print(f"Index updated with {len(new_suggestions)} new suggestions")
    
    # Usage
    index_manager = SuggestionIndexManager(
        index_path="knowledge_base/autocomplete.idx",
        suggestions_path="knowledge_base/suggestions.json"
    )
    
    async def maybe_add_suggestion(suggestion: str):
        """Add user-generated query if it's novel and high-quality"""
        # Check novelty
        existing = get_suggestions(suggestion, k=1, threshold=0.95)
        if existing:
            return  # Too similar to existing suggestion
    
        # Quality checks
        if len(suggestion) < 5 or len(suggestion) > 200:
            return
        if not suggestion[0].isupper():  # Basic grammar check
            return
    
        # Add to pending queue
        index_manager.add_suggestion(suggestion)

    Key design decisions:

    • Novelty check: Only add if no existing suggestion is >95% similar
    • Quality filter: Basic length and grammar checks
    • Batch updates: Aggregate for 5 minutes, then batch-encode and add
    • Hot reload: Readers see updates immediately (FAISS is thread-safe for reads)
    • S3 sync: Other instances pull updated index on next restart

    Part 5: Persistent User Profiles with Redis

    User interests need to persist across sessions. We use Redis for fast reads and automatic TTL-based cleanup.

    import redis
    import json
    from typing import Set
    
    redis_client = redis.Redis(host='localhost', port=6379, db=0)
    
    class UserProfileStore:
        """
        Persistent user interest storage using Redis.
        Supports TTL for automatic cleanup of stale profiles.
        """
    
        PROFILE_TTL = 60 * 60 * 24 * 30  # 30 days
    
        @staticmethod
        def get_interests(user_id: str) -> Set[str]:
            """Retrieve user's interest keywords"""
            key = f"user:interests:{user_id}"
            data = redis_client.get(key)
            if data:
                return set(json.loads(data))
            return set()
    
        @staticmethod
        def update_interests(user_id: str, new_keywords: Set[str]):
            """Add new keywords to user's profile"""
            key = f"user:interests:{user_id}"
    
            # Get existing interests
            existing = UserProfileStore.get_interests(user_id)
    
            # Merge and limit to top 100 keywords
            combined = existing | new_keywords
            if len(combined) > 100:
                # Keep most recent (in practice, track with timestamps)
                combined = set(list(combined)[-100:])
    
            # Save with TTL refresh
            redis_client.setex(
                key,
                UserProfileStore.PROFILE_TTL,
                json.dumps(list(combined))
            )
    
        @staticmethod
        def log_interaction(user_id: str, event_type: str, data: str):
            """Log interaction for analytics (append to list)"""
            key = f"user:interactions:{user_id}"
            entry = json.dumps({
                "type": event_type,
                "data": data,
                "timestamp": time.time()
            })
    
            # Keep last 1000 interactions
            redis_client.lpush(key, entry)
            redis_client.ltrim(key, 0, 999)
            redis_client.expire(key, UserProfileStore.PROFILE_TTL)

    Redis keys:

    • user:interests:{user_id} — Set of interest keywords (max 100)
    • user:interactions:{user_id} — List of recent interactions (max 1000)

    The 30-day TTL ensures we don't store data forever for inactive users, and automatically "forget" outdated interests.

    Production Performance

    # Performance metrics from production deployment
    #
    # Infrastructure:
    # - 2x c5.xlarge instances (4 vCPU, 8GB RAM)
    # - Redis ElastiCache (cache.t3.micro)
    # - S3 for index storage
    #
    # Suggestion Index:
    # - 6,185 suggestions
    # - 384-dimensional embeddings
    # - IndexFlatIP (exact search)
    #
    # Latency (P50 / P95 / P99):
    # - FAISS search: 0.3ms / 0.8ms / 1.2ms
    # - Personalization: 1.5ms / 3.2ms / 5.1ms
    # - WebSocket round-trip: 4ms / 12ms / 25ms
    # - Total (typing → suggestions): 6ms / 16ms / 31ms
    #
    # Throughput:
    # - 2,500 suggestion requests/second per instance
    # - 50,000 concurrent WebSocket connections (across cluster)
    #
    # User Engagement:
    # - Suggestion click-through rate: 34%
    # - Personalized vs generic: 47% vs 28% CTR
    # - Time to first suggestion: 6ms (perceived instant)

    The key metric: personalized suggestions have 47% click-through rate vs 28% for generic. Users are significantly more likely to engage with suggestions tailored to their interests.

    Advanced Techniques

    1. Interest Decay

    Not all interests are equal—recent ones matter more. Implement exponential decay:

    def get_weighted_interests(user_id: str) -> Dict[str, float]:
        """Get interests with time-based decay weights"""
        interactions = redis_client.lrange(f"user:interactions:{user_id}", 0, -1)
    
        keyword_weights = defaultdict(float)
        now = time.time()
    
        for interaction in interactions:
            data = json.loads(interaction)
            age_hours = (now - data['timestamp']) / 3600
    
            # Half-life of 24 hours
            decay = 0.5 ** (age_hours / 24)
    
            # Weight by interaction type
            type_weight = {'selection': 1.0, 'click': 0.5, 'hover': 0.2}
            weight = decay * type_weight.get(data['type'], 0.1)
    
            for keyword in extract_keywords(data['data']):
                keyword_weights[keyword] += weight
    
        return dict(keyword_weights)

    2. Negative Signals

    Track suggestions that were shown but not clicked as weak negative signals:

    # On the client, track which suggestions were visible
    sendImpressionEvent(visibleSuggestions: string[])
    
    # On the server, compare impressions vs clicks
    impressions = get_recent_impressions(user_id)
    clicks = get_recent_clicks(user_id)
    ignored = impressions - clicks
    
    # Slightly penalize keywords from ignored suggestions
    for suggestion in ignored:
        keywords = extract_keywords(suggestion)
        decrease_interest_score(user_id, keywords, factor=0.1)

    3. Cohort-Based Suggestions

    For new users with no history, use collaborative filtering at the cohort level:

    def get_cohort_popular_suggestions(cohort_id: str, k: int = 10):
        """Get most-clicked suggestions for user's cohort"""
        key = f"cohort:popular:{cohort_id}"
    
        # Sorted set: suggestion -> click count
        top_suggestions = redis_client.zrevrange(key, 0, k-1)
    
        return [s.decode() for s in top_suggestions]
    
    # Combine with semantic search for new users
    def get_suggestions_for_new_user(query: str, cohort_id: str):
        semantic = get_suggestions(query, k=3)
        popular = get_cohort_popular_suggestions(cohort_id, k=2)
    
        # Interleave: semantic, popular, semantic, popular, semantic
        return interleave(semantic, popular)

    Lessons Learned

    1. Debounce Typing Events

    Don't send an event on every keystroke. Debounce by 100-150ms to reduce server load and avoid flickering suggestions.

    2. Minimum Query Length

    Suggestions for single characters are rarely useful and expensive to compute. We require at least 2 characters before triggering search.

    3. Graceful Degradation

    If the WebSocket disconnects, fall back to HTTP requests. If personalization fails, return generic semantic results. Always have a fallback.

    4. Privacy Considerations

    User interest profiles are powerful but sensitive. We anonymize after 30 days, don't store raw queries long-term, and allow users to clear their history.

    Conclusion

    Building a real-time personalized suggestion system requires orchestrating multiple technologies:

    • FAISS for sub-millisecond semantic search
    • WebSockets for bidirectional real-time streaming
    • spaCy/NLP for keyword extraction
    • Redis for fast user profile storage
    • Background workers for continuous index updates

    The result is an autocomplete experience that feels magical—suggestions appear instantly and seem to "read your mind" because they're tailored to your demonstrated interests.

    The 47% vs 28% click-through rate tells the story: personalization isn't just a nice-to-have, it's a 1.7x improvement in user engagement.


    This system powered the chatbot interface for a fintech product, handling millions of suggestion requests daily. The combination of semantic search and real-time personalization transformed a basic autocomplete into an intelligent assistant that anticipated user needs.