Русский
Русский
English
Статистика
Реклама

Многопоточное скачивание файлов с ftp python-скриптом

Зачем это нужно?

Однажды передо мной встала задача копирования большого количества файлов с ftp-сервера. Нужно было делать бэкап. Казалось бы, что может быть проще! Но увы, ничего готового работающего так же быстро для моих условий найти не удалось.

Ситуация

Нужно было забирать периодически пару сотен файлов с ftp-сервера под Windows. Много мелочи и несколько очень крупных по размеру файлов. Суммарно примерно на 500 Гб. Сервер представляет собой vps, расположенный довольно далеко за рубежом. Днем машина высоко нагружена, рано ночью выполняются регламентные работы, итого на скачивание часов 5 максимум.

Ни одна из рассмотренных мной утилит не смогла справиться качественно и за отведенное время. Ну что ж, деваться некуда, нормальную систему резервного копирования ещё не купили, а значит ноги в руки вооружаемся редактором или IDE Python и вперёд! За приключениями!

Конфиг

Все параметры для скрипта вынесем в отдельный файл для удобства.

Шаблон конфига:

host = 'ip.ip.ip.ip'user = 'ftpusername'passwd = 'ftppassword'basepath = '/path/to/backup/folder'  # Папка, в которой будут созданы подпапки со скачанными файламиmax_threads = 20 # максимальное количество одновременных процессов загрузкиlog_path = '\path\to\logfile'statusfilepath = '\path\to\statusfile'

Конфиг сохраняем с расширением .py и импортируем в начале нашего скрипта. Импортировать можно непосредственно в пространство имён скрипта, но я сделал конструкцию слегка напоминающую костыль в основной части моего скрипта:

if __name__ == "__main__":    host = config.host    user = config.user    passwd = config.passwd    basepath = config.basepath  # Папка, в которой будут созданы подпапки со скачанными файлами    max_threads = config.max_threads    log_path = config.log_path    statusfilepath = config.statusfilepath    main()

В начале был список

Скачать файлы с ftp сама по себе задача не сложная, но путём недолгих экспериментов было выяснено, что скачивание файлов занимает время, а таймаут ftp-соединения приходит к нам гораздо быстрее. Следовательно, качать нужно каждый файл в новом соединении, иначе велик риск чего-то потом не досчитаться в скачанных файлах.

Для этого нам нужен список этих самых файлов. Ни о каком статичном списке файлов, конечно, речи не идет, значит нам его при каждом выполнении скрипта получать с сервера по-новой.

Удобства ради и чтобы не таскать параметры по всему коду - переопределим параметры стандартного класса ftp:

class MyFtp (ftplib.FTP):    """Класс переопределяет стандартный, чтобы задать все параметры соединение в одном месте"""    def __init__(self):        self.host = host        self.user = user        self.passwd = passwd        self.timeout = 1800        super(MyFtp, self).__init__()    def connect(self):        super(MyFtp, self).connect(self.host, timeout=self.timeout)    def login(self):        super(MyFtp, self).login(user=self.user, passwd=self.passwd)    def quit(self):        super(MyFtp,self).quit()

Параметры берутся из конфига. Конечно же в нужно не забыть импортировать библиотеку ftplib, чтобы этот кусок заработал.

Список файлов с сервера мы получим с помощью следующего класса:

class FileList:    """Класс для работы со списком загружаемых файлов"""    def __init__(self):        self.ftp = None        self.file_list = []    def connect_ftp(self):        import sys        self.ftp = MyFtp()        self.ftp.connect()        self.ftp.login()        self.ftp.__class__.encoding = sys.getfilesystemencoding()    def get_list(self, name):        """Метод для получения списка всех файлов с ftp-сервера."""        import os        for dirname in self.ftp.mlsd(str(name), facts=["type"]):            if dirname[1]["type"] == "file":                entry_file_list = {}                entry_file_list['remote_path'] = name  #путь до файла                entry_file_list['filename'] = dirname[0]  #имя файла                self.file_list.append(entry_file_list)            else:                path = os.path.join(name, dirname[0])                self.get_list(path)    def get_next_file(self):        return self.file_list.pop()    def len(self):        return len(self.file_list)

Помимо методов соединения с сервером, получения списка файлов и определения его длины здесь имеет метод, который возвращает нам следующий файл для скачивания, из списка он при этом, конечно, удаляется.

Логирование

Для ведения логов скачивания будет использовать стандартную библиотеку logging. Создадим класс, который будет заниматься логированием.

