Wednesday, November 9, 2011

lock free intrusive FIFO

Вдохновившись постами remark'а с rsdn, решил попробовать сделать что-либо самому в модной сфере lock free, чтобы глубже разобраться в вопросе, так сказать. В результате трудов вечерком создал уродца, который можно использовать, например так:

CList<ValueNode<int> > list;
list.push_back(new ValueNode<int>(100500));
ValueNode<int> * pNode = 0;
if (list.pop_front(&pNode))
{
   std::cout<<pNode->value<<"\n";
   delete pNode;
}

Теперь задаюсь вопросом, как бы так надежно верифицировать сие чудо и найти в нем ляпы.
Приходят в голову следующие варианты:
1) написать дикий многопоточный тест и расставить рандомные слипы в самом листе;
2) расписать все состояния примитива и переходы между ними.
3) использовать чудо тулзу Relacy с 1024cores.net.

Поскольку было уже несколько поздно, успел реализовал только пункт 1: все вроде ок, потестил все ветки, ведем себя корректно. Очередь заполняется, очищается, ресурсы чистятся, ABA проблемы вроде бы нет, можно удалять элементы сразу после pop_front, или использовать повторно.
Вариант номер № 2 мне особенно близок, правда, есть твердая уверенность, что он может потребовать уйму времени. Интересно посмотреть на Relacy, думаю в ближайшее время таки попробую в ней разобраться.

Также интересно было бы попробовать, можно ли данный примитив привести к виду wait-free. В приведенной ниже реализации производители особо не мешают друг другу, но потребители перманентно "толкаются", и могут вхолостую использовать ресурсы процессора, что видимо, не есть гуд.

Собственно чудо:
#include "windows.h"

template<class Type>
Type * Exchange(Type ** ppDestinationType * pData)
{
    return (Type * )InterlockedExchangePointer((void**)ppDestination, pData);
}

struct BaseNode
{
    BaseNode * pNext;

    BaseNode()
        : pNext(0)
    {
    }

private:
    BaseNode(const BaseNode &);
    BaseNode & operator = (const BaseNode &);
};

template<class Type>
struct ValueNode:public BaseNode
{
    Type value;
public:
    ValueNode(Type obj = 0)
        : value(obj)
    {
    }
    ~ValueNode()
    {
    }
};

template <class NodeType>
class CList
{
    CList(const CList &);
    CList & operator = (const CList &);

    BaseNode   m_processingNode;
    BaseNode   m_fakeNode;
    BaseNode * m_pFirst;
    BaseNode * m_pLast;

    long m_fakeNodeAdded;
protected:
    void push_back_impl(BaseNode * pNode)
    {
        BaseNode * pRealLast = Exchange(& m_pLast, pNode);
        // if pRealLast is fakenode, only after that node will be ready to be fetched:
        pRealLast->pNext = pNode;
    }

    bool pop_front_impl(BaseNode ** ppNode)
    {
       *ppNode = 0;

       while(1)
       {
           // take a first node
           BaseNode * pFirst = Exchange(&m_pFirst, &m_processingNode);
           if (pFirst == &m_processingNode)
                continue;

           // pFirst is valid
           if (!pFirst->pNext)
           {
               // there is no second element
               if (pFirst == &m_fakeNode)
               {
                   // cleanup
                   Exchange(&m_pFirst, pFirst);
                   return false;
               }

               // push back fake node
               if (InterlockedIncrement(&m_fakeNodeAdded) != 1)
               {
                   InterlockedDecrement(&m_fakeNodeAdded);

                   // cleanup
                   Exchange(&m_pFirst, pFirst);
                   continue;
               }

               push_back_impl(&m_fakeNode);

               // still not added
               if (!pFirst->pNext)
               {
                   // cleanup
                   Exchange(&m_pFirst, pFirst);
                   continue;
               }
           }

           *ppNode = pFirst;
           Exchange(&m_pFirst, pFirst->pNext);
           pFirst->pNext = 0;
           return true;
       }
    }

public:
    CList()
        : m_pFirst(0), m_pLast(0), m_fakeNodeAdded(1)
    {
        m_pFirst = m_pLast = &m_fakeNode;
    }

    // interface:
    void push_back(NodeType * pNode)
    {
        push_back_impl(pNode);
    }

    bool pop_front(NodeType ** pNode)
    {
        bool res = pop_front_impl((BaseNode **)pNode);
        if (!res)
            return res;

        if (*pNode == &m_fakeNode)
        {
            InterlockedDecrement(&m_fakeNodeAdded);
            return pop_front_impl((BaseNode **)pNode);
        }
        return res;
    }
};

No comments: