Вдохновившись постами remark'а с rsdn, решил попробовать сделать что-либо самому в модной сфере lock free, чтобы глубже разобраться в вопросе, так сказать. В результате трудов вечерком создал уродца, который можно использовать, например так:
CList<ValueNode<int> > list;
list.push_back(new ValueNode<int>(100500));
ValueNode<int> * pNode = 0;
if (list.pop_front(&pNode))
{
std::cout<<pNode->value<<"\n";
delete pNode;
}
Теперь задаюсь вопросом, как бы так надежно верифицировать сие чудо и найти в нем ляпы.
Приходят в голову следующие варианты:
1) написать дикий многопоточный тест и расставить рандомные слипы в самом листе;
2) расписать все состояния примитива и переходы между ними.
3) использовать чудо тулзу Relacy с 1024cores.net.
Поскольку было уже несколько поздно, успел реализовал только пункт 1: все вроде ок, потестил все ветки, ведем себя корректно. Очередь заполняется, очищается, ресурсы чистятся, ABA проблемы вроде бы нет, можно удалять элементы сразу после pop_front, или использовать повторно.
Вариант номер № 2 мне особенно близок, правда, есть твердая уверенность, что он может потребовать уйму времени. Интересно посмотреть на Relacy, думаю в ближайшее время таки попробую в ней разобраться.
Также интересно было бы попробовать, можно ли данный примитив привести к виду wait-free. В приведенной ниже реализации производители особо не мешают друг другу, но потребители перманентно "толкаются", и могут вхолостую использовать ресурсы процессора, что видимо, не есть гуд.
Собственно чудо:
#include "windows.h"
template<class Type>
Type * Exchange(Type ** ppDestination, Type * pData)
{
return (Type * )InterlockedExchangePointer((void**)ppDestination, pData);
}
struct BaseNode
{
BaseNode * pNext;
BaseNode()
: pNext(0)
{
}
private:
BaseNode(const BaseNode &);
BaseNode & operator = (const BaseNode &);
};
template<class Type>
struct ValueNode:public BaseNode
{
Type value;
public:
ValueNode(Type obj = 0)
: value(obj)
{
}
~ValueNode()
{
}
};
template <class NodeType>
class CList
{
CList(const CList &);
CList & operator = (const CList &);
BaseNode m_processingNode;
BaseNode m_fakeNode;
BaseNode * m_pFirst;
BaseNode * m_pLast;
long m_fakeNodeAdded;
protected:
void push_back_impl(BaseNode * pNode)
{
BaseNode * pRealLast = Exchange(& m_pLast, pNode);
// if pRealLast is fakenode, only after that node will be ready to be fetched:
pRealLast->pNext = pNode;
}
bool pop_front_impl(BaseNode ** ppNode)
{
*ppNode = 0;
while(1)
{
// take a first node
BaseNode * pFirst = Exchange(&m_pFirst, &m_processingNode);
if (pFirst == &m_processingNode)
continue;
// pFirst is valid
if (!pFirst->pNext)
{
// there is no second element
if (pFirst == &m_fakeNode)
{
// cleanup
Exchange(&m_pFirst, pFirst);
return false;
}
// push back fake node
if (InterlockedIncrement(&m_fakeNodeAdded) != 1)
{
InterlockedDecrement(&m_fakeNodeAdded);
// cleanup
Exchange(&m_pFirst, pFirst);
continue;
}
push_back_impl(&m_fakeNode);
// still not added
if (!pFirst->pNext)
{
// cleanup
Exchange(&m_pFirst, pFirst);
continue;
}
}
*ppNode = pFirst;
Exchange(&m_pFirst, pFirst->pNext);
pFirst->pNext = 0;
return true;
}
}
public:
CList()
: m_pFirst(0), m_pLast(0), m_fakeNodeAdded(1)
{
m_pFirst = m_pLast = &m_fakeNode;
}
// interface:
void push_back(NodeType * pNode)
{
push_back_impl(pNode);
}
bool pop_front(NodeType ** pNode)
{
bool res = pop_front_impl((BaseNode **)pNode);
if (!res)
return res;
if (*pNode == &m_fakeNode)
{
InterlockedDecrement(&m_fakeNodeAdded);
return pop_front_impl((BaseNode **)pNode);
}
return res;
}
};
No comments:
Post a Comment