209 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			C++
		
	
	
	
			
		
		
	
	
			209 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			C++
		
	
	
	
| /*
 | |
|  * Copyright(C) 2020. Huawei Technologies Co.,Ltd. All rights reserved.
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  * http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  */
 | |
| #ifndef BLOCKING_QUEUE_H
 | |
| #define BLOCKING_QUEUE_H
 | |
| 
 | |
| #include "ErrorCode.h"
 | |
| #include <condition_variable>
 | |
| #include <list>
 | |
| #include <mutex>
 | |
| #include <stdint.h>
 | |
| 
 | |
| static const int DEFAULT_MAX_QUEUE_SIZE = 256;
 | |
| 
 | |
| template<typename T> class BlockingQueue {
 | |
| public:
 | |
|     BlockingQueue(uint32_t maxSize = DEFAULT_MAX_QUEUE_SIZE) : max_size_(maxSize), is_stoped_(false) {}
 | |
| 
 | |
|     ~BlockingQueue() {}
 | |
| 
 | |
|     APP_ERROR Pop(T &item)
 | |
|     {
 | |
|         std::unique_lock<std::mutex> lock(mutex_);
 | |
| 
 | |
|         while (queue_.empty() && !is_stoped_) {
 | |
|             empty_cond_.wait(lock);
 | |
|         }
 | |
| 
 | |
|         if (is_stoped_) {
 | |
|             return APP_ERR_QUEUE_STOPED;
 | |
|         }
 | |
| 
 | |
|         if (queue_.empty()) {
 | |
|             return APP_ERR_QUEUE_EMPTY;
 | |
|         } else {
 | |
|             item = queue_.front();
 | |
|             queue_.pop_front();
 | |
|         }
 | |
| 
 | |
|         full_cond_.notify_one();
 | |
| 
 | |
|         return APP_ERR_OK;
 | |
|     }
 | |
| 
 | |
|     APP_ERROR Pop(T& item, unsigned int timeOutMs)
 | |
|     {
 | |
|         std::unique_lock<std::mutex> lock(mutex_);
 | |
|         auto realTime = std::chrono::milliseconds(timeOutMs);
 | |
| 
 | |
|         while (queue_.empty() && !is_stoped_) {
 | |
|             empty_cond_.wait_for(lock, realTime);
 | |
|         }
 | |
|         
 | |
|         if (is_stoped_) {
 | |
|             return APP_ERR_QUEUE_STOPED;
 | |
|         }
 | |
| 
 | |
|         if (queue_.empty()) {
 | |
|             return APP_ERR_QUEUE_EMPTY;
 | |
|         } else {
 | |
|             item = queue_.front();
 | |
|             queue_.pop_front();
 | |
|         }
 | |
| 
 | |
|         full_cond_.notify_one();
 | |
| 
 | |
|         return APP_ERR_OK;
 | |
|     }
 | |
| 
 | |
|     APP_ERROR Push(const T& item, bool isWait = false)
 | |
|     {
 | |
|         std::unique_lock<std::mutex> lock(mutex_);
 | |
| 
 | |
|         while (queue_.size() >= max_size_ && isWait && !is_stoped_) {
 | |
|             full_cond_.wait(lock);
 | |
|         }
 | |
| 
 | |
|         if (is_stoped_) {
 | |
|             return APP_ERR_QUEUE_STOPED;
 | |
|         }
 | |
| 
 | |
|         if (queue_.size() >= max_size_) {
 | |
|             return APP_ERROR_QUEUE_FULL;
 | |
|         }
 | |
|         queue_.push_back(item);
 | |
| 
 | |
|         empty_cond_.notify_one();
 | |
| 
 | |
|         return APP_ERR_OK;
 | |
|     }
 | |
| 
 | |
|     APP_ERROR Push_Front(const T &item, bool isWait = false)
 | |
|     {
 | |
|         std::unique_lock<std::mutex> lock(mutex_);
 | |
| 
 | |
|         while (queue_.size() >= max_size_ && isWait && !is_stoped_) {
 | |
|             full_cond_.wait(lock);
 | |
|         }
 | |
| 
 | |
|         if (is_stoped_) {
 | |
|             return APP_ERR_QUEUE_STOPED;
 | |
|         }
 | |
| 
 | |
|         if (queue_.size() >= max_size_) {
 | |
|             return APP_ERROR_QUEUE_FULL;
 | |
|         }
 | |
| 
 | |
|         queue_.push_front(item);
 | |
| 
 | |
|         empty_cond_.notify_one();
 | |
| 
 | |
|         return APP_ERR_OK;
 | |
|     }
 | |
| 
 | |
|     void Stop()
 | |
|     {
 | |
|         {
 | |
|             std::unique_lock<std::mutex> lock(mutex_);
 | |
|             is_stoped_ = true;
 | |
|         }
 | |
| 
 | |
|         full_cond_.notify_all();
 | |
|         empty_cond_.notify_all();
 | |
|     }
 | |
| 
 | |
|     void Restart()
 | |
|     {
 | |
|         {
 | |
|             std::unique_lock<std::mutex> lock(mutex_);
 | |
|             is_stoped_ = false;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     // if the queue is stoped ,need call this function to release the unprocessed items
 | |
|     std::list<T> GetRemainItems()
 | |
|     {
 | |
|         std::unique_lock<std::mutex> lock(mutex_);
 | |
| 
 | |
|         if (!is_stoped_) {
 | |
|             return std::list<T>();
 | |
|         }
 | |
| 
 | |
|         return queue_;
 | |
|     }
 | |
| 
 | |
|     APP_ERROR GetBackItem(T &item)
 | |
|     {
 | |
|         if (is_stoped_) {
 | |
|             return APP_ERR_QUEUE_STOPED;
 | |
|         }
 | |
| 
 | |
|         if (queue_.empty()) {
 | |
|             return APP_ERR_QUEUE_EMPTY;
 | |
|         }
 | |
| 
 | |
|         item = queue_.back();
 | |
|         return APP_ERR_OK;
 | |
|     }
 | |
| 
 | |
|     std::mutex *GetLock()
 | |
|     {
 | |
|         return &mutex_;
 | |
|     }
 | |
| 
 | |
|     APP_ERROR IsFull()
 | |
|     {
 | |
|         std::unique_lock<std::mutex> lock(mutex_);
 | |
|         return queue_.size() >= max_size_;
 | |
|     }
 | |
| 
 | |
|     int GetSize()
 | |
|     {
 | |
|         return queue_.size();
 | |
|     }
 | |
| 
 | |
|     APP_ERROR IsEmpty()
 | |
|     {
 | |
|         return queue_.empty();
 | |
|     }
 | |
| 
 | |
|     void Clear()
 | |
|     {
 | |
|         std::unique_lock<std::mutex> lock(mutex_);
 | |
|         queue_.clear();
 | |
|     }
 | |
| 
 | |
| private:
 | |
|     std::list<T> queue_;
 | |
|     std::mutex mutex_;
 | |
|     std::condition_variable empty_cond_;
 | |
|     std::condition_variable full_cond_;
 | |
|     uint32_t max_size_;
 | |
| 
 | |
|     bool is_stoped_;
 | |
| };
 | |
| #endif // __INC_BLOCKING_QUEUE_H__
 |