Существует класс задач, которые можно ускорить
за счет многопоточности. В основном эти задачи связаны с интернетом: закачка файлов, рассылка почты, сканирование сетей,
брутфорс паролей к различным сервисам и т.д. При
последовательном выполнении каких-либо операций в интернете программе приходится ждать: транспортировка пакета между
компами занимает какое-то время, и серверу требуется время на обработку. Таким образом,
канал связи простаивает. Это время можно использовать для выполнения других задач.
Тогда общая производительность возрастет. Именно на этом принципе основана работа
download manager'ов. Мы рассмотрим реализацию многопоточного приложения в Windows на C++ и шаблон
программирования, на основе которого можно создавать многопоточные приложения для
выполнения задач, перечисленных выше. Для реализации одновременной обработки нескольких соединений в Windows существуют
два способа. Первый заключается в использовании асинхронных функций. Он обходится
без создания потоков: функции WSASend, WSARecv и др. возвращают управление сразу
после вызова, запустив обработку в фоновом режиме, а после ее завершения система
вызывает функцию, называемую IO Completion Routine, которая, обработав результат,
вызывает следующее действие. Такие программы неудобно ни писать, ни читать, так как
сложно проследить последовательность выполнения действий по коду программы. Это
несовременный способ, и он не рекомендуется для Win32-приложений.
Потоки
Другой способ состоит в написании многопоточного приложения. 386-й процессор - это первый процессор, поддерживающий приоритетную
многозадачность, т. е. умеющий выполнять несколько задач без
программного переключения между ними, т. е. мы можем считать, что задачи выполняются
одновременно. На самом деле процессор переключается между задачами, выделяя
каждой из них свой квант времени, но это делается аппаратными средствами
абсолютно прозрачно для приложений. Не буду копировать сюда MSDN. Перечислю только основные функции, которые нам
понадобятся. Программа запускает потоки с помощью функции
CreateThread(). Одним из параметров этой функции является указатель на функцию, которая будет
выполняться во вновь созданном потоке. Она имеет один параметр типа
LPVOID, который можно использовать как угодно. Обычно это указатель на какую-нибудь структуру.
Если эта функция возвратит управление, то поток завершится, а DWORD-зачение,
возвращенное функцией, станет кодом выхода этого потока. Его можно получить с
помощью функуии GetExitCodeThread(). GetExitCodeThread() возвращает
константу STILL_ACTIVE, если поток еще не завершился. Внутри функции потока нужно реализовать выполнение задачи, в основном, так же, как
и в однопоточной программе, за исключение нескольких случаев, например, когда
несколько потоков работают с одними и теми же переменными.
Синхронизация
Представьте себе такую ситуацию: один поток записывает текстовую строку в массив, а
другой читает. Любая обработка строк - это цикл по каждому символу, и на любом шаге
этого цикла может произойти переключение задач. Может случиться так, что будет заменена
половина строки, а затем произойдет чтение. Понятно, что тогда прочитается мусор. Чтобы этого не случилось, нельзя допускать, чтобы несколько потоков одновременно
работали с общими данными. Для этого в Windows существуют функции синхронизации. С глобальными данными должен быть связан объект, называемый Mutex. Создается он
функцией CreateMutex(), а уничтожается CloseHandle(). На время
работы с данными поток должен завладеть мутексом. Если другой поток уже владеет им, то придется подождать.
Это делается функцией WaitForSingleObject(). Она получает хендл мутекса и таймаут. Если
не можем продолжать работу, не дождавшись возможности завладеть
мутексом, то вместо таймаута передаем константу
INFINITE. Поработав с глобальными данными, дадим и другим потокам поработать. Для этого вызовем
ReleaseMutex(). Вот и все функции системы, о которых нам надо знать. Теперь перейдем к разработке
нашего приложения.
Шаблон приложения
Мы не будем заниматься какой-либо конкретной задачей, а рассмотрим общую структуру
приложения, выполняющего некую долговыполнимую задачу несколькими потоками.
Она будет предусматривать возможность изменять количество рабочих потоков и прерывать
работу в любой момент. Найдем то, что объединяет все задачи, к которым можно применить данный подход.
- Задача разделена на подзадачи, которые можно выполнять независимо друг от друга.
- Одновременное выполнение нескольких подзадач быстрее, чем последовательное,
но чрезмерное количество потоков плохо. Существует некоторое оптимальное
количество потоков, которое каждый для себя определяет экспериментально. - В зависимости от некоторых условий могут существовать ресурсы, которые могут быть
использованы несколькими подзадачами последовательно (например, соединение с
SMTP-сервером при отправке нескольких писем с одного адреса).
Они должны быть связаны с потоком и не выделяться каждый раз, когда они нужны.
Напишем абстрактные классы, соответствующие этим требованиям. Класс элементарной задачи (закачка одной секции файла, отправка одного письма,
сканирование одного порта на одном компе, проверка одного пароля) назовем CMTTask
(MT - отслова multithreaded)
class CMTTask
{
public:
virtual int Run(CMTThreadResources*& lpThreadRes, int nThreadNum, CMTNotifTarget* pnt);
virtual int SafeInterrupt();
};
Всего два метода. Единственное, что нужно сделать с любой задачей, - это выполнить (Run), а в
современных программах есть еще возможность прерывать работу. Функция SafeInterrupt
может либо устанавливать внутренний флаг, который проверяется внутри функции
Run(), либо убивать поток, в котором выполняется Run(), либо вообще ничего не делать. В последнем
случае работа будет приостанавливаться только после выполнения текущей задачи.
Первый параметр функции Run() - это указатель на ресурсы потока. Изначально этот указатель
может быть NULL. Функция Run() должна уметь выделять необходимые ресурсы.
Остальные два параметра нужны только для пользовательского интерфейса. Они указывают, где
будет отображаться состояние задачи.
Класс ресурсов:
class CMTThreadResources
{
public:
virtual BOOL IsKindOf(int nClass);
virtual BOOL FreeResources();
};
У каждого типа ресурсов будут свои методы, поэтому нужно делать type cast, а перед этим
неплохо бы проверить, тот ли это класс, который нужен. Этого можно не делать, если Вы не
такой маньяк, чтобы совмещать в одной проге разнородные задачи. На всякий случай
напишем функцию IsKindOf(). Метод FreeResources() нужен, поскольку CMTTask::Run() не освобождает ресурсы, и функция
потока должна сама уметь это делать. Метод для этого должен быть в абстрактном классе.
Можно вместо этого написать виртуальный деструктор, но он не может возвращать данные,
что затруднит отладку.
Задачи мы будем хранить в очереди.
class CMTTaskQueue
{
public:
virtual CMTTask* GetNextTask();
};
Здесь под очередью мы будем понимать просто источник задач. Это не
подразумевает, что данные всех задач будут физически храниться. Например, если задан диапазон
IP-адресов для сканирования, то храниться будет только текущий и конечный
адрес. В случае с закачкой файлов прога должна
хранить инфу о секциях и делить не закачанные
данные на секции. Функция GetNextTask() конструирует объект задачи и возвращает указатель на него
или возвращает NULL, если больше задач нет. Если выполнение задачи прерывается, то ее нужно как-то сохранить обратно в очередь.
Включить необходимые методы в абстрактные классы не представляется возможным,
так как эти механизмы в разных программах существенно различаются.
Перейдем к классу, который управляет потоками. Это основной класс в проге на основе
нашего шаблона. Это уже не абстрактный класс, и производить от него классы не нужно.
Этот класс запускает и прерывает потоки и заботится о снхронизации. Потоки обращаются
о общим данным только посредством функции CMTThreadArray::GetNextTask().
class CMTThreadArray
{
friend DWORD WINAPI MTThreadProc(LPVOID pvThread); // Функция потока
public:
CMTThreadArray(CMTTaskQueue* pTaskQueue, CMTNotifTarget* pNotifTarget);
~CMTThreadArray();
void SetNumThreads(int n, BOOL fInterruptTask, BOOL fWait); // Запускает и
// Останавливает потоки
BOOL IsExecuting(); // Проверяет, выполняется ли еще что-нибудь
private:
CMTTask* GetNextTask(int nId); // Вызывается из функции потока
// nId - номер потока
CMTTaskQueue* m_pTaskQueue; // Очередь задач
CMTNotifTarget* m_pNotifTarget; // То место, куда выводится инфа о состоянии
HANDLE* m_pThreadHandles; // Массив хендлов потоков
CMTTask** m_ppTasks; // Массив задач
int m_nThreads; // Количество хендлов в массиве
int m_nThreadsA; // Количество работающих потоков
HANDLE m_hMutex;
};
class CMTThread // Это просто структура для передачи данных в функцию потока.
{
public:
CMTThread(CMTThreadArray* pAr, int nId);
CMTThreadArray* m_pAr;
int m_nId;
};
CMTThread::CMTThread(CMTThreadArray* pAr, int nId) // Для удобства создания объекта и
{ // заполнения его данными одной строчкой
m_pAr = pAr; // полезен конструктор
m_nId = nId;
}
DWORD WINAPI MTThreadProc(LPVOID pvThread)
{
CMTThread* pThread = (CMTThread*) pvThread;
CMTTask* pTask;
CMTThreadResources* pRsrc = NULL;
// Пока есть задания для этого потока, выполняем их, затем
// освобождаем ресурсы и выходим
while (pTask = pThread->m_pAr->GetNextTask(pThread->m_nId))
{
pTask->Run(pRsrc, pThread->m_nId, pThread->m_pAr->m_pNotifTarget);
delete pTask;
}
if (pRsrc) {pRsrc->FreeResources(); delete pRsrc;}
delete pThread;
return 0;
}
Изменение количества рабочих потоков работает так. Если требуется увеличить количество
потоков, то просто запускаем потоки и сохраняем их хендлы в массив. Если же нужно
завершить часть потоков или все потоки, то это надо сделать правильно, завершив текущую
задачу и освободив ресурсы. Если установлен флаг fInterruptTask, то вызовем
SafeInterrupt() задач, выполняемых в потоках, которые нужно прервать. Далее эти потоки должны освободить
ресурсы и завершиться. CMTThreadArray::GetNextTask() проверяет, должен ли завершиться
данный поток (номер потока передается в функцию), и возвращает
NULL в этом случае.
void CMTThreadArray::SetNumThreads(int n, BOOL fInterruptTask, BOOL fWait)
{
int i;
DWORD dwTmpThreadId;
WaitForSingleObject(m_hMutex, INFINITE);
if (n > m_nThreads) // Изменить размер массива потоков
{
m_pThreadHandles = (HANDLE*)realloc(m_pThreadHandles, n*sizeof(HANDLE));
m_ppTasks = (CMTTask**)realloc(m_ppTasks, n*sizeof(CMTTask*));
for (int i = m_nThreads; i < n; i++) {m_pThreadHandles[i] = NULL; m_ppTasks[i] = NULL;}
m_nThreads = n;
}
int nThreadsToRun = n - m_nThreadsA;
if (nThreadsToRun > 0) // Увеличить количество потоков
{
m_pNotifTarget->OnThreadCountChange(n);
for (i = m_nThreadsA; i < n; i++)
{
if (m_pThreadHandles[i]) // Если поток с данным номером уже работает (еще не
{ // завершился после предыдущего
SetNumThreads()),
DWORD dwEC; // то ничего запускать не нужно
if (GetExitCodeThread(m_pThreadHandles[i], &dwEC))
{
if (dwEC == STILL_ACTIVE) continue;
}
}
m_pThreadHandles[i] = CreateThread(NULL, 0, MTThreadProc, new CMTThread(this, i), 0, &dwTmpThreadId);
}
}
if (fInterruptTask && (nThreadsToRun < 0)) // Прервать потоки потоков
{
for (i = n; i < m_nThreadsA; i++)
{
if (m_ppTasks[i]) m_ppTasks[i]->SafeInterrupt();
}
}
m_nThreadsA = n;
ReleaseMutex(m_hMutex);
if (fWait && (nThreadsToRun < 0))
WaitForMultipleObjects(-nThreadsToRun, m_pThreadHandles+n, TRUE, INFINITE);
if (nThreadsToRun < 0) m_pNotifTarget->OnThreadCountChange(n);
}
BOOL CMTThreadArray::IsExecuting()
{
// Ждать завершения всех потоков
// Timeout = 0; не ждет, а возвращает сразу. По возвращаемому значению определяем,
// работают ли потоки
return WaitForMultipleObjects(m_nThreads, m_pThreadHandles, TRUE, 0) == WAIT_TIMEOUT;
}
CMTTask* CMTThreadArray::GetNextTask(int nId)
{
CMTTask* p;
WaitForSingleObject(m_hMutex, INFINITE);
if (nId < m_nThreadsA)
p = m_pTaskQueue->GetNextTask(); else
p = NULL;
m_ppTasks[nId] = p;
ReleaseMutex(m_hMutex);
return p;
}
В проге есть еще классы, работающие с GUI, но их мы не будем рассматривать. Смотрите исходник.
В качестве примера я избрал самую простую задачу - сканирование портов.
Прога сканирует диапазон IP-адресов, проверяя заданные порты. Может быть использована,
например, для поиска протрояненного компа в сети определенного провайдера.
Здесь CScanTask::Run() проверяет один порт на одном IP, CScanTask::SafeInterrupt() ничего не делает,
CScanTaskQueue реализует счетчик адресов и счетчик портов. Вот, собственно, и все.