Core Architecture
ProfitWheel is structured across six layers that together move data from ad platforms through intelligence systems to actionable insights.
π Evidence
Six-layer intro sentence β pw-docs/src/content/docs/core-architecture.mdx:8 β exact intro text; six ## Layer N headings (lines 28-50) confirm the model. β
Six-layer model β pw-enterprise/CLAUDE.md enumerates Flask API, PostgreSQL, Redis+RQ, LangChain/LangGraph/CrewAI AI, Elasticsearch, and GCP/Azure cloud, matching the docβs layer stack. β
flowchart TD
FE["Client: React 19 SPA<br/>Cloudflare Pages"] --> BFF["Cloudflare Worker BFF<br/>CSP / headers / edge bindings"]
BFF --> API["Flask API monolith<br/>~52 blueprints"]
API --> PG[("PostgreSQL")]
API --> RD[("Redis")]
API --> ES[("Elasticsearch")]
API -- enqueue --> RQ["RQ workers<br/>fast / default / slow"]
CM["cron_master.py<br/>scheduled dispatcher"] --> RQ
CM -- scheduled --> PG
BQ[("BigQuery")]
CM -- scheduled --> BQ
RQ --> AI["AI layer<br/>LangChain / LangGraph / CrewAI Β· 12+ agents"]
AI --> LLM["LLM vendors<br/>OpenAI Β· Anthropic Β· Groq Β· Gemini Β· Cohere"]
RQ --> EXT["External integrations<br/>Google Β· Meta Β· TikTok Β· Pinterest Β· LinkedIn Β· Snapchat Β· GA4"]
AI --> PG
π Evidence
Client: React 19 SPA on Cloudflare Pages β pw-enterprise-frontend/package.json:108,114, pw-enterprise-frontend/wrangler.toml:3 β react/react-dom ^19.1.0; pages_build_output_dir confirms Pages. β
BFF: CSP, headers, edge bindings β pw-enterprise-frontend/functions/_middleware.ts:9-16,42-50 β Pages middleware sets CSP/security headers and Env declares AI, 3x Vectorize, Hyperdrive bindings. β
Flask API: ~52 blueprints β pw-enterprise/app.py:234-284,445 β grep -c app.register_blueprint = 52 (51 in 234-284 + the SSE blueprint at :445). β
PostgreSQL store β pw-enterprise/db_connection.py:9-45 β psycopg2 ThreadedConnectionPool (dev+prod, 1-100 conns); env keys POSTGRES_HOST/POSTGRES_PORT in .env:140-141. β
Redis store + RQ queues β pw-enterprise/.env:22-25,41-42, pw-enterprise/api/intent_insights.py:141-143 β redis_host/redis_port and rq_env/rq_env_fast/rq_env_slow keys read at runtime. β
Elasticsearch store β pw-enterprise/library/utils.py:17,83-90, pw-enterprise/pyproject.toml:37 β Elasticsearch client via search_engine_ip; elasticsearch>=9.2.0 dependency. β
RQ workers: fast/default/slow β pw-enterprise/api/intent_insights.py:141-143,1452,3037, pw-enterprise/pyproject.toml:82 β Queue(rq_env) and Queue(rq_env_slow) instantiated; rq>=2.6.1 dependency. β
API enqueues to RQ β pw-enterprise/api/intent_insights.py:1452 β qc = Queue(rq_env, connection=redis_conn, timeout=600) inside API module. β
cron_master.py dispatcher β pw-enterprise/cron_master.py:23 β run_type = sys.argv[1] then dispatches; 58 run_type == branches counted. β
cron_master dispatches to cron jobs (which enqueue RQ) β pw-enterprise/cron_master.py:8-11,23-28 β sets up tracing then dispatches run_type to cron/ functions; cron jobs use Queue(...). β
cron writes to Postgres β pw-enterprise/cron/audience_size_cron.py:603 β comment Write to Postgres first (before BigQuery which can hang) in a cron_master-dispatched job. β
BigQuery store β pw-enterprise/api/common.py:4,11, pw-enterprise/pyproject.toml:100 β from google.cloud import bigquery / client = bigquery.Client(); google-cloud-bigquery>=3.39.0 dependency. β
cron writes to BigQuery β pw-enterprise/cron/audience_size_cron.py:646,667 β BigQuery dedup+insert into xenon-antonym-363411.profitboost_dataset.trend_interest_country. β
AI layer LangChain/LangGraph/CrewAI; 12+ agents β pw-enterprise/pyproject.toml:12-16,33, pw-enterprise/v3/langraph_main.py:91-105 β all three frameworks declared; agent_description_mapping() has 12 named agents (library/ai_services.py:417-450 lists 13β16 more). β
LLM vendors OpenAI/Anthropic/Groq/Gemini/Cohere β pw-enterprise/.env:3,4,88,96, pw-enterprise/v3/langraph_main.py:119,124, pw-enterprise/api/reports.py:2620-2622 β keys + ChatAnthropic/ChatGoogleGenerativeAI imports + cohere.Client. β
External integrations (7+) β pw-enterprise/library/ β tiktok_package.py:20-21 plus pinterest_package.py, linkedin_package.py, snapchat_package.py, google_ads_package.py, facebook_package.py, google_analytics_4_package.py all present. β
RQ job kicks off AI crew β pw-enterprise/v3/main.py:27-46 β py_crew_run builds PythonCrew(...).crew().kickoff(inputs=inputs), the RQβAI path. β
Cron/RQ jobs hit external platforms β pw-enterprise/cron/tiktok.py, pw-enterprise/cron/facebook.py β platform cron jobs exist and are dispatched by cron_master.py onto RQ. β
AI results written to Postgres β pw-enterprise/v3/main.py:53-56 β after crew completes, execute_query("UPDATE custom_report SET knowledge_bank=... WHERE id=...", commit=True). β
Layer 1: Client (Frontend)
Section titled βLayer 1: Client (Frontend)βA React 19 single-page application delivered via Cloudflare Pages. A thin Cloudflare Worker BFF sits in front of the SPA to enforce security headers and Content Security Policy, and to expose edge bindings (AI, Vectorize, Hyperdrive) without shipping credentials to the browser. (Cloudflare R2 object storage is used server-side via boto3 β it is not a Worker edge binding.)
π Evidence
React 19 SPA β pw-enterprise-frontend/package.json:108,114 β react/react-dom ^19.1.0. β
Cloudflare Pages delivery β pw-enterprise-frontend/wrangler.toml:3, pw-enterprise-frontend/package.json:15 β pages_build_output_dir and deploy script wrangler pages deploy dist. β
BFF enforces CSP + security headers β pw-enterprise-frontend/functions/_middleware.ts:24-50 β buildCSP + getSecurityHeaders (X-Frame-Options, X-Content-Type-Options, etc.) on HTML responses. β
BFF edge bindings: AI, Vectorize Γ3, Hyperdrive β pw-enterprise-frontend/functions/_middleware.ts:9-16 and worker-configuration.d.ts:7-13 declare AI, three VECTORIZE_INDEX_*, and a Hyperdrive binding in the Worker Env (configured via the Pages dashboard; only AI is in wrangler.toml). R2 is NOT a Worker binding β it is used server-side via boto3 (pw-enterprise/library/storage.py). β
Layer 2: API (Backend)
Section titled βLayer 2: API (Backend)βA Flask monolith (app.py) with approximately 52 blueprint modules handling the full surface area of the product β campaigns, recommendations, reporting, user management, and more. This layer services synchronous request/response interactions for all user-facing calls.
π Evidence
Flask monolith app.py β pw-enterprise/app.py:9-10, pw-enterprise/pyproject.toml:44 β from flask import Flask (line 9; cited 7-10 includes dotenv); flask>=3.1.2 dependency. β
~52 blueprints β pw-enterprise/app.py:234-284,445 β grep -c app.register_blueprint = 52 (51 in 234-284 + the SSE blueprint at :445). β
MethodView pattern β pw-enterprise/api/amazon.py:22, pw-enterprise/api/authenticate.py:21 β from flask.views import MethodView across API modules. β
Layer 3: Async / Background
Section titled βLayer 3: Async / BackgroundβThe operational backbone: cron_master.py acts as a scheduled dispatcher, feeding work into RQ workers across three priority queues (fast, default, slow) backed by Redis. Heavy operations β twin creation, large reports, embedding generation, and ad-platform syncs β are enqueued here, never run inline in the API request path.
π Evidence
cron_master.py dispatcher β pw-enterprise/cron_master.py:23-28 β run_type = sys.argv[1] branches to matching cron/ functions; 58 branches. β
Three RQ queues backed by Redis β pw-enterprise/.env:22-25, pw-enterprise/api/content_insights.py:552 β rq_env/rq_env_fast/rq_env_slow (plus unused rq_env_medium); Queue(rq_env_slow, connection=redis_conn). β
Heavy ops run as cron/RQ jobs β pw-enterprise/cron_master.py:369-386,497, pw-enterprise/cron/run_reports.py β report_embeddings and ai_twin_update_cron branches dispatch embedding/twin jobs. β
Three RQ tiers backed by Redis β pw-enterprise/config/constants.py:33-34 loads rq_env + rq_env_slow; .env:22-25 also defines rq_env_fast (and rq_env_medium), confirming the queue tiers. β
Layer 4: AI / Intelligence
Section titled βLayer 4: AI / IntelligenceβLangChain, LangGraph, and CrewAI orchestrate 12+ specialist agents (the LangGraph mapping defines 12; the copilot agent list returns 13β16) that handle tasks ranging from budget recommendations to ad creative analysis. The agents run across multiple LLM vendors (OpenAI, Anthropic, Groq, Gemini, Cohere) and are instrumented in Langfuse (with a default 50% trace sample rate).
π Evidence
LangChain/LangGraph/CrewAI orchestration β pw-enterprise/pyproject.toml:12-16,33, pw-enterprise/v3/crew_f.py:10-14, pw-enterprise/v3/langraph_main.py:126-128 β deps + crewai imports + langgraph.graph StateGraph. β
12+ agents β pw-enterprise/v3/langraph_main.py:91-105 β agent_description_mapping() has 12 named agents; library/ai_services.py:417-450 lists 13β16 more. β
OpenAI vendor β pw-enterprise/.env:3, pw-enterprise/library/llm_completions.py:48-63 β openai_api_key key + "openai": ProviderConfig(api_key_env="openai_api_key"...). β
Anthropic vendor β pw-enterprise/.env:88, pw-enterprise/v3/langraph_main.py:2537-2540 β claude_key + ChatAnthropic(model_name="claude-sonnet-4-5", api_key=claude_key). β
Groq vendor β pw-enterprise/.env:96, pw-enterprise/library/llm_completions.py:33-47, pw-enterprise/pyproject.toml:15 β groq_api_key + "groq": ProviderConfig(...) + langchain-groq>=1.1.1. β
Gemini vendor β pw-enterprise/pyproject.toml:20, pw-enterprise/v3/langraph_main.py:329-330 β langchain-google-genai>=4.2.0 + ChatGoogleGenerativeAI(model="gemini-3-flash-preview"). β
Cohere vendor β pw-enterprise/.env:4, pw-enterprise/api/reports.py:2620-2622, pw-enterprise/pyproject.toml:32 β cohere_api_key + co = cohere.Client(cohere_api_key) + cohere>=5.20.0. β
Langfuse tracing β pw-enterprise/library/langsmith_metadata.py:16-34, pw-enterprise/cron_master.py:8-11, pw-enterprise/pyproject.toml:105-106 β setup_langfuse_tracing() auto-instruments LangChain; called at cron startup. β
Langfuse tracing (50% default sample) β pw-enterprise/library/langsmith_metadata.py:25-27 defaults LANGFUSE_SAMPLE_RATE to 0.5; instrumentation hooks all LangChain calls but ~50% are sampled by default. β
Layer 5: Data Stores
Section titled βLayer 5: Data StoresβFour purpose-fit stores: PostgreSQL is the system of record for all structured business data; Redis serves as both the in-memory cache and the RQ job queue; Elasticsearch powers full-text search and time-series metric queries; BigQuery is the analytics warehouse for large-scale reporting and ML feature pipelines.
π Evidence
PostgreSQL system of record β pw-enterprise/db_connection.py:26-45 β dev + prod ThreadedConnectionPool (1-100 conns each). β
Redis backs RQ queues (cache role under-evidenced) β pw-enterprise/.env:41-42, pw-enterprise/api/content_insights.py:552 β redis_host/redis_port + Queue(..., connection=redis_conn); cited lines prove queue but not cache. β οΈ Needs a Redis cache GET/SET citation.
Elasticsearch search + time-series β pw-enterprise/library/utils.py:83-90, pw-enterprise/cron/es_refresh_cron.py, pw-enterprise/cron/behaviour_timeseries_cron.py β ES client + dedicated refresh and time-series crons exist. β
BigQuery analytics warehouse β pw-enterprise/api/common.py:4,11, pw-enterprise/cron/audience_size_cron.py:667 β bigquery.Client() + cron inserts trend/audience data; βML feature pipelinesβ phrase not directly evidenced. β οΈ
Cloudflare R2 object storage β pw-enterprise/library/storage.py:1-7, pw-enterprise/.env:144-148 β R2 upload helpers + R2_ACCOUNT_ID/R2_ACCESS_KEY_ID/etc. env keys. β
Azure Blob (legacy) β pw-enterprise/library/storage.py:6,46 β docstring notes migration from Azure Blob to R2; Convert old Azure Blob Storage URL to R2 URL helper. β
GCP Secret Manager (primary) β pw-enterprise/library/gcp_secrets.py:1-13, pw-enterprise/app.py:16,30,51-52 β secretmanager module; app loads 18 secrets in parallel via ThreadPoolExecutor at startup. β
Azure Key Vault (legacy) β pw-enterprise/pyproject.toml:27-28, pw-enterprise/library/gcp_secrets.py:4-5 β azure-identity/azure-keyvault-secrets deps still present; docstring says GCP replaced Azure Key Vault. β
Layer 6: External Integrations
Section titled βLayer 6: External IntegrationsβSeven-plus ad platforms β Google, Meta, TikTok, Pinterest, LinkedIn, Snapchat, and GA4 β feed raw performance data in and receive optimized signals out. Additional document and media AI services support creative intelligence use cases.
π Evidence
7+ ad platforms β pw-enterprise/library/ β dv360_package.py:32-33, tiktok_package.py:20-21, youtube_package.py:14-16 plus google_ads/facebook/pinterest/linkedin/snapchat/google_analytics_4 packages β 8+ integrations, β7+β is a safe minimum. β
Key flows
Section titled βKey flowsβThe layer model shows what the system is made of. These diagrams trace how work actually moves through it β the paths a senior engineer needs to reason about the platform.
1. Research & twin lifecycle
Section titled β1. Research & twin lifecycleβThe core product loop. Twins are not generic chatbots: they are grounded in real audience signals pulled from ad platforms, distilled into behavior / intent / mentions reports plus persistent memory, and then used to run qualitative and quantitative research that informs marketing decisions.
flowchart LR META["Ad-platform audiences<br/>(Meta & others)"] --> SIG["Audience signals<br/>behavior Β· intent Β· mentions"] SIG --> RPT["Grounding reports<br/>+ twin memory"] RPT --> TWIN["AI Twin persona<br/>ai_twin/ module"] TWIN --> QUAL["Qualitative<br/>focus groups Β· interviews"] TWIN --> QUANT["Quantitative<br/>surveys Β· respondents"] QUAL --> INS["Insights"] QUANT --> INS INS --> MKT["Informs marketing<br/>& campaign decisions"]
π Evidence
Ad-platform audiences are the source node β pw-enterprise/cron_master.py:199-236 audience_run imports/calls facebook_run, snapchat_run, google_analytics_run, tiktok_run, pinterest_run, linkedin_run, and library/facebook_package.py wraps Meta. β
Signals = behavior / intent / mentions β pw-enterprise/ai_twin/respondents/respondent_prompt_composer.py:8-10 docstring lists --behavior, --intent, --mentions report inputs. β
Signals distilled into reports + twin memory β pw-enterprise/ai_twin/respondents/respondent_prompt_composer.py:57-118 prompt mandates 2-3 memory fragments per respondent built from behavior/intent/mentions summaries. β
Twin persona lives in ai_twin/ β pw-enterprise/ai_twin/group/main.py:1-20 (focus-group entry) and ai_twin/respondents/agents.py:49 (SurveyAnalysisAgent) confirm both submodules. β
Qualitative focus groups + interviews β pw-enterprise/ai_twin/group/main.py:85 builds focus-group prompts and :487-499 builds interview-mode (Says/Thinks) prompts. β
Quantitative surveys with respondents β pw-enterprise/ai_twin/respondents/agents.py:49,62 (SurveyAnalysisAgent.fetch_survey_data) and response_aggregator.py:2580 (_get_real_respondent_count). β
Insights inform marketing decisions β pw-enterprise/ai_twin/respondents/agents.py:899-935 system prompt: βstrategic survey insights advisor β¦ actionable business insights β¦ better business decisionsβ. β
2. Async job pipeline
Section titled β2. Async job pipelineβHeavy work never runs inline in the request path. cron_master.py dispatches scheduled cron/ jobs, and both cron and the API enqueue onto three Redis-backed RQ priority queues drained by workers. Apache Airflow provides higher-level orchestration on top.
flowchart TD
API["Flask API"] -- enqueue --> ENQ{"RQ enqueue"}
CM["cron_master.py<br/>dispatcher"] --> CRON["cron/ jobs (~50)"]
CRON --> ENQ
AF["Apache Airflow<br/>library/airflow_client.py"] -. orchestrates .-> CM
ENQ --> FAST["queue: fast"]
ENQ --> DEF["queue: default"]
ENQ --> SLOW["queue: slow"]
FAST --> W["RQ workers"]
DEF --> W
SLOW --> W
RD[("Redis<br/>queue backend")] -. backs .-> ENQ
W --> PG[("PostgreSQL")]
W --> ES[("Elasticsearch")]
W --> AIW["AI agents"]
π Evidence
Flask API enqueues to RQ β pw-enterprise/api/content_insights.py:547-556 creates Queue(rq_env_slow,...) and qc.enqueue(...); api/survey.py:2064-2067 uses Queue(rq_env,...). β
cron_master.py is the CLI dispatcher β pw-enterprise/cron_master.py:2 reads sys.argv and branches on run_type (e.g. audience_run at :199) to import/call cron jobs. β
cron/ holds ~50 job files β pw-enterprise/cron/ has 55 entries, 54 excluding __init__.py; docβs β~50β is within rounding. β
Airflow orchestration via library/airflow_client.py β file docstring says βHTTP wrapper for Airflow REST APIβ and imports AIRFLOW_API_BASE_URL etc. from config/airflow_config.py. β
Three RQ tiers (fast/default/slow) β pw-enterprise/.env:22-25 defines rq_env, rq_env_fast, rq_env_medium, rq_env_slow; api/content_insights.py:31 consumes rq_env_slow. β
(note: an rq_env_medium also exists)
Redis backs RQ queues β pw-enterprise/api/planner.py:4558 Redis(host=redis_host,...) connects every Queue(); same pattern in api/content_insights.py:551. β
Workers write PG/ES and invoke AI agents β pw-enterprise/cron_master.py:253-257 syncs PostgreSQLβElasticsearch; library/ai_services.py:414-450 lists dispatched agents. β
3. Request lifecycle & auth
Section titled β3. Request lifecycle & authβEvery user-facing call passes through the Cloudflare Worker BFF (which adds CSP, security headers, and edge bindings), then hits a Flask MethodView endpoint. Each endpoint authenticates the Authorization header before running business logic.
flowchart LR
U["Browser<br/>React 19 SPA"] --> CF["Cloudflare Worker BFF<br/>CSP Β· headers Β· edge bindings"]
CF --> EP["Flask MethodView<br/>endpoint"]
EP --> AUTH["authentication(auth_header)<br/>JWT / session"]
AUTH -- valid --> BL["Business logic"]
AUTH -- invalid --> R401["401 Unauthorized"]
BL --> PG[("PostgreSQL")]
BL --> RD[("Redis cache")]
BL --> RESP["JSON response"]
π Evidence
Client is a React 19 SPA β pw-enterprise-frontend/package.json:108 pins "react": "^19.1.0". β
Cloudflare Worker BFF adds CSP/headers/bindings β pw-enterprise-frontend/functions/_middleware.ts:9-16,44-45 declares Env bindings (AI, Vectorize x3, Hyperdrive) and sets Content-Security-Policy + X-Frame-Options: DENY. β
Flask MethodView endpoints β pw-enterprise/api/misc.py:30 imports from flask.views import MethodView; :160 class fbAdaccountApi(MethodView). β
authentication(auth_header) validates JWT β pw-enterprise/api/jwt.py:111-121 defines it and delegates to decode_jwt_token; imported in app.py:133. β
JWT HS256 with configurable expiry β pw-enterprise/api/jwt.py:10-14 sets JWT_ALGORITHM = "HS256"; encode_jwt_token (:65-79) takes custom_days/is_trusted for expiry (default 168h). Note: module-level JWT_EXP_DELTA_HOURS = 1 is defined but unused. β
Invalid JWT path β pw-enterprise/api/jwt.py:103-108 returns {"status":"error"} for expired/invalid tokens, but decode_jwt_token/authentication return an error dict, not an HTTP 401; the 401 mapping happens in individual endpoints and was not verified here. β οΈ
Business logic uses PostgreSQL + Redis β pw-enterprise/db_connection.py:26-46 builds PG ThreadedConnectionPool(1,100); cache.py:10 import redis. β
4. AI orchestration
Section titled β4. AI orchestrationβAgents are composed with LangChain / LangGraph / CrewAI and dispatched to whichever LLM vendor fits the task β some via the shared llm_completions.py layer (OpenAI/Groq), others via direct vendor clients. Calls are instrumented in Langfuse (50% default sample rate); retrieval is backed by FAISS vectors and Cloudflare Vectorize.
flowchart TD
REQ["Agent task<br/>(from RQ / API)"] --> ORCH["Orchestration<br/>LangChain Β· LangGraph Β· CrewAI"]
ORCH --> ROUTE{"LLM dispatch<br/>llm_completions.py + direct clients"}
ROUTE --> OAI["OpenAI"]
ROUTE --> ANT["Anthropic"]
ROUTE --> GRQ["Groq"]
ROUTE --> GEM["Gemini"]
ROUTE --> COH["Cohere"]
ROUTE --> CER["Cerebras Β· DeepSeek Β· Perplexity Β· OpenRouter"]
ORCH --> VEC["FAISS Β· Vectorize<br/>retrieval"]
ORCH -. traced .-> LF["Langfuse"]
OAI --> OUT["Result"]
ANT --> OUT
GRQ --> OUT
GEM --> OUT
COH --> OUT
CER --> OUT
π Evidence
LangChain / LangGraph / CrewAI orchestration β pw-enterprise/requirements.txt:518 langchain==1.2.7, :563 langgraph==1.0.7, :154 crewai==1.6.1. β
LLM dispatch: shared layer for OpenAI/Groq, direct clients for the rest β pw-enterprise/library/llm_completions.py:32-64 PROVIDER_CONFIGS covers openai and groq; Anthropic/Gemini/Cohere/Cerebras/etc. are instantiated directly at their call sites. Diagram node relabeled accordingly. β
OpenAI vendor β pw-enterprise/.env:3 openai_api_key; loaded in app.py:304. β
Anthropic vendor β pw-enterprise/.env:136 ANTHROPIC_API_KEY; ai_twin/respondents/agents.py:337-338 instantiates ChatAnthropic(model_name="claude-sonnet-4-5") (using env claude_key). β
Groq vendor β pw-enterprise/.env:96 groq_api_key; library/ai_services.py:345-347 imports ChatGroq and reads groq_api_key. β
Gemini vendor β pw-enterprise/.env:131 GOOGLE_API_KEY; ai_twin/group/main.py:8 imports ChatGoogleGenerativeAI. β
Cohere vendor β pw-enterprise/.env:4 cohere_api_key; api/planner.py:12 import cohere. β
Cerebras vendor β pw-enterprise/.env:107 CEREBRAS-API-KEY; v3/langraph_main.py:396-398 imports Cerebras and calls cerebras_client.chat.completions.create. β
DeepSeek vendor β pw-enterprise/.env:110 deepseek_api_key; consumed in v3/langraph_main.py:32. β
Perplexity vendor β pw-enterprise/.env:111 perplexity_api_key; library/ai_services.py:310-316 calls https://api.perplexity.ai/chat/completions. β
OpenRouter vendor β pw-enterprise/.env:168 openrouter; api/assets/routers/conversations.py:4126-4131 sets openai_api_base="https://openrouter.ai/api/v1". β
FAISS + Cloudflare Vectorize retrieval β pw-enterprise/app.py:1219-1227 builds faiss.IndexFlatL2 FAISS store; add_vectors.py:17-23 VectorizeBase calls the Cloudflare AI embeddings API. β
Langfuse tracing (50% default sample) β pw-enterprise/library/langsmith_metadata.py:16-33 calls LangChainInstrumentor().instrument() and :25-27 sets a default LANGFUSE_SAMPLE_RATE of 0.5; intro now states the sampling. β
12+ agents (13β16 in the copilot list) β pw-enterprise/library/ai_services.py:417-431 returns 13 agents in app mode and :433-450 returns 16 otherwise (plus a separate SurveyAnalysisAgent); v3/langraph_main.py:91-105 defines 12. β
5. Data & storage topology
Section titled β5. Data & storage topologyβFour query stores plus two object stores, fronted by secret managers. Each store is purpose-fit rather than overlapping.
flowchart TD
API["Flask API + RQ workers"]
SEC["Secrets<br/>GCP Secret Manager Β· Azure Key Vault"] -. injected .-> API
API --> PG[("PostgreSQL<br/>system of record")]
API --> RD[("Redis<br/>cache + RQ queues")]
API --> ES[("Elasticsearch<br/>search Β· time-series")]
API --> BQ[("BigQuery<br/>analytics warehouse")]
API --> R2["Cloudflare R2<br/>object storage"]
API --> AZ["Azure Blob<br/>storage (legacy)"]
π Evidence
Flask API is the primary write source β pw-enterprise/app.py:234-445 registers 52 blueprints (e.g. register_blueprint(research_bp) at :234, sse at :445). β
GCP Secret Manager injects secrets β pw-enterprise/library/gcp_secrets.py:1-18 wraps google.cloud.secretmanager; app.py:30-52 loads _SECRETS in parallel via load_secret_to_env. β
Azure Key Vault is legacy β pw-enterprise/requirements.txt:62 still pins azure-keyvault-secrets==4.10.0, but library/secrets.py:109-115 set_secret_to_key_vault/get_secret_from_key_vault now delegate to GCP Secret Manager. β
PostgreSQL is the system of record β pw-enterprise/db_connection.py:9-46 uses psycopg2 ThreadedConnectionPool(1,100) for dev/prod databases. β
Redis = cache + RQ queue β pw-enterprise/.env:41-43 redis_host/port/password; cache.py:10 import redis (cache) while RQ queues use the same Redis. β
Elasticsearch active β pw-enterprise/library/utils.py:17 from elasticsearch import Elasticsearch; .env:26 elastic_key. β
BigQuery warehouse β pw-enterprise/api/common.py:4-11 imports from google.cloud import bigquery and instantiates bigquery.Client(). β
Cloudflare R2 object storage β pw-enterprise/library/storage.py:1-7,32-37 documents migration to R2 and get_r2_client() builds a boto3 S3 client at *.r2.cloudflarestorage.com. β
Azure Blob is legacy β pw-enterprise/library/storage.py:29 keeps AZURE_STORAGE_URL_PATTERN = "https://pwdevstorage.blob.core.windows.net/" for migration; .env:47-48 still has Azure storage connections. β
6. External integration map
Section titled β6. External integration mapβDozens of outbound integrations feed the platform, grouped into ad/social platforms, retail-commerce data, and search/web/enrichment providers. In-house wrappers in library/ and scheduled cron jobs normalize them into Elasticsearch and PostgreSQL.
flowchart LR
subgraph ADS["Ad / social platforms"]
G["Google Ads Β· DV360"]
M["Meta"]
T["TikTok Β· Pinterest"]
L["LinkedIn Β· Snapchat"]
Y["YouTube Β· GA4"]
end
subgraph RETAIL["Retail / commerce data"]
RF["Rainforest (Amazon)"]
BB["BigBox Β· BlueCart Β· RedCircle"]
end
subgraph WEB["Search / web / enrichment"]
TV["Tavily Β· Serper Β· ValueSERP"]
SC["ZenRows Β· scraping APIs"]
DM["Census Β· Precisely"]
end
ADS --> ING["Ingestion<br/>library/*_package.py Β· cron jobs"]
RETAIL --> ING
WEB --> ING
ING --> ES[("Elasticsearch")]
ING --> PG[("PostgreSQL")]
π Evidence
Google Ads + DV360 wrappers β pw-enterprise/library/google_ads_package.py:42-46 and library/dv360_package.py:32-36 (dv360_client_id/secret, def dv360_auth). β
Meta/Facebook wrapper β pw-enterprise/library/facebook_package.py exists (e.g. :53 cohere_api_key); finderβs comment-header snippet was fabricated but the file is the Meta wrapper. β
TikTok + Pinterest wrappers β pw-enterprise/library/tiktok_package.py and library/pinterest_package.py both exist. β
LinkedIn + Snapchat wrappers β pw-enterprise/library/linkedin_package.py and library/snapchat_package.py both exist. β
YouTube + GA4 wrappers β pw-enterprise/library/youtube_package.py:14-16 (youtube_client_secret, youtube.force-ssl scope) and library/google_analytics_4_package.py exists. β
Rainforest (Amazon) integration β pw-enterprise/api/product.py:89 rainforest_key, :311 calls https://api.rainforestapi.com/request, caches into rainforest_request table. β
BigBox / BlueCart / RedCircle β pw-enterprise/api/social_insights.py:79-81 loads all three keys; :536 calls bluecartapi.com, :603 calls redcircleapi.com; secrets loaded in app.py:37-39. β
Tavily configured but usage unverified β pw-enterprise/.env:137 has TAVILY_API_KEY, but no active SDK import/call was found in the codebase; evidence is env-key only. β οΈ
Serper integration β pw-enterprise/custom_answer_engine.py:12,17,29 imports GoogleSerperAPIWrapper and instantiates it with SERPER_API_KEY. β
ValueSERP integration β pw-enterprise/library/valueserp_helper.py:12,165 reads valueserp_api_key and calls https://api.valueserp.com/search. β
ZenRows scraping integration β pw-enterprise/config/constants.py:40 and v3/tools.py:56 both load zenrow_token = os.environ["zenrow_token"]. β
US Census enrichment β pw-enterprise/ai_twin/respondents/universe_stage_fetcher.py:246,310 defines _query_census hitting https://api.census.gov/data/...; .env:138 CENSUS_API_KEY. β
Precisely enrichment β pw-enterprise/api/household.py:200 def precisely_household(...); app.py:33-34 loads PRECISELY-SECRET/PRECISELY-KEY from GCP. β
Ingestion via library/*_package.py β pw-enterprise/library/ has 11 *_package.py files (dv360, facebook, google_ads, google_analytics, google_analytics_4, linkedin, pinterest, snapchat, spotify, tiktok, youtube). β
Cron jobs normalize into ES + PG β pw-enterprise/cron/sync_audience_between_pg_and_es.py exists and cron_master.py:253-257 runs sync_audience_between_pg_and_es_cron(). β
Client β BFF β Flask API β (Postgres / Redis) β RQ workers β AI layer β external platforms, with cron jobs feeding the data stores on a schedule.