VTrain/base/BlockingQueue/BlockingQueue.h

209 lines
4.6 KiB
C++

/*
* Copyright(C) 2020. Huawei Technologies Co.,Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef BLOCKING_QUEUE_H
#define BLOCKING_QUEUE_H
#include "ErrorCode.h"
#include <condition_variable>
#include <list>
#include <mutex>
#include <stdint.h>
static const int DEFAULT_MAX_QUEUE_SIZE = 256;
template<typename T> class BlockingQueue {
public:
BlockingQueue(uint32_t maxSize = DEFAULT_MAX_QUEUE_SIZE) : max_size_(maxSize), is_stoped_(false) {}
~BlockingQueue() {}
APP_ERROR Pop(T &item)
{
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.empty() && !is_stoped_) {
empty_cond_.wait(lock);
}
if (is_stoped_) {
return APP_ERR_QUEUE_STOPED;
}
if (queue_.empty()) {
return APP_ERR_QUEUE_EMPTY;
} else {
item = queue_.front();
queue_.pop_front();
}
full_cond_.notify_one();
return APP_ERR_OK;
}
APP_ERROR Pop(T& item, unsigned int timeOutMs)
{
std::unique_lock<std::mutex> lock(mutex_);
auto realTime = std::chrono::milliseconds(timeOutMs);
while (queue_.empty() && !is_stoped_) {
empty_cond_.wait_for(lock, realTime);
}
if (is_stoped_) {
return APP_ERR_QUEUE_STOPED;
}
if (queue_.empty()) {
return APP_ERR_QUEUE_EMPTY;
} else {
item = queue_.front();
queue_.pop_front();
}
full_cond_.notify_one();
return APP_ERR_OK;
}
APP_ERROR Push(const T& item, bool isWait = false)
{
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.size() >= max_size_ && isWait && !is_stoped_) {
full_cond_.wait(lock);
}
if (is_stoped_) {
return APP_ERR_QUEUE_STOPED;
}
if (queue_.size() >= max_size_) {
return APP_ERROR_QUEUE_FULL;
}
queue_.push_back(item);
empty_cond_.notify_one();
return APP_ERR_OK;
}
APP_ERROR Push_Front(const T &item, bool isWait = false)
{
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.size() >= max_size_ && isWait && !is_stoped_) {
full_cond_.wait(lock);
}
if (is_stoped_) {
return APP_ERR_QUEUE_STOPED;
}
if (queue_.size() >= max_size_) {
return APP_ERROR_QUEUE_FULL;
}
queue_.push_front(item);
empty_cond_.notify_one();
return APP_ERR_OK;
}
void Stop()
{
{
std::unique_lock<std::mutex> lock(mutex_);
is_stoped_ = true;
}
full_cond_.notify_all();
empty_cond_.notify_all();
}
void Restart()
{
{
std::unique_lock<std::mutex> lock(mutex_);
is_stoped_ = false;
}
}
// if the queue is stoped ,need call this function to release the unprocessed items
std::list<T> GetRemainItems()
{
std::unique_lock<std::mutex> lock(mutex_);
if (!is_stoped_) {
return std::list<T>();
}
return queue_;
}
APP_ERROR GetBackItem(T &item)
{
if (is_stoped_) {
return APP_ERR_QUEUE_STOPED;
}
if (queue_.empty()) {
return APP_ERR_QUEUE_EMPTY;
}
item = queue_.back();
return APP_ERR_OK;
}
std::mutex *GetLock()
{
return &mutex_;
}
APP_ERROR IsFull()
{
std::unique_lock<std::mutex> lock(mutex_);
return queue_.size() >= max_size_;
}
int GetSize()
{
return queue_.size();
}
APP_ERROR IsEmpty()
{
return queue_.empty();
}
void Clear()
{
std::unique_lock<std::mutex> lock(mutex_);
queue_.clear();
}
private:
std::list<T> queue_;
std::mutex mutex_;
std::condition_variable empty_cond_;
std::condition_variable full_cond_;
uint32_t max_size_;
bool is_stoped_;
};
#endif // __INC_BLOCKING_QUEUE_H__