Sorry for the alarming title but, Admins for real, go set up Anubis.

For context, Anubis is essentially a gatekeeper/rate limiter for small services. From them:

(Anubis) is designed to help protect the small internet from the endless storm of requests that flood in from AI companies. Anubis is as lightweight as possible to ensure that everyone can afford to protect the communities closest to them.

It puts forward a challenge that must be solved in order to gain access, and judges how trustworthy a connection is. For the vast majority of real users they will never notice, or will notice a small delay accessing your site the first time. Even smaller scrapers may get by relatively easily.

For big scrapers though, AI and trainers, they get hit with computational problems that waste their compute before being let in. (Trust me, I worked for a company that did “scrape the internet”, and compute is expensive and a constant worry for them, so win win for us!)

Anubis ended up taking maybe 10 minutes to set up. For Lemmy hosters you literally just point your UI proxy at Anubis and point Anubis to Lemmy UI. Very easy and slots right in, minimal setup.

These graphs are since I turned it on less than an hour ago. I have a small instance, only a few people, and immediately my CPU usage has gone down and my requests per minute have gone down. I have already had thousands of requests challenged, I had no idea I was being scraped this much! You can see they’re backing off in the charts.

(FYI, this only stops the web requests, so it does nothing to the API or federation. Those are proxied elsewhere, so it really does only target web scrapers).

  • mesa@piefed.social
    link
    fedilink
    English
    arrow-up
    2
    ·
    13 hours ago
    from flask import Flask, request, abort
    from datetime import datetime, timedelta
    import sqlite3
    import logging
    import os
    
    app = Flask(__name__)
    
    DB_FILE = "honeypot.db"
    #LOG_FILE = "/var/log/honeypot.log"
    LOG_FILE = "honeypot.log"
    
    TRAP_THRESHOLD = 3             # clicks before flagging
    FLAG_DURATION_HOURS = 24       # how long the flag lasts
    
    
    # --- Setup logging for Fail2Ban integration ---
    #os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
    logging.basicConfig(
        filename=LOG_FILE,
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(message)s",
    )
    
    
    # --- Database setup ---
    def init_db():
        with sqlite3.connect(DB_FILE) as conn:
            c = conn.cursor()
            c.execute("""
                CREATE TABLE IF NOT EXISTS hits (
                    ip TEXT,
                    ts DATETIME
                )
            """)
            c.execute("""
                CREATE TABLE IF NOT EXISTS flagged (
                    ip TEXT PRIMARY KEY,
                    flagged_on DATETIME,
                    expires DATETIME
                )
            """)
            conn.commit()
    
    
    # --- Helper functions ---
    def record_hit(ip):
        now = datetime.utcnow()
        with sqlite3.connect(DB_FILE) as conn:
            c = conn.cursor()
            c.execute("INSERT INTO hits (ip, ts) VALUES (?, ?)", (ip, now))
            conn.commit()
    
    
    def get_hit_count(ip):
        with sqlite3.connect(DB_FILE) as conn:
            c = conn.cursor()
            c.execute("SELECT COUNT(*) FROM hits WHERE ip = ?", (ip,))
            return c.fetchone()[0]
    
    
    def flag_ip(ip):
        now = datetime.utcnow()
        expires = now + timedelta(hours=FLAG_DURATION_HOURS)
        with sqlite3.connect(DB_FILE) as conn:
            c = conn.cursor()
            c.execute("REPLACE INTO flagged (ip, flagged_on, expires) VALUES (?, ?, ?)",
                      (ip, now, expires))
            conn.commit()
        logging.warning(f"HONEYPOT flagged {ip} for {FLAG_DURATION_HOURS}h")  # Fail2Ban picks this up
    
    
    def is_flagged(ip):
        now = datetime.utcnow()
        with sqlite3.connect(DB_FILE) as conn:
            c = conn.cursor()
            c.execute("SELECT expires FROM flagged WHERE ip = ?", (ip,))
            row = c.fetchone()
            if not row:
                return False
            expires = datetime.fromisoformat(row[0])
            if now < expires:
                return True
            # Expired flag, remove it
            c.execute("DELETE FROM flagged WHERE ip = ?", (ip,))
            conn.commit()
            return False
    
    
    # --- Middleware ---
    @app.before_request
    def block_flagged():
        ip = request.remote_addr
        if is_flagged(ip):
            abort(403, description="Access denied (you have been flagged).")
    
    
    # --- Routes ---
    @app.route('/')
    def home():
        return '''
            <h1>Welcome</h1>
            <p><a href="/do_not_click">Don’t click this unless you are a bot</a></p>
        '''
    
    
    @app.route('/robots.txt')
    def robots_txt():
        return "User-agent: *\nDisallow: /do_not_click\n", 200, {'Content-Type': 'text/plain'}
    
    
    @app.route('/do_not_click')
    def honeypot():
        ip = request.remote_addr
    
        if is_flagged(ip):
            abort(403, description="Access denied (you’ve been flagged).")
    
        record_hit(ip)
        hit_count = get_hit_count(ip)
        logging.info(f"HONEYPOT triggered by {ip} (count={hit_count})")
    
        if hit_count >= TRAP_THRESHOLD:
            flag_ip(ip)
            return "You’ve been flagged for suspicious behavior.", 403
    
        return f"Suspicious activity detected ({hit_count}/{TRAP_THRESHOLD})."
    
    
    if __name__ == "__main__":
        init_db()
        app.run(debug=True)
    

    Here I condensed this down to its parts. Hopefully this works well for you.