From 9ad2d42b9ff49b5f7b35a2fb54cc74ed440fb626 Mon Sep 17 00:00:00 2001 From: Jason Cameron Date: Fri, 13 Jun 2025 13:14:26 -0400 Subject: [PATCH] feat: add robots2policy CLI utility to convert robots.txt to Anubis challenge policies --- cmd/robots2policy/main.go | 347 ++++++++++++++++++++++++++++++++ cmd/robots2policy/main_test.go | 356 +++++++++++++++++++++++++++++++++ docs/docs/CHANGELOG.md | 1 + 3 files changed, 704 insertions(+) create mode 100644 cmd/robots2policy/main.go create mode 100644 cmd/robots2policy/main_test.go diff --git a/cmd/robots2policy/main.go b/cmd/robots2policy/main.go new file mode 100644 index 00000000..88e9e0cf --- /dev/null +++ b/cmd/robots2policy/main.go @@ -0,0 +1,347 @@ +package main + +import ( + "bufio" + "encoding/json" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "regexp" + "strings" + + "gopkg.in/yaml.v3" +) + +var ( + inputFile = flag.String("input", "", "path to robots.txt file (use - for stdin)") + outputFile = flag.String("output", "", "output file path (use - for stdout, defaults to stdout)") + outputFormat = flag.String("format", "yaml", "output format: yaml or json") + baseAction = flag.String("action", "CHALLENGE", "default action for disallowed paths: ALLOW, DENY, CHALLENGE, WEIGH") + crawlDelay = flag.Int("crawl-delay-weight", 0, "if > 0, add weight adjustment for crawl-delay (difficulty adjustment)") + policyName = flag.String("name", "robots-txt-policy", "name for the generated policy") + userAgentDeny = flag.String("deny-user-agents", "DENY", "action for specifically blocked user agents: DENY, CHALLENGE") + helpFlag = flag.Bool("help", false, "show help") +) + +type RobotsRule struct { + UserAgent string + Disallows []string + Allows []string + CrawlDelay int + IsBlacklist bool // true if this is a specifically denied user agent +} + +type Weight struct { + Adjust int `yaml:"adjust" json:"adjust"` +} + +type Challenge struct { + Difficulty int `yaml:"difficulty,omitempty" json:"difficulty,omitempty"` + Algorithm string `yaml:"algorithm,omitempty" json:"algorithm,omitempty"` + ReportAs int `yaml:"report_as,omitempty" json:"report_as,omitempty"` +} + +type AnubisRule struct { + Name string `yaml:"name" json:"name"` + Action string `yaml:"action" json:"action"` + Expression map[string]interface{} `yaml:"expression,omitempty" json:"expression,omitempty"` + Challenge *Challenge `yaml:"challenge,omitempty" json:"challenge,omitempty"` + Weight *Weight `yaml:"weight,omitempty" json:"weight,omitempty"` +} + +func main() { + flag.Parse() + + if *helpFlag || len(os.Args) == 1 { + showHelp() + return + } + + if *inputFile == "" { + log.Fatal("input file is required (use -input flag or -help for usage)") + } + + // Read robots.txt + var input io.Reader + if *inputFile == "-" { + input = os.Stdin + } else if strings.HasPrefix(*inputFile, "http://") || strings.HasPrefix(*inputFile, "https://") { + resp, err := http.Get(*inputFile) + if err != nil { + log.Fatalf("failed to fetch robots.txt from URL: %v", err) + } + defer resp.Body.Close() + input = resp.Body + } else { + file, err := os.Open(*inputFile) + if err != nil { + log.Fatalf("failed to open input file: %v", err) + } + defer file.Close() + input = file + } + + // Parse robots.txt + rules, err := parseRobotsTxt(input) + if err != nil { + log.Fatalf("failed to parse robots.txt: %v", err) + } + + // Convert to Anubis rules + anubisRules := convertToAnubisRules(rules) + + // Generate output + var output []byte + switch strings.ToLower(*outputFormat) { + case "yaml": + output, err = yaml.Marshal(anubisRules) + case "json": + output, err = json.MarshalIndent(anubisRules, "", " ") + default: + log.Fatalf("unsupported output format: %s (use yaml or json)", *outputFormat) + } + + if err != nil { + log.Fatalf("failed to marshal output: %v", err) + } + + // Write output + if *outputFile == "" || *outputFile == "-" { + fmt.Print(string(output)) + } else { + err = os.WriteFile(*outputFile, output, 0644) + if err != nil { + log.Fatalf("failed to write output file: %v", err) + } + fmt.Printf("Generated Anubis policy written to %s\n", *outputFile) + } +} + +func showHelp() { + fmt.Printf(`robots2policy - Convert robots.txt to Anubis challenge rules + +Usage: + robots2policy -input [options] + +Examples: + # Convert local robots.txt file + robots2policy -input robots.txt -output policy.yaml + + # Convert from URL + robots2policy -input https://sourceware.org/robots.txt -format json + + # Read from stdin, write to stdout + curl https://example.com/robots.txt | robots2policy -input - -format yaml + +Options: +`) + flag.PrintDefaults() + fmt.Printf(` +Actions: + ALLOW - Allow the request without challenge + DENY - Block the request completely + CHALLENGE - Issue proof-of-work challenge (default) + WEIGH - Adjust challenge difficulty weight + +The tool converts robots.txt rules as follows: + - Disallow rules -> CEL expressions matching request paths + - User-agent rules -> CEL expressions matching user agent headers + - Crawl-delay -> Optional weight adjustments for challenge difficulty + - Blacklisted user agents -> Separate deny/challenge rules + +Generated CEL expressions use: + - path.startsWith("/pattern") for exact path prefixes + - path.matches("regex") for wildcard patterns (* and ?) + - userAgent.contains("pattern") for user agent matching +`) +} + +func parseRobotsTxt(input io.Reader) ([]RobotsRule, error) { + scanner := bufio.NewScanner(input) + var rules []RobotsRule + var currentRule *RobotsRule + + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + + // Skip empty lines and comments + if line == "" || strings.HasPrefix(line, "#") { + continue + } + + // Split on first colon + parts := strings.SplitN(line, ":", 2) + if len(parts) != 2 { + continue + } + + directive := strings.TrimSpace(strings.ToLower(parts[0])) + value := strings.TrimSpace(parts[1]) + + switch directive { + case "user-agent": + // Start a new rule section + if currentRule != nil { + rules = append(rules, *currentRule) + } + currentRule = &RobotsRule{ + UserAgent: value, + Disallows: make([]string, 0), + Allows: make([]string, 0), + } + + case "disallow": + if currentRule != nil && value != "" { + currentRule.Disallows = append(currentRule.Disallows, value) + } + + case "allow": + if currentRule != nil && value != "" { + currentRule.Allows = append(currentRule.Allows, value) + } + + case "crawl-delay": + if currentRule != nil { + if delay, err := parseIntSafe(value); err == nil { + currentRule.CrawlDelay = delay + } + } + } + } + + // Don't forget the last rule + if currentRule != nil { + rules = append(rules, *currentRule) + } + + // Mark blacklisted user agents (those with "Disallow: /") + for i := range rules { + for _, disallow := range rules[i].Disallows { + if disallow == "/" { + rules[i].IsBlacklist = true + break + } + } + } + + return rules, scanner.Err() +} + +func parseIntSafe(s string) (int, error) { + var result int + _, err := fmt.Sscanf(s, "%d", &result) + return result, err +} + +func convertToAnubisRules(robotsRules []RobotsRule) []AnubisRule { + var anubisRules []AnubisRule + ruleCounter := 0 + + for _, robotsRule := range robotsRules { + userAgent := robotsRule.UserAgent + + // Handle blacklisted user agents (complete deny/challenge) + if robotsRule.IsBlacklist { + ruleCounter++ + rule := AnubisRule{ + Name: fmt.Sprintf("%s-blacklist-%d", *policyName, ruleCounter), + Action: *userAgentDeny, + } + + if userAgent == "*" { + // This would block everything - convert to a weight adjustment instead + rule.Name = fmt.Sprintf("%s-global-restriction-%d", *policyName, ruleCounter) + rule.Action = "WEIGH" + rule.Weight = &Weight{Adjust: 20} // Increase difficulty significantly + rule.Expression = map[string]interface{}{ + "single": "true", // Always applies + } + } else { + rule.Expression = map[string]interface{}{ + "single": fmt.Sprintf("userAgent.contains(%q)", userAgent), + } + } + anubisRules = append(anubisRules, rule) + continue + } + + // Handle specific disallow rules + for _, disallow := range robotsRule.Disallows { + if disallow == "/" { + continue // Already handled as blacklist above + } + + ruleCounter++ + rule := AnubisRule{ + Name: fmt.Sprintf("%s-disallow-%d", *policyName, ruleCounter), + Action: *baseAction, + } + + // Build CEL expression + var conditions []string + + // Add user agent condition if not wildcard + if userAgent != "*" { + conditions = append(conditions, fmt.Sprintf("userAgent.contains(%q)", userAgent)) + } + + // Add path condition + pathCondition := buildPathCondition(disallow) + conditions = append(conditions, pathCondition) + + if len(conditions) == 1 { + rule.Expression = map[string]interface{}{ + "single": conditions[0], + } + } else { + rule.Expression = map[string]interface{}{ + "all": conditions, + } + } + + anubisRules = append(anubisRules, rule) + } + + // Handle crawl delay as weight adjustment + if robotsRule.CrawlDelay > 0 && *crawlDelay > 0 { + ruleCounter++ + rule := AnubisRule{ + Name: fmt.Sprintf("%s-crawl-delay-%d", *policyName, ruleCounter), + Action: "WEIGH", + Weight: &Weight{Adjust: *crawlDelay}, + } + + if userAgent == "*" { + rule.Expression = map[string]interface{}{ + "single": "true", // Always applies + } + } else { + rule.Expression = map[string]interface{}{ + "single": fmt.Sprintf("userAgent.contains(%q)", userAgent), + } + } + + anubisRules = append(anubisRules, rule) + } + } + + return anubisRules +} + +func buildPathCondition(robotsPath string) string { + // Handle wildcards in robots.txt paths + if strings.Contains(robotsPath, "*") || strings.Contains(robotsPath, "?") { + // Convert robots.txt wildcards to regex + regex := regexp.QuoteMeta(robotsPath) + regex = strings.ReplaceAll(regex, `\*`, `.*`) // * becomes .* + regex = strings.ReplaceAll(regex, `\?`, `.`) // ? becomes . + regex = "^" + regex + return fmt.Sprintf("path.matches(%q)", regex) + } + + // Simple prefix match for most cases + return fmt.Sprintf("path.startsWith(%q)", robotsPath) +} diff --git a/cmd/robots2policy/main_test.go b/cmd/robots2policy/main_test.go new file mode 100644 index 00000000..c513102c --- /dev/null +++ b/cmd/robots2policy/main_test.go @@ -0,0 +1,356 @@ +package main + +import ( + "bytes" + "encoding/json" + "net/http" + "os" + "strings" + "testing" + "time" + + "gopkg.in/yaml.v3" +) + +// Test URLs for real robots.txt files +var testRobotsURLs = []struct { + name string + url string + desc string +}{ + {"Google", "https://www.google.com/robots.txt", "Google's comprehensive robots.txt with many disallows"}, + {"Apple", "https://www.apple.com/robots.txt", "Apple's robots.txt with product-specific rules"}, + {"GitHub", "https://github.com/robots.txt", "GitHub's developer-focused robots.txt"}, + {"Reddit", "https://www.reddit.com/robots.txt", "Reddit's social media robots.txt"}, + {"Wikipedia", "https://en.wikipedia.org/robots.txt", "Wikipedia's educational content robots.txt"}, + {"Twitter", "https://twitter.com/robots.txt", "Twitter's social platform robots.txt"}, + {"Facebook", "https://www.facebook.com/robots.txt", "Facebook's social network robots.txt"}, + {"LinkedIn", "https://www.linkedin.com/robots.txt", "LinkedIn's professional network robots.txt"}, + {"Amazon", "https://www.amazon.com/robots.txt", "Amazon's e-commerce robots.txt"}, + {"Microsoft", "https://www.microsoft.com/robots.txt", "Microsoft's corporate robots.txt"}, +} + +func TestRealRobotsTxtConversion(t *testing.T) { + if os.Getenv("DONT_USE_NETWORK") != "" { + t.Skip("test requires network egress") + } + + for _, test := range testRobotsURLs { + t.Run(test.name, func(t *testing.T) { + // Fetch robots.txt + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Get(test.url) + if err != nil { + t.Skipf("Failed to fetch %s: %v", test.url, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Skipf("Non-200 status for %s: %d", test.url, resp.StatusCode) + } + + // Parse robots.txt + rules, err := parseRobotsTxt(resp.Body) + if err != nil { + t.Fatalf("Failed to parse robots.txt from %s: %v", test.url, err) + } + + // Convert to Anubis rules + oldPolicyName := *policyName + *policyName = test.name + "-policy" + defer func() { *policyName = oldPolicyName }() + + anubisRules := convertToAnubisRules(rules) + + // Validate we got some rules + if len(anubisRules) == 0 { + t.Errorf("No rules generated for %s", test.name) + return + } + + // Test YAML output + yamlOutput, err := yaml.Marshal(anubisRules) + if err != nil { + t.Fatalf("Failed to marshal YAML for %s: %v", test.name, err) + } + + if len(yamlOutput) == 0 { + t.Errorf("Empty YAML output for %s", test.name) + } + + // Test JSON output + jsonOutput, err := json.MarshalIndent(anubisRules, "", " ") + if err != nil { + t.Fatalf("Failed to marshal JSON for %s: %v", test.name, err) + } + + if len(jsonOutput) == 0 { + t.Errorf("Empty JSON output for %s", test.name) + } + + // Validate rule structure + for i, rule := range anubisRules { + if rule.Name == "" { + t.Errorf("Rule %d has empty name for %s", i, test.name) + } + + if rule.Action == "" { + t.Errorf("Rule %d has empty action for %s", i, test.name) + } + + validActions := map[string]bool{ + "ALLOW": true, "DENY": true, "CHALLENGE": true, "WEIGH": true, + } + if !validActions[rule.Action] { + t.Errorf("Rule %d has invalid action '%s' for %s", i, rule.Action, test.name) + } + + // Check that CEL expressions exist + if rule.Expression == nil { + t.Errorf("Rule %d has no expression for %s", i, test.name) + } + } + + t.Logf("Successfully converted %s: %d robots.txt rules -> %d Anubis rules", + test.name, len(rules), len(anubisRules)) + }) + } +} + +func TestCELExpressionGeneration(t *testing.T) { + tests := []struct { + name string + robotsPath string + expected string + }{ + { + name: "simple path", + robotsPath: "/admin", + expected: `path.startsWith("/admin")`, + }, + { + name: "path with trailing slash", + robotsPath: "/admin/", + expected: `path.startsWith("/admin/")`, + }, + { + name: "wildcard path", + robotsPath: "/search*", + expected: `path.matches("^/search.*")`, + }, + { + name: "complex wildcard", + robotsPath: "/*/wiki/*?action=*", + expected: `path.matches("^/.*/wiki/.*.action=.*")`, + }, + { + name: "question mark wildcard", + robotsPath: "/file?.txt", + expected: `path.matches("^/file.\\.txt")`, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + result := buildPathCondition(test.robotsPath) + if result != test.expected { + t.Errorf("Expected %q, got %q", test.expected, result) + } + }) + } +} + +func TestRobotsTxtParsing(t *testing.T) { + robotsTxt := `# Example robots.txt +User-agent: * +Disallow: /admin/ +Disallow: /private +Allow: /public + +User-agent: Googlebot +Disallow: /search +Crawl-delay: 10 + +User-agent: BadBot +Disallow: / + +# Rogue bots +User-agent: SpamBot +Disallow: /` + + reader := strings.NewReader(robotsTxt) + rules, err := parseRobotsTxt(reader) + if err != nil { + t.Fatalf("Failed to parse test robots.txt: %v", err) + } + + expectedRules := 4 // *, Googlebot, BadBot, SpamBot + if len(rules) != expectedRules { + t.Errorf("Expected %d rules, got %d", expectedRules, len(rules)) + } + + // Check universal rules + universalRule := rules[0] + if universalRule.UserAgent != "*" { + t.Errorf("Expected universal user agent '*', got %q", universalRule.UserAgent) + } + + if len(universalRule.Disallows) != 2 { + t.Errorf("Expected 2 disallows for universal rule, got %d", len(universalRule.Disallows)) + } + + if len(universalRule.Allows) != 1 { + t.Errorf("Expected 1 allow for universal rule, got %d", len(universalRule.Allows)) + } + + // Check Googlebot rules + googlebotRule := rules[1] + if googlebotRule.UserAgent != "Googlebot" { + t.Errorf("Expected Googlebot user agent, got %q", googlebotRule.UserAgent) + } + + if googlebotRule.CrawlDelay != 10 { + t.Errorf("Expected crawl delay 10, got %d", googlebotRule.CrawlDelay) + } + + // Check blacklisted bots + badBotRule := rules[2] + if !badBotRule.IsBlacklist { + t.Errorf("BadBot should be marked as blacklisted") + } + + spamBotRule := rules[3] + if !spamBotRule.IsBlacklist { + t.Errorf("SpamBot should be marked as blacklisted") + } +} + +func TestAnubisRuleGeneration(t *testing.T) { + // Test with simple robots.txt + robotsRules := []RobotsRule{ + { + UserAgent: "*", + Disallows: []string{"/admin", "/private"}, + Allows: []string{"/public"}, + }, + { + UserAgent: "BadBot", + Disallows: []string{"/"}, + IsBlacklist: true, + }, + } + + oldPolicyName := *policyName + *policyName = "test-policy" + defer func() { *policyName = oldPolicyName }() + + oldBaseAction := *baseAction + *baseAction = "CHALLENGE" + defer func() { *baseAction = oldBaseAction }() + + oldUserAgentDeny := *userAgentDeny + *userAgentDeny = "DENY" + defer func() { *userAgentDeny = oldUserAgentDeny }() + + anubisRules := convertToAnubisRules(robotsRules) + + // Should have 3 rules: 2 disallows + 1 blacklist + expectedRules := 3 + if len(anubisRules) != expectedRules { + t.Errorf("Expected %d Anubis rules, got %d", expectedRules, len(anubisRules)) + } + + // Check first disallow rule + firstRule := anubisRules[0] + if firstRule.Action != "CHALLENGE" { + t.Errorf("Expected CHALLENGE action, got %s", firstRule.Action) + } + + if !strings.Contains(firstRule.Name, "disallow") { + t.Errorf("Expected disallow in rule name, got %s", firstRule.Name) + } + + // Check blacklist rule + var blacklistRule *AnubisRule + for _, rule := range anubisRules { + if strings.Contains(rule.Name, "blacklist") { + blacklistRule = &rule + break + } + } + + if blacklistRule == nil { + t.Errorf("No blacklist rule found") + } else { + if blacklistRule.Action != "DENY" { + t.Errorf("Expected DENY action for blacklist, got %s", blacklistRule.Action) + } + } +} + +func TestEmptyRobotsTxt(t *testing.T) { + reader := strings.NewReader("") + rules, err := parseRobotsTxt(reader) + if err != nil { + t.Fatalf("Failed to parse empty robots.txt: %v", err) + } + + if len(rules) != 0 { + t.Errorf("Expected 0 rules for empty robots.txt, got %d", len(rules)) + } + + anubisRules := convertToAnubisRules(rules) + if len(anubisRules) != 0 { + t.Errorf("Expected 0 Anubis rules for empty robots.txt, got %d", len(anubisRules)) + } +} + +func TestCommentsOnlyRobotsTxt(t *testing.T) { + robotsTxt := `# This is a comment +# Another comment +# User-agent: * (commented out) +# Disallow: / (commented out)` + + reader := strings.NewReader(robotsTxt) + rules, err := parseRobotsTxt(reader) + if err != nil { + t.Fatalf("Failed to parse comments-only robots.txt: %v", err) + } + + if len(rules) != 0 { + t.Errorf("Expected 0 rules for comments-only robots.txt, got %d", len(rules)) + } +} + +func TestMalformedRobotsTxt(t *testing.T) { + robotsTxt := `User-agent: * +Disallow /admin (missing colon) +Allow: /public +Random line without colon +User-agent +Disallow: /test` + + reader := strings.NewReader(robotsTxt) + rules, err := parseRobotsTxt(reader) + if err != nil { + t.Fatalf("Failed to parse malformed robots.txt: %v", err) + } + + // Should still parse the valid parts + if len(rules) == 0 { + t.Errorf("Expected some rules despite malformed input, got 0") + } + + // Should have at least the Allow rule + foundAllow := false + for _, rule := range rules { + if len(rule.Allows) > 0 { + foundAllow = true + break + } + } + + if !foundAllow { + t.Errorf("Expected to find Allow rule in malformed robots.txt") + } +} diff --git a/docs/docs/CHANGELOG.md b/docs/docs/CHANGELOG.md index 6be92077..1ace2ae0 100644 --- a/docs/docs/CHANGELOG.md +++ b/docs/docs/CHANGELOG.md @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Bump AI-robots.txt to version 1.34 - Make progress bar styling more compatible (UXP, etc) - Add `--strip-base-prefix` flag/envvar to strip the base prefix from request paths when forwarding to target servers +- Add `robots2policy` CLI utility to convert robots.txt files to Anubis challenge policies using CEL expressions ([#409](https://github.com/TecharoHQ/anubis/issues/409)) ## v1.19.1: Jenomis cen Lexentale - Echo 1