| from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, session, jsonify
|
| import os
|
| import sqlite3
|
| import hashlib
|
| from werkzeug.utils import secure_filename
|
| import datetime
|
| import shutil
|
| import re
|
| import mimetypes
|
| import json
|
|
|
| def get_file_type(filename):
|
| """Определяет тип файла на основе его расширения"""
|
| mime_type, _ = mimetypes.guess_type(filename)
|
| if mime_type:
|
| if mime_type.startswith('image/'):
|
| return 'image'
|
| elif mime_type.startswith('video/'):
|
| return 'video'
|
| elif mime_type.startswith('audio/'):
|
| return 'audio'
|
| elif mime_type.startswith('text/') or mime_type in ['application/json', 'application/xml', 'application/javascript']:
|
| return 'text'
|
|
|
|
|
| text_extensions = ['.txt', '.html', '.css', '.js', '.py', '.md', '.json', '.xml', '.csv', '.ini', '.cfg', '.conf']
|
| for ext in text_extensions:
|
| if filename.lower().endswith(ext):
|
| return 'text'
|
|
|
| return 'other'
|
|
|
| def hex_to_rgb(value):
|
| """Преобразует hex-код цвета в формат RGB"""
|
| value = value.lstrip('#')
|
| return f"{int(value[0:2], 16)}, {int(value[2:4], 16)}, {int(value[4:6], 16)}"
|
|
|
|
|
| def get_db_connection():
|
| conn = sqlite3.connect(DB_PATH)
|
| conn.row_factory = sqlite3.Row
|
| return conn
|
|
|
| def hash_password(password):
|
| return hashlib.sha256(password.encode()).hexdigest()
|
|
|
| def get_file_by_id(file_id):
|
| """Получает информацию о файле по его ID"""
|
| if not file_id:
|
| return None
|
|
|
| conn = get_db_connection()
|
| file = conn.execute('SELECT * FROM files WHERE id = ?', (file_id,)).fetchone()
|
| conn.close()
|
| return file
|
|
|
| def format_size(size_bytes):
|
| """Форматирует размер файла в человекочитаемый вид"""
|
| if size_bytes < 1024:
|
| return f"{size_bytes} Б"
|
| elif size_bytes < 1024 * 1024:
|
| return f"{size_bytes/1024:.1f} КБ"
|
| elif size_bytes < 1024 * 1024 * 1024:
|
| return f"{size_bytes/(1024*1024):.1f} МБ"
|
| else:
|
| return f"{size_bytes/(1024*1024*1024):.1f} ГБ"
|
|
|
| app = Flask(__name__)
|
| app.secret_key = 'supersecretkey'
|
|
|
|
|
| app.jinja_env.filters['hex_to_rgb'] = hex_to_rgb
|
| app.jinja_env.globals['get_file_by_id'] = get_file_by_id
|
| app.jinja_env.globals['format_size'] = format_size
|
|
|
|
|
| BASEDIR = os.path.abspath(os.path.dirname(__file__))
|
| UPLOAD_FOLDER = os.path.join(BASEDIR, 'uploads')
|
| BACKGROUNDS_FOLDER = os.path.join(BASEDIR, 'static', 'backgrounds')
|
| DB_PATH = os.path.join(BASEDIR, 'database.db')
|
|
|
|
|
| if not os.path.exists(UPLOAD_FOLDER):
|
| os.makedirs(UPLOAD_FOLDER)
|
|
|
| if not os.path.exists(BACKGROUNDS_FOLDER):
|
| os.makedirs(BACKGROUNDS_FOLDER)
|
|
|
|
|
| def init_db():
|
| conn = sqlite3.connect(DB_PATH)
|
| cursor = conn.cursor()
|
|
|
|
|
| cursor.execute('''
|
| CREATE TABLE IF NOT EXISTS users (
|
| id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| username TEXT UNIQUE NOT NULL,
|
| password TEXT NOT NULL,
|
| email TEXT UNIQUE NOT NULL,
|
| background_image TEXT DEFAULT 'default.jpg',
|
| color_scheme TEXT DEFAULT '#0d6efd',
|
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
| )
|
| ''')
|
|
|
|
|
| cursor.execute('''
|
| CREATE TABLE IF NOT EXISTS files (
|
| id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| filename TEXT NOT NULL,
|
| original_filename TEXT NOT NULL,
|
| path TEXT NOT NULL,
|
| size INTEGER NOT NULL,
|
| user_id INTEGER NOT NULL,
|
| parent_folder TEXT DEFAULT '/',
|
| is_folder BOOLEAN DEFAULT 0,
|
| file_type TEXT DEFAULT 'other',
|
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
| FOREIGN KEY (user_id) REFERENCES users (id)
|
| )
|
| ''')
|
|
|
|
|
| cursor.execute('''
|
| CREATE TABLE IF NOT EXISTS shares (
|
| id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| file_id INTEGER NOT NULL,
|
| user_id INTEGER NOT NULL,
|
| shared_with INTEGER,
|
| permission TEXT DEFAULT 'read',
|
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
| FOREIGN KEY (file_id) REFERENCES files (id),
|
| FOREIGN KEY (user_id) REFERENCES users (id),
|
| FOREIGN KEY (shared_with) REFERENCES users (id)
|
| )
|
| ''')
|
|
|
|
|
| cursor.execute('''
|
| CREATE TABLE IF NOT EXISTS contacts (
|
| id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| user_id INTEGER NOT NULL,
|
| contact_id INTEGER NOT NULL,
|
| status TEXT DEFAULT 'pending',
|
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
| FOREIGN KEY (user_id) REFERENCES users (id),
|
| FOREIGN KEY (contact_id) REFERENCES users (id),
|
| UNIQUE(user_id, contact_id)
|
| )
|
| ''')
|
|
|
|
|
| cursor.execute('''
|
| CREATE TABLE IF NOT EXISTS messages (
|
| id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| sender_id INTEGER NOT NULL,
|
| receiver_id INTEGER NOT NULL,
|
| content TEXT NOT NULL,
|
| attachment_id INTEGER,
|
| is_read BOOLEAN DEFAULT 0,
|
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
| FOREIGN KEY (sender_id) REFERENCES users (id),
|
| FOREIGN KEY (receiver_id) REFERENCES users (id),
|
| FOREIGN KEY (attachment_id) REFERENCES files (id)
|
| )
|
| ''')
|
|
|
| conn.commit()
|
| conn.close()
|
|
|
|
|
|
|
|
|
| @app.route('/get_file_content/<int:file_id>')
|
| def get_file_content(file_id):
|
| if 'user_id' not in session:
|
| return redirect(url_for('login'))
|
|
|
| user_id = session['user_id']
|
|
|
| conn = sqlite3.connect(DB_PATH)
|
| cursor = conn.cursor()
|
|
|
|
|
| cursor.execute('SELECT filename, path, user_id FROM files WHERE id = ?', (file_id,))
|
| file_data = cursor.fetchone()
|
|
|
| if not file_data:
|
| conn.close()
|
| return 'Файл не найден', 404
|
|
|
|
|
| if file_data[2] != user_id:
|
|
|
| cursor.execute('SELECT permission FROM shares WHERE file_id = ? AND shared_with = ?', (file_id, user_id))
|
| share_data = cursor.fetchone()
|
| conn.close()
|
|
|
| if not share_data:
|
| return 'Доступ запрещен', 403
|
|
|
|
|
| if share_data[0] not in ['read', 'write']:
|
| return 'Доступ запрещен', 403
|
| else:
|
| conn.close()
|
|
|
|
|
| file_path = os.path.join(BASEDIR, file_data[1])
|
|
|
| try:
|
| with open(file_path, 'r', encoding='utf-8') as file:
|
| content = file.read()
|
| return content
|
| except Exception as e:
|
| return str(e), 500
|
|
|
|
|
| @app.route('/save_file_content/<int:file_id>', methods=['POST'])
|
| def save_file_content(file_id):
|
| if 'user_id' not in session:
|
| return jsonify({'success': False, 'error': 'Необходима авторизация'}), 401
|
|
|
| user_id = session['user_id']
|
|
|
|
|
| data = request.get_json()
|
| if not data or 'content' not in data:
|
| return jsonify({'success': False, 'error': 'Отсутствует содержимое файла'}), 400
|
|
|
| content = data['content']
|
|
|
| conn = sqlite3.connect(DB_PATH)
|
| cursor = conn.cursor()
|
|
|
|
|
| cursor.execute('SELECT filename, path, user_id FROM files WHERE id = ?', (file_id,))
|
| file_data = cursor.fetchone()
|
| conn.close()
|
|
|
| if not file_data:
|
| return jsonify({'success': False, 'error': 'Файл не найден'}), 404
|
|
|
|
|
| if file_data[2] != user_id:
|
|
|
| conn = sqlite3.connect(DB_PATH)
|
| cursor = conn.cursor()
|
| cursor.execute('SELECT permission FROM shares WHERE file_id = ? AND shared_with = ?', (file_id, user_id))
|
| share_data = cursor.fetchone()
|
| conn.close()
|
|
|
| if not share_data or share_data[0] != 'write':
|
| return jsonify({'success': False, 'error': 'Доступ запрещен'}), 403
|
|
|
|
|
| file_path = os.path.join(BASEDIR, file_data[1])
|
|
|
| try:
|
| with open(file_path, 'w', encoding='utf-8') as file:
|
| file.write(content)
|
| return jsonify({'success': True})
|
| except Exception as e:
|
| return jsonify({'success': False, 'error': str(e)}), 500
|
| init_db()
|
|
|
|
|
| def login_required(f):
|
| def decorated_function(*args, **kwargs):
|
| if 'user_id' not in session:
|
| flash('Пожалуйста, войдите в систему для доступа к этой странице', 'error')
|
| return redirect(url_for('login'))
|
| return f(*args, **kwargs)
|
| decorated_function.__name__ = f.__name__
|
| return decorated_function
|
|
|
|
|
| @app.route('/')
|
| def index():
|
| if 'user_id' in session:
|
| return redirect(url_for('dashboard'))
|
| return render_template('index.html', now=datetime.datetime.now(), background='default.jpg')
|
|
|
| @app.route('/register', methods=['GET', 'POST'])
|
| def register():
|
| if request.method == 'POST':
|
| username = request.form['username']
|
| password = request.form['password']
|
| email = request.form['email']
|
| background_image = 'default.jpg'
|
|
|
| conn = get_db_connection()
|
|
|
|
|
| user = conn.execute('SELECT * FROM users WHERE username = ? OR email = ?',
|
| (username, email)).fetchone()
|
|
|
| if user:
|
| flash('Пользователь с таким именем или email уже существует', 'error')
|
| conn.close()
|
| return redirect(url_for('register'))
|
|
|
|
|
| hashed_password = hash_password(password)
|
| conn.execute('INSERT INTO users (username, password, email, background_image) VALUES (?, ?, ?, ?)',
|
| (username, hashed_password, email, background_image))
|
| conn.commit()
|
|
|
|
|
| user_id = conn.execute('SELECT id FROM users WHERE username = ?', (username,)).fetchone()[0]
|
|
|
|
|
| user_folder = os.path.join(UPLOAD_FOLDER, str(user_id))
|
| if not os.path.exists(user_folder):
|
| os.makedirs(user_folder)
|
|
|
| conn.close()
|
|
|
| flash('Регистрация успешна! Теперь вы можете войти в систему', 'success')
|
| return redirect(url_for('login'))
|
|
|
| return render_template('register.html', now=datetime.datetime.now(), background='default.jpg')
|
|
|
| @app.route('/login', methods=['GET', 'POST'])
|
| def login():
|
| if request.method == 'POST':
|
| username = request.form['username']
|
| password = request.form['password']
|
|
|
| conn = get_db_connection()
|
| user = conn.execute('SELECT * FROM users WHERE username = ?', (username,)).fetchone()
|
|
|
| if user and user['password'] == hash_password(password):
|
| session['user_id'] = user['id']
|
| session['username'] = user['username']
|
| session['background'] = user['background_image']
|
| session['color_scheme'] = user['color_scheme']
|
| conn.close()
|
| flash('Вы успешно вошли в систему!', 'success')
|
| return redirect(url_for('dashboard'))
|
| else:
|
| flash('Неверное имя пользователя или пароль', 'error')
|
| conn.close()
|
| return redirect(url_for('login'))
|
|
|
| return render_template('login.html', now=datetime.datetime.now(), background='default.jpg')
|
|
|
| @app.route('/logout')
|
| def logout():
|
| session.clear()
|
| flash('Вы вышли из системы', 'info')
|
| return redirect(url_for('index'))
|
|
|
| @app.route('/dashboard')
|
| @login_required
|
| def dashboard():
|
| user_id = session['user_id']
|
| current_folder = request.args.get('folder', '/')
|
|
|
|
|
| if current_folder != '/' and not current_folder.endswith('/'):
|
| current_folder += '/'
|
|
|
| conn = get_db_connection()
|
|
|
|
|
| files = conn.execute(
|
| 'SELECT * FROM files WHERE user_id = ? AND parent_folder = ? ORDER BY is_folder DESC, filename',
|
| (user_id, current_folder)
|
| ).fetchall()
|
|
|
|
|
| parent_folder = os.path.dirname(current_folder.rstrip('/'))
|
| if parent_folder == '':
|
| parent_folder = '/'
|
|
|
|
|
| current_folder_info = None
|
| if current_folder != '/':
|
| current_folder_info = conn.execute(
|
| 'SELECT * FROM files WHERE user_id = ? AND filename = ? AND is_folder = 1',
|
| (user_id, current_folder)
|
| ).fetchone()
|
|
|
|
|
| breadcrumbs = []
|
| if current_folder != '/':
|
| parts = current_folder.strip('/').split('/')
|
| path = '/'
|
| breadcrumbs.append(('/', 'Корень'))
|
| for i, part in enumerate(parts):
|
| if part:
|
| path += part + '/'
|
| breadcrumbs.append((path, part))
|
| else:
|
| breadcrumbs.append(('/', 'Корень'))
|
|
|
| conn.close()
|
|
|
| return render_template('dashboard.html',
|
| files=files,
|
| current_folder=current_folder,
|
| breadcrumbs=breadcrumbs,
|
| current_folder_info=current_folder_info,
|
| now=datetime.datetime.now(),
|
| background=session.get('background', 'default.jpg'))
|
|
|
| @app.route('/upload', methods=['POST'])
|
| @login_required
|
| def upload_file():
|
| user_id = session['user_id']
|
| current_folder = request.form.get('current_folder', '/')
|
|
|
|
|
| if 'files[]' not in request.files:
|
| flash('Файл не выбран', 'error')
|
| return redirect(url_for('dashboard', folder=current_folder))
|
|
|
| files = request.files.getlist('files[]')
|
|
|
|
|
| if not files or all(file.filename == '' for file in files):
|
| flash('Файлы не выбраны', 'error')
|
| return redirect(url_for('dashboard', folder=current_folder))
|
|
|
|
|
| success_count = 0
|
| for file in files:
|
| if file.filename == '':
|
| continue
|
|
|
| try:
|
| conn = get_db_connection()
|
| filename = secure_filename(file.filename)
|
|
|
|
|
| user_folder = os.path.join(UPLOAD_FOLDER, str(user_id))
|
|
|
|
|
| if current_folder == '/':
|
| save_path = user_folder
|
| else:
|
| folder_path = current_folder.strip('/')
|
| save_path = os.path.join(user_folder, folder_path)
|
|
|
|
|
| if not os.path.exists(save_path):
|
| os.makedirs(save_path)
|
|
|
|
|
| file_path = os.path.join(save_path, filename)
|
|
|
|
|
| file.save(file_path)
|
|
|
|
|
| file_size = os.path.getsize(file_path)
|
|
|
|
|
| file_type = get_file_type(filename)
|
|
|
| conn.execute(
|
| 'INSERT INTO files (filename, original_filename, path, size, user_id, parent_folder, file_type) VALUES (?, ?, ?, ?, ?, ?, ?)',
|
| (filename, file.filename, file_path, file_size, user_id, current_folder, file_type)
|
| )
|
| conn.commit()
|
| success_count += 1
|
| except Exception as e:
|
| print(f'Ошибка при загрузке файла {file.filename}: {str(e)}')
|
| finally:
|
| conn.close()
|
|
|
| if success_count > 0:
|
| flash(f'Успешно загружено файлов: {success_count}', 'success')
|
| else:
|
| flash('Не удалось загрузить ни одного файла', 'error')
|
| return redirect(url_for('dashboard', folder=current_folder))
|
|
|
|
|
| @app.route('/create_folder', methods=['POST'])
|
| @login_required
|
| def create_folder():
|
| user_id = session['user_id']
|
| folder_name = request.form['folder_name']
|
| current_folder = request.form.get('current_folder', '/')
|
|
|
|
|
| if current_folder != '/' and not current_folder.endswith('/'):
|
| current_folder += '/'
|
|
|
|
|
| full_path = current_folder + folder_name + '/'
|
|
|
|
|
| conn = get_db_connection()
|
| existing = conn.execute('SELECT * FROM files WHERE filename = ? AND user_id = ?',
|
| (full_path, user_id)).fetchone()
|
|
|
| if existing:
|
| flash('Папка уже существует', 'error')
|
| return redirect(url_for('dashboard', folder=current_folder))
|
|
|
|
|
| db_path = f"{current_folder}{folder_name}/"
|
|
|
|
|
| user_folder = os.path.join(UPLOAD_FOLDER, str(user_id))
|
| new_folder_path = os.path.join(user_folder, current_folder.lstrip('/'), folder_name)
|
|
|
|
|
| conn.execute(
|
| 'INSERT INTO files (filename, original_filename, path, size, user_id, parent_folder, is_folder) VALUES (?, ?, ?, ?, ?, ?, ?)',
|
| (db_path, folder_name, new_folder_path, 0, user_id, current_folder, 1)
|
| )
|
| conn.commit()
|
| conn.close()
|
|
|
| flash(f'Папка {folder_name} успешно создана', 'success')
|
| return redirect(url_for('dashboard', folder=current_folder))
|
|
|
| @app.route('/download/<int:file_id>')
|
| @login_required
|
| def download_file(file_id):
|
| user_id = session['user_id']
|
|
|
| conn = get_db_connection()
|
|
|
| file = conn.execute('SELECT * FROM files WHERE id = ? AND user_id = ?', (file_id, user_id)).fetchone()
|
|
|
|
|
| if not file:
|
| file = conn.execute('''
|
| SELECT f.* FROM files f
|
| JOIN shares s ON f.id = s.file_id
|
| WHERE f.id = ? AND s.shared_with = ?
|
| ''', (file_id, user_id)).fetchone()
|
|
|
|
|
| if not file:
|
|
|
| file = conn.execute('''
|
| SELECT f.*
|
| FROM files f
|
| JOIN messages m ON f.id = m.attachment_id
|
| WHERE f.id = ? AND m.receiver_id = ?
|
| ''', (file_id, user_id)).fetchone()
|
|
|
|
|
| if not file:
|
| file = conn.execute('''
|
| SELECT f.*
|
| FROM files f
|
| JOIN messages m ON f.id = m.attachment_id
|
| WHERE f.id = ? AND m.sender_id = ?
|
| ''', (file_id, user_id)).fetchone()
|
|
|
| conn.close()
|
|
|
| if file and not file['is_folder']:
|
| directory = os.path.dirname(file['path'])
|
| return send_from_directory(directory, os.path.basename(file['path']), as_attachment=True, download_name=file['original_filename'])
|
| else:
|
| flash('Файл не найден или у вас нет прав для его скачивания', 'error')
|
| return redirect(url_for('dashboard'))
|
|
|
| @app.route('/delete/<int:file_id>', methods=['POST'])
|
| @login_required
|
| def delete_file(file_id):
|
| user_id = session['user_id']
|
|
|
| conn = get_db_connection()
|
| file = conn.execute('SELECT * FROM files WHERE id = ? AND user_id = ?', (file_id, user_id)).fetchone()
|
|
|
| if not file:
|
| conn.close()
|
| flash('Файл не найден или у вас нет прав для его удаления', 'error')
|
| return redirect(url_for('dashboard'))
|
|
|
| current_folder = file['parent_folder']
|
|
|
|
|
| if file['is_folder']:
|
|
|
| if os.path.exists(file['path']):
|
| shutil.rmtree(file['path'])
|
|
|
|
|
| folder_path = file['filename']
|
| conn.execute('DELETE FROM files WHERE user_id = ? AND (parent_folder = ? OR parent_folder LIKE ?)',
|
| (user_id, folder_path, folder_path + '/%'))
|
| else:
|
|
|
| if os.path.exists(file['path']):
|
| os.remove(file['path'])
|
|
|
|
|
| conn.execute('DELETE FROM files WHERE id = ?', (file_id,))
|
|
|
|
|
| conn.execute('DELETE FROM shares WHERE file_id = ?', (file_id,))
|
|
|
| conn.commit()
|
| conn.close()
|
|
|
| flash('Файл успешно удален', 'success')
|
| return redirect(url_for('dashboard', folder=current_folder))
|
|
|
| @app.route('/share/<int:file_id>', methods=['GET', 'POST'])
|
| @login_required
|
| def share_file(file_id):
|
| user_id = session['user_id']
|
|
|
| conn = get_db_connection()
|
| file = conn.execute('SELECT * FROM files WHERE id = ? AND user_id = ? AND is_folder = 0', (file_id, user_id)).fetchone()
|
|
|
| if not file:
|
| conn.close()
|
| flash('Файл не найден или у вас нет прав для его совместного использования', 'error')
|
| return redirect(url_for('dashboard'))
|
|
|
| if request.method == 'POST':
|
| shared_username = request.form['username']
|
| permission = request.form['permission']
|
|
|
|
|
| shared_user = conn.execute('SELECT id FROM users WHERE username = ?', (shared_username,)).fetchone()
|
|
|
| if not shared_user:
|
| conn.close()
|
| flash(f'Пользователь {shared_username} не найден', 'error')
|
| return redirect(url_for('share_file', file_id=file_id))
|
|
|
|
|
| if shared_user['id'] == user_id:
|
| conn.close()
|
| flash('Вы не можете поделиться файлом с самим собой', 'error')
|
| return redirect(url_for('share_file', file_id=file_id))
|
|
|
|
|
| existing_share = conn.execute(
|
| 'SELECT * FROM shares WHERE file_id = ? AND user_id = ? AND shared_with = ?',
|
| (file_id, user_id, shared_user['id'])
|
| ).fetchone()
|
|
|
| if existing_share:
|
|
|
| conn.execute(
|
| 'UPDATE shares SET permission = ? WHERE id = ?',
|
| (permission, existing_share['id'])
|
| )
|
| flash(f'Права доступа для пользователя {shared_username} обновлены', 'success')
|
| else:
|
|
|
| conn.execute(
|
| 'INSERT INTO shares (file_id, user_id, shared_with, permission) VALUES (?, ?, ?, ?)',
|
| (file_id, user_id, shared_user['id'], permission)
|
| )
|
| flash(f'Файл успешно предоставлен пользователю {shared_username}', 'success')
|
|
|
| conn.commit()
|
| conn.close()
|
| return redirect(url_for('dashboard'))
|
|
|
|
|
| shares = conn.execute(
|
| '''SELECT s.*, u.username FROM shares s
|
| JOIN users u ON s.shared_with = u.id
|
| WHERE s.file_id = ? AND s.user_id = ?''',
|
| (file_id, user_id)
|
| ).fetchall()
|
|
|
| conn.close()
|
|
|
| return render_template('share.html', file=file, shares=shares, now=datetime.datetime.now(), background=session.get('background', 'default.jpg'))
|
|
|
| @app.route('/shared_with_me')
|
| @login_required
|
| def shared_with_me():
|
| user_id = session['user_id']
|
|
|
| conn = get_db_connection()
|
| shared_files = conn.execute(
|
| '''SELECT f.*, s.permission, u.username as owner_username
|
| FROM files f
|
| JOIN shares s ON f.id = s.file_id
|
| JOIN users u ON f.user_id = u.id
|
| WHERE s.shared_with = ?
|
| ORDER BY f.is_folder DESC, f.filename''',
|
| (user_id,)
|
| ).fetchall()
|
|
|
|
|
| files_with_access = []
|
| for file in shared_files:
|
| file_dict = dict(file)
|
| file_dict['access_type'] = file_dict['permission']
|
| file_dict['owner'] = file_dict['owner_username']
|
| files_with_access.append(file_dict)
|
|
|
| conn.close()
|
|
|
| return render_template('shared_with_me.html', files=files_with_access, now=datetime.datetime.now(), background=session.get('background', 'default.jpg'))
|
|
|
| @app.route('/settings')
|
| @login_required
|
| def settings():
|
| conn = get_db_connection()
|
| user = conn.execute('SELECT color_scheme FROM users WHERE id = ?', (session['user_id'],)).fetchone()
|
| conn.close()
|
| return render_template('settings.html',
|
| now=datetime.datetime.now(),
|
| background=session.get('background', 'default.jpg'),
|
| color_scheme=user['color_scheme'])
|
|
|
| @app.route('/change_color_scheme', methods=['POST'])
|
| @login_required
|
| def change_color_scheme():
|
| user_id = session['user_id']
|
| color_scheme = request.form['color_scheme']
|
|
|
| conn = get_db_connection()
|
| conn.execute('UPDATE users SET color_scheme = ? WHERE id = ?', (color_scheme, user_id))
|
| conn.commit()
|
| conn.close()
|
|
|
| session['color_scheme'] = color_scheme
|
| flash('Цветовая схема успешно обновлена', 'success')
|
| return redirect(url_for('settings'))
|
|
|
| @app.route('/change_background', methods=['POST'])
|
| @login_required
|
| def change_background():
|
| user_id = session['user_id']
|
|
|
| if 'background_file' not in request.files:
|
| flash('Файл не выбран', 'error')
|
| return redirect(url_for('settings'))
|
|
|
| file = request.files['background_file']
|
|
|
| if file.filename == '':
|
| flash('Файл не выбран', 'error')
|
| return redirect(url_for('settings'))
|
|
|
|
|
| allowed_extensions = {'jpg', 'jpeg', 'png', 'gif'}
|
| if not '.' in file.filename or file.filename.rsplit('.', 1)[1].lower() not in allowed_extensions:
|
| flash('Недопустимый формат файла. Разрешены только изображения (jpg, jpeg, png, gif)', 'error')
|
| return redirect(url_for('settings'))
|
|
|
|
|
| filename = f"user_{user_id}_{secure_filename(file.filename)}"
|
| file_path = os.path.join(BACKGROUNDS_FOLDER, filename)
|
|
|
|
|
| file.save(file_path)
|
|
|
|
|
| conn = get_db_connection()
|
| conn.execute('UPDATE users SET background_image = ? WHERE id = ?', (filename, user_id))
|
| conn.commit()
|
| conn.close()
|
|
|
|
|
| session['background'] = filename
|
|
|
| flash('Фоновое изображение успешно обновлено', 'success')
|
| return redirect(url_for('settings'))
|
|
|
|
|
| @app.route('/contacts')
|
| @login_required
|
| def contacts():
|
| user_id = session['user_id']
|
| conn = get_db_connection()
|
|
|
|
|
| contacts = conn.execute('''
|
| SELECT c.id, c.contact_id, c.status, u.username, u.email,
|
| (SELECT COUNT(*) FROM messages
|
| WHERE sender_id = c.contact_id AND receiver_id = ? AND is_read = 0) as unread_count
|
| FROM contacts c
|
| JOIN users u ON c.contact_id = u.id
|
| WHERE c.user_id = ? AND c.status = 'accepted'
|
| ORDER BY u.username
|
| ''', (user_id, user_id)).fetchall()
|
|
|
|
|
| incoming_requests = conn.execute('''
|
| SELECT c.id, c.user_id, u.username, u.email
|
| FROM contacts c
|
| JOIN users u ON c.user_id = u.id
|
| WHERE c.contact_id = ? AND c.status = 'pending'
|
| ORDER BY c.created_at DESC
|
| ''', (user_id,)).fetchall()
|
|
|
|
|
| outgoing_requests = conn.execute('''
|
| SELECT c.id, c.contact_id, u.username, u.email
|
| FROM contacts c
|
| JOIN users u ON c.contact_id = u.id
|
| WHERE c.user_id = ? AND c.status = 'pending'
|
| ORDER BY c.created_at DESC
|
| ''', (user_id,)).fetchall()
|
|
|
| conn.close()
|
|
|
| return render_template('contacts.html',
|
| contacts=contacts,
|
| incoming_requests=incoming_requests,
|
| outgoing_requests=outgoing_requests,
|
| now=datetime.datetime.now(),
|
| background=session.get('background', 'default.jpg'))
|
|
|
| @app.route('/add_contact', methods=['POST'])
|
| @login_required
|
| def add_contact():
|
| user_id = session['user_id']
|
| username_or_email = request.form['username_or_email']
|
|
|
| conn = get_db_connection()
|
|
|
|
|
| user = conn.execute('SELECT id FROM users WHERE username = ? OR email = ?',
|
| (username_or_email, username_or_email)).fetchone()
|
|
|
| if not user:
|
| flash('Пользователь не найден', 'error')
|
| conn.close()
|
| return redirect(url_for('contacts'))
|
|
|
| contact_id = user['id']
|
|
|
|
|
| if contact_id == user_id:
|
| flash('Вы не можете добавить себя в контакты', 'error')
|
| conn.close()
|
| return redirect(url_for('contacts'))
|
|
|
|
|
| existing = conn.execute('SELECT * FROM contacts WHERE user_id = ? AND contact_id = ?',
|
| (user_id, contact_id)).fetchone()
|
|
|
| if existing:
|
| flash('Этот пользователь уже в вашем списке контактов или запросов', 'error')
|
| conn.close()
|
| return redirect(url_for('contacts'))
|
|
|
|
|
| incoming = conn.execute('SELECT * FROM contacts WHERE user_id = ? AND contact_id = ?',
|
| (contact_id, user_id)).fetchone()
|
|
|
| if incoming:
|
|
|
| conn.execute('UPDATE contacts SET status = "accepted" WHERE user_id = ? AND contact_id = ?',
|
| (contact_id, user_id))
|
|
|
| conn.execute('INSERT INTO contacts (user_id, contact_id, status) VALUES (?, ?, "accepted")',
|
| (user_id, contact_id))
|
| flash('Контакт успешно добавлен', 'success')
|
| else:
|
|
|
| conn.execute('INSERT INTO contacts (user_id, contact_id, status) VALUES (?, ?, "pending")',
|
| (user_id, contact_id))
|
| flash('Запрос на добавление в контакты отправлен', 'success')
|
|
|
| conn.commit()
|
| conn.close()
|
| return redirect(url_for('contacts'))
|
|
|
| @app.route('/accept_contact/<int:contact_id>', methods=['POST'])
|
| @login_required
|
| def accept_contact(contact_id):
|
| user_id = session['user_id']
|
|
|
| conn = get_db_connection()
|
|
|
|
|
| contact_request = conn.execute('SELECT * FROM contacts WHERE id = ? AND contact_id = ?',
|
| (contact_id, user_id)).fetchone()
|
|
|
| if not contact_request:
|
| flash('Запрос не найден', 'error')
|
| conn.close()
|
| return redirect(url_for('contacts'))
|
|
|
|
|
| conn.execute('UPDATE contacts SET status = "accepted" WHERE id = ?', (contact_id,))
|
|
|
|
|
| requester_id = contact_request['user_id']
|
| existing = conn.execute('SELECT * FROM contacts WHERE user_id = ? AND contact_id = ?',
|
| (user_id, requester_id)).fetchone()
|
|
|
| if not existing:
|
| conn.execute('INSERT INTO contacts (user_id, contact_id, status) VALUES (?, ?, "accepted")',
|
| (user_id, requester_id))
|
| else:
|
| conn.execute('UPDATE contacts SET status = "accepted" WHERE user_id = ? AND contact_id = ?',
|
| (user_id, requester_id))
|
|
|
| conn.commit()
|
| conn.close()
|
|
|
| flash('Контакт успешно добавлен', 'success')
|
| return redirect(url_for('contacts'))
|
|
|
| @app.route('/reject_contact/<int:contact_id>', methods=['POST'])
|
| @login_required
|
| def reject_contact(contact_id):
|
| user_id = session['user_id']
|
|
|
| conn = get_db_connection()
|
|
|
|
|
| conn.execute('DELETE FROM contacts WHERE id = ? AND contact_id = ?', (contact_id, user_id))
|
| conn.commit()
|
| conn.close()
|
|
|
| flash('Запрос отклонен', 'info')
|
| return redirect(url_for('contacts'))
|
|
|
| @app.route('/remove_contact/<int:contact_id>', methods=['POST'])
|
| @login_required
|
| def remove_contact(contact_id):
|
| user_id = session['user_id']
|
|
|
| conn = get_db_connection()
|
|
|
|
|
| contact = conn.execute('SELECT contact_id FROM contacts WHERE id = ? AND user_id = ?',
|
| (contact_id, user_id)).fetchone()
|
|
|
| if not contact:
|
| flash('Контакт не найден', 'error')
|
| conn.close()
|
| return redirect(url_for('contacts'))
|
|
|
| other_user_id = contact['contact_id']
|
|
|
|
|
| conn.execute('DELETE FROM contacts WHERE user_id = ? AND contact_id = ?', (user_id, other_user_id))
|
| conn.execute('DELETE FROM contacts WHERE user_id = ? AND contact_id = ?', (other_user_id, user_id))
|
|
|
| conn.commit()
|
| conn.close()
|
|
|
| flash('Контакт удален', 'info')
|
| return redirect(url_for('contacts'))
|
|
|
|
|
| @app.route('/messages/<int:contact_id>')
|
| @login_required
|
| def messages(contact_id):
|
| user_id = session['user_id']
|
|
|
| conn = get_db_connection()
|
|
|
|
|
| contacts = conn.execute('''
|
| SELECT c.id, c.contact_id, u.username,
|
| (SELECT COUNT(*) FROM messages
|
| WHERE sender_id = c.contact_id AND receiver_id = ? AND is_read = 0) as unread_count
|
| FROM contacts c
|
| JOIN users u ON c.contact_id = u.id
|
| WHERE c.user_id = ? AND c.status = 'accepted'
|
| ORDER BY u.username
|
| ''', (user_id, user_id)).fetchall()
|
|
|
|
|
|
|
| if contact_id == 0:
|
|
|
| if contacts:
|
| conn.close()
|
| return redirect(url_for('messages', contact_id=contacts[0]['contact_id']))
|
|
|
| conn.close()
|
| return render_template('messages.html',
|
| contact=None,
|
| messages=[],
|
| contacts=contacts,
|
| user_id=user_id,
|
| now=datetime.datetime.now(),
|
| background=session.get('background', 'default.jpg'))
|
|
|
|
|
| contact = conn.execute('''
|
| SELECT c.id, c.contact_id, u.username, u.email
|
| FROM contacts c
|
| JOIN users u ON c.contact_id = u.id
|
| WHERE c.user_id = ? AND c.contact_id = ? AND c.status = 'accepted'
|
| ''', (user_id, contact_id)).fetchone()
|
|
|
| if not contact:
|
| flash('Контакт не найден или не подтвержден', 'error')
|
| conn.close()
|
| return redirect(url_for('contacts'))
|
|
|
|
|
| messages = conn.execute('''
|
| SELECT * FROM messages
|
| WHERE (sender_id = ? AND receiver_id = ?) OR (sender_id = ? AND receiver_id = ?)
|
| ORDER BY created_at
|
| ''', (user_id, contact_id, contact_id, user_id)).fetchall()
|
|
|
|
|
| conn.execute('UPDATE messages SET is_read = 1 WHERE sender_id = ? AND receiver_id = ?',
|
| (contact_id, user_id))
|
| conn.commit()
|
|
|
| conn.close()
|
|
|
| return render_template('messages.html',
|
| contact=contact,
|
| messages=messages,
|
| contacts=contacts,
|
| user_id=user_id,
|
| now=datetime.datetime.now(),
|
| background=session.get('background', 'default.jpg'))
|
|
|
| @app.route('/send_message/<int:contact_id>', methods=['POST'])
|
| @login_required
|
| def send_message(contact_id):
|
| user_id = session['user_id']
|
| content = request.form['content']
|
|
|
| if not content or content.strip() == '':
|
| flash('Сообщение не может быть пустым', 'error')
|
| return redirect(url_for('messages', contact_id=contact_id))
|
|
|
| conn = get_db_connection()
|
|
|
|
|
| contact = conn.execute('''
|
| SELECT * FROM contacts
|
| WHERE user_id = ? AND contact_id = ? AND status = 'accepted'
|
| ''', (user_id, contact_id)).fetchone()
|
|
|
| if not contact:
|
| flash('Контакт не найден или не подтвержден', 'error')
|
| conn.close()
|
| return redirect(url_for('contacts'))
|
|
|
|
|
| attachment_id = None
|
| if 'attachment' in request.files and request.files['attachment'].filename != '':
|
| file = request.files['attachment']
|
| filename = secure_filename(file.filename)
|
|
|
|
|
| attachments_folder = os.path.join(UPLOAD_FOLDER, str(user_id), 'attachments')
|
| if not os.path.exists(attachments_folder):
|
| os.makedirs(attachments_folder)
|
|
|
|
|
| file_path = os.path.join(attachments_folder, filename)
|
| file.save(file_path)
|
|
|
|
|
| file_size = os.path.getsize(file_path)
|
|
|
|
|
| file_type = get_file_type(filename)
|
|
|
|
|
| cursor = conn.cursor()
|
| cursor.execute(
|
| 'INSERT INTO files (filename, original_filename, path, size, user_id, parent_folder, file_type) VALUES (?, ?, ?, ?, ?, ?, ?)',
|
| (filename, file.filename, file_path, file_size, user_id, '/attachments/', file_type)
|
| )
|
| conn.commit()
|
|
|
|
|
| attachment_id = cursor.lastrowid
|
|
|
|
|
| conn.execute('INSERT INTO messages (sender_id, receiver_id, content, attachment_id) VALUES (?, ?, ?, ?)',
|
| (user_id, contact_id, content, attachment_id))
|
| conn.commit()
|
| conn.close()
|
|
|
| return redirect(url_for('messages', contact_id=contact_id))
|
|
|
| @app.route('/get_unread_count')
|
| @login_required
|
| def get_unread_count():
|
| user_id = session['user_id']
|
|
|
| conn = get_db_connection()
|
|
|
|
|
| unread_count = conn.execute('''
|
| SELECT COUNT(*) as count FROM messages
|
| WHERE receiver_id = ? AND is_read = 0
|
| ''', (user_id,)).fetchone()['count']
|
|
|
| conn.close()
|
|
|
| return jsonify({'unread_count': unread_count})
|
|
|
| @app.route('/get_new_messages/<int:contact_id>')
|
| @login_required
|
| def get_new_messages(contact_id):
|
| user_id = session['user_id']
|
| last_id = request.args.get('last_id', 0, type=int)
|
|
|
| conn = get_db_connection()
|
|
|
|
|
| contact = conn.execute('''
|
| SELECT * FROM contacts
|
| WHERE user_id = ? AND contact_id = ? AND status = 'accepted'
|
| ''', (user_id, contact_id)).fetchone()
|
|
|
| if not contact:
|
| conn.close()
|
| return jsonify({'error': 'Контакт не найден или не подтвержден', 'messages': []})
|
|
|
|
|
| messages = conn.execute('''
|
| SELECT * FROM messages
|
| WHERE ((sender_id = ? AND receiver_id = ?) OR (sender_id = ? AND receiver_id = ?)) AND id > ?
|
| ORDER BY created_at
|
| ''', (user_id, contact_id, contact_id, user_id, last_id)).fetchall()
|
|
|
|
|
| conn.execute('UPDATE messages SET is_read = 1 WHERE sender_id = ? AND receiver_id = ?',
|
| (contact_id, user_id))
|
| conn.commit()
|
|
|
|
|
| message_list = []
|
| for message in messages:
|
| message_dict = dict(message)
|
|
|
|
|
| if message['attachment_id']:
|
| attachment = conn.execute('SELECT * FROM files WHERE id = ?', (message['attachment_id'],)).fetchone()
|
| if attachment:
|
| attachment_dict = dict(attachment)
|
| attachment_dict['size_formatted'] = format_size(attachment['size'])
|
| message_dict['attachment'] = attachment_dict
|
|
|
| message_list.append(message_dict)
|
|
|
| conn.close()
|
|
|
| return jsonify({
|
| 'messages': message_list,
|
| 'user_id': user_id
|
| })
|
|
|
| @app.route('/search_users')
|
| @login_required
|
| def search_users():
|
| user_id = session['user_id']
|
| query = request.args.get('q', '')
|
|
|
| if len(query) < 3:
|
| return jsonify([])
|
|
|
| conn = get_db_connection()
|
|
|
|
|
| users = conn.execute('''
|
| SELECT id, username, email FROM users
|
| WHERE (username LIKE ? OR email LIKE ?) AND id != ?
|
| LIMIT 10
|
| ''', (f'%{query}%', f'%{query}%', user_id)).fetchall()
|
|
|
|
|
| result = []
|
| for user in users:
|
| contact = conn.execute('''
|
| SELECT status FROM contacts
|
| WHERE user_id = ? AND contact_id = ?
|
| ''', (user_id, user['id'])).fetchone()
|
|
|
| status = contact['status'] if contact else None
|
|
|
| result.append({
|
| 'id': user['id'],
|
| 'username': user['username'],
|
| 'email': user['email'],
|
| 'status': status
|
| })
|
|
|
| conn.close()
|
|
|
| return jsonify(result)
|
|
|
| @app.route('/view_shared/<int:file_id>')
|
| @login_required
|
| def view_shared_file(file_id):
|
| user_id = session['user_id']
|
|
|
| conn = get_db_connection()
|
|
|
|
|
| share = conn.execute(
|
| 'SELECT s.*, f.* FROM shares s JOIN files f ON s.file_id = f.id WHERE s.file_id = ? AND s.shared_with = ?',
|
| (file_id, user_id)
|
| ).fetchone()
|
|
|
| if not share:
|
| conn.close()
|
| flash('Файл не найден или у вас нет прав для его просмотра', 'error')
|
| return redirect(url_for('shared_with_me'))
|
|
|
|
|
| if share['is_folder']:
|
|
|
| folder_path = share['filename']
|
| shared_files = conn.execute(
|
| '''SELECT f.*, s.permission
|
| FROM files f
|
| JOIN shares s ON f.id = s.file_id
|
| WHERE f.user_id = ? AND f.parent_folder = ? AND s.shared_with = ?
|
| ORDER BY f.is_folder DESC, f.filename''',
|
| (share['user_id'], folder_path, user_id)
|
| ).fetchall()
|
|
|
| conn.close()
|
| return render_template('view_shared_folder.html', folder=share, files=shared_files, now=datetime.datetime.now(), background=session.get('background', 'default.jpg'))
|
| else:
|
|
|
| conn.close()
|
| directory = os.path.dirname(share['path'])
|
| return send_from_directory(directory, os.path.basename(share['path']), as_attachment=True, download_name=share['original_filename'])
|
|
|
| @app.route('/delete_share/<int:share_id>', methods=['POST'])
|
| @login_required
|
| def delete_share(share_id):
|
| user_id = session['user_id']
|
|
|
| conn = get_db_connection()
|
|
|
|
|
| share = conn.execute('SELECT * FROM shares WHERE id = ? AND user_id = ?', (share_id, user_id)).fetchone()
|
|
|
| if not share:
|
| conn.close()
|
| flash('Запись не найдена или у вас нет прав для ее удаления', 'error')
|
| return redirect(url_for('dashboard'))
|
|
|
|
|
| file_id = share['file_id']
|
|
|
|
|
| conn.execute('DELETE FROM shares WHERE id = ?', (share_id,))
|
| conn.commit()
|
| conn.close()
|
|
|
| flash('Общий доступ успешно отменен', 'success')
|
| return redirect(url_for('share_file', file_id=file_id))
|
|
|
| @app.route('/search')
|
| @login_required
|
| def search():
|
| user_id = session['user_id']
|
| query = request.args.get('query', '')
|
|
|
| if not query:
|
| return redirect(url_for('dashboard'))
|
|
|
| conn = get_db_connection()
|
|
|
|
|
| files = conn.execute(
|
| '''SELECT * FROM files
|
| WHERE user_id = ? AND (filename LIKE ? OR original_filename LIKE ?)
|
| ORDER BY is_folder DESC, filename''',
|
| (user_id, f'%{query}%', f'%{query}%')
|
| ).fetchall()
|
|
|
|
|
| shared_files = conn.execute(
|
| '''SELECT f.*, s.permission, u.username as owner_username
|
| FROM files f
|
| JOIN shares s ON f.id = s.file_id
|
| JOIN users u ON f.user_id = u.id
|
| WHERE s.shared_with = ? AND (f.filename LIKE ? OR f.original_filename LIKE ?)
|
| ORDER BY f.is_folder DESC, f.filename''',
|
| (user_id, f'%{query}%', f'%{query}%')
|
| ).fetchall()
|
|
|
| conn.close()
|
|
|
| return render_template('search_results.html',
|
| files=files,
|
| shared_files=shared_files,
|
| query=query,
|
| now=datetime.datetime.now(),
|
| background=session.get('background', 'default.jpg'))
|
|
|
|
|
| if __name__ == '__main__':
|
| app.run(host='0.0.0.0', debug=True) |