神经网络的Python实时图像分类问题
caffe
deep-learning
python
5
0

我正在尝试使用caffe和python进行实时图像分类。我正在使用OpenCV在一个过程中从我的网络摄像头流式传输,并且在一个单独的过程中,使用caffe对从网络摄像头拉出的帧执行图像分类。然后,我将分类结果传回主线程,以对网络摄像头流进行字幕。

问题是,即使我拥有NVIDIA GPU并在GPU上执行caffe预测,主线程也会变慢。通常,在不做任何预测的情况下,我的网络摄像头流以30 fps的速度运行;但是,根据预测,我的网络摄像头流的最佳速度为15 fps。

我已经验证了caffe在执行预测时确实使用了GPU,并且我的GPU或GPU内存没有用尽。我还验证了我的CPU内核在程序执行过程中的任何时候都没有耗尽。我想知道我是在做错什么,还是没有办法将这两个过程真正分开。任何建议表示赞赏。这是我的代码供参考

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        #other initialization stuff

    def run(self):
        caffe.set_mode_gpu()
        caffe.set_device(0)
        #Load caffe net -- code omitted 
        while True:
            image = self.task_queue.get()
            #crop image -- code omitted
            text = net.predict(image)
            self.result_queue.put(text)

        return

import cv2
import caffe
import multiprocessing
import Queue 

tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
consumer = Consumer(tasks,results)
consumer.start()

#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
    rval, frame = vc.read()
else:
    rval = False
frame_copy[:] = frame
task_empty = True
while rval:
    if task_empty:
       tasks.put(frame_copy)
       task_empty = False
    if not results.empty():
       text = results.get()
       #Add text to frame
       cv2.putText(frame,text)
       task_empty = True

    #Showing the frame with all the applied modifications
    cv2.imshow("preview", frame)

    #Getting next frame from camera
    rval, frame = vc.read()
    frame_copy[:] = frame
    #Getting keyboard input 
    key = cv2.waitKey(1)
    #exit on ESC
    if key == 27:
        break

我很确定这是caffe预测,它会使所有内容变慢,因为当我注释掉该预测并在过程之间来回传递虚拟文本时,我又获得了30 fps。

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        #other initialization stuff

    def run(self):
        caffe.set_mode_gpu()
        caffe.set_device(0)
        #Load caffe net -- code omitted
        while True:
            image = self.task_queue.get()
            #crop image -- code omitted
            #text = net.predict(image)
            text = "dummy text"
            self.result_queue.put(text)

        return

import cv2
import caffe
import multiprocessing
import Queue 

tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
consumer = Consumer(tasks,results)
consumer.start()

#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
    rval, frame = vc.read()
else:
    rval = False
frame_copy[:] = frame
task_empty = True
while rval:
    if task_empty:
       tasks.put(frame_copy)
       task_empty = False
    if not results.empty():
       text = results.get()
       #Add text to frame
       cv2.putText(frame,text)
       task_empty = True

    #Showing the frame with all the applied modifications
    cv2.imshow("preview", frame)

    #Getting next frame from camera
    rval, frame = vc.read()
    frame_copy[:] = frame
    #Getting keyboard input 
    key = cv2.waitKey(1)
    #exit on ESC
    if key == 27:
        break
参考资料:
Stack Overflow
收藏
评论
共 2 个回答
高赞 时间 活跃

一些解释和一些思考:

  1. 我在使用Intel Core i5-6300HQ @2.3GHz cpu, 8 GB RAMNVIDIA GeForce GTX 960M gpu(2GB内存)的笔记本电脑上在下面运行我的代码,结果是:

    无论我是否在运行caffe的情况下运行代码(通过注释掉net_output = this->net_->Forward(net_input)以及void Consumer::entry()一些必要内容),我总是可以在大约30 fps的速度下运行主线程。

    在具有Intel Core i5-4440 cpu, 8 GB RAMNVIDIA GeForce GT 630 gpu(1GB内存)的PC上获得了类似的结果。

  2. 我在同一台笔记本电脑上在问题中运行了@ user3543300的代码,结果是:

    无论caffe是否正在运行(在gpu上),我都可以达到30 fps。

  3. 根据@ user3543300的反馈,使用上述2个版本的代码,运行caffe(在Nvidia GeForce 940MX GPU and Intel® Core™ i7-6500U CPU @ 2.50GHz × 4上运行caffe时,@ user3543300只能获得约15 fps)。笔记本电脑)。当caffe作为独立程序在gpu上运行时,网络摄像头的帧频也会降低。

因此,我仍然认为问题很可能出在硬件I / O限制上,例如DMA带宽(有关DMA的该线程可能暗示)或RAM带宽。希望@ user3543300可以检查或发现我尚未意识到的真正问题。

