生产者消费者模式是并发编程的核心模式之一核心是想要提高程序的运行效率。这里记录一下自己的思考使用通俗的语言和以日志记录为例解读生产者消费者模式并实现生产者消费者模式。将生产者消费者模式的核心内容划分为三个问题阻塞问题、内存积压问题、cpu空转问题。这里是第二章内存积压问题。速度不一致问题异步解耦合解决了生产者和消费者相互阻塞的问题能有效提高程序的运行效率。此时消费者和生产者的分离理想情况是两者速度一致生产者生产一份数据消费者就进行消费程序处于一个动态平衡状态。但是实际运行中生产者和消费者的速度是不一致的当生产者生产速度大于消费者消费速度时会导致数据积压当数据过多时还会导致内存溢出。日志场景在日志记录的案例中生产者和消费者的速度是明显不一致的因为日志生产在cpu和内存中而日志消费在io硬盘中cpu和内存的速度远大于io硬盘的速度。代码实现以第一章中的异步解耦合日志系统为例。void async_log() { std::cout异步解耦日志系统,不再相互阻塞std::endl; std::ofstream log_file; std::string log_pathlog2.txt; // 缓存 std::dequestd::string log_buffer; // 停止标志 std::atomicbool stop_flag(false); // 加锁避免数据竞争 std::mutex log_mutex; // cpu内存处理数据 auto log_data_func[log_buffer,stop_flag,log_mutex](){ int count1; std::chrono::high_resolution_clock::time_point start std::chrono::high_resolution_clock::now(); while(true) { // std::chrono::high_resolution_clock::time_point t1 std::chrono::high_resolution_clock::now(); //创建数据 std::string large_data(2048, x); // 4KB 数据 // std::this_thread::sleep_for(std::chrono::milliseconds(100)); //时间记录 auto time std::chrono::system_clock::now(); auto time_t std::chrono::system_clock::to_time_t(time); std::string content_timestd::ctime(time_t); // std::cout生成日志content_timestd::endl; std::string content[content_time] large_data; //写入缓存 std::unique_lockstd::mutex lock(log_mutex); log_buffer.push_back(content); lock.unlock(); // std::chrono::high_resolution_clock::time_point t2 std::chrono::high_resolution_clock::now(); // std::cout处理业务耗时std::chrono::duration_caststd::chrono::microseconds(t2-t1).count()微秒std::endlstd::endl; count; if(count10) { std::chrono::high_resolution_clock::time_point t_10 std::chrono::high_resolution_clock::now(); std::cout生成10条日志耗时std::chrono::duration_caststd::chrono::microseconds(t_10-start).count()msstd::endl; } if (stop_flag) { break; } }; std::cout生成日志结束std::endl; std::cout总共生成数据量count条std::endl; }; std::thread thread_log_data(log_data_func); // 日志写入磁盘 auto log_disk_func[log_buffer,log_file,log_path,stop_flag,log_mutex](){ //计时开始 std::chrono::high_resolution_clock::time_point start std::chrono::high_resolution_clock::now(); int count1; while(true) { //判断退出循环 if (count10) { break; } if(!log_buffer.empty()) { std::chrono::high_resolution_clock::time_point t1 std::chrono::high_resolution_clock::now(); //打开文件 log_file.open(log_path,std::ios::app); //写日志 std::unique_lockstd::mutex lock(log_mutex); auto datalog_buffer.front(); log_buffer.pop_front(); lock.unlock(); log_filedatastd::endl; //关闭文件 log_file.close(); std::chrono::high_resolution_clock::time_point t2 std::chrono::high_resolution_clock::now(); // std::cout写日志耗时std::chrono::duration_caststd::chrono::microseconds(t2-t1).count()微秒std::endl; count; } } //通知生产进程结束 stop_flag true; //计时结束 std::chrono::high_resolution_clock::time_point end std::chrono::high_resolution_clock::now(); std::coutio耗时std::chrono::duration_caststd::chrono::microseconds(end-start).count()msstd::endl; }; std::thread thread_log_disk(log_disk_func); thread_log_data.join(); thread_log_disk.join(); } int main() { // 同步阻塞问题 std::cout----------------std::endl; async_log(); return 0; }运行结果---------------- 异步解耦日志系统,不再相互阻塞 生成10条日志耗时46ms io耗时3864ms 生成日志结束 总共生成数据量600条结果分析可以看到生成10条日志耗时46ms而写日志耗时3864ms说明io硬盘的写入速度远小于cpu和内存的速度。当10条日志写入磁盘完成后内存中生成的日志数据量已经达到了600条说明内存中已经积压了大量的日志数据。解决内存积压内存积压的核心是生产者的速度大于消费者的速度这个受算法的影响一般不能直接修改但是次一级的原因是分配的共享内存没有进行限制。所以我们可以对共享内存进行限制当共享内存达到一定大小的时候生产者需要等待消费者消费完共享内存中的数据。代码实现voidasync_log_buffer_size_question(){std::cout异步解耦日志系统,io硬盘不再阻塞缓存积压std::endl;std::ofstream log_file;std::string log_pathlog2.txt;// 缓存std::dequestd::stringlog_buffer;// 资源上锁std::mutex log_mutex;// 停止标志std::atomicboolstop_flag(false);// 生成数据autolog_data_func[log_buffer,log_mutex,stop_flag](){intcount1;std::chrono::high_resolution_clock::time_point startstd::chrono::high_resolution_clock::now();while(true){// std::chrono::high_resolution_clock::time_point t1 std::chrono::high_resolution_clock::now();//创建数据std::stringlarge_data(2048,x);// 4KB 数据//时间记录autotimestd::chrono::system_clock::now();autotime_tstd::chrono::system_clock::to_time_t(time);std::string content_timestd::ctime(time_t);// std::cout生成日志content_timestd::endl;std::string content[content_time] large_data;//写入缓存std::unique_lockstd::mutexlock(log_mutex);log_buffer.push_back(content);lock.unlock();// std::chrono::high_resolution_clock::time_point t2 std::chrono::high_resolution_clock::now();// std::cout处理业务耗时std::chrono::duration_caststd::chrono::microseconds(t2-t1).count()微秒std::endlstd::endl;count;if(stop_flag){break;}if(count10){std::chrono::high_resolution_clock::time_point t_10std::chrono::high_resolution_clock::now();std::cout生成10条日志耗时std::chrono::duration_caststd::chrono::microseconds(t_10-start).count()msstd::endl;}};};std::threadthread_log_data(log_data_func);// 日志写入磁盘autolog_disk_func[log_buffer,log_file,log_path,log_mutex,stop_flag](){//计时开始std::chrono::high_resolution_clock::time_point startstd::chrono::high_resolution_clock::now();intcount1;while(true){//判断退出循环if(count10){break;}if(!log_buffer.empty()){// std::chrono::high_resolution_clock::time_point t1 std::chrono::high_resolution_clock::now();//打开文件log_file.open(log_path,std::ios::app);//写日志std::unique_lockstd::mutexlock(log_mutex);autodatalog_buffer.front();log_buffer.pop_front();lock.unlock();log_filedatastd::endl;//关闭文件log_file.close();// std::this_thread::sleep_for(std::chrono::milliseconds(100));std::chrono::high_resolution_clock::time_point t2std::chrono::high_resolution_clock::now();// std::cout写日志耗时std::chrono::duration_caststd::chrono::microseconds(t2-t1).count()微秒std::endl;count;}}//关闭生产者stop_flagtrue;//计时结束std::chrono::high_resolution_clock::time_point endstd::chrono::high_resolution_clock::now();std::coutio耗时std::chrono::duration_caststd::chrono::microseconds(end-start).count()msstd::endl;//缓存积压std::cout缓存积压log_buffer.size()std::endl;};std::threadthread_log_disk(log_disk_func);thread_log_data.join();thread_log_disk.join();}voidasync_log_buffer_size_solution(){std::cout异步解耦日志系统,io硬盘不再阻塞缓存积压std::endl;std::cout限制缓存,缓存尺寸10std::endl;std::ofstream log_file;std::string log_pathlog2.txt;// 缓存尺寸intbuffer_size10;// 缓存std::dequestd::stringlog_buffer;// 资源上锁std::mutex log_mutex;// 停止标志std::atomicboolstop_flag(false);// cpu内存处理数据autolog_data_func[log_buffer,log_mutex,buffer_size,stop_flag](){intcount1;std::chrono::high_resolution_clock::time_point startstd::chrono::high_resolution_clock::now();while(true){if(log_buffer.size()buffer_size){// std::chrono::high_resolution_clock::time_point t1 std::chrono::high_resolution_clock::now();//创建数据std::stringlarge_data(2048,x);// 4KB 数据//时间记录autotimestd::chrono::system_clock::now();autotime_tstd::chrono::system_clock::to_time_t(time);std::string content_timestd::ctime(time_t);// std::cout生成日志content_timestd::endl;std::string content[content_time] large_data;//写入缓存std::unique_lockstd::mutexlock(log_mutex);log_buffer.push_back(content);lock.unlock();// std::chrono::high_resolution_clock::time_point t2 std::chrono::high_resolution_clock::now();// std::cout处理业务耗时std::chrono::duration_caststd::chrono::microseconds(t2-t1).count()微秒std::endlstd::endl;count;}if(stop_flag){break;}if(count10){std::chrono::high_resolution_clock::time_point t_10std::chrono::high_resolution_clock::now();std::cout生成10条日志耗时std::chrono::duration_caststd::chrono::microseconds(t_10-start).count()msstd::endl;}};};std::threadthread_log_data(log_data_func);// 日志写入磁盘autolog_disk_func[log_buffer,log_file,log_path,log_mutex,stop_flag](){//计时开始std::chrono::high_resolution_clock::time_point startstd::chrono::high_resolution_clock::now();intcount1;while(true){//判断退出循环if(count10){break;}if(!log_buffer.empty()){std::chrono::high_resolution_clock::time_point t1std::chrono::high_resolution_clock::now();//打开文件log_file.open(log_path,std::ios::app);//写日志std::unique_lockstd::mutexlock(log_mutex);autodatalog_buffer.front();log_buffer.pop_front();lock.unlock();log_filedatastd::endl;//关闭文件log_file.close();// std::this_thread::sleep_for(std::chrono::milliseconds(100));std::chrono::high_resolution_clock::time_point t2std::chrono::high_resolution_clock::now();// std::cout写日志耗时std::chrono::duration_caststd::chrono::microseconds(t2-t1).count()微秒std::endl;count;}}//通知生产者stop_flagtrue;//计时结束std::chrono::high_resolution_clock::time_point endstd::chrono::high_resolution_clock::now();std::coutio耗时std::chrono::duration_caststd::chrono::microseconds(end-start).count()msstd::endl;//缓存积压std::cout缓存积压log_buffer.size()std::endl;};std::threadthread_log_disk(log_disk_func);thread_log_data.join();thread_log_disk.join();}intmain(){// 缓存积压问题std::cout----------------std::endl;async_log_buffer_size_question();std::cout----------------std::endl;async_log_buffer_size_solution();return0;}结果---------------- 异步解耦日志系统,异步解耦合缓存积压 生成10条日志耗时587ms io耗时3489ms 缓存积压514 ---------------- 异步解耦日志系统,异步解耦合缓存积压解决 限制缓存,缓存尺寸10 生成10条日志耗时46ms io耗时3122ms 缓存积压10结果分析限制共享内存和没限制内存两者记录相同数量的日志消耗时间基本一致没有限制内存会生成514条数据在缓冲区每条数据2kB,内存占用1MB左右如果生成时间拉长会造成更大的内存占用直到内存溢出。限制内存最多生成10条数据在缓冲区每条数据2kB,内存占用20kB左右不会出现内存占用堆积的问题。结论在异步解耦日志系统中通过限制共享内存的大小成功防止内存大量占用。
c++生产者消费者者模式学习笔记-2内存积压
生产者消费者模式是并发编程的核心模式之一核心是想要提高程序的运行效率。这里记录一下自己的思考使用通俗的语言和以日志记录为例解读生产者消费者模式并实现生产者消费者模式。将生产者消费者模式的核心内容划分为三个问题阻塞问题、内存积压问题、cpu空转问题。这里是第二章内存积压问题。速度不一致问题异步解耦合解决了生产者和消费者相互阻塞的问题能有效提高程序的运行效率。此时消费者和生产者的分离理想情况是两者速度一致生产者生产一份数据消费者就进行消费程序处于一个动态平衡状态。但是实际运行中生产者和消费者的速度是不一致的当生产者生产速度大于消费者消费速度时会导致数据积压当数据过多时还会导致内存溢出。日志场景在日志记录的案例中生产者和消费者的速度是明显不一致的因为日志生产在cpu和内存中而日志消费在io硬盘中cpu和内存的速度远大于io硬盘的速度。代码实现以第一章中的异步解耦合日志系统为例。void async_log() { std::cout异步解耦日志系统,不再相互阻塞std::endl; std::ofstream log_file; std::string log_pathlog2.txt; // 缓存 std::dequestd::string log_buffer; // 停止标志 std::atomicbool stop_flag(false); // 加锁避免数据竞争 std::mutex log_mutex; // cpu内存处理数据 auto log_data_func[log_buffer,stop_flag,log_mutex](){ int count1; std::chrono::high_resolution_clock::time_point start std::chrono::high_resolution_clock::now(); while(true) { // std::chrono::high_resolution_clock::time_point t1 std::chrono::high_resolution_clock::now(); //创建数据 std::string large_data(2048, x); // 4KB 数据 // std::this_thread::sleep_for(std::chrono::milliseconds(100)); //时间记录 auto time std::chrono::system_clock::now(); auto time_t std::chrono::system_clock::to_time_t(time); std::string content_timestd::ctime(time_t); // std::cout生成日志content_timestd::endl; std::string content[content_time] large_data; //写入缓存 std::unique_lockstd::mutex lock(log_mutex); log_buffer.push_back(content); lock.unlock(); // std::chrono::high_resolution_clock::time_point t2 std::chrono::high_resolution_clock::now(); // std::cout处理业务耗时std::chrono::duration_caststd::chrono::microseconds(t2-t1).count()微秒std::endlstd::endl; count; if(count10) { std::chrono::high_resolution_clock::time_point t_10 std::chrono::high_resolution_clock::now(); std::cout生成10条日志耗时std::chrono::duration_caststd::chrono::microseconds(t_10-start).count()msstd::endl; } if (stop_flag) { break; } }; std::cout生成日志结束std::endl; std::cout总共生成数据量count条std::endl; }; std::thread thread_log_data(log_data_func); // 日志写入磁盘 auto log_disk_func[log_buffer,log_file,log_path,stop_flag,log_mutex](){ //计时开始 std::chrono::high_resolution_clock::time_point start std::chrono::high_resolution_clock::now(); int count1; while(true) { //判断退出循环 if (count10) { break; } if(!log_buffer.empty()) { std::chrono::high_resolution_clock::time_point t1 std::chrono::high_resolution_clock::now(); //打开文件 log_file.open(log_path,std::ios::app); //写日志 std::unique_lockstd::mutex lock(log_mutex); auto datalog_buffer.front(); log_buffer.pop_front(); lock.unlock(); log_filedatastd::endl; //关闭文件 log_file.close(); std::chrono::high_resolution_clock::time_point t2 std::chrono::high_resolution_clock::now(); // std::cout写日志耗时std::chrono::duration_caststd::chrono::microseconds(t2-t1).count()微秒std::endl; count; } } //通知生产进程结束 stop_flag true; //计时结束 std::chrono::high_resolution_clock::time_point end std::chrono::high_resolution_clock::now(); std::coutio耗时std::chrono::duration_caststd::chrono::microseconds(end-start).count()msstd::endl; }; std::thread thread_log_disk(log_disk_func); thread_log_data.join(); thread_log_disk.join(); } int main() { // 同步阻塞问题 std::cout----------------std::endl; async_log(); return 0; }运行结果---------------- 异步解耦日志系统,不再相互阻塞 生成10条日志耗时46ms io耗时3864ms 生成日志结束 总共生成数据量600条结果分析可以看到生成10条日志耗时46ms而写日志耗时3864ms说明io硬盘的写入速度远小于cpu和内存的速度。当10条日志写入磁盘完成后内存中生成的日志数据量已经达到了600条说明内存中已经积压了大量的日志数据。解决内存积压内存积压的核心是生产者的速度大于消费者的速度这个受算法的影响一般不能直接修改但是次一级的原因是分配的共享内存没有进行限制。所以我们可以对共享内存进行限制当共享内存达到一定大小的时候生产者需要等待消费者消费完共享内存中的数据。代码实现voidasync_log_buffer_size_question(){std::cout异步解耦日志系统,io硬盘不再阻塞缓存积压std::endl;std::ofstream log_file;std::string log_pathlog2.txt;// 缓存std::dequestd::stringlog_buffer;// 资源上锁std::mutex log_mutex;// 停止标志std::atomicboolstop_flag(false);// 生成数据autolog_data_func[log_buffer,log_mutex,stop_flag](){intcount1;std::chrono::high_resolution_clock::time_point startstd::chrono::high_resolution_clock::now();while(true){// std::chrono::high_resolution_clock::time_point t1 std::chrono::high_resolution_clock::now();//创建数据std::stringlarge_data(2048,x);// 4KB 数据//时间记录autotimestd::chrono::system_clock::now();autotime_tstd::chrono::system_clock::to_time_t(time);std::string content_timestd::ctime(time_t);// std::cout生成日志content_timestd::endl;std::string content[content_time] large_data;//写入缓存std::unique_lockstd::mutexlock(log_mutex);log_buffer.push_back(content);lock.unlock();// std::chrono::high_resolution_clock::time_point t2 std::chrono::high_resolution_clock::now();// std::cout处理业务耗时std::chrono::duration_caststd::chrono::microseconds(t2-t1).count()微秒std::endlstd::endl;count;if(stop_flag){break;}if(count10){std::chrono::high_resolution_clock::time_point t_10std::chrono::high_resolution_clock::now();std::cout生成10条日志耗时std::chrono::duration_caststd::chrono::microseconds(t_10-start).count()msstd::endl;}};};std::threadthread_log_data(log_data_func);// 日志写入磁盘autolog_disk_func[log_buffer,log_file,log_path,log_mutex,stop_flag](){//计时开始std::chrono::high_resolution_clock::time_point startstd::chrono::high_resolution_clock::now();intcount1;while(true){//判断退出循环if(count10){break;}if(!log_buffer.empty()){// std::chrono::high_resolution_clock::time_point t1 std::chrono::high_resolution_clock::now();//打开文件log_file.open(log_path,std::ios::app);//写日志std::unique_lockstd::mutexlock(log_mutex);autodatalog_buffer.front();log_buffer.pop_front();lock.unlock();log_filedatastd::endl;//关闭文件log_file.close();// std::this_thread::sleep_for(std::chrono::milliseconds(100));std::chrono::high_resolution_clock::time_point t2std::chrono::high_resolution_clock::now();// std::cout写日志耗时std::chrono::duration_caststd::chrono::microseconds(t2-t1).count()微秒std::endl;count;}}//关闭生产者stop_flagtrue;//计时结束std::chrono::high_resolution_clock::time_point endstd::chrono::high_resolution_clock::now();std::coutio耗时std::chrono::duration_caststd::chrono::microseconds(end-start).count()msstd::endl;//缓存积压std::cout缓存积压log_buffer.size()std::endl;};std::threadthread_log_disk(log_disk_func);thread_log_data.join();thread_log_disk.join();}voidasync_log_buffer_size_solution(){std::cout异步解耦日志系统,io硬盘不再阻塞缓存积压std::endl;std::cout限制缓存,缓存尺寸10std::endl;std::ofstream log_file;std::string log_pathlog2.txt;// 缓存尺寸intbuffer_size10;// 缓存std::dequestd::stringlog_buffer;// 资源上锁std::mutex log_mutex;// 停止标志std::atomicboolstop_flag(false);// cpu内存处理数据autolog_data_func[log_buffer,log_mutex,buffer_size,stop_flag](){intcount1;std::chrono::high_resolution_clock::time_point startstd::chrono::high_resolution_clock::now();while(true){if(log_buffer.size()buffer_size){// std::chrono::high_resolution_clock::time_point t1 std::chrono::high_resolution_clock::now();//创建数据std::stringlarge_data(2048,x);// 4KB 数据//时间记录autotimestd::chrono::system_clock::now();autotime_tstd::chrono::system_clock::to_time_t(time);std::string content_timestd::ctime(time_t);// std::cout生成日志content_timestd::endl;std::string content[content_time] large_data;//写入缓存std::unique_lockstd::mutexlock(log_mutex);log_buffer.push_back(content);lock.unlock();// std::chrono::high_resolution_clock::time_point t2 std::chrono::high_resolution_clock::now();// std::cout处理业务耗时std::chrono::duration_caststd::chrono::microseconds(t2-t1).count()微秒std::endlstd::endl;count;}if(stop_flag){break;}if(count10){std::chrono::high_resolution_clock::time_point t_10std::chrono::high_resolution_clock::now();std::cout生成10条日志耗时std::chrono::duration_caststd::chrono::microseconds(t_10-start).count()msstd::endl;}};};std::threadthread_log_data(log_data_func);// 日志写入磁盘autolog_disk_func[log_buffer,log_file,log_path,log_mutex,stop_flag](){//计时开始std::chrono::high_resolution_clock::time_point startstd::chrono::high_resolution_clock::now();intcount1;while(true){//判断退出循环if(count10){break;}if(!log_buffer.empty()){std::chrono::high_resolution_clock::time_point t1std::chrono::high_resolution_clock::now();//打开文件log_file.open(log_path,std::ios::app);//写日志std::unique_lockstd::mutexlock(log_mutex);autodatalog_buffer.front();log_buffer.pop_front();lock.unlock();log_filedatastd::endl;//关闭文件log_file.close();// std::this_thread::sleep_for(std::chrono::milliseconds(100));std::chrono::high_resolution_clock::time_point t2std::chrono::high_resolution_clock::now();// std::cout写日志耗时std::chrono::duration_caststd::chrono::microseconds(t2-t1).count()微秒std::endl;count;}}//通知生产者stop_flagtrue;//计时结束std::chrono::high_resolution_clock::time_point endstd::chrono::high_resolution_clock::now();std::coutio耗时std::chrono::duration_caststd::chrono::microseconds(end-start).count()msstd::endl;//缓存积压std::cout缓存积压log_buffer.size()std::endl;};std::threadthread_log_disk(log_disk_func);thread_log_data.join();thread_log_disk.join();}intmain(){// 缓存积压问题std::cout----------------std::endl;async_log_buffer_size_question();std::cout----------------std::endl;async_log_buffer_size_solution();return0;}结果---------------- 异步解耦日志系统,异步解耦合缓存积压 生成10条日志耗时587ms io耗时3489ms 缓存积压514 ---------------- 异步解耦日志系统,异步解耦合缓存积压解决 限制缓存,缓存尺寸10 生成10条日志耗时46ms io耗时3122ms 缓存积压10结果分析限制共享内存和没限制内存两者记录相同数量的日志消耗时间基本一致没有限制内存会生成514条数据在缓冲区每条数据2kB,内存占用1MB左右如果生成时间拉长会造成更大的内存占用直到内存溢出。限制内存最多生成10条数据在缓冲区每条数据2kB,内存占用20kB左右不会出现内存占用堆积的问题。结论在异步解耦日志系统中通过限制共享内存的大小成功防止内存大量占用。