#!/usr/bin/python3 import sqlite3 from flask import Flask, make_response, request, render_template, g, send_from_directory from flask_paginate import Pagination, get_page_parameter import confuse import re DATABASE = "../data/diffs.db" CONFIG_FILE = "../data/config.yaml" config = confuse.Configuration("headline", __name__) config.set_file(CONFIG_FILE) app = Flask(__name__) def get_db(): db = getattr(g, "_database", None) if db is None: db = g._database = sqlite3.connect(DATABASE) db.row_factory = sqlite3.Row return db @app.teardown_appcontext def close_connection(exception): db = getattr(g, "_database", None) if db is not None: db.close() def websearch_to_fts_query(search: str): """ Converts web searches into fts queries: 'this is "a test"' -> '"this" OR "is" OR "a test"' """ return " OR ".join( [ '"' + m.group(0) + '"' for m in re.finditer(r'(?<=")[^"]+(?=")|[^\s"]+', search) ] ) @app.route("/") def index(): db = get_db().cursor() search = request.args.get("search", type=str, default="") query = websearch_to_fts_query(search) if search else None # View options expand_diffs = request.args.get("expand_diffs") is not None db.execute( f"SELECT count(*) FROM diffs{'_fts(?)' if query else ''}", (query,) if query else (), ) diff_count = db.fetchall()[0][0] # flask-paginate page = request.args.get(get_page_parameter(), type=int, default=1) pagination = Pagination( page=page, total=diff_count, record_name="diffs", css_framework="bootstrap5" ) page_skip = pagination.skip per_page = pagination.per_page if query: db.execute( "SELECT * FROM diffs JOIN (SELECT rowid FROM diffs_fts(?)) filter ON filter.rowid = diffs.diff_id ORDER BY diff_id DESC LIMIT ? OFFSET ?", (query, per_page, page_skip), ) else: db.execute( "SELECT * FROM diffs ORDER BY diff_id DESC LIMIT ? OFFSET ?", (per_page, page_skip), ) diffs = db.fetchall() html = render_template( "index.html", diffs=diffs, page=page, pagination=pagination, diff_count=diff_count, search=search, expand_diffs=expand_diffs, ) res = make_response(html) res.cache_control.max_age = 60 res.cache_control.public = True return res @app.route("/article/") def article_detail(article_id: str): db = get_db().cursor() db.execute("SELECT * FROM diffs WHERE article_id = ?", (article_id,)) result = db.fetchall() if len(result) == 0: return make_response(render_template("not_found.html"), 404) article_url = result[0]["article_url"] return render_template( "article_detail.html", article_id=article_id, article_url=article_url, diffs=result, ) @app.route("/about") def about(): return render_template("about.html") @app.route("/feeds") def feed_list(): feeds = [] for conf in config["feeds"]: feed = { "rss_source": str(conf["rss_source"]), "unique_tag": str(conf["unique_tag"]), "feed_name": str(conf["name"]), } feeds.append(feed) return render_template("feeds.html", feeds=feeds) @app.route("/robots.txt") def static_from_root(): return send_from_directory(app.static_folder or "static", request.path[1:]) if __name__ == "__main__": app.run(host="0.0.0.0")