# Unified Fetcher Learning System - Implementation Plan ## Overview Write a unified system that learns optimal fetcher strategies from experience while protecting against IP bans. **Core Innovation**: Replace pre-computed patterns with conditional probability queries over heuristic-tagged attempts. This enables flexible "P(success | domain=X AND suffix=Y)" queries without pattern explosion. ## Architecture ### Parallel Implementation Strategy The new learning system runs **in parallel** with the existing hard-coded router, with a **feature flag** to switch between: - **Legacy mode**: Uses existing `fetcher_router` table (no learning) - **Learning mode**: Uses new heuristic-based system (ignores `fetcher_router`) This allows: - Zero risk deployment (toggle off if issues) - A/B testing between approaches - Gradual migration - Rollback capability ### Learning Mode Lookup Priority 1. **Heuristic-based learning** - query `attempt_heuristics` with time decay 2. **Cheap HTTP classification** - 3s GET + content analysis 3. **Execute & Learn** - validate, log attempt with heuristics, handle failures **Note**: Hard-coded router is NOT part of the new flow (legacy mode only) ### Key Enhancements - **Parallel deployment**: Feature flag to switch between legacy and learning modes - **Explicit ban tracking**: `is_banned` boolean for IP risk management - **Time decay**: Weight recent attempts with `0.5^(days/30)` exponential decay - **Heuristic importance**: Natural frequency weighting + explicit scoring for dynamic heuristics - **Smart retry**: Retry on timeout/network errors, LONGER pause after captcha (10min base) - **Exponential backoff**: Different schedules for captcha vs other errors - **Dynamic heuristics**: Add new heuristic types without code changes ## Database Schema ### New Collection: `fetcher_attempts` Raw audit log (90-day retention) **Fields**: - `id` (text, PK) - `link` (relation → links, cascade delete) - `url` (text, required) - `fetcher` (select: simple_http, playwright, playwright_stealth) - `success` (bool) - `is_banned` (bool) - explicit IP ban detection - `error_type` (select: blocked_captcha, blocked_403, empty_content, timeout, wrong_tool) - `http_status` (number) - `response_headers` (json) - Server, Content-Type, CF-Ray - `duration_ms` (number) - `attempted_at` (autodate) **Indexes**: - `idx_attempts_link` on `link` - `idx_attempts_url` on `url` - `idx_attempts_time` on `attempted_at DESC` ### New Collection: `attempt_heuristics` Many-to-many junction table for conditional probability **Fields**: - `id` (text, PK) - `attempt` (relation → fetcher_attempts, cascade delete) - `heuristic_type` (text) - OPEN-ENDED: domain, suffix, contains_cdn, status_200, has_captcha, custom_xyz, etc. - `heuristic_value` (text) - "wikipedia.org", ".jpg", "true" - `importance_score` (number, 0-1) - calculated via predictive power analytics - `created_at` (autodate) **Design**: `heuristic_type` is TEXT (not select) to allow adding new heuristic types dynamically without schema migration. The attempt relation guarantees robustness - you can query any heuristic type even if it was just invented. **Indexes**: - `idx_heur_type_value` on `(heuristic_type, heuristic_value)` - `idx_heur_attempt` on `attempt` **Design**: No pre-computed patterns. Dynamic GROUP BY queries for conditional probability. ### Extended Collection: `links` Add pause/retry fields **New fields**: - `paused_until` (date, optional) - exponential backoff timestamp - `risk_level` (number, 0-5) - ban risk counter - `attempt_count` (number, min 0) - total fetch attempts - `last_attempt_at` (date) - most recent fetch - `learning_mode_enabled` (bool, default: false) - feature flag per-link (optional: can be global config instead) ### Seed Data Bootstrap with synthetic priors: ``` Attempt 1: seed:.pdf → simple_http (success) Heuristics: (suffix, .pdf, importance: 0.95) Attempt 2: seed:.mp4 → simple_http (success) Heuristics: (suffix, .mp4, importance: 0.95) Attempt 3: seed:/cdn/ → simple_http (success) Heuristics: (contains_cdn, true, importance: 0.85) ``` ## Code Structure ### New Packages ``` internal/fetcher/ validator/ validator.go # ValidationResult struct, Validate() interface block_detector.go # DetectCaptcha(), Detect403() spa_detector.go # DetectSPA(), DetectReact() heuristic_extractor.go # ExtractPostFetchHeuristics() heuristic/ extractor.go # ExtractFromURL() - domain, suffix, path types.go # Heuristic struct, HeuristicType enum attempt/ store.go # AttemptStore interface logger.go # LogAttempt() with heuristic linking pocketbase/ store.go # PocketBase CRUD query.go # FindBestFetcher() - conditional probability selection/ selector.go # FindBestFetcher() - waterfall logic confidence.go # CalculateConfidence() with time decay fallback.go # CheapHTTPClassification() ``` ### Modified Files - `internal/link/model.go` - Add PausedUntil, RiskLevel, AttemptCount, LastAttemptAt - `internal/link/pocketbase/store.go` - Handle new fields in Get/Save - `internal/link/pocketbase/phase_route_fetcher.go` - Integrate waterfall (hard-coded → learned → heuristic) - `internal/fetcher/direct_access/phase_fetch.go` - Add validation, retry logic, attempt logging - `internal/fetcher/direct_access/fetcher.go` - Return headers + status code ### Migration Files - `migrations/{timestamp}_created_fetcher_attempts.go` - `migrations/{timestamp}_created_attempt_heuristics.go` - `migrations/{timestamp}_updated_links.go` - `migrations/{timestamp}_seed_heuristic_priors.go` ## Core Algorithms ### Conditional Probability Query (Fully Dynamic) Find best fetcher given **any combination** of heuristics with time decay: ```go func (s *Store) FindBestFetcher(heuristics []Heuristic, minSampleSize int) (*FetcherScore, error) { // 1. Build OR filter matching ANY heuristic (fully dynamic - no hardcoded types) filter := "" params := make(map[string]any) for i, h := range heuristics { if i > 0 { filter += " || " } filter += fmt.Sprintf("(heuristic_type = {:type%d} && heuristic_value = {:value%d})", i, i) params[fmt.Sprintf("type%d", i)] = h.Type // Could be "video_platform", "deep_path", anything params[fmt.Sprintf("value%d", i)] = h.Value } // 2. Find matching heuristic records (discovers ANY heuristic type in DB) heuristicRecords, err := s.app.FindRecordsByFilter("attempt_heuristics", filter, "", 0, 0, params) if err != nil || len(heuristicRecords) == 0 { return nil, nil // No learned data } // 3. Extract attempt IDs attemptIDs := make(map[string]bool) for _, hr := range heuristicRecords { attemptIDs[hr.GetString("attempt")] = true } // 4. Fetch all attempts, apply time decay scores := make(map[string]*FetcherScore) now := time.Now() for attemptID := range attemptIDs { attempt, _ := s.app.FindRecordById("fetcher_attempts", attemptID) fetcher := attempt.GetString("fetcher") success := attempt.GetBool("success") attemptedAt := attempt.GetDateTime("attempted_at").Time() // Time decay: 0.5 decay per 30 days daysSince := now.Sub(attemptedAt).Hours() / 24 weight := math.Pow(0.5, daysSince/30) if scores[fetcher] == nil { scores[fetcher] = &FetcherScore{Fetcher: fetcher} } scores[fetcher].SampleSize++ if success { scores[fetcher].WeightedSuccesses += weight } } // 5. Calculate confidence (weighted_success_rate × min(1, sample_size/10)) var best *FetcherScore for _, score := range scores { if score.SampleSize < minSampleSize { continue // Skip if not enough data } score.SuccessRate = score.WeightedSuccesses / float64(score.SampleSize) // Confidence penalty for low sample size sampleFactor := math.Min(1.0, float64(score.SampleSize)/10.0) score.Confidence = score.SuccessRate * sampleFactor if best == nil || score.Confidence > best.Confidence { best = score } } return best, nil } ``` **Key insight**: This query works with ANY heuristic type that exists in the database. No enumeration needed. ### Fetcher Selection (Single Dynamic Query) **No hardcoded fallback cascade** - natural specificity emerges from data: ```go func FindBestFetcher(url string) (fetcher string, confidence float64, error) { // Extract ALL heuristics dynamically (no pre-declaration needed) heuristics := ExtractFromURL(url) // Single query with ALL heuristics the URL has result := queryWithHeuristics(heuristics, minSample=5) if result != nil && result.Confidence > 0.6 { return result.Fetcher, result.Confidence, nil } // No learned data → return nil, caller will use cheap HTTP return "", 0.0, nil } ``` **How natural specificity works:** - URL matches 5 heuristics + 20 past attempts = high sample size, high confidence (very specific match) - URL matches 2 heuristics + 3 past attempts = low sample size, low confidence (not enough data) - Confidence formula already penalizes low samples: `confidence = success_rate × min(1, sample_size/10)` - Single threshold (0.6) is the ONLY gate - no need to enumerate heuristic types or fallback levels ### Ban Risk Calculation ```go func GetDomainBanRate(domain string, days int) float64 { // Find all attempts for domain in last N days heuristics := FindByFilter("heuristic_type = 'domain' && heuristic_value = :domain") bannedCount := 0 totalCount := 0 for h in heuristics: attempt := FindRecordById("fetcher_attempts", h.Attempt) if attempt.AttemptedAt > now.AddDate(0, 0, -days): totalCount++ if attempt.IsBanned: bannedCount++ return float64(bannedCount) / float64(totalCount) } ``` ### Pause Duration (Error-Specific) ```go func CalculatePauseDuration(riskLevel int, errorType string) time.Duration { var baseDelay time.Duration // Longer pause for captcha (higher IP ban risk) if errorType == "blocked_captcha" || errorType == "blocked_403" { baseDelay = 10 * time.Minute } else { baseDelay = 5 * time.Minute } return time.Duration(float64(baseDelay) * math.Pow(2, float64(riskLevel))) } ``` **Sequences**: - Captcha/403: 10min → 20min → 40min → 80min → 160min → 320min - Other errors: 5min → 10min → 20min → 40min → 80min → 160min ## Integration Points ### Modified `phase_route_fetcher.go` (Parallel Implementation) ```go func RegisterRouteFetcherPhase(orch *Orchestrator, execs *ExecutorRegistry, app core.App) { fetcherStore := fetcherpb.New(app) attemptStore := attemptpb.New(app) selector := selection.New(attemptStore) // Global config (or read from env/settings) learningModeEnabled := os.Getenv("LEARNING_MODE_ENABLED") == "true" execs.Register("route_fetcher", func(record Record) *ExecutorResult { l := record.(*link.Link) // Check if paused if l.PausedUntil != nil && time.Now().Before(*l.PausedUntil) { return &ExecutorResult{Error: fmt.Errorf("paused until %v", l.PausedUntil)} } domain := extractDomain(l.InitialURL) // FEATURE FLAG: Choose between legacy and learning mode if !learningModeEnabled { // LEGACY MODE: Use hard-coded router only if route := fetcherStore.GetByDomain(domain); route != nil { return &ExecutorResult{Data: map[string]interface{}{ "fetcher": route.Fetcher, "source": "hard_coded", }} } return &ExecutorResult{Error: fmt.Errorf("no route for domain: %s", domain)} } // LEARNING MODE: Use heuristic-based learning // PRIORITY 1: Heuristic-based learning fetcher, confidence, err := selector.FindBestFetcher(l.InitialURL) if err == nil && confidence > 0.6 { return &ExecutorResult{Data: map[string]interface{}{ "fetcher": fetcher, "source": "learned_heuristic", "confidence": confidence, }} } // PRIORITY 2: Cheap HTTP classification fetcher, classification := selector.CheapHTTPClassification(l.InitialURL) return &ExecutorResult{Data: map[string]interface{}{ "fetcher": fetcher, "source": "heuristic_classification", }} }) } ``` ### Modified `phase_fetch.go` ```go func makeFetchExecutor(store *Store, attemptStore *AttemptStore, validator *Validator) ExecutorFunc { return func(record Record) *ExecutorResult { l := record.(*link.Link) fetcher := l.PhaseResults["fetcher"].(string) // Extract pre-fetch heuristics preHeuristics := heuristic.ExtractFromURL(l.InitialURL) startTime := time.Now() // Execute fetch html, headers, statusCode, err := doFetch(l.InitialURL, fetcher) duration := time.Since(startTime) // Validate response validationResult := validator.Validate(html, headers, statusCode) // Extract post-fetch heuristics postHeuristics := validator.ExtractPostFetchHeuristics(html, headers, statusCode) allHeuristics := append(preHeuristics, postHeuristics...) var success bool var errorType string var isBanned bool // CASE 1: Network/timeout error (RETRY) if err != nil { success = false errorType = classifyError(err) // "timeout", "network_error" // RETRY ONCE on transient errors if (errorType == "timeout" || errorType == "network_error") && l.AttemptCount == 0 { html2, headers2, statusCode2, err2 := doFetch(l.InitialURL, fetcher) attemptStore.Log(l.ID, l.InitialURL, fetcher, err2 == nil, errorType, statusCode2, headers2, time.Since(startTime), false, allHeuristics) if err2 == nil { success = true html = html2 headers = headers2 statusCode = statusCode2 err = nil } } // CASE 2: Bot block detected (PAUSE with LONGER backoff) } else if validationResult.IsBlocked { success = false errorType = validationResult.BlockType isBanned = true // PAUSE LINK (longer backoff for captcha) l.RiskLevel++ pauseDuration := CalculatePauseDuration(l.RiskLevel, errorType) pauseUntil := time.Now().Add(pauseDuration) l.PausedUntil = &pauseUntil // CASE 3: Empty content (wrong tool) } else if validationResult.IsEmpty && !validationResult.HasSPA { success = false errorType = "wrong_tool" // RETRY ONCE with playwright if l.AttemptCount == 0 { html2, headers2, statusCode2, err2 := doFetch(l.InitialURL, "playwright") attemptStore.Log(l.ID, l.InitialURL, "playwright", err2 == nil, "", statusCode2, headers2, time.Since(startTime), false, allHeuristics) if err2 == nil { success = true html = html2 } } // CASE 4: Success } else { success = true } // Log attempt l.AttemptCount++ l.LastAttemptAt = &startTime attemptStore.Log(l.ID, l.InitialURL, fetcher, success, errorType, statusCode, headers, duration, isBanned, allHeuristics) if !success { return &ExecutorResult{Error: fmt.Errorf("fetch failed: %s", errorType)} } // Save file fileID := store.Create(l.ID, []byte(html), "page.html") return &ExecutorResult{Data: map[string]interface{}{"file_ids": []string{fileID}}} } } ``` ## Validation Layer ### Block Detection ```go func DetectCaptcha(html string) bool { keywords := []string{ "Cloudflare", "hCaptcha", "reCAPTCHA", "g-recaptcha", "Just a moment", "cf-browser-verification", "grecaptcha", } for _, kw := range keywords { if strings.Contains(html, kw) { return true } } return false } func Detect403(statusCode int) bool { return statusCode == 403 || statusCode == 429 } ``` ### SPA Detection ```go func DetectSPA(html string) (bool, string) { frameworks := map[string][]string{ "react": {`