Tuesday, April 9, 2013

lock free intrusive FIFO - Part3

В продолжении истории о Lock Free Intrusive FIFO, задался целью улучшить поведение pop_front из MPMC-версии очереди.

Описание проблемы из предыдущего поста:
Также интересно было бы попробовать, можно ли данный примитив привести к виду wait-free. В приведенной ниже реализации производители особо не мешают друг другу, но потребители перманентно "толкаются", и могут вхолостую использовать ресурсы процессора, что видимо, не есть гуд.
Решение, приведенное в нем, действительно обладает недостатком: в нем каждый поток пытается захватить &m_processingNode, чтобы впоследствии эксклюзивно вынимать m_pFront - первую ноду. Хотелось бы добиться того, чтобы каждый поток мог гарантированно получить ноду за конечное количество шагов.
При работе над этой проблемой, у меня возникла забавная мысль, что сам оригинальный MPSC без processingNode является идеальным средством для синхронизации потоков.
Т.е. если рядом с основной MPSC очередью данных положить еще одну MPSC очередь для синхронизации, содержащую некие ThreadNodes, то можно привести pop_front к wait_free виду, используя следующий алгоритм:
1. выделили ThreadNode node; на стеке
2. сделали lockQueue.push_back(&node);
3. ждем пока lockQueue.m_pFront не будет равен указателю на node.
4. вуаля! мы захватили лок.
5. вынимаем элемент используя pop_front из dataQueue
6. отпускаем лок: делаем lockQueue.pop_front()
В данном алгоритме присутствует одна проблема. Поскольку наш MPSC Queue работает с рутовой/фейковой нодой, то в пункт 3 потоки могут и не попасть, если фейковая нода находится на первом месте. Т.е. нужно анализировать m_pFront на предмет наличия в нем указателя на фейковую ноду:

class CLock:private CListImpl
{
    BaseNode * m_pRootLock;
public:
    CLock()
       : m_pRootLock(0)
    {
    }
    void lock(BaseNode * pThreadNode)
    {
        push_back(pThreadNode);

        while(1)
        {
            if (m_pFirst == pThreadNode)
               return; // we won the race!

            
            if (m_pFirst == &m_rootNode)
            {  // oops, we need to choose one person who 

               // will kill the dragon
               if (!CompareAndSwap(&m_pRootLock, pThreadNode, (BaseNode *)0))
               {
                   if (m_pFirst == &m_rootNode)
                   {
                       pop_front(true);
                   }
                   Exchange(&m_pRootLock, (BaseNode *)0);
               }
            }
        }
    }
    void unlock(BaseNode * pTheadNode)
    {
        pop_front(false);
    }
};



Т.е из MPSC очереди получился прекрасный лок для использования в MPMC очереди.Данный примитив мне чем-то напоминает Bakery Lock, но в отличие от него позволяет синхронизировать заранее неизвестное количество потоков.
Тогда сама очередь может выглядеть так:


template<class NodeType>
class CIntrusiveList
{
    CListImpl m_listImpl;
    CLock m_popLock;
public:
    CIntrusiveList()
    {
    }
    void push_back(NodeType * pNode)
    {
       m_listImpl.push_back(pNode);
    }
    NodeType * pop_front()
    {
       ScopedLock guard(&m_popLock);
       return static_cast<NodeType *>(m_listImpl.pop_front(false));
    }
};



По моему, достаточно красиво.
Законченный пример:
https://xp-dev.com/svn/ligen-blog-support/trunk/lock.cpp
https://xp-dev.com/svn/ligen-blog-support/trunk/lat_relacy_test/ 

No comments: