1 1 1 1 1 1 1 1 1 1 Rating 5.00 (1 Vote)
Логика и пример работы Зеннопостера с MySQL в многопотоке

Привет, друзья! Тема многопоточной работы Зеннопостером с базой данных MySQL актуальна как никогда - я уже работаю с MySQL больше года, но всегда достаточно много времени приходится выделять на продумывание процесса работы, чтобы Зеннопостер работал корректно именно так, как я этого хочу. Вот только в зависимости от того, что должен делать шаблон и сколько действий должен выполнять, будет использовать браузер или нет - всегда логика работы с MySQL примерно идентична, но все таки отличается. И остановиться, рассказать о всех моментах у меня банально нет времени - после покупки машины я потратил много денег и теперь стараюсь стабилизировать свое финансовое положение. При этом в данной статье я покажу пример, как можно работать Зеннопостером с MySQL, и многим разработчикам шаблонов такого подхода будет достаточно.

Определение задачи

В основном Зеннопостером мы работаем с каким-то конкретным сайтом используя большое количество аккаунтов, либо же имеем один прокси и работаем с большим количеством сайтов. Также как вариант может быть один прокси, один аккаунт, один сайт, но на сайте миллион видео роликов или комментариев или чего-то еще и нам хочется держать в базе данных актуальную информацию и обновлять её. Естественно в таком случае хочется задействовать многопоточный режим, чтобы получать актуальную информацию как можно быстрее. Еще может быть вариант, что мы хотим заходить на какой-то кран каждых 5 или 10 минут и собирать там криптовалюту и нам необходимо держать в базе данных время последнего сбора монет.

Все эти варианты разные, но принцип работы Зеннопостера в многопоточном режиме с базой данных будет примерно идентичным - будут отличаться только поля таблицы базы данных. Исходя из этого давайте я придумаю отдельную задачу, решение которой и реализую в данной публикации - а Вашим домашним заданием будет адаптировать это решение для своих нужд. Договорились? Тогда жду от Вас комментарий о том, получилось ли адаптировать решение или нет.

И так, есть у нас сайт, на котором люди оставляют отзывы о сайтах - ВебПроверка. На этом сайте есть что-то похожее на API - мы можем получить количество положительных и отрицательных отзывов о сайте, который запрашиваем. Допустим у нас есть 5 сайтов, за отзывами об которых мы хотим следить и хранить эту информацию в базе данных. Чтобы информация была актуальной мы будем обращаться к ВебПроверке каждую минуту. При чем, чтобы продемонстрировать многопоточный режим Зеннопостера договоримся что работать мы будем в 5 потоков (ведь при желании мы можем захотеть иметь информацию о 1000 сайтах к примеру, но на таком маленьком количестве сайтов будет лучше заметно корректно работает шаблон или нет).

Шаблон не должен использовать браузер, и конечно же мы не хотим каждый раз заходить и запускать его - он будет запускаться каждую минуту на 5 выполнений в 5 потоков. Таким образом мы сможем добиться решения поставленной задачи и иметь всегда актуальную информацию в нашей базе данных. Конечно же мы можем также запустить его на бесконечное количество выполнений в этих же 5 потоков, только в этом случае он будет много раз долбиться запросами к базе данных, получать ответ что еще время не пришло, закрываться и запускаться повторно. Чтобы как-то растянуть это время, можно поставить внутри шаблона например паузу на 1 минуту - шаблон без использования браузера много оперативной памяти не кушает, а значит мы можем себе позволить держать запущенными необходимое количество потоков. С другой стороны все таки разработчики Зеннопостера рекомендуют закрывать поток как только он не нужен и запускать его тогда, когда в нем есть необходимость - из-за чего иногда лучше использовать расписание.

Создаем табличку в MySQL

Так как задача у нас примитивная, то и база данных будет точно такая же - она будет состоять из одной таблички, назовем её например site. И хранить она будет данные о сайте, а именно количество положительных, отрицательных и нейтральных отзывов. Также в этой табличке мы будем хранить домен сайта и присвоим каждому сайту идентификатор, чтобы проще было его искать в случае необходимости. Еще введем поле timestamp, в котором будет храниться время последнего обращения к сайту и поле status, которое будет отвечать за то, чтобы с одним сайтом работал только один поток Зеннопостера. Поле timestamp будет обновляться в значени "Сейчас" каждый раз, как только мы изменим какое либо другое поле. Поле status мы будем изменять шаблоном Зеннопостера.

Для создания таблички в базе данных MySQL можно использовать программу Navicat. Она предоставлена в графическом интерфейсе - просто подключаемся к нашей базе данных и нажимаем добавить новую табличку. После чего указываем поля id, url, timestamp, status, positive, negative, messages.

Также можно открыть вкладку "Запросы" и поочередно выполнить SQL запросы:

  1. DROP TABLE IF EXISTS `site`;
  2. CREATE TABLE `site` (
      `id` int(6) NOT NULL,
      `url` varchar(255) NOT NULL,
      `timestamp` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      `status` int(3) NOT NULL DEFAULT '0',
      `positive` int(6) DEFAULT '0',
      `negative` int(6) DEFAULT '0',
      `messages` int(6) DEFAULT '0',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf32;
  3. INSERT INTO `site` VALUES ('1', 'zennolab.com', '2017-12-03 17:07:10', '0', '0', '0', '0');
    INSERT INTO `site` VALUES ('2', 'youtube.com', '2017-12-03 17:07:10', '0', '0', '0', '0');
    INSERT INTO `site` VALUES ('3', '999dice.com', '2017-12-03 17:07:12', '0', '0', '0', '0');
    INSERT INTO `site` VALUES ('4', 'kwork.ru', '2017-12-03 17:07:13', '0', '0', '0', '0');
    INSERT INTO `site` VALUES ('5', 'coinhive.com', '2017-12-03 17:07:15', '0', '0', '0', '0');

Оформление таблички в Navicat будет иметь следующий вид:

Создали табличку MySQL - вид в Navicat

А когда добавим туда данные - получим примерно такую табличку с данными:

Добавили данные в MySQL - вид в Navicat

Естественно, что эти SQL запросы можно выполнить и в Зеннопостере как с использованием стандартных действий, так и используя "Свой C# код". Раньше я всегда использовал стандартные кубики, так как знаний, как работать с своим кодом было недостаточно. Когда же разобрался маленько - начал использовать эту возможность - оказалось, что так куда удобнее, так как прямо в одном кубике можно и сформировать запросы и отправаить их и обработать результат. Ниже я предоставлю фрагменты кода с описанием действий - их можно будет просто скопировать и вставить в один кубик "Свой C# код", не смотря на то, что здесь он будет разбит на логические блоки.

  1. bool message = true;
    string query = string.Empty;
    int rezult = 0;
    string[] db_setting = { "database:name", "server:localhost", "port:82", "user id:login", "password:password"};
    string connect_db = string.Format(
    		@"database={0};server={1};port={2};user id={3};password={4}",
    		db_setting[0].Split(':')[1],
    		db_setting[1].Split(':')[1],
    		db_setting[2].Split(':')[1],
    		db_setting[3].Split(':')[1],
    		db_setting[4].Split(':')[1]
    	);

    Логическую переменную message я создал для того, чтобы можно быстро изменить ей значение на false чтобы отключить уведомления, которые указал в коде для отладки и отслеживания процесса работы

    В строковую переменную query я буду помещать тело SQL запроса, чтобы удалить табличку, создать табличку, добавить данные или обновить их.

    При выполнении запросов в Зеннопостере есть возможность отправлять запросы, которые не требуют возврата значения - например запросы, которые добавляют данные возвращают количество добавленных строк, запросы которые обновляют данные - возвращают количество измененных строк. Так вот переменная rezult будет хранить в себе это значение как результат выполненного запроса.

    В строковый массив db_setting я добавлю данные для доступа к базе данных, а именно имя базы данны, сервер базы данных, порт соединения к базе данных, логин пользователя базы данных и пароль пользователя базы данных. Конечно эти данные Вы должны заменить на свои чтобы подключиться к своей базе данных MySQL.

    Переменная connect_db будет хранить строку подключения к базе данных и будет использоваться везде где придется отправить запрос к базе данных. Формируется она с входящих данных массива db_setting

  2. query="DROP TABLE IF EXISTS `site`";
    rezult = ZennoPoster.Db.ExecuteNonQuery(query, null, ZennoLab.InterfacesLibrary.Enums.Db.DbProvider.MySqlClient, connect_db);
    project.SendInfoToLog(string.Format(@"{0}",rezult), message);

    Сначала в переменную query добавляется SQL запрос, который удаляет табличку site если находит её в базе данных. Это делается для того, чтобы не появилась ошибка при попытке создания этой таблички в следующих запросах. После этого выполняется уже сам запрос к базе данных, результат выполнения которого сохранится в переменной rezult. И уже в конце выводится уведомление в лог с содержимим переменной rezult.

  3. query=@"CREATE TABLE `site` (
      `id` int(6) NOT NULL,
      `url` varchar(255) NOT NULL,
      `timestamp` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      `status` int(3) NOT NULL DEFAULT '0',
      `positive` int(6) DEFAULT '0',
      `negative` int(6) DEFAULT '0',
      `messages` int(6) DEFAULT '0',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf32";
    rezult = ZennoPoster.Db.ExecuteNonQuery(query, null, ZennoLab.InterfacesLibrary.Enums.Db.DbProvider.MySqlClient, connect_db);
    project.SendInfoToLog(string.Format(@"{0}",rezult), message);

    Как видим, этот код от предыдущего отличается только самим SQL запросом. Сейчас уже мы будем отправлять запрос, который будет в нашей базе данных создавать табличку site с полями id, url, timestamp, status, positive, negative, messages.

  4. string[] values = {
    	"'1', 'zennolab.com', '2017-12-03 17:07:10', '0', '0', '0', '0'",
    	"'2', 'youtube.com', '2017-12-03 17:07:10', '0', '0', '0', '0'",
    	"'3', '999dice.com', '2017-12-03 17:07:12', '0', '0', '0', '0'",
    	"'4', 'kwork.ru', '2017-12-03 17:07:13', '0', '0', '0', '0'",
    	"'5', 'coinhive.com', '2017-12-03 17:07:15', '0', '0', '0', '0'"	
    };

    Чтобы проще было работать с данными, которые необходимо добавлять в базу данных я вынес их в отдельный строковый массив. Каждый элемент массива - это данные в виде id, url, timestamp, status, positive, negative, messages, которые необходимо подготовить заранее. В запросе для добавления данных в базу данных мы будем использовать всегда только один элемент массива - а значит в любой момент этот массив можно заменить просто на переменную, которая будет содержать значения в этой же последовательности.

  5. foreach(string data in values) {
    	query=string.Format(@"INSERT INTO `site` VALUES ({0})", data);
    	rezult = ZennoPoster.Db.ExecuteNonQuery(query, null, ZennoLab.InterfacesLibrary.Enums.Db.DbProvider.MySqlClient, connect_db);
    	project.SendInfoToLog(string.Format(@"{0}",rezult), message);
    }

    Так как значения которые мы будем добавлять в базу данных хранятся у нас в массиве, то самым простым способом добавить их - это пробежаться по всем элементам массива и для каждого элемента выполнить запрос на добавление в базу данных. Конечно, в базу данных данные можно было бы добавить и по другому - сформировать строку в виде (елемент массива) для каждого элемента массива, после чего соединить их через запятую и уже потом добавить в SQL запрос. Это отлично работает и экономит количество запросов к базе данных, но, если этот массив с данными будет например на 100 000 строк - тогда этот запрос может не выполняться (такое количество данных можно разбить например по 100 строк, и добавлять их одним запросом - тогда все данные получится добавить всего 1000 запросов).

В результате выполнения кубика "Свой C# код" в Зеннопостере мы получим созданную табличку в базе данных MySQL с заполненными данными. Естественно, что этот кубик необходимо выполнять только один раз, а не постоянно в многопотоке - так как он будет удалять табличку и создавать её заново. Но, пример с данного фрагмента кода можно использовать всегда, когда у нас есть необходимость добавить какие-либо данные в базу данных либо создать какую-либо табличку для работы с аккаунтами например.

Пример многопоточного парсинга в MySQL

И так, выше мы подготовили уже для себя табличку с данными в нашей базе данных MySQL и можно приступать к написанию шаблона, который будет работать с базой данных в многопоточном режиме. Так вот одной из проблем, при работе в многопоточном режиме является тот факт, что каждый поток берет одну и ту же строку с базы данных. И сложно сразу понять, как настроить это смещение.

В своих шаблонах я использую несколько "хитростей" которые позволяют мне работать в многопоточном режиме.

Одна из них рассчитана на то, что мы можем запустить например 5 потоков одновременно, и необходимо чтобы потоки выстроились в очередь. Добиться этого возможно путем добавления рандомной паузы перед выполнением запроса, которой достаточно для того, чтобы обращения к базе данных разных потоков были не одновременными.

Следующая "хитрость" рассчитана на поочередную работу с строками таблички базы данных. Суть заключается в том, что в табличке базы данных находится поле timestamp которая содержит значение времени последнего изменения данных в этой строке таблицы. Таким образом, чтобы всегда смещать самые старые записи вверх я в запросах SQL использую сортировку ORDER BY по данному полю.

Очередная хитрость заключается в том, что сразу же после того, как я получил данные с базы данных я изменяю поле статус для полученной строки например с 0 на 1 или с 1 на 2 или с 2 на 0. В момент изменения данных обновляется поле timestamp и тогда очередные потоки отсортировав табличку уже не смогут сразу получить эту же строку. При этом я делаю проверку на использование строки другим потоком - когда я обновляю статус с 0 на 1, то указываю, что при этом статус точно не должен быть равным 1 (его не подхватил еще другой поток). Когда данные в базе обновлены - значит условие выполнено - мы получаем в результате количество измененных строк в базе - и можем продолжить работу. Если же условие не выполнено, то количество измененных строк будет равным 0 - в этом случае мы понимаем, что другой поток уже работает с этой строкой и необходимо либо завершить работу либо попробовать получить строку с базы данных повторно. Вот только зацикливать повторным взятием строки все таки не рекомендуется чтобы не уйти в бесконечный цикл (что чревато зависанием Зеннопостера), на крайний случай можно сделать определенное количество попыток повторно получить данные с базы при этом между итерациями цикла добавить паузу хотя бы в 1 секунду.

Наверно сложно понять написанный выше текст - давайте рассмотрим сам код шаблона, который решает поставленную в начале статьи задачу. Возле каждого фрагмента я постараюсь описать зачем он нужен и как это работает - может тогда будет проще разобраться. Данные фрагменты кода также необходимо будет вставлять в один кубик "Свой C# код" смотря на то, что изложил я его здесь на несколько - так проще его прокомментировать.

  1. string[] db_setting = { "database:name", "server:localhost", "port:82", "user id:login", "password:password"};
    string connect_db = string.Format(@"database={0};server={1};port={2};user id={3};password={4}",db_setting[0].Split(':')[1],db_setting[1].Split(':')[1],db_setting[2].Split(':')[1],db_setting[3].Split(':')[1],db_setting[4].Split(':')[1]);
    string url_site = string.Empty;
    string id_site = string.Empty;
    string query = string.Empty;
    int count_update = 0;
            

    Сначала мы определяем переменные, которые будут использоваться в нашем коде.

    • db_setting - содержит массив данных для подключения к MySQL - заменяем на свои данные
    • connect_db - формирует с db_setting строку подключения к базе данных
    • url_site - будет содержать домен который мы получим с базы данных чтобы получить информацию о нем на сайте ВебПроверка
    • id_site - идентификатор нашего сайта будет использоваться в запросах для изменения статуса сайта в базе данных и получения домена
    • query - будет содержать сформированный запрос который будем отправлять к базе данных для получения или изменения данных
    • count_update - будет содержать результат запроса обновления статуса сайта в нашей базе данных
  2. Random rand = new Random(DateTime.Now.Millisecond);
    System.Threading.Thread.Sleep(rand.Next(rand.Next(10, 100), rand.Next(3000, 5000)));
            

    Для того, чтобы когда мы запустим шаблон в многопоточном режиме он сразу не бросился всеми потоками к одной строке необходимо сгенерировать случайную паузу. Делается это кодом, который предложен выше. После его выполнения каждый отдельный поток Зеннопостера будет задерживаться на это значение перед отправкой первого запроса к базе данных.

    Не исключаю, что есть какие-то другие способы - но моя цель показать как я пишу шаблоны для Зеннопостера и как выстраиваю логику их корректной работы. Если же Вы считаете что я заблуждаюсь и есть вариант проще и удобнее - добро пожаловать в комментарии.

  3. query = @"SELECT site.id FROM site WHERE site.`status` = 0 AND (NOW()-site.`timestamp`)/60 > 0 ORDER BY site.`timestamp` ASC LIMIT 1";
    id_site = ZennoPoster.Db.ExecuteQuery(query, null, ZennoLab.InterfacesLibrary.Enums.Db.DbProvider.MySqlClient, connect_db, " | ");
            

    В переменную query мы добавляем SQL запрос. Данный запрос берет с базы данных site.id с таблички site при условии, что для этого site.id статус будет иметь значение 0 а также от времени "Сейчас" отнимается время которое находится в поле site.`timestamp` - это получится разница времени в секундах, делим на 60 - получаем разницу в минутах. Так вот, если этот сайт использовался больше 0 минут назад (можно поставить например 1 или 2 или 60 в зависимости от переодичности парсинга новых данных), тогда мы получим site.id, при этом если результатов несколько, то мы отсортируем их по полю последнего использования site.`timestamp` от старых к новым и возьмем только одну строку (одно значение).

    Отправив этот запрос в переменную id_site мы получим либо ничего (NULL) либо идентификатор сайта, с которым мы должны продолжать работу.

  4. if(String.IsNullOrEmpty(id_site)) return null;
            

    Если в результате выполнения SQL запроса, который мы отправили выше, мы получим пустое значение или NULL, то это значит что либо нет сайтов, которые мы парсили больше заданного промежутка времени либо все сайты имеют статус отличающийся от нуля - а значит занятые другими потоками Зеннопостера. Значит, мы должны проверить это и завершить работу выходом по красной ветке. В таком случае, если например пойдет 10 красных срабатываний подряд мы сможем остановить шаблон и запустить его в работу снова через несколько минут расписанием.

  5. query = string.Format(@"UPDATE site SET site.`status`=""1"" WHERE site.id = {0} AND site.`status` <> 1 LIMIT 1",id_site);
    count_update = ZennoPoster.Db.ExecuteNonQuery(query, null, ZennoLab.InterfacesLibrary.Enums.Db.DbProvider.MySqlClient, connect_db);
            

    Если мы не вышли по красной ветке после проверки на NULL это значит что мы получили id_site. Но мы не можем быть уверенными, что другой поток уже не использует эту строку из базы данных. Для этого мы изменяем статус сайта с 0 на 1 (указав в условии, чтобы изменялась строка только для id_site и только когда статус не равен 1). После чего в переменной count_update у нас будет количество измененных строк. Если все успешно - значение 1, если не успешно - значение 0.

  6. if(count_update<1) return null;
            

    Так вот, если в переменной count_update у нас находится 0, это значит что статус id_site не изменился, что в свою очередь значит что данный id_site уже занят другим потоком Зеннопостера. В этом случае нам необходимо завершить работу шаблона выходом по красной ветке. Именно эту проверку и выход по красной ветке делает данный код.

  7. Если мы действием выше не вышли по красной ветке, это значит, что статус изменился, а значит мы можем приступить к нашим целевым действиям - например изменить статус с 1 на 2 (чтобы иметь возможность отслеживать состояния работы шаблона с каждым конкретным сайтом) и после этого уже отправить запрос на получения данных с ВебПроверки. С полученных данных мы сформируем запрос в базе данных, и обновим данные в базе для данного сайта.

    Сформировали запрос для изменения статуса на 2

    query = string.Format(@"UPDATE site SET site.`status`=""2"" WHERE site.id = {0} LIMIT 1",id_site);

    Отправили запрос - изменили статус сайта на 2

    count_update = ZennoPoster.Db.ExecuteNonQuery(query, null, ZennoLab.InterfacesLibrary.Enums.Db.DbProvider.MySqlClient, connect_db);

    Сформировали запрос на получение домена сайта

    query = string.Format(@"SELECT site.url FROM site WHERE site.id = ""{0}"" LIMIT 1",id_site);

    Отправили запрос к базе данных и получили домен сайта

    url_site = ZennoPoster.Db.ExecuteQuery(query, null, ZennoLab.InterfacesLibrary.Enums.Db.DbProvider.MySqlClient, connect_db, " | ");

    Отправили целевой гет запрос на ВебПроверку и получили данные о сайте

    string get = ZennoPoster.HttpGet(string.Format("https://webproverka.com/get-info.php?domain={0}",url_site), string.Empty, "windows-1251", ZennoLab.InterfacesLibrary.Enums.Http.ResponceType.BodyOnly, 30000, string.Empty, "PHP", false, 0, new [] {""});

    Вывели данные о сайте в лог (без этого можно обойтись)

    project.SendInfoToLog("Домен: " + get.Split('\n')[0].Split('=')[1], true);
    project.SendInfoToLog("Статус: " + get.Split('\n')[1].Split('=')[1], true);
    project.SendInfoToLog("Позитив: " + get.Split('\n')[2].Split('=')[1], true);
    project.SendInfoToLog("Негатив: " + get.Split('\n')[3].Split('=')[1], true);
    project.SendInfoToLog("Сообщения: " + get.Split('\n')[4].Split('=')[1], true);

    Сформировали запрос для обновления информации о сайте в базе данных

    query = string.Format(@"UPDATE site SET site.`status`=""0"", site.positive=""{0}"", site.negative=""{1}"", site.messages=""{2}"" WHERE site.id = ""{3}"" LIMIT 1",
    get.Split('\n')[2].Split('=')[1], 
    get.Split('\n')[3].Split('=')[1],
    get.Split('\n')[4].Split('=')[1],
    id_site);

    Отправили запрос к базе данных - освободили сайт установив статус 0

    count_update = ZennoPoster.Db.ExecuteNonQuery(query, null, ZennoLab.InterfacesLibrary.Enums.Db.DbProvider.MySqlClient, connect_db);

Если мы собрали код из статьи в один шаблон, то у нас должно получиться что-то вроде этого шаблона из двух кубиков "Свой C# код"

Пример шаблона который получился в результате

Вот собственно и вся логика шаблона для Зеннопостера, который будет работать с базой данных в многопоточном режиме. Добавляем его в свой Зеннопостер и наблюдаем в логе примерно такую картину:

Многопоточный парсер на Зеннопостере с использованием базы данных MySQL

Переписывать данный шаблон можно под любые разные задачи - начиная с парсинга чего-либо и заканчивая ботами для сбора криптовалюты на кранах. Надеюсь, что данная информация по работе Зеннопостером и базой данных MySQL была для Вас полезной - если это так, отправляйте ссылку на статью в социальные сети и оставляйте комментарии.

Добавить комментарий


Защитный код
Обновить

Комментарии   

Антон
0 # Антон 05.12.2017 02:04
Спасибо, Юра, очень помог!
мне одного этого кода хватило)
Код:Random rand = new Random(DateTime.Now.Millisecond);
System.Threading.Thread.Sleep(rand.Next(rand.Next(10, 100), rand.Next(3000, 5000)));

Подскажи, а как в timestamp сделать, и обновлять в unixtime?
Ответить | Сообщить модератору
Юрий Йосифович
0 # Юрий Йосифович 05.12.2017 12:00
Привет, Антон!
Рад, что смог помочь решить проблему.

Но, с unixtime я не работал так, как это правильно работать.
Я просто при необходимости хранил в базе данных числовое значение.
А Зеннопостером просто получал текущее время кодом:
Код:
Int64 retval=0;
var st= new DateTime(1970,1,1);
TimeSpan t = (DateTime.Now.ToUniversalTime()-st);
retval= (Int64)(t.TotalMilliseconds+0.5);
string x_time = retval.ToString();

После чего просто обновлял его в базе данных обычным запросом UPDATE table SET field="value"
Ответить | Сообщить модератору