tabs.device.DiskStatusServer 源代码

import logging
import socket
from flask import Flask, jsonify
import shutil
import os
import requests
from PyQt6 import QtCore
from typing import List
from dataclasses import dataclass
import json

logger = logging.getLogger(__name__)


def ping_server(server: str, port: int, timeout=3):
    """ping server"""
    try:
        socket.setdefaulttimeout(timeout)
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect((server, port))
    except OSError:
        return False
    else:
        s.close()
        return True


app = Flask(__name__)

config_ = None
path_ = None
location_ = None


[文档]@app.route("/size") def disk_size(): """磁盘可用空间服务器""" global config_, path_, location_ if path_ == None: if config_ == None: logging.error("config not initialized") return else: path_ = config_["record_path_root"] for group in config_["groups"]: if group["current"]: location_ = group["name"] data = DiskStatus.get(path_, location_) return jsonify(data)
# https://realpython.com/python-pyqt-qthread/#using-qthread-vs-pythons-threading
[文档]class Worker(QtCore.QObject): """服务器工作线程""" finished = QtCore.pyqtSignal() def run(self): app.run(host="0.0.0.0", port=5000) self.finished.emit()
[文档]@dataclass class DiskStatus: """磁盘数据模型""" location: str = "" total: str = "" used: str = "" free: str = "" error: bool = True
[文档] @classmethod def get(cls, path: str, location: str): """获取数据""" total, used, free = 0, 0, 0 error = True if os.path.exists(path): total, used, free = shutil.disk_usage(path=path) error = False total, used, free = list(map(lambda x: f"{x/2**30:.2f}G", (total, used, free))) instance = cls( total=total, used=used, free=free, error=error, location=location ) return instance
import pydevd
[文档]class StatusReporterWorker(QtCore.QObject): """通过requrests定期获取双方磁盘数据, 作为信号发出""" result_signal = QtCore.pyqtSignal(list) """结果信号""" def __init__(self, config) -> None: super(StatusReporterWorker, self).__init__() pydevd.settrace(suspend=False) self.config = config
[文档] def get(self): """获取信息""" pydevd.settrace(suspend=False) status_all = [] proxies = { "http": "", "https": "", } for group in self.config["groups"]: address = group["address"] if group["current"]: address = "127.0.0.1" try: r = requests.get( f"http://{address}:5000/size", timeout=3, proxies=proxies ) except: logger.info(f"{group['name']}通讯失败") status_all.append(DiskStatus()) else: try: data = json.loads(r.text) except: logger.exception("json parse error") status_all.append(DiskStatus()) else: status_all.append(DiskStatus(**data)) return status_all
[文档] def refresh(self): """刷新""" data = self.get() self.result_signal.emit(data)
[文档] def start_routine(self): """定时刷新""" self.timer = QtCore.QTimer() self.timer.timeout.connect(self.refresh) self.timer.start(3000)
[文档]class StatusReporter(QtCore.QObject): """运行磁盘数据线程""" result_signal = QtCore.pyqtSignal(list) """结果信号""" def __init__(self, config) -> None: super(StatusReporter, self).__init__() self.config = config self.thread = QtCore.QThread() self.worker = StatusReporterWorker(config) self.worker.moveToThread(self.thread) self.thread.started.connect(self.worker.start_routine) self.worker.result_signal.connect(self.result_signal)
[文档] def start(self): """启动线程""" self.thread.start()
[文档]class DiskStatusServer(QtCore.QObject): """运行服务器线程""" thread_completed_signal = QtCore.pyqtSignal() def __init__(self, config): super(DiskStatusServer, self).__init__() global config_ config_ = config for group in config["groups"]: if group["current"]: self.current_group = group["name"] self.thread = QtCore.QThread() self.worker = Worker() self.worker.moveToThread(self.thread) self.thread.started.connect(self.worker.run) self.worker.finished.connect(self.log) self.worker.finished.connect(self.thread.quit) self.worker.finished.connect(self.worker.deleteLater) self.thread.finished.connect(self.thread.deleteLater)
[文档] def start(self): """启动服务器""" self.thread.start()
[文档] def log(self): """退出日志""" logger.warning("status server exited.")