# Learning System - Integration ## Philosophy: Phase-Based Architecture The existing codebase uses a **phase-based orchestrator** pattern: ``` route_fetcher → (dynamic fetcher phases) → store ``` **Key insight**: Fetcher phases are registered dynamically at runtime based on the selected fetcher. This means: - We can add learning logic in `route_fetcher` without touching fetcher internals - Probe results can be passed to fetch phase via `PhaseResults` - Each phase has clear responsibility boundaries **Integration principle**: Minimal changes to existing code, maximum leverage of existing patterns. --- ## Feature Flag Strategy ### Why Parallel Implementation? ``` LEARNING_MODE_ENABLED=false LEARNING_MODE_ENABLED=true │ │ ▼ ▼ ┌───────────────┐ ┌───────────────────────┐ │ fetcher_router│ │ 1. Query heuristics │ │ table lookup │ │ 2. Probe if unsure │ └───────────────┘ │ 3. Log + Learn │ └───────────────────────┘ ``` **Benefits**: - Zero-risk deployment (toggle off if broken) - A/B testing capability - Gradual migration path - Instant rollback **Setting**: Global env var or per-link field (`learning_mode_enabled`) --- ## Modified `phase_route_fetcher.go` **IMPORTANT**: Uses dynamic phase registration with orchestrator. The learning mode branch slots into the existing architecture seamlessly. ```go func RegisterRouteFetcherPhase(orch *processor.Orchestrator, execs *processor.ExecutorRegistry, app core.App) { fetcherStore := fetcherpb.New(app) fetchedFileStore := fetchedfilepb.New(app) attemptStore := attemptpb.New(app) // NEW selector := selection.New(attemptStore) // NEW learningMode := os.Getenv("LEARNING_MODE_ENABLED") == "true" orch.RegisterPhase(processor.PhaseDefinition{ Name: "route_fetcher", Dependencies: []string{}, }) execs.Register("route_fetcher", processor.ExecutorFunc(func(record processor.Record) *processor.ExecutorResult { l := record.(*link.Link) domain := extractDomain(l.InitialURL) var selectedFetcher string var source string var probeResult *selection.ClassificationResult if learningMode { // Check pause first (exponential backoff) if l.PausedUntil != nil && time.Now().Before(*l.PausedUntil) { return &processor.ExecutorResult{Error: fmt.Errorf("paused until %v", l.PausedUntil)} } // PRIORITY 1: Try learned heuristics fetcherName, confidence, _ := selector.FindBestFetcher(l.InitialURL) if confidence > 0.6 { selectedFetcher = fetcherName source = "learned" } else { // PRIORITY 2: Cheap probe (reusable) probeResult, _ = selector.CheapHTTPClassification(l.InitialURL) selectedFetcher = probeResult.Fetcher source = "probe" } } else { // LEGACY MODE: hard-coded lookup only fr, err := fetcherStore.GetByDomain(domain) if err != nil { return &processor.ExecutorResult{Error: err} } selectedFetcher = fr.Fetcher source = "hard_coded" } // Dynamic phase registration (matches actual architecture) fetcherPhases, err := fetcher.RegisterFetcherPhases(selectedFetcher, orch, execs, fetchedFileStore) if err != nil { return &processor.ExecutorResult{Error: err} } orch.SetOrder(append([]string{"route_fetcher"}, fetcherPhases...)) result := map[string]interface{}{ "fetcher": selectedFetcher, "source": source, "domain": domain, "phases": fetcherPhases, } // Pass probe result for reuse (key optimization) if probeResult != nil && probeResult.ProbeBody != nil { result["probe_body"] = probeResult.ProbeBody result["probe_status"] = probeResult.ProbeStatus } return &processor.ExecutorResult{Data: result} })) } ``` --- ## Probe & Reuse Pattern ### The Problem Without optimization, learning mode makes **2 HTTP requests**: 1. **Cheap probe** (3s GET) → analyze HTML → decide fetcher 2. **Full fetch** → execute with selected fetcher → save If fetcher = `direct_access`, we made the **same request twice**. ### The Solution Capture probe response body and reuse when applicable: ```go type ClassificationResult struct { Fetcher string Reason string ProbeBody []byte // Captured for reuse ProbeStatus int ProbeHeaders http.Header } func CheapHTTPClassification(url string) (*ClassificationResult, error) { resp, err := httpClient.Get(url, 3*time.Second) if err != nil { return &ClassificationResult{Fetcher: "direct_access", Reason: "probe_failed"}, nil } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) result := &ClassificationResult{ ProbeBody: body, ProbeStatus: resp.StatusCode, ProbeHeaders: resp.Header, } validationResult := validator.Validate(string(body), resp.Header, resp.StatusCode) if validationResult.IsBlocked { result.Fetcher = "playwright_stealth" result.ProbeBody = nil // Don't reuse - blocked response useless } else if validationResult.HasSPA || validationResult.IsEmpty { result.Fetcher = "playwright" result.ProbeBody = nil // Don't reuse - needs JS execution } else if resp.StatusCode == 200 && len(body) > 500 { result.Fetcher = "direct_access" // ProbeBody STAYS POPULATED - reuse in fetch phase } return result, nil } ``` ### Reuse Decision Matrix | Classification | Fetcher | Reuse Probe? | Why | |---------------|---------|--------------|-----| | Static HTML | direct_access | **Yes** | Same result, skip request | | Captcha detected | playwright_stealth | No | Blocked response useless | | SPA detected | playwright | No | Needs JS execution | | Probe failed | direct_access | No | No body captured | ### In `phase_fetch.go` ```go func fetch(...) { // Check if we already have valid content from probe if probeBody, ok := l.GetPhaseResults()["probe_body"].([]byte); ok && len(probeBody) > 0 { // Skip fetch - use probe result directly! return saveToStore(probeBody, l.GetPhaseResults()["probe_status"].(int)) } // Otherwise do full fetch resp, err := http.Get(l.InitialURL) // ... } ``` **Result**: Static pages go from 2 HTTP requests → 1 HTTP request. --- ## Modified `phase_fetch.go` Full implementation with validation, retry logic, and attempt logging: ```go func makeFetchExecutor(store *Store, attemptStore *AttemptStore, validator *Validator) ExecutorFunc { return func(record Record) *ExecutorResult { l := record.(*link.Link) fetcher := l.PhaseResults["fetcher"].(string) // Extract pre-fetch heuristics preHeuristics := heuristic.ExtractFromURL(l.InitialURL) startTime := time.Now() // Execute fetch (or use probe body if available) html, headers, statusCode, err := doFetch(l.InitialURL, fetcher) duration := time.Since(startTime) // Validate response validationResult := validator.Validate(html, headers, statusCode) // Extract post-fetch heuristics postHeuristics := validator.ExtractPostFetchHeuristics(html, headers, statusCode) allHeuristics := append(preHeuristics, postHeuristics...) var success bool var errorType string var isBanned bool // CASE 1: Network/timeout error (RETRY ONCE) if err != nil { success = false errorType = classifyError(err) if (errorType == "timeout" || errorType == "network_error") && l.AttemptCount == 0 { html2, headers2, statusCode2, err2 := doFetch(l.InitialURL, fetcher) attemptStore.Log(l.ID, l.InitialURL, fetcher, err2 == nil, errorType, statusCode2, headers2, time.Since(startTime), false, allHeuristics) if err2 == nil { success = true html = html2 headers = headers2 statusCode = statusCode2 err = nil } } // CASE 2: Bot block detected (PAUSE with exponential backoff) } else if validationResult.IsBlocked { success = false errorType = validationResult.BlockType isBanned = true l.RiskLevel++ pauseDuration := CalculatePauseDuration(l.RiskLevel, errorType) pauseUntil := time.Now().Add(pauseDuration) l.PausedUntil = &pauseUntil // CASE 3: Empty content (wrong tool - try playwright) } else if validationResult.IsEmpty && !validationResult.HasSPA { success = false errorType = "wrong_tool" if l.AttemptCount == 0 { html2, headers2, statusCode2, err2 := doFetch(l.InitialURL, "playwright") attemptStore.Log(l.ID, l.InitialURL, "playwright", err2 == nil, "", statusCode2, headers2, time.Since(startTime), false, allHeuristics) if err2 == nil { success = true html = html2 } } // CASE 4: Success } else { success = true } // Log attempt (always, for learning) l.AttemptCount++ l.LastAttemptAt = &startTime attemptStore.Log(l.ID, l.InitialURL, fetcher, success, errorType, statusCode, headers, duration, isBanned, allHeuristics) if !success { return &ExecutorResult{Error: fmt.Errorf("fetch failed: %s", errorType)} } fileID := store.Create(l.ID, []byte(html), "page.html") return &ExecutorResult{Data: map[string]interface{}{"file_ids": []string{fileID}}} } } ``` --- ## Implementation Order ### Phase 1: Foundation (HUMAN + CODE) 1. Create migrations (HUMAN - see `learning_migrations.md`) 2. Update `internal/link/model.go` with new fields 3. Update `internal/link/pocketbase/store.go` 4. Run migrations, verify schema ### Phase 2: Heuristic Extraction 1. Create `internal/fetcher/heuristic/types.go` (Heuristic struct) 2. Implement `internal/fetcher/heuristic/extractor.go` (ExtractFromURL) 3. Create `internal/fetcher/validator/` package (block/SPA detection) 4. Manual test: Extract heuristics from sample URLs ### Phase 3: Attempt Logging 1. Create `internal/fetcher/attempt/pocketbase/store.go` (CRUD) 2. Implement `internal/fetcher/attempt/logger.go` (LogAttempt with heuristics) 3. Modify `phase_fetch.go` to log attempts (read-only first) 4. Manual test: Verify attempts logged with heuristics ### Phase 4: Query Layer 1. Implement `internal/fetcher/attempt/pocketbase/query.go` (FindBestFetcher) 2. Create seed data migration (HUMAN) 3. Manual test: Query with known heuristics ### Phase 5: Selection Layer 1. Create `internal/fetcher/selection/selector.go` (waterfall logic) 2. Implement `internal/fetcher/selection/fallback.go` (CheapHTTPClassification + Probe & Reuse) 3. Manual test: Selection for various URLs ### Phase 6: Integration 1. Modify `phase_route_fetcher.go` (learning mode branch) 2. Add pause check and probe result passing 3. Manual test: Legacy mode works, learning mode kicks in ### Phase 7: Retry and Pause 1. Add retry logic to `phase_fetch.go` (transient errors + wrong tool) 2. Add pause logic to `phase_fetch.go` (block detection) 3. Manual test: Pause on block, retry on timeout/empty ### Phase 8: Monitoring 1. Create analytics queries (success rates, ban rates by domain) 2. Monitor learning velocity 3. Adjust confidence threshold (0.6) if needed --- ## Manual Testing Strategy ### Test 1: Legacy Mode (Feature Flag OFF) ``` LEARNING_MODE_ENABLED=false → Submit link → Verify source: "hard_coded" → Verify fetcher_router table lookup works ``` ### Test 2: Learned Pattern (Feature Flag ON) ``` Insert 5 successful wikipedia.org + playwright attempts → Submit new wikipedia.org link → Verify source: "learned", fetcher: "playwright" → Verify confidence > 0.6 ``` ### Test 3: Suffix Pattern (Seed Data) ``` Use seed data for .pdf suffix → Submit unknown-domain.com/document.pdf → Verify direct_access selected (seed data confidence) ``` ### Test 4: Block Detection ``` Mock captcha response (Cloudflare keywords) → Submit link → Verify is_banned: true, paused_until set → Resubmit immediately → verify pause error → Wait for pause → verify retry allowed ``` ### Test 5: Probe Reuse ``` Submit static HTML page (no captcha, no SPA) → Verify only 1 HTTP request made (check logs) → Verify file saved correctly → Verify probe_body passed via PhaseResults ``` ### Test 6: Time Decay ``` Insert 5 old successful attempts (60 days ago) → playwright Insert 2 recent failed attempts (today) → playwright → Submit link → Verify recent failures outweigh old successes (confidence < 0.6) → Verify fallback to probe classification ``` --- ## New Model Fields ### `internal/link/model.go` ```go type Link struct { // ... existing fields ... // Learning system fields PausedUntil *time.Time // Exponential backoff timestamp RiskLevel int // Ban risk counter (0-5+) AttemptCount int // Total fetch attempts LastAttemptAt *time.Time // Most recent fetch time } ``` ### Why These Fields? | Field | Purpose | Used By | |-------|---------|---------| | `PausedUntil` | Exponential backoff after failures | `phase_route_fetcher.go` (skip paused links) | | `RiskLevel` | Consecutive failures counter | `CalculatePauseDuration()` | | `AttemptCount` | Total attempts (for retry limiting) | `phase_fetch.go` (retry once only) | | `LastAttemptAt` | When last attempted | Debugging, rate limiting | --- ## Package Structure ``` internal/fetcher/ validator/ validator.go # ValidationResult struct, Validate() block_detector.go # DetectCaptcha(), Detect403() spa_detector.go # DetectSPA(), DetectReact() heuristic_extractor.go # ExtractPostFetchHeuristics() heuristic/ extractor.go # ExtractFromURL() - domain, suffix, path types.go # Heuristic struct attempt/ store.go # AttemptStore interface logger.go # LogAttempt() with heuristic linking pocketbase/ store.go # PocketBase CRUD query.go # FindBestFetcher() - conditional probability selection/ selector.go # FindBestFetcher() - waterfall logic confidence.go # CalculateConfidence() with time decay fallback.go # CheapHTTPClassification() + Probe & Reuse ``` --- ## Critical Integration Points | File | Change | Impact | |------|--------|--------| | `phase_route_fetcher.go` | Add learning mode branch | **High** - core routing logic | | `phase_fetch.go` | Add validation, retry, logging | **High** - all fetches affected | | `link/model.go` | Add 4 fields | **Low** - backward compatible | | `link/pocketbase/store.go` | Handle new fields | **Low** - additive only | **Principle**: Changes are additive. Legacy mode continues to work unchanged.