如果问题确实是我上面所想的,那么明智的想法是减少CNN网络引入的内存I / O开销。实际上,为了解决硬件资源有限的嵌入式系统上的类似问题,已经对此问题进行了一些研究,例如Qautization 结构稀疏的深度神经网络SqueezeNetDeep-Compression 。因此,希望它也可以通过应用这些技能来帮助提高问题中网络摄像头的帧率。


原始答案:

试试这个c ++解决方案。它使用线程来执行任务中的I / O开销 ,我使用bvlc_alexnet.caffemodeldeploy.prototxt进行了测试以进行图像分类,并且在caffe运行时(在GPU上)没有看到主线程(网络摄像头流)的明显变慢):

#include <stdio.h>
#include <iostream>
#include <string>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include "caffe/caffe.hpp"
#include "caffe/util/blocking_queue.hpp"
#include "caffe/data_transformer.hpp"
#include "opencv2/opencv.hpp"

using namespace cv;

//Queue pair for sharing image/results between webcam and caffe threads
template<typename T>
class QueuePair {
  public:
    explicit QueuePair(int size);
    ~QueuePair();

    caffe::BlockingQueue<T*> free_;
    caffe::BlockingQueue<T*> full_;

  DISABLE_COPY_AND_ASSIGN(QueuePair);
};
template<typename T>
QueuePair<T>::QueuePair(int size) {
  // Initialize the free queue
  for (int i = 0; i < size; ++i) {
    free_.push(new T);
  }
}
template<typename T>
QueuePair<T>::~QueuePair(){
  T *data;
  while (free_.try_pop(&data)){
    delete data;
  }
  while (full_.try_pop(&data)){
    delete data;
  }
}
template class QueuePair<Mat>;
template class QueuePair<std::string>;

//Do image classification(caffe predict) using a subthread
class Consumer{
  public:
    Consumer(boost::shared_ptr<QueuePair<Mat>> task
           , boost::shared_ptr<QueuePair<std::string>> result);
    ~Consumer();
    void Run();
    void Stop();
    void entry(boost::shared_ptr<QueuePair<Mat>> task
             , boost::shared_ptr<QueuePair<std::string>> result);

  private:
    bool must_stop();

    boost::shared_ptr<QueuePair<Mat> > task_q_;
    boost::shared_ptr<QueuePair<std::string> > result_q_;

    //caffe::Blob<float> *net_input_blob_;
    boost::shared_ptr<caffe::DataTransformer<float> > data_transformer_;
    boost::shared_ptr<caffe::Net<float> > net_;
    std::vector<std::string> synset_words_;
    boost::shared_ptr<boost::thread> thread_;
};
Consumer::Consumer(boost::shared_ptr<QueuePair<Mat>> task
                 , boost::shared_ptr<QueuePair<std::string>> result) :
 task_q_(task), result_q_(result), thread_(){

  //for data preprocess
  caffe::TransformationParameter trans_para;
  //set mean
  trans_para.set_mean_file("/path/to/imagenet_mean.binaryproto");
  //set crop size, here is cropping 227x227 from 256x256
  trans_para.set_crop_size(227);
  //instantiate a DataTransformer using trans_para for image preprocess
  data_transformer_.reset(new caffe::DataTransformer<float>(trans_para
                        , caffe::TEST));

  //initialize a caffe net
  net_.reset(new caffe::Net<float>(std::string("/path/to/deploy.prototxt")
           , caffe::TEST));
  //net parameter
  net_->CopyTrainedLayersFrom(std::string("/path/to/bvlc_alexnet.caffemodel"));

  std::fstream synset_word("path/to/caffe/data/ilsvrc12/synset_words.txt");
  std::string line;
  if (!synset_word.good()){
    std::cerr << "synset words open failed!" << std::endl;
  }
  while (std::getline(synset_word, line)){
    synset_words_.push_back(line.substr(line.find_first_of(' '), line.length()));
  }
  //a container for net input, holds data converted from cv::Mat
  //net_input_blob_ = new caffe::Blob<float>(1, 3, 227, 227);
}
Consumer::~Consumer(){
  Stop();
  //delete net_input_blob_;
}
void Consumer::entry(boost::shared_ptr<QueuePair<Mat>> task
    , boost::shared_ptr<QueuePair<std::string>> result){

  caffe::Caffe::set_mode(caffe::Caffe::GPU);
  caffe::Caffe::SetDevice(0);

  cv::Mat *frame;
  cv::Mat resized_image(256, 256, CV_8UC3);
  cv::Size re_size(resized_image.cols, resized_image.rows);

  //for caffe input and output
  const std::vector<caffe::Blob<float> *> net_input = this->net_->input_blobs();
  std::vector<caffe::Blob<float> *> net_output;

  //net_input.push_back(net_input_blob_);
  std::string *res;

  int pre_num = 1;
  while (!must_stop()){
    std::stringstream result_strm;
    frame = task->full_.pop();
    cv::resize(*frame, resized_image, re_size, 0, 0, CV_INTER_LINEAR);
    this->data_transformer_->Transform(resized_image, *net_input[0]);
    net_output = this->net_->Forward();
    task->free_.push(frame);

    res = result->free_.pop();
    //Process results here
    for (int i = 0; i < pre_num; ++i){
      result_strm << synset_words_[net_output[0]->cpu_data()[i]] << " " 
                  << net_output[0]->cpu_data()[i + pre_num] << "\n";
    }
    *res = result_strm.str();
    result->full_.push(res);
  }
}

