mirror of
https://git.0x0.st/mia/0x0.git
synced 2026-01-09 20:11:46 +03:00
Add support for ClamAV
This commit is contained in:
64
fhost.py
64
fhost.py
@@ -22,7 +22,7 @@
|
||||
from flask import Flask, abort, make_response, redirect, request, send_from_directory, url_for, Response, render_template
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from flask_migrate import Migrate
|
||||
from sqlalchemy import and_
|
||||
from sqlalchemy import and_, or_
|
||||
from jinja2.exceptions import *
|
||||
from jinja2 import ChoiceLoader, FileSystemLoader
|
||||
from hashlib import sha256
|
||||
@@ -32,6 +32,7 @@ import click
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import datetime
|
||||
import typing
|
||||
import requests
|
||||
import secrets
|
||||
@@ -70,6 +71,13 @@ app.config.update(
|
||||
FHOST_UPLOAD_BLACKLIST = None,
|
||||
NSFW_DETECT = False,
|
||||
NSFW_THRESHOLD = 0.608,
|
||||
VSCAN_SOCKET = None,
|
||||
VSCAN_QUARANTINE_PATH = "quarantine",
|
||||
VSCAN_IGNORE = [
|
||||
"Eicar-Test-Signature",
|
||||
"PUA.Win.Packer.XmMusicFile",
|
||||
],
|
||||
VSCAN_INTERVAL = datetime.timedelta(days=7),
|
||||
URL_ALPHABET = "DEQhd2uFteibPwq0SWBInTpA_jcZL5GKz3YCR14Ulk87Jors9vNHgfaOmMXy6Vx-",
|
||||
)
|
||||
|
||||
@@ -131,6 +139,7 @@ class File(db.Model):
|
||||
expiration = db.Column(db.BigInteger)
|
||||
mgmt_token = db.Column(db.String)
|
||||
secret = db.Column(db.String)
|
||||
last_vscan = db.Column(db.DateTime)
|
||||
|
||||
def __init__(self, sha256, ext, mime, addr, expiration, mgmt_token):
|
||||
self.sha256 = sha256
|
||||
@@ -591,3 +600,56 @@ def get_max_lifespan(filesize: int) -> int:
|
||||
max_exp = app.config.get("FHOST_MAX_EXPIRATION", 365 * 24 * 60 * 60 * 1000)
|
||||
max_size = app.config.get("MAX_CONTENT_LENGTH", 256 * 1024 * 1024)
|
||||
return min_exp + int((-max_exp + min_exp) * (filesize / max_size - 1) ** 3)
|
||||
|
||||
def do_vscan(f):
|
||||
if f["path"].is_file():
|
||||
with open(f["path"], "rb") as scanf:
|
||||
try:
|
||||
f["result"] = list(app.config["VSCAN_SOCKET"].instream(scanf).values())[0]
|
||||
except:
|
||||
f["result"] = ("SCAN FAILED", None)
|
||||
else:
|
||||
f["result"] = ("FILE NOT FOUND", None)
|
||||
|
||||
return f
|
||||
|
||||
@app.cli.command("vscan")
|
||||
def vscan():
|
||||
if not app.config["VSCAN_SOCKET"]:
|
||||
print("""Error: Virus scanning enabled but no connection method specified.
|
||||
Please set VSCAN_SOCKET.""")
|
||||
sys.exit(1)
|
||||
|
||||
qp = Path(app.config["VSCAN_QUARANTINE_PATH"])
|
||||
qp.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
from multiprocessing import Pool
|
||||
with Pool() as p:
|
||||
if isinstance(app.config["VSCAN_INTERVAL"], datetime.timedelta):
|
||||
scandate = datetime.datetime.now() - app.config["VSCAN_INTERVAL"]
|
||||
res = File.query.filter(or_(File.last_vscan < scandate,
|
||||
File.last_vscan == None),
|
||||
File.removed == False)
|
||||
else:
|
||||
res = File.query.filter(File.last_vscan == None, File.removed == False)
|
||||
|
||||
work = [{"path" : f.getpath(), "name" : f.getname(), "id" : f.id} for f in res]
|
||||
|
||||
results = []
|
||||
for i, r in enumerate(p.imap_unordered(do_vscan, work)):
|
||||
if r["result"][0] != "OK":
|
||||
print(f"{r['name']}: {r['result'][0]} {r['result'][1] or ''}")
|
||||
|
||||
found = False
|
||||
if r["result"][0] == "FOUND":
|
||||
if not r["result"][1] in app.config["VSCAN_IGNORE"]:
|
||||
r["path"].rename(qp / r["name"])
|
||||
found = True
|
||||
|
||||
results.append({
|
||||
"id" : r["id"],
|
||||
"last_vscan" : None if r["result"][0] == "SCAN FAILED" else datetime.datetime.now(),
|
||||
"removed" : found})
|
||||
|
||||
db.session.bulk_update_mappings(File, results)
|
||||
db.session.commit()
|
||||
|
||||
Reference in New Issue
Block a user