209 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			C
		
	
	
	
		
		
			
		
	
	
			209 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			C
		
	
	
	
|  | /*
 | ||
|  |  * Copyright(C) 2020. Huawei Technologies Co.,Ltd. All rights reserved. | ||
|  |  * | ||
|  |  * Licensed under the Apache License, Version 2.0 (the "License"); | ||
|  |  * you may not use this file except in compliance with the License. | ||
|  |  * You may obtain a copy of the License at | ||
|  |  * | ||
|  |  * http://www.apache.org/licenses/LICENSE-2.0
 | ||
|  |  * | ||
|  |  * Unless required by applicable law or agreed to in writing, software | ||
|  |  * distributed under the License is distributed on an "AS IS" BASIS, | ||
|  |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
|  |  * See the License for the specific language governing permissions and | ||
|  |  * limitations under the License. | ||
|  |  */ | ||
|  | #ifndef BLOCKING_QUEUE_H
 | ||
|  | #define BLOCKING_QUEUE_H
 | ||
|  | 
 | ||
|  | #include "ErrorCode.h"
 | ||
|  | #include <condition_variable>
 | ||
|  | #include <list>
 | ||
|  | #include <mutex>
 | ||
|  | #include <stdint.h>
 | ||
|  | 
 | ||
|  | static const int DEFAULT_MAX_QUEUE_SIZE = 256; | ||
|  | 
 | ||
