Описание проблемы из предыдущего поста:
Также интересно было бы попробовать, можно ли данный примитив привести к виду wait-free. В приведенной ниже реализации производители особо не мешают друг другу, но потребители перманентно "толкаются", и могут вхолостую использовать ресурсы процессора, что видимо, не есть гуд.Решение, приведенное в нем, действительно обладает недостатком: в нем каждый поток пытается захватить &m_processingNode, чтобы впоследствии эксклюзивно вынимать m_pFront - первую ноду. Хотелось бы добиться того, чтобы каждый поток мог гарантированно получить ноду за конечное количество шагов.
При работе над этой проблемой, у меня возникла забавная мысль, что сам оригинальный MPSC без processingNode является идеальным средством для синхронизации потоков.
Т.е. если рядом с основной MPSC очередью данных положить еще одну MPSC очередь для синхронизации, содержащую некие ThreadNodes, то можно привести pop_front к wait_free виду, используя следующий алгоритм:
1. выделили ThreadNode node; на стеке
2. сделали lockQueue.push_back(&node);
3. ждем пока lockQueue.m_pFront не будет равен указателю на node.
4. вуаля! мы захватили лок.
5. вынимаем элемент используя pop_front из dataQueue
6. отпускаем лок: делаем lockQueue.pop_front()
В данном алгоритме присутствует одна проблема. Поскольку наш MPSC Queue работает с рутовой/фейковой нодой, то в пункт 3 потоки могут и не попасть, если фейковая нода находится на первом месте. Т.е. нужно анализировать m_pFront на предмет наличия в нем указателя на фейковую ноду:
class CLock:private CListImpl
{
BaseNode * m_pRootLock;
public:
CLock()
: m_pRootLock(0)
{
}
void lock(BaseNode * pThreadNode)
{
push_back(pThreadNode);
while(1)
{
if (m_pFirst == pThreadNode)
return; // we won the race!
if (m_pFirst == &m_rootNode)
{ // oops, we need to choose one person who
// will kill the dragon
if (!CompareAndSwap(&m_pRootLock, pThreadNode, (BaseNode *)0))
{
if (m_pFirst == &m_rootNode)
{
pop_front(true);
}
Exchange(&m_pRootLock, (BaseNode *)0);
}
}
}
}
void unlock(BaseNode * pTheadNode)
{
pop_front(false);
}
};
Т.е из MPSC очереди получился прекрасный лок для использования в MPMC очереди.Данный примитив мне чем-то напоминает Bakery Lock, но в отличие от него позволяет синхронизировать заранее неизвестное количество потоков.
Тогда сама очередь может выглядеть так:
template<class NodeType>
class CIntrusiveList
{
CListImpl m_listImpl;
CLock m_popLock;
public:
CIntrusiveList()
{
}
void push_back(NodeType * pNode)
{
m_listImpl.push_back(pNode);
}
NodeType * pop_front()
{
ScopedLock guard(&m_popLock);
return static_cast<NodeType *>(m_listImpl.pop_front(false));
}
};
По моему, достаточно красиво.
Законченный пример:
https://xp-dev.com/svn/ligen-blog-support/trunk/lock.cpp
https://xp-dev.com/svn/ligen-blog-support/trunk/lat_relacy_test/
No comments:
Post a Comment