mirror of
https://git.0x0.st/mia/0x0.git
synced 2026-01-09 20:11:46 +03:00
Implement request filters
This moves preexisting blacklists to the database, and adds the
following filter types:
* IP address
* IP network
* MIME type
* User agent
In addition, IP address handling is now done with the ipaddress
module.
This commit is contained in:
197
fhost.py
197
fhost.py
@@ -19,23 +19,28 @@
|
||||
and limitations under the License.
|
||||
"""
|
||||
|
||||
from flask import Flask, abort, make_response, redirect, request, send_from_directory, url_for, Response, render_template
|
||||
from flask import Flask, abort, make_response, redirect, request, send_from_directory, url_for, Response, render_template, Request
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from flask_migrate import Migrate
|
||||
from sqlalchemy import and_, or_
|
||||
from sqlalchemy.orm import declared_attr
|
||||
import sqlalchemy.types as types
|
||||
from jinja2.exceptions import *
|
||||
from jinja2 import ChoiceLoader, FileSystemLoader
|
||||
from hashlib import sha256
|
||||
from magic import Magic
|
||||
from mimetypes import guess_extension
|
||||
import click
|
||||
import enum
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import datetime
|
||||
import ipaddress
|
||||
import typing
|
||||
import requests
|
||||
import secrets
|
||||
import re
|
||||
from validators import url as url_valid
|
||||
from pathlib import Path
|
||||
|
||||
@@ -63,12 +68,6 @@ app.config.update(
|
||||
"text/plain" : ".txt",
|
||||
"text/x-diff" : ".diff",
|
||||
},
|
||||
FHOST_MIME_BLACKLIST = [
|
||||
"application/x-dosexec",
|
||||
"application/java-archive",
|
||||
"application/java-vm"
|
||||
],
|
||||
FHOST_UPLOAD_BLACKLIST = None,
|
||||
NSFW_DETECT = False,
|
||||
NSFW_THRESHOLD = 0.92,
|
||||
VSCAN_SOCKET = None,
|
||||
@@ -129,12 +128,34 @@ class URL(db.Model):
|
||||
|
||||
return u
|
||||
|
||||
class IPAddress(types.TypeDecorator):
|
||||
impl = types.LargeBinary
|
||||
cache_ok = True
|
||||
|
||||
def process_bind_param(self, value, dialect):
|
||||
match value:
|
||||
case ipaddress.IPv6Address():
|
||||
value = (value.ipv4_mapped or value).packed
|
||||
case ipaddress.IPv4Address():
|
||||
value = value.packed
|
||||
|
||||
return value
|
||||
|
||||
def process_result_value(self, value, dialect):
|
||||
if value is not None:
|
||||
value = ipaddress.ip_address(value)
|
||||
if type(value) is ipaddress.IPv6Address:
|
||||
value = value.ipv4_mapped or value
|
||||
|
||||
return value
|
||||
|
||||
|
||||
class File(db.Model):
|
||||
id = db.Column(db.Integer, primary_key = True)
|
||||
sha256 = db.Column(db.String, unique = True)
|
||||
ext = db.Column(db.UnicodeText)
|
||||
mime = db.Column(db.UnicodeText)
|
||||
addr = db.Column(db.UnicodeText)
|
||||
addr = db.Column(IPAddress(16))
|
||||
ua = db.Column(db.UnicodeText)
|
||||
removed = db.Column(db.Boolean, default=False)
|
||||
nsfw_score = db.Column(db.Float)
|
||||
@@ -227,12 +248,13 @@ class File(db.Model):
|
||||
else:
|
||||
mime = file_.content_type
|
||||
|
||||
if mime in app.config["FHOST_MIME_BLACKLIST"] or guess in app.config["FHOST_MIME_BLACKLIST"]:
|
||||
abort(415)
|
||||
|
||||
if len(mime) > 128:
|
||||
abort(400)
|
||||
|
||||
for flt in MIMEFilter.query.all():
|
||||
if flt.check(guess):
|
||||
abort(403, flt.reason)
|
||||
|
||||
if mime.startswith("text/") and not "charset" in mime:
|
||||
mime += "; charset=utf-8"
|
||||
|
||||
@@ -308,6 +330,127 @@ class File(db.Model):
|
||||
return f, isnew
|
||||
|
||||
|
||||
class RequestFilter(db.Model):
|
||||
__tablename__ = "request_filter"
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
type = db.Column(db.String(20), index=True, nullable=False)
|
||||
comment = db.Column(db.UnicodeText)
|
||||
|
||||
__mapper_args__ = {
|
||||
"polymorphic_on": type,
|
||||
"with_polymorphic": "*",
|
||||
"polymorphic_identity": "empty"
|
||||
}
|
||||
|
||||
def __init__(self, comment: str = None):
|
||||
self.comment = comment
|
||||
|
||||
|
||||
class AddrFilter(RequestFilter):
|
||||
addr = db.Column(IPAddress(16), unique=True)
|
||||
|
||||
__mapper_args__ = {"polymorphic_identity": "addr"}
|
||||
|
||||
def __init__(self, addr: ipaddress._BaseAddress, comment: str = None):
|
||||
self.addr = addr
|
||||
super().__init__(comment=comment)
|
||||
|
||||
def check(self, addr: ipaddress._BaseAddress) -> bool:
|
||||
if type(addr) is ipaddress.IPv6Address:
|
||||
addr = addr.ipv4_mapped or addr
|
||||
return addr == self.addr
|
||||
|
||||
def check_request(self, r: Request) -> bool:
|
||||
return self.check(ipaddress.ip_address(r.remote_addr))
|
||||
|
||||
@property
|
||||
def reason(self) -> str:
|
||||
return f"Your IP Address ({self.addr.compressed}) is blocked from " \
|
||||
"uploading files."
|
||||
|
||||
|
||||
class IPNetwork(types.TypeDecorator):
|
||||
impl = types.Text
|
||||
cache_ok = True
|
||||
|
||||
def process_bind_param(self, value, dialect):
|
||||
if value is not None:
|
||||
value = value.compressed
|
||||
|
||||
return value
|
||||
|
||||
def process_result_value(self, value, dialect):
|
||||
if value is not None:
|
||||
value = ipaddress.ip_network(value)
|
||||
|
||||
return value
|
||||
|
||||
|
||||
class NetFilter(RequestFilter):
|
||||
net = db.Column(IPNetwork)
|
||||
|
||||
__mapper_args__ = {"polymorphic_identity": "net"}
|
||||
|
||||
def __init__(self, net: ipaddress._BaseNetwork, comment: str = None):
|
||||
self.net = net
|
||||
super().__init__(comment=comment)
|
||||
|
||||
def check(self, addr: ipaddress._BaseAddress) -> bool:
|
||||
if type(addr) is ipaddress.IPv6Address:
|
||||
addr = addr.ipv4_mapped or addr
|
||||
return addr in self.net
|
||||
|
||||
def check_request(self, r: Request) -> bool:
|
||||
return self.check(ipaddress.ip_address(r.remote_addr))
|
||||
|
||||
@property
|
||||
def reason(self) -> str:
|
||||
return f"Your network ({self.net.compressed}) is blocked from " \
|
||||
"uploading files."
|
||||
|
||||
|
||||
class HasRegex:
|
||||
@declared_attr
|
||||
def regex(cls):
|
||||
return cls.__table__.c.get("regex", db.Column(db.UnicodeText))
|
||||
|
||||
def check(self, s: str) -> bool:
|
||||
return re.match(self.regex, s) is not None
|
||||
|
||||
|
||||
class MIMEFilter(HasRegex, RequestFilter):
|
||||
__mapper_args__ = {"polymorphic_identity": "mime"}
|
||||
|
||||
def __init__(self, mime_regex: str, comment: str = None):
|
||||
self.regex = mime_regex
|
||||
super().__init__(comment=comment)
|
||||
|
||||
def check_request(self, r: Request) -> bool:
|
||||
if "file" in r.files:
|
||||
return self.check(r.files["file"].mimetype)
|
||||
|
||||
return False
|
||||
|
||||
@property
|
||||
def reason(self) -> str:
|
||||
return "File MIME type not allowed."
|
||||
|
||||
|
||||
class UAFilter(HasRegex, RequestFilter):
|
||||
__mapper_args__ = {"polymorphic_identity": "ua"}
|
||||
|
||||
def __init__(self, ua_regex: str, comment: str = None):
|
||||
self.regex = ua_regex
|
||||
super().__init__(comment=comment)
|
||||
|
||||
def check_request(self, r: Request) -> bool:
|
||||
return self.check(r.user_agent.string)
|
||||
|
||||
@property
|
||||
def reason(self) -> str:
|
||||
return "User agent not allowed."
|
||||
|
||||
|
||||
class UrlEncoder(object):
|
||||
def __init__(self,alphabet, min_length):
|
||||
self.alphabet = alphabet
|
||||
@@ -351,17 +494,6 @@ def shorten(url):
|
||||
|
||||
return u.geturl()
|
||||
|
||||
def in_upload_bl(addr):
|
||||
if app.config["FHOST_UPLOAD_BLACKLIST"]:
|
||||
with app.open_instance_resource(app.config["FHOST_UPLOAD_BLACKLIST"], "r") as bl:
|
||||
check = addr.lstrip("::ffff:")
|
||||
for l in bl.readlines():
|
||||
if not l.startswith("#"):
|
||||
if check == l.rstrip():
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
"""
|
||||
requested_expiration can be:
|
||||
- None, to use the longest allowed file lifespan
|
||||
@@ -371,10 +503,7 @@ requested_expiration can be:
|
||||
Any value greater that the longest allowed file lifespan will be rounded down to that
|
||||
value.
|
||||
"""
|
||||
def store_file(f, requested_expiration: typing.Optional[int], addr, ua, secret: bool):
|
||||
if in_upload_bl(addr):
|
||||
return "Your host is blocked from uploading files.\n", 451
|
||||
|
||||
def store_file(f, requested_expiration: typing.Optional[int], addr, ua, secret: bool):
|
||||
sf, isnew = File.store(f, requested_expiration, addr, ua, secret)
|
||||
|
||||
response = make_response(sf.geturl())
|
||||
@@ -491,8 +620,15 @@ def get(path, secret=None):
|
||||
@app.route("/", methods=["GET", "POST"])
|
||||
def fhost():
|
||||
if request.method == "POST":
|
||||
for flt in RequestFilter.query.all():
|
||||
if flt.check_request(request):
|
||||
abort(403, flt.reason)
|
||||
|
||||
sf = None
|
||||
secret = "secret" in request.form
|
||||
addr = ipaddress.ip_address(request.remote_addr)
|
||||
if type(addr) is ipaddress.IPv6Address:
|
||||
addr = addr.ipv4_mapped or addr
|
||||
|
||||
if "file" in request.files:
|
||||
try:
|
||||
@@ -500,7 +636,7 @@ def fhost():
|
||||
return store_file(
|
||||
request.files["file"],
|
||||
int(request.form["expires"]),
|
||||
request.remote_addr,
|
||||
addr,
|
||||
request.user_agent.string,
|
||||
secret
|
||||
)
|
||||
@@ -512,14 +648,14 @@ def fhost():
|
||||
return store_file(
|
||||
request.files["file"],
|
||||
None,
|
||||
request.remote_addr,
|
||||
addr,
|
||||
request.user_agent.string,
|
||||
secret
|
||||
)
|
||||
elif "url" in request.form:
|
||||
return store_url(
|
||||
request.form["url"],
|
||||
request.remote_addr,
|
||||
addr,
|
||||
request.user_agent.string,
|
||||
secret
|
||||
)
|
||||
@@ -538,6 +674,7 @@ Disallow: /
|
||||
|
||||
@app.errorhandler(400)
|
||||
@app.errorhandler(401)
|
||||
@app.errorhandler(403)
|
||||
@app.errorhandler(404)
|
||||
@app.errorhandler(411)
|
||||
@app.errorhandler(413)
|
||||
@@ -546,7 +683,7 @@ Disallow: /
|
||||
@app.errorhandler(451)
|
||||
def ehandler(e):
|
||||
try:
|
||||
return render_template(f"{e.code}.html", id=id, request=request), e.code
|
||||
return render_template(f"{e.code}.html", id=id, request=request, description=e.description), e.code
|
||||
except TemplateNotFound:
|
||||
return "Segmentation fault\n", e.code
|
||||
|
||||
|
||||
Reference in New Issue
Block a user