void Consumer::Run(){
  if (!thread_){
    try{
      thread_.reset(new boost::thread(&Consumer::entry, this, task_q_, result_q_));
    }
    catch (std::exception& e) {
      std::cerr << "Thread exception: " << e.what() << std::endl;
    }
  }
  else
    std::cout << "Consumer thread may have been running!" << std::endl;
};
void Consumer::Stop(){
  if (thread_ && thread_->joinable()){
    thread_->interrupt();
    try {
      thread_->join();
    }
    catch (boost::thread_interrupted&) {
    }
    catch (std::exception& e) {
      std::cerr << "Thread exception: " << e.what() << std::endl;
    }
  }
}
bool Consumer::must_stop(){
  return thread_ && thread_->interruption_requested();
}


int main(void)
{
  int max_queue_size = 1000;
  boost::shared_ptr<QueuePair<Mat>> tasks(new QueuePair<Mat>(max_queue_size));
  boost::shared_ptr<QueuePair<std::string>> results(new QueuePair<std::string>(max_queue_size));

  char str[100], info_str[100] = " results: ";
  VideoCapture vc(0);
  if (!vc.isOpened())
    return -1;

  Consumer consumer(tasks, results);
  consumer.Run();

  Mat frame, *frame_copy;
  namedWindow("preview");
  double t, fps;

  while (true){
    t = (double)getTickCount();
    vc.read(frame);

    if (waitKey(1) >= 0){
      consuer.Stop();
      break;
    }

    if (tasks->free_.try_peek(&frame_copy)){
      frame_copy = tasks->free_.pop();
      *frame_copy = frame.clone();
      tasks->full_.push(frame_copy);
    }
    std::string *res;
    std::string frame_info("");
    if (results->full_.try_peek(&res)){
      res = results->full_.pop();
      frame_info = frame_info + info_str;
      frame_info = frame_info + *res;
      results->free_.push(res);
    }    

    t = ((double)getTickCount() - t) / getTickFrequency();
    fps = 1.0 / t;

    sprintf(str, " fps: %.2f", fps);
    frame_info = frame_info + str;

    putText(frame, frame_info, Point(5, 20)
         , FONT_HERSHEY_SIMPLEX, 0.5, Scalar(0, 255, 0));
    imshow("preview", frame);
  }
}

src / caffe / util / blocking_queue.cpp中 ,在下面进行一些更改并重建caffe:

...//Other stuff
template class BlockingQueue<Batch<float>*>;
template class BlockingQueue<Batch<double>*>;
template class BlockingQueue<Datum*>;
template class BlockingQueue<shared_ptr<DataReader::QueuePair> >;
template class BlockingQueue<P2PSync<float>*>;
template class BlockingQueue<P2PSync<double>*>;
//add these 2 lines below
template class BlockingQueue<cv::Mat*>;
template class BlockingQueue<std::string*>;
收藏
评论

似乎caffe的python包装器阻止了Global Interpreter Lock(GIL) 。因此,调用任何caffe python命令都会阻塞所有 python线程。

一种解决方法(风险自担)是为特定的caffe功能禁用GIL。例如,如果您希望能够不带锁地forward运行,则可以编辑$CAFFE_ROOT/python/caffe/_caffe.cpp 。添加此功能:

void Net_Forward(Net<Dtype>& net, int start, int end) {
  Py_BEGIN_ALLOW_THREADS;   // <-- disable GIL
  net.ForwardFromTo(start, end);
  Py_END_ALLOW_THREADS;     // <-- restore GIL
}

并将.def("_forward", &Net<Dtype>::ForwardFromTo)替换为:

.def("_forward", &Net_Forward)

更改后不要忘记make pycaffe

请参阅了解更多详情。

收藏
评论
新手导航
  • 社区规范
  • 提出问题
  • 进行投票
  • 个人资料
  • 优化问题
  • 回答问题

关于我们

常见问题

内容许可

联系我们

@2020 AskGo
京ICP备20001863号