|  | template<typename T> class BlockingQueue { | ||
|  | public: | ||
|  |     BlockingQueue(uint32_t maxSize = DEFAULT_MAX_QUEUE_SIZE) : max_size_(maxSize), is_stoped_(false) {} | ||
|  | 
 | ||
|  |     ~BlockingQueue() {} | ||
|  | 
 | ||
|  |     APP_ERROR Pop(T &item) | ||
|  |     { | ||
|  |         std::unique_lock<std::mutex> lock(mutex_); | ||
|  | 
 | ||
|  |         while (queue_.empty() && !is_stoped_) { | ||
|  |             empty_cond_.wait(lock); | ||
|  |         } | ||
|  | 
 | ||
|  |         if (is_stoped_) { | ||
|  |             return APP_ERR_QUEUE_STOPED; | ||
|  |         } | ||
|  | 
 | ||
|  |         if (queue_.empty()) { | ||
|  |             return APP_ERR_QUEUE_EMPTY; | ||
|  |         } else { | ||
|  |             item = queue_.front(); | ||
|  |             queue_.pop_front(); | ||
|  |         } | ||
|  | 
 | ||
|  |         full_cond_.notify_one(); | ||
|  | 
 | ||
|  |         return APP_ERR_OK; | ||
|  |     } | ||
|  | 
 | ||
|  |     APP_ERROR Pop(T& item, unsigned int timeOutMs) | ||
|  |     { | ||
|  |         std::unique_lock<std::mutex> lock(mutex_); | ||
|  |         auto realTime = std::chrono::milliseconds(timeOutMs); | ||
|  | 
 | ||
|  |         while (queue_.empty() && !is_stoped_) { | ||
|  |             empty_cond_.wait_for(lock, realTime); | ||
|  |         } | ||
|  |          | ||
|  |         if (is_stoped_) { | ||
|  |             return APP_ERR_QUEUE_STOPED; | ||
|  |         } | ||
|  | 
 | ||
|  |         if (queue_.empty()) { | ||
|  |             return APP_ERR_QUEUE_EMPTY; | ||
|  |         } else { | ||
|  |             item = queue_.front(); | ||
|  |             queue_.pop_front(); | ||
|  |         } | ||
|  | 
 | ||
|  |         full_cond_.notify_one(); | ||
|  | 
 | ||
|  |         return APP_ERR_OK; | ||
|  |     } | ||
|  | 
 | ||
|  |     APP_ERROR Push(const T& item, bool isWait = false) | ||
|  |     { | ||
|  |         std::unique_lock<std::mutex> lock(mutex_); | ||
|  | 
 | ||
|  |         while (queue_.size() >= max_size_ && isWait && !is_stoped_) { | ||
|  |             full_cond_.wait(lock); | ||
|  |         } | ||
|  | 
 | ||
|  |         if (is_stoped_) { | ||
|  |             return APP_ERR_QUEUE_STOPED; | ||
|  |         } | ||
|  | 
 | ||
|  |         if (queue_.size() >= max_size_) { | ||
|  |             return APP_ERROR_QUEUE_FULL; | ||
|  |         } | ||
|  |         queue_.push_back(item); | ||
|  | 
 | ||
|  |         empty_cond_.notify_one(); | ||
|  | 
 | ||
|  |         return APP_ERR_OK; | ||
|  |     } | ||
|  | 
 | ||
|  |     APP_ERROR Push_Front(const T &item, bool isWait = false) | ||
|  |     { | ||
|  |         std::unique_lock<std::mutex> lock(mutex_); | ||
|  | 
 | ||
|  |         while (queue_.size() >= max_size_ && isWait && !is_stoped_) { | ||
|  |             full_cond_.wait(lock); | ||
|  |         } | ||
|  | 
 | ||
|  |         if (is_stoped_) { | ||
|  |             return APP_ERR_QUEUE_STOPED; | ||
|  |         } | ||
|  | 
 | ||
|  |         if (queue_.size() >= max_size_) { | ||
|  |             return APP_ERROR_QUEUE_FULL; | ||
|  |         } | ||
|  | 
 | ||
|  |         queue_.push_front(item); | ||
|  | 
 | ||
|  |         empty_cond_.notify_one(); | ||
|  | 
 | ||
|  |         return APP_ERR_OK; | ||
|  |     } | ||
|  | 
 | ||
|  |     void Stop() | ||
|  |     { | ||
|  |         { | ||
|  |             std::unique_lock<std::mutex> lock(mutex_); | ||
|  |             is_stoped_ = true; | ||
|  |         } | ||
|  | 
 | ||
|  |         full_cond_.notify_all(); | ||
|  |         empty_cond_.notify_all(); | ||
|  |     } | ||
|  | 
 | ||
|  |     void Restart() | ||
|  |     { | ||
|  |         { | ||
|  |             std::unique_lock<std::mutex> lock(mutex_); | ||
|  |             is_stoped_ = false; | ||
|  |         } | ||
|  |     } | ||
|  | 
 | ||
|  |     // if the queue is stoped ,need call this function to release the unprocessed items
 | ||
|  |     std::list<T> GetRemainItems() | ||
|  |     { | ||
|  |         std::unique_lock<std::mutex> lock(mutex_); | ||
|  | 
 | ||
|  |         if (!is_stoped_) { | ||
|  |             return std::list<T>(); | ||
|  |         } | ||
|  | 
 | ||
|  |         return queue_; | ||
|  |     } | ||
|  | 
 | ||
|  |     APP_ERROR GetBackItem(T &item) | ||
|  |     { | ||
|  |         if (is_stoped_) { | ||
|  |             return APP_ERR_QUEUE_STOPED; | ||
|  |         } | ||
|  | 
 | ||
|  |         if (queue_.empty()) { | ||
|  |             return APP_ERR_QUEUE_EMPTY; | ||
|  |         } | ||
|  | 
 | ||
|  |         item = queue_.back(); | ||
|  |         return APP_ERR_OK; | ||
|  |     } | ||
|  | 
 | ||
|  |     std::mutex *GetLock() | ||
|  |     { | ||
|  |         return &mutex_; | ||
|  |     } | ||
|  | 
 | ||
|  |     APP_ERROR IsFull() | ||
|  |     { | ||
|  |         std::unique_lock<std::mutex> lock(mutex_); | ||
|  |         return queue_.size() >= max_size_; | ||
|  |     } | ||
|  | 
 | ||
|  |     int GetSize() | ||
|  |     { | ||
|  |         return queue_.size(); | ||
|  |     } | ||
|  | 
 | ||
|  |     APP_ERROR IsEmpty() | ||
|  |     { | ||
|  |         return queue_.empty(); | ||
|  |     } | ||
|  | 
 | ||
|  |     void Clear() | ||
|  |     { | ||
|  |         std::unique_lock<std::mutex> lock(mutex_); | ||
|  |         queue_.clear(); | ||
|  |     } | ||
|  | 
 | ||
|  | private: | ||
|  |     std::list<T> queue_; | ||
|  |     std::mutex mutex_; | ||
|  |     std::condition_variable empty_cond_; | ||
|  |     std::condition_variable full_cond_; | ||
|  |     uint32_t max_size_; | ||
|  | 
 | ||
|  |     bool is_stoped_; | ||
|  | }; | ||
|  | #endif // __INC_BLOCKING_QUEUE_H__
 |