進程池(process pool)是一種管理進程的技術,通過預先創建一定數量的進程來減少頻繁創建和銷毀進程的開銷。這種方法特別適用于需要并發執行大量任務的場景,尤其是在處理cpu密集型任務時。
上圖展示了進程池模型,其中父進程(master進程)通過創建多個子進程(worker或slaver進程)并通過管道連接,向各個子進程派發任務。
進程池的主要作用包括:
- 提高性能:通過預先創建進程,減少了頻繁創建和銷毀進程的開銷,提升整體執行效率。
- 減少系統資源消耗:避免了頻繁創建和銷毀進程導致的資源浪費,進程池通過復用已有進程,節省了資源。
- 提升任務響應速度:由于進程池中的進程是預先創建的,可以快速分配空閑進程處理任務。
- 更好的資源管理:進程池可以限制系統中的最大并發進程數,保護系統資源,避免過載。
- 并行處理:對于CPU密集型任務,進程池通過并行化處理多個任務,顯著提升處理效率。
進程池適用于大規模并發任務的處理,如Web爬蟲、數據處理和大規模計算等場景。
代碼模擬進程池管道信息
首先,我們通過控制創建子進程的數量來實現進程池。可以通過main函數的參數控制子進程的創建個數。
enum{ OK = 0, UsageError, PipeError, ForkError, }; void Usage(string proc){ cout << "Usage: " << proc << " <number_of_processes>" << endl; cout << "Example: " << proc << " 4" << endl; exit(UsageError); } // main函數的第一個參數是數組元素個數,第二個元素是創建子進程個數 int main(int argc, char *argv[]){ if(argc != 2){ Usage(argv[0]); return UsageError; } int num = atoi(argv[1]); // 接下來創建管道和子進程 }
我們封裝一個類來管理進程池:
using work_t = function<void>; class ProcessPool{ public: ProcessPool(int n, work_t w) : num(n), work(w) {} ~ProcessPool() {} int InitProcesspool(); void DisPatchTasks(); void CleanProcessPool(); private: vector<channel> channels; // 管道 int num; // 進程的個數 work_t work; // 工作類型(void()) };
Channel類用于管理管道:
class Channel{ public: Channel(int wfd, pid_t who) : _wfd(wfd), _who(who) { _name = "Channel-" + to_string(wfd) + '-' + to_string(who); } string Name() { return _name; } void Send(int cmd) { write(_wfd, &cmd, sizeof(cmd)); } void Close() { close(_wfd); } pid_t Id() { return _who; } ~Channel() {} private: int _wfd; // master的寫端 string _name; // 對應worker的名字 pid_t _who; // 記錄哪個子進程的管道 };
任務類模擬三種任務:
void Download() { cout << "Downloading..." << endl; } void Log() { cout << "Logging..." << endl; } void Sql() { cout << "Executing SQL..." << endl; } using task_t = function<void>; class TaskManager{ public: TaskManager(){ srand(time(nullptr)); InsertTask(Download); InsertTask(Log); InsertTask(Sql); } ~TaskManager() {} void InsertTask(task_t t) { tasks[number++] = t; } int SelectTask() { return rand() % number; } void Excute(int number) { if(tasks.find(number) == tasks.end()) return; tasks[number](); } private: unordered_map<int, task_t> tasks; }; static int number = 0; TaskManager tmp; void Usage(string proc){ cout << "Usage: " << proc << " <number_of_processes>" << endl; cout << "Example: " << proc << " 4" << endl; exit(UsageError); }
接下來實現InitProcesspool()、DisPatchTasks()和CleanProcessPool()函數:
int InitProcesspool(){ for (int i = 0; i < num; i++) { int pipefd[2]; if (pipe(pipefd) < 0) { perror("pipe"); return PipeError; } pid_t pid = fork(); if (pid < 0) { perror("fork"); return ForkError; } else if (pid == 0) { close(pipefd[1]); dup2(pipefd[0], 0); close(pipefd[0]); work(); exit(OK); } else { close(pipefd[0]); channels.push_back(Channel(pipefd[1], pid)); } } return OK; } void DisPatchTasks(){ int who = 0; int num = 20; while (num--) { int task = tmp.SelectTask(); Channel &curr = channels[who]; who++; who %= channels.size(); cout << "Dispatch task " << task << " to " << curr.Name() << endl; curr.Send(task); } } void CleanProcessPool() { for (auto &c : channels) { c.Close(); } for (auto &e : channels) { pid_t rid = waitpid(e.Id(), nullptr, 0); if (rid > 0) { cout << "子進程 " << rid << " 已回收" << endl; } } }
#include "ProcessPool.hpp" #include "Task.hpp" int main(int argc, char *argv[]){ if(argc != 2) { Usage(argv[0]); return UsageError; } int num = atoi(argv[1]); ProcessPool *pp = new ProcessPool(num, Worker); pp->InitProcesspool(); pp->DisPatchTasks(); pp->CleanProcessPool(); delete pp; return 0; }
相關頭文件和Makefile:
// Channel.hpp #ifndef __CHANNEL_HPP__ #define __CHANNEL_HPP__ #include <iostream> #include <string> #include <unistd.h> using namespace std; // ... (Channel類定義) #endif // ProcessPool.hpp #include <string> #include <unistd.h> #include <cstdlib> #include <vector> #include <sys> #include <functional> #include <sys> #include "Channel.hpp" #include "Task.hpp" // ... (ProcessPool類定義) // Task.hpp #pragma once #include <iostream> #include <unordered_map> #include <functional> #include <ctime> using namespace std; // ... (TaskManager類定義) // Makefile BIN=processpool cc=g++ FLAGS=-c -Wall -std=c++11 LDFLAGS=-o SRC=$(shell ls *.cc) OBJ=$(SRC:.cc=.o) $(BIN):$(OBJ) $(cc) $(LDFLAGS) $@ $^ %.o:%.cc $(cc) $(FLAGS) $< clean: rm -f $(OBJ) $(BIN)
總結:
本文詳細介紹了進程池的概念及其在實際應用中的作用。通過代碼模擬,我們展示了如何初始化進程池、分發任務、執行任務邏輯以及清理進程池。文章還涵蓋了相關的封裝類和文件結構,如main.cc、Channel.hpp、ProcessPool.hpp、Task.hpp和Makefile,這些內容為理解和實現進程池提供了全面的指導。
進程池是一種有效的資源管理技術,能夠提高多任務處理的效率和系統性能。通過合理的設計和實現,進程池可以在復雜的系統中發揮重要作用,減少資源浪費并提升任務執行的穩定性。希望本文的內容能為讀者在實際項目中應用進程池提供有價值的參考。