class MyLogger:    """Класс для логирования событий"""    def __init__(self):        self.logger = None    def start_file_logging(self, logger_name, log_path):        """Обычное логирование в файл"""        import logging        self.logger = logging.getLogger(logger_name)        self.logger.setLevel(logging.INFO)        try:            fh = logging.FileHandler(log_path)        except FileNotFoundError:            log_path = "downloader.log"            fh = logging.FileHandler(log_path)        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')        fh.setFormatter(formatter)        self.logger.addHandler(fh)    def start_rotate_logging(self, logger_name, log_path, max_bytes=104857600, story_backup=5):        """Логирование в файл с ротацией логов"""        import logging        from logging.handlers import RotatingFileHandler        self.logger = logging.getLogger(logger_name)        self.logger.setLevel(logging.INFO)        try:            fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)        except FileNotFoundError:            log_path = "downloader.log"            fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')        fh.setFormatter(formatter)        self.logger.addHandler(fh)    def add(self, msg):        self.logger.info(str(msg))    def add_error(self, msg):        self.logger.error(str(msg))

Скрипт будет поддерживать просто логирование в файл и ротацию файловых логов, ибо логи имеют свойство расти непомерно и это надо бы держать под контролем.

Скачивание файла

Каждый файл будет скачиваться в отдельном потоке. Класс, скачивающий один конкретный файл с сервера выглядит следующим образом:

class BaseFileDownload(threading.Thread):    """ Объект для копируемого файла """    count = 0    def __init__(self, rpath, filename, log):        threading.Thread.__init__(self)        self.remote_path = rpath        self.filename = filename        self.ftp = None        self.command = None        self.currentpath = None        self.log = log        self.__class__.count += 1 # для подсчета одновременно запущенных закачек    def __del__(self):        self.__class__.count -= 1    def connect(self):        """Метод для соединения с ftp"""        import sys        self.ftp = MyFtp()        self.ftp.connect()        self.ftp.login()        self.ftp.__class__.encoding = sys.getfilesystemencoding()    def run(self):        """Запуск потока скачивания"""        import os        self.connect()        self.command = str(bytes('RETR ', encoding='latin-1'), encoding='utf-8')        self.currentpath = os.path.join(basepath, self.remote_path[3:])        self.ftp.cwd(self.remote_path)        if not os.path.exists(self.currentpath):            os.makedirs(self.currentpath, exist_ok=True)        self.host_file = os.path.join(self.currentpath, self.filename)        try:            with open(self.host_file, 'wb') as local_file:                self.log.add("Start downloading " + self.filename)                self.ftp.retrbinary(self.command + self.filename, local_file.write)                self.log.add("Downloading " + self.filename + " complete")        except ftplib.error_perm:            self.log.add_error('Perm error')        self.ftp.quit()

Для подсчета количества одновременно скачиваемых файлов мы будет использовать свойство класса count. В нём у нас будет количество существующих экземпляров класса: в конструкторе счетчик наращивается, в деструкторе, соответственно, уменьшается.

Метод для запуска скачивания должен обязательно называться run - это требование библиотеки threading (не забываем её импортировать!), которую мы будем использовать для параллельного запуска нескольких процессов скачивания.

При сохранении списка файлов скрипт сохраняет также путь до этого файла, этот путь мы воссоздаем при скачивании с помощью os.makedirs.

Статус-файл

По завершению скачивания скрипт будет записывать в файл уведомление об этом. Это уведомление можно мониторить zabbix, чтобы понимать когда бэкап не отработал или, как сделал я - написать простого бота, чтобы периодически проверять статус.

Класс для работы с этим файлов выглядит так:

class StatusFile:    """По окончанию задачи скрипт пишет в файл уведомление о корректном выполнении."""    def __init__(self):        self.msg = ''    def setstatus(self, msg):        global statusfilepath        with open(statusfilepath, 'w') as status_file:            status_file.write(msg)

Многопоточность

Ну и, наконец, сама основная функция скрипта, которая осуществляет работу с потоками скачивания:

def main():    import os    import datetime    import time    log = MyLogger()    log.start_rotate_logging("DownloaderLog", os.path.join(log_path, "download_backup.log")) # запускаем логирование    now = datetime.datetime.today().strftime("%Y%m%d")    global basepath    basepath = os.path.join(basepath, now)  # модифицируем путь, добавляя текущую дату    list_file = FileList()    list_file.connect_ftp()    list_file.get_list("..")    for i in range(list_file.len()):        flag = True        while flag:   # цикл внутри которого поддерживается нужное количество одновременно запущенных загрузок            if BaseFileDownload.count < max_threads:                curfile = list_file.get_next_file()                threadid = BaseFileDownload(curfile["remote_path"], curfile["filename"], log)                threadid.start()                flag = False            else:                time.sleep(20)    log.add("Downloading files complete")    statusfile = StatusFile()    statusfile.setstatus("Downloading at " + str(datetime.datetime.now()) + " finishing successful")

Здесь мы запускаем логирование, получаем список файлов ( он хранится в памяти).

В вечном цикле while мы проверяем количество одновременно запущенных скачиваний и, при необходимости, запускаем дополнительные потоки.

Исходный код целиком можно найти здесь.

Источник: habr.com
К списку статей
Опубликовано: 18.01.2021 00:17:12
0

Сейчас читают

Комментариев (0)
Имя
Электронная почта

Python

Ftp

Multithreading

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru