Most autocomplete systems are static—they return the same suggestions regardless of who's typing. But what if your suggestions could learn from each user's behavior in real-time? What if hovering over a suggestion for 3 seconds, or clicking but not selecting, could influence what appears next?
I built exactly this for a chatbot interface: a quick-suggest system that serves personalized suggestions in under 6ms, learns from clicks/hovers/selections via WebSocket, and continuously grows its suggestion pool from user queries. Here's the complete technical deep-dive.
The Architecture at a Glance
# System Architecture Overview
#
# ┌─────────────────────────────────────────────────────────────┐
# │ Frontend (React) │
# │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
# │ │ Input Field │ │ Suggestions │ │ Interaction Tracker │ │
# │ │ (typing) │ │ (render) │ │ (hover/click/select)│ │
# │ └──────┬──────┘ └──────▲──────┘ └──────────┬──────────┘ │
# │ │ │ │ │
# │ └────────────────┼─────────────────────┘ │
# │ │ │
# └──────────────────────────┼───────────────────────────────────┘
# │ WebSocket
# │ (bidirectional)
# ┌──────────────────────────┼───────────────────────────────────┐
# │ FastAPI Server │
# │ ┌───────────────────────▼──────────────────────────────┐ │
# │ │ WebSocket Handler │ │
# │ │ • Parse events (typing/click/hover/selection) │ │
# │ │ • Route to appropriate handlers │ │
# │ └───────────┬───────────────────────────┬──────────────┘ │
# │ │ │ │
# │ ┌───────────▼───────────┐ ┌───────────▼───────────────┐ │
# │ │ Suggestion Engine │ │ Personalization Engine │ │
# │ │ • FAISS index search │ │ • Keyword extraction │ │
# │ │ • Embedding model │ │ • Interest profile merge │ │
# │ │ • Threshold filter │ │ • Re-ranking logic │ │
# │ └───────────┬───────────┘ └───────────┬───────────────┘ │
# │ │ │ │
# │ ┌───────────▼───────────────────────────▼───────────────┐ │
# │ │ Index Manager │ │
# │ │ • Hot-reload index • Add new suggestions │ │
# │ │ • Background sync • S3 persistence │ │
# │ └───────────────────────────┬───────────────────────────┘ │
# └──────────────────────────────┼───────────────────────────────┘
# │
# ┌─────────────────────┼─────────────────────┐
# │ │ │
# ┌────────▼────────┐ ┌───────▼───────┐ ┌────────▼────────┐
# │ Redis │ │ S3 │ │ PostgreSQL │
# │ User profiles │ │ FAISS index │ │ Analytics │
# │ Session state │ │ Suggestions │ │ Aggregations │
# └─────────────────┘ └───────────────┘ └─────────────────┘ The system has four key components:
- FAISS Index: Semantic search over 6,000+ suggestions in sub-millisecond
- WebSocket Layer: Bidirectional streaming for real-time interaction capture
- Personalization Engine: Re-ranks suggestions based on user interest profile
- Index Manager: Hot-reloads new suggestions without downtime
Part 1: Semantic Search with FAISS
Traditional autocomplete uses prefix matching or fuzzy string search. But users don't always type the exact prefix—someone looking for "Databricks funding" might type "how much did databricks raise". We need semantic search.
Setting Up the Index
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import json
# Load the embedding model (384 dimensions, fast inference)
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
# Load pre-computed suggestions (6,000+ queries)
with open("knowledge_base/autocomplete_suggestions.json", "r") as f:
suggestions = json.load(f)['suggestions']
# Encode all suggestions into embeddings
embeddings = model.encode(suggestions)
print(f"Embeddings shape: {embeddings.shape}") # (6185, 384)
# Initialize FAISS index with Inner Product similarity
# (cosine similarity when vectors are normalized)
index = faiss.IndexFlatIP(384)
index.add(np.array(embeddings).astype('float32'))
# Persist to disk for production use
faiss.write_index(index, "knowledge_base/autocomplete.idx") Key decisions:
- all-MiniLM-L6-v2: 384-dim embeddings, 22M params, ~5ms encode time. Perfect balance of quality and speed.
- IndexFlatIP: Inner Product similarity (equivalent to cosine when normalized). Exact search, no approximation.
- 6,185 suggestions: Pre-computed from common user queries, documentation, and product features.
Search Function
def get_suggestions(user_input: str, k: int = 5, threshold: float = 0.8):
"""
Retrieve top-k semantically similar suggestions.
Args:
user_input: Partial query typed by user
k: Maximum suggestions to return
threshold: Minimum similarity score (0-1)
Returns:
List of suggestion strings above threshold
"""
# Encode user input to same embedding space
query_embedding = model.encode([user_input])
# FAISS search returns (distances, indices)
distances, indices = index.search(
query_embedding.astype('float32'),
k
)
# Filter by threshold and return suggestions
results = [
suggestions[idx]
for i, idx in enumerate(indices[0])
if distances[0][i] > threshold
]
return results
# Example usage
>>> get_suggestions("Cerebras")
['what about cerebras',
'Tell me about Cerebras',
'tell me about cerebras',
'what about cerebras then?',
'show me cerebras now']
>>> get_suggestions("databricks funding")
['How much funding has Databricks raised?',
'Tell me about Databricks latest round',
'Databricks valuation']
The threshold parameter is crucial—it filters out low-confidence matches. At 0.8, we only return suggestions that are genuinely semantically similar, not just vaguely related.
Performance Characteristics
# FAISS Inner Product search performance on 6,185 suggestions:
#
# Index type: IndexFlatIP (brute force, exact search)
# Embedding dim: 384
# Hardware: M1 MacBook Pro
#
# Benchmark results:
# - Single query: 0.3ms average
# - Batch of 10: 0.8ms average
# - Batch of 100: 4.2ms average
#
# For larger indexes (100K+ suggestions), use IndexIVFFlat:
#
# nlist = 100 # Number of clusters
# quantizer = faiss.IndexFlatIP(384)
# index = faiss.IndexIVFFlat(quantizer, 384, nlist, faiss.METRIC_INNER_PRODUCT)
# index.train(embeddings) # Required for IVF
# index.add(embeddings)
# index.nprobe = 10 # Search 10 nearest clusters
#
# This reduces search from O(n) to O(n/nlist * nprobe) Elasticsearch's vector search (dense_vector) adds ~20-50ms latency for semantic queries. FAISS operates purely in-memory with optimized SIMD instructions, achieving sub-millisecond search. For autocomplete where every millisecond matters, this is the difference between "instant" and "noticeable delay".
Part 2: WebSocket Event Streaming
HTTP request-response is too slow for autocomplete—every keystroke would incur connection overhead. WebSockets give us a persistent bidirectional channel: the client streams typing events, the server streams suggestions back.
Server-Side Connection Manager
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Set
import asyncio
import json
app = FastAPI()
# Connection manager for handling multiple clients
class ConnectionManager:
def __init__(self):
# Map: user_id -> WebSocket connection
self.active_connections: Dict[str, WebSocket] = {}
# Map: user_id -> set of interests (keywords)
self.user_interests: Dict[str, Set[str]] = {}
async def connect(self, websocket: WebSocket, user_id: str):
await websocket.accept()
self.active_connections[user_id] = websocket
self.user_interests.setdefault(user_id, set())
def disconnect(self, user_id: str):
self.active_connections.pop(user_id, None)
async def send_suggestions(self, user_id: str, suggestions: list):
if user_id in self.active_connections:
await self.active_connections[user_id].send_json({
"type": "suggestions",
"data": suggestions
})
def update_interests(self, user_id: str, keywords: Set[str]):
"""Merge new keywords into user's interest profile"""
self.user_interests[user_id].update(keywords)
manager = ConnectionManager() WebSocket Endpoint
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
await manager.connect(websocket, user_id)
try:
while True:
# Receive events from client
data = await websocket.receive_json()
event_type = data.get("type")
if event_type == "typing":
# User is typing - get suggestions
query = data.get("query", "")
if len(query) >= 2: # Min 2 chars before suggesting
suggestions = get_personalized_suggestions(
user_id=user_id,
query=query,
k=5
)
await manager.send_suggestions(user_id, suggestions)
elif event_type == "click":
# User clicked a suggestion
clicked = data.get("suggestion")
keywords = extract_keywords(clicked)
manager.update_interests(user_id, keywords)
# Log for analytics
await log_interaction(user_id, "click", clicked)
elif event_type == "hover":
# User hovered over suggestion (3+ seconds)
hovered = data.get("suggestion")
dwell_time = data.get("dwell_ms", 0)
if dwell_time > 3000: # Significant interest signal
keywords = extract_keywords(hovered)
# Lower weight than click
manager.update_interests(user_id, keywords)
elif event_type == "selection":
# User selected suggestion and submitted
selected = data.get("suggestion")
keywords = extract_keywords(selected)
# Highest weight - explicit selection
manager.update_interests(user_id, keywords)
# Potentially add to suggestion pool if novel
await maybe_add_suggestion(selected)
except WebSocketDisconnect:
manager.disconnect(user_id) We capture four types of user interactions:
| Event | Signal Strength | What It Tells Us |
|---|---|---|
typing | Trigger | User wants suggestions (request) |
hover (3s+) | Weak interest | User is considering this topic |
click | Medium interest | User engaged with suggestion |
selection | Strong interest | User explicitly chose this query |
Client-Side Implementation
// Frontend WebSocket client (TypeScript/React)
class SuggestClient {
private ws: WebSocket | null = null;
private userId: string;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
constructor(userId: string) {
this.userId = userId;
this.connect();
}
private connect() {
this.ws = new WebSocket(`wss://api.example.com/ws/${this.userId}`);
this.ws.onopen = () => {
console.log('WebSocket connected');
this.reconnectAttempts = 0;
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'suggestions') {
this.onSuggestions(data.data);
}
};
this.ws.onclose = () => {
// Exponential backoff reconnection
if (this.reconnectAttempts < this.maxReconnectAttempts) {
const delay = Math.pow(2, this.reconnectAttempts) * 1000;
setTimeout(() => this.connect(), delay);
this.reconnectAttempts++;
}
};
}
// Called when user types in input
sendTypingEvent(query: string) {
this.send({ type: 'typing', query });
}
// Called when user clicks a suggestion
sendClickEvent(suggestion: string) {
this.send({ type: 'click', suggestion });
}
// Called when user hovers for 3+ seconds
sendHoverEvent(suggestion: string, dwellMs: number) {
this.send({ type: 'hover', suggestion, dwell_ms: dwellMs });
}
// Called when user selects and submits
sendSelectionEvent(suggestion: string) {
this.send({ type: 'selection', suggestion });
}
private send(data: object) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
}
}
// Override this to update UI
onSuggestions(suggestions: string[]) {
console.log('Received suggestions:', suggestions);
}
} The client handles reconnection with exponential backoff—critical for mobile users with flaky connections. Each event is fire-and-forget; we don't wait for acknowledgment.
Tracking Hover Dwell Time
// React hook for tracking hover with dwell time
import { useRef, useCallback } from 'react';
function useHoverTracking(client: SuggestClient) {
const hoverTimerRef = useRef This hook tracks when users hover over suggestions. A 3-second hover indicates genuine interest—they're reading and considering. We capture this as a weak positive signal for personalization.
Part 3: Real-Time Personalization
The magic happens in the re-ranking step. We combine semantic relevance with user interest overlap to surface suggestions that match both what they're typing and what they care about.
import spacy
from collections import Counter
from typing import Set, List
# Load spaCy for keyword extraction
nlp = spacy.load("en_core_web_sm")
def extract_keywords(text: str) -> Set[str]:
"""Extract meaningful keywords from text using NER and noun chunks"""
doc = nlp(text)
keywords = set()
# Named entities (companies, products, etc.)
for ent in doc.ents:
if ent.label_ in ['ORG', 'PRODUCT', 'GPE', 'PERSON']:
keywords.add(ent.text.lower())
# Noun chunks (key phrases)
for chunk in doc.noun_chunks:
# Filter out pronouns and very short chunks
if chunk.root.pos_ != 'PRON' and len(chunk.text) > 2:
keywords.add(chunk.text.lower())
return keywords
def get_personalized_suggestions(
user_id: str,
query: str,
k: int = 5
) -> List[str]:
"""
Get suggestions personalized to user's interests.
Strategy:
1. Get top-2k candidates from FAISS (semantic match)
2. Re-rank based on user interest overlap
3. Return top-k after re-ranking
"""
# Step 1: Get candidate suggestions (2x what we need)
query_embedding = model.encode([query])
distances, indices = index.search(
query_embedding.astype('float32'),
k * 4 # Over-fetch for re-ranking
)
candidates = [
(suggestions[idx], distances[0][i])
for i, idx in enumerate(indices[0])
if distances[0][i] > 0.5 # Relaxed threshold for candidates
]
# Step 2: Get user's interest profile
user_interests = manager.user_interests.get(user_id, set())
if not user_interests:
# No personalization data yet - return by semantic score
return [c[0] for c in candidates[:k]]
# Step 3: Re-rank by combining semantic score + interest overlap
def personalization_score(suggestion: str, base_score: float) -> float:
suggestion_keywords = extract_keywords(suggestion)
overlap = len(suggestion_keywords & user_interests)
# Weighted combination: 70% semantic, 30% personalization
# Overlap bonus: up to 0.3 additional score
interest_bonus = min(overlap * 0.1, 0.3)
return base_score * 0.7 + interest_bonus
ranked = sorted(
candidates,
key=lambda x: personalization_score(x[0], x[1]),
reverse=True
)
return [r[0] for r in ranked[:k]] How the Scoring Works
Consider a user who has previously clicked on suggestions about "Databricks", "funding rounds", and "Series D". Their interest profile contains these keywords.
When they type "data company", we might get these candidates:
| Suggestion | Semantic Score | Interest Overlap | Final Score |
|---|---|---|---|
| "Tell me about Snowflake" | 0.82 | 0 keywords | 0.82 × 0.7 = 0.57 |
| "Databricks latest funding" | 0.78 | 2 keywords | 0.78 × 0.7 + 0.2 = 0.75 |
| "Data warehouse comparison" | 0.85 | 0 keywords | 0.85 × 0.7 = 0.60 |
The Databricks suggestion wins despite lower semantic score because it matches the user's demonstrated interests. This is collaborative filtering meets semantic search.
Part 4: Continuous Index Growth
The suggestion pool isn't static—it grows from user queries. When someone types a novel, high-quality query, we add it to the index for future users.
import threading
import time
from datetime import datetime, timedelta
class SuggestionIndexManager:
"""
Manages the FAISS index with hot-reload capability.
Allows adding new suggestions without downtime.
"""
def __init__(self, index_path: str, suggestions_path: str):
self.index_path = index_path
self.suggestions_path = suggestions_path
# Load initial index
self.index = faiss.read_index(index_path)
with open(suggestions_path) as f:
self.suggestions = json.load(f)['suggestions']
# Pending suggestions (not yet indexed)
self.pending_suggestions: List[str] = []
self.pending_lock = threading.Lock()
# Start background index update thread
self.update_thread = threading.Thread(
target=self._background_update_loop,
daemon=True
)
self.update_thread.start()
def add_suggestion(self, suggestion: str):
"""Queue a new suggestion for indexing"""
with self.pending_lock:
if suggestion not in self.suggestions:
self.pending_suggestions.append(suggestion)
def _background_update_loop(self):
"""Periodically batch-update the index"""
while True:
time.sleep(300) # Update every 5 minutes
with self.pending_lock:
if not self.pending_suggestions:
continue
new_suggestions = self.pending_suggestions.copy()
self.pending_suggestions.clear()
# Encode new suggestions
new_embeddings = model.encode(new_suggestions)
# Add to index (thread-safe for reads)
self.index.add(new_embeddings.astype('float32'))
self.suggestions.extend(new_suggestions)
# Persist to disk
faiss.write_index(self.index, self.index_path)
with open(self.suggestions_path, 'w') as f:
json.dump({'suggestions': self.suggestions}, f)
# Upload to S3 for other instances
s3_upload_file(BUCKET_NAME, FAISS_INDEX_S3, self.index_path)
s3_upload_file(BUCKET_NAME, SUGGESTIONS_S3, self.suggestions_path)
print(f"Index updated with {len(new_suggestions)} new suggestions")
# Usage
index_manager = SuggestionIndexManager(
index_path="knowledge_base/autocomplete.idx",
suggestions_path="knowledge_base/suggestions.json"
)
async def maybe_add_suggestion(suggestion: str):
"""Add user-generated query if it's novel and high-quality"""
# Check novelty
existing = get_suggestions(suggestion, k=1, threshold=0.95)
if existing:
return # Too similar to existing suggestion
# Quality checks
if len(suggestion) < 5 or len(suggestion) > 200:
return
if not suggestion[0].isupper(): # Basic grammar check
return
# Add to pending queue
index_manager.add_suggestion(suggestion) Key design decisions:
- Novelty check: Only add if no existing suggestion is >95% similar
- Quality filter: Basic length and grammar checks
- Batch updates: Aggregate for 5 minutes, then batch-encode and add
- Hot reload: Readers see updates immediately (FAISS is thread-safe for reads)
- S3 sync: Other instances pull updated index on next restart
Part 5: Persistent User Profiles with Redis
User interests need to persist across sessions. We use Redis for fast reads and automatic TTL-based cleanup.
import redis
import json
from typing import Set
redis_client = redis.Redis(host='localhost', port=6379, db=0)
class UserProfileStore:
"""
Persistent user interest storage using Redis.
Supports TTL for automatic cleanup of stale profiles.
"""
PROFILE_TTL = 60 * 60 * 24 * 30 # 30 days
@staticmethod
def get_interests(user_id: str) -> Set[str]:
"""Retrieve user's interest keywords"""
key = f"user:interests:{user_id}"
data = redis_client.get(key)
if data:
return set(json.loads(data))
return set()
@staticmethod
def update_interests(user_id: str, new_keywords: Set[str]):
"""Add new keywords to user's profile"""
key = f"user:interests:{user_id}"
# Get existing interests
existing = UserProfileStore.get_interests(user_id)
# Merge and limit to top 100 keywords
combined = existing | new_keywords
if len(combined) > 100:
# Keep most recent (in practice, track with timestamps)
combined = set(list(combined)[-100:])
# Save with TTL refresh
redis_client.setex(
key,
UserProfileStore.PROFILE_TTL,
json.dumps(list(combined))
)
@staticmethod
def log_interaction(user_id: str, event_type: str, data: str):
"""Log interaction for analytics (append to list)"""
key = f"user:interactions:{user_id}"
entry = json.dumps({
"type": event_type,
"data": data,
"timestamp": time.time()
})
# Keep last 1000 interactions
redis_client.lpush(key, entry)
redis_client.ltrim(key, 0, 999)
redis_client.expire(key, UserProfileStore.PROFILE_TTL) Redis keys:
user:interests:{user_id}— Set of interest keywords (max 100)user:interactions:{user_id}— List of recent interactions (max 1000)
The 30-day TTL ensures we don't store data forever for inactive users, and automatically "forget" outdated interests.
Production Performance
# Performance metrics from production deployment
#
# Infrastructure:
# - 2x c5.xlarge instances (4 vCPU, 8GB RAM)
# - Redis ElastiCache (cache.t3.micro)
# - S3 for index storage
#
# Suggestion Index:
# - 6,185 suggestions
# - 384-dimensional embeddings
# - IndexFlatIP (exact search)
#
# Latency (P50 / P95 / P99):
# - FAISS search: 0.3ms / 0.8ms / 1.2ms
# - Personalization: 1.5ms / 3.2ms / 5.1ms
# - WebSocket round-trip: 4ms / 12ms / 25ms
# - Total (typing → suggestions): 6ms / 16ms / 31ms
#
# Throughput:
# - 2,500 suggestion requests/second per instance
# - 50,000 concurrent WebSocket connections (across cluster)
#
# User Engagement:
# - Suggestion click-through rate: 34%
# - Personalized vs generic: 47% vs 28% CTR
# - Time to first suggestion: 6ms (perceived instant) The key metric: personalized suggestions have 47% click-through rate vs 28% for generic. Users are significantly more likely to engage with suggestions tailored to their interests.
Advanced Techniques
1. Interest Decay
Not all interests are equal—recent ones matter more. Implement exponential decay:
def get_weighted_interests(user_id: str) -> Dict[str, float]:
"""Get interests with time-based decay weights"""
interactions = redis_client.lrange(f"user:interactions:{user_id}", 0, -1)
keyword_weights = defaultdict(float)
now = time.time()
for interaction in interactions:
data = json.loads(interaction)
age_hours = (now - data['timestamp']) / 3600
# Half-life of 24 hours
decay = 0.5 ** (age_hours / 24)
# Weight by interaction type
type_weight = {'selection': 1.0, 'click': 0.5, 'hover': 0.2}
weight = decay * type_weight.get(data['type'], 0.1)
for keyword in extract_keywords(data['data']):
keyword_weights[keyword] += weight
return dict(keyword_weights) 2. Negative Signals
Track suggestions that were shown but not clicked as weak negative signals:
# On the client, track which suggestions were visible
sendImpressionEvent(visibleSuggestions: string[])
# On the server, compare impressions vs clicks
impressions = get_recent_impressions(user_id)
clicks = get_recent_clicks(user_id)
ignored = impressions - clicks
# Slightly penalize keywords from ignored suggestions
for suggestion in ignored:
keywords = extract_keywords(suggestion)
decrease_interest_score(user_id, keywords, factor=0.1) 3. Cohort-Based Suggestions
For new users with no history, use collaborative filtering at the cohort level:
def get_cohort_popular_suggestions(cohort_id: str, k: int = 10):
"""Get most-clicked suggestions for user's cohort"""
key = f"cohort:popular:{cohort_id}"
# Sorted set: suggestion -> click count
top_suggestions = redis_client.zrevrange(key, 0, k-1)
return [s.decode() for s in top_suggestions]
# Combine with semantic search for new users
def get_suggestions_for_new_user(query: str, cohort_id: str):
semantic = get_suggestions(query, k=3)
popular = get_cohort_popular_suggestions(cohort_id, k=2)
# Interleave: semantic, popular, semantic, popular, semantic
return interleave(semantic, popular) Lessons Learned
1. Debounce Typing Events
Don't send an event on every keystroke. Debounce by 100-150ms to reduce server load and avoid flickering suggestions.
2. Minimum Query Length
Suggestions for single characters are rarely useful and expensive to compute. We require at least 2 characters before triggering search.
3. Graceful Degradation
If the WebSocket disconnects, fall back to HTTP requests. If personalization fails, return generic semantic results. Always have a fallback.
4. Privacy Considerations
User interest profiles are powerful but sensitive. We anonymize after 30 days, don't store raw queries long-term, and allow users to clear their history.
Conclusion
Building a real-time personalized suggestion system requires orchestrating multiple technologies:
- FAISS for sub-millisecond semantic search
- WebSockets for bidirectional real-time streaming
- spaCy/NLP for keyword extraction
- Redis for fast user profile storage
- Background workers for continuous index updates
The result is an autocomplete experience that feels magical—suggestions appear instantly and seem to "read your mind" because they're tailored to your demonstrated interests.
The 47% vs 28% click-through rate tells the story: personalization isn't just a nice-to-have, it's a 1.7x improvement in user engagement.
This system powered the chatbot interface for a fintech product, handling millions of suggestion requests daily. The combination of semantic search and real-time personalization transformed a basic autocomplete into an intelligent assistant that anticipated user needs.