BOOST Multi-Index Container and Transaction Storage

Introduction

STL provides a wide range of ordered and sorted collections, such as vector, list, set and map. All of these collection are ordered or sorted according to a single criterion. This sorting criterion must be specified at design time. Even if the STL library is flexible when choosing the sorting criteria, the collections are still confined to a single sorting order.

Sometimes it is necessary to have more than one searching option for the same collection. Consider the example of a request-reply asynchronous protocol. The pending transactions must be sorted according to a transaction ID. It allows efficient matching between the pending request and the arrived reply by transaction ID. But, it often requirs thepossibility to report all pending transactions for given user. It will require the collection to be sorted according to the user ID as well. To solve the problem, you can use two sorted collections pointing to the same data: one is sorted by transaction ID; another is sorted by user ID. Such approach, however, requires a lot of efforts to keep both collections synchronized.

Fortunately the BOOST library provides very convenient solution for the problem the boost::multi_index_container class. The class has reach interface and it is worth considering commonly used techniques on a modeling example.

Transaction Manager as Multi-Index Container

Our modeling example refers to implementation of a request-reply asynchronous protocol. "Asynchronous" means that an application is not blocked waiting the reply; instead it gets notification when the reply arrived.

The application issues requests to different servers. Each server is identified by its endpoint - host and port pair. Each issued request should be stored as a transaction in a container. When a reply from a server arrives the container will be searched for matching pending transaction. It is assumes (as most protocols do) that the matching is done according to unique transaction id. It is also assumed that the transaction ID is unique in respect to any given server. Actually, only the combination transaction of ID + endpoint is unique.

This is why the container should be sorted by the "transaction ID + endpoint" key.

The best-effort transport model (UDP like) is assumed - when messages are usually delivered to its destination but there is a probability that a message is lost. Hence a pending request can wait for its reply forever. To avoid the container overflow, the pending requests must have limited life time. If a request is not answered during certain period, the request will be re-sent. If such re-sent request is still not answered after certain timeout, it will be dropped as expired.

To implement this timeout function the application needs to scan the transaction collection periodically. Normally the transaction collection contains thousands of pending transactions with only a small fraction of them expired. Hence successive scanning of the collection would be ineffective because the scanning time increases linearly with the collection size. It is a better idea to sort the collection by expiration time and scan only really expired transactions. Note that because a new transaction will be expired later than any already stored transaction, the new transaction should be inserted always into end of the collection. Hence we can use "hinted" insertion that takes constant time. Therefore the collection should be sorted by the expiration time as well.

Also the collection should be ordered by the user ID. The necessity in last sorting index came from the requirement to produce reports sorted by user ID. As a result, a sorted collection with three indices — by expiration time, by user id and by transaction id — is needed. To define such multi-index collection use the boost::multi_index_container class:

/** Stored transaction entry */
struct entry
{
  /** possible states */
  enum state_type {pending, resent, succeeded, failed};

  endpoint_type peer; /** network peer */
  tx_id_type tx_id; /** transaction id */
  boost::system_time expiration; /** expiration time */
  user_info info; /** user specific data */
  mutable state_type state; /** current transaction state */

};

/** comapres 2 entries for using as identity key */
inline bool operator<(const entry& l, const entry& r)
{
  if(l.info.group_id==r.info.group_id) 
    return l.info.user_id<r.info.user_id;

  return l.info.group_id<r.info.group_id;
}

/** Multi index storage */
class mi_map_type : 
  public boost::multi_index::multi_index_container
<
  // stored class
  entry,
  // all indices enumeration:
  boost::multi_index::indexed_by
  <
    ...// index view definition #0
    ...// index view definition #1
    ...// index view definition #2
  >
>
{};

To define an indexed view, first of all, you should specify its type. The BOOST library supports the following index types:

  • Ordered (unique and non-unique) indices sort the elements like std::set/std::multiset.
  • Sequenced indices arrange the elements as if in std::list.
  • Random access indices provide interface similar to std::vector, without memory contiguity, however
  • Hashed indices provide fast access to the elements through hashing techniques.
  • Because our collection has more than one index view, a way is needed to specify exactly what index view are being used. For example, the find() method will return different results depending on selected index view.

    The index view can be specified by its cardinal number. As usual the numeration starts from zero. For example:

    mi_map_type::nth_index<2>::type::iterator it=
      m_map.get<2>().find();
    

    It means searching by the index view #2 - by transaction id.

    As a better option, the indices can be assigned tags. With assigned tags the example above can be re-written in more readable form:

    mi_map_type::index<full_tx_id>::type::iterator it=
      m_map.get<full_tx_id >().find();
    

    The examples below demonstrate different ways of indexed view definitions.

    The First Indexed View - By Expiration Time

    Our first indexed view is ordered by expiration time. As the key the view uses the expiration field. The index is non-unique because two different transactions can have exactly the same expiration time. The index should be sorted in direct order, because you want the oldest transaction be first.

    The first index definition looks like:

    
    /** tag structures */
    struct expiration_time{};
    
    // 1st index: non unique index by expiration time
    // used for time-out checking
    
    boost::multi_index::ordered_non_unique
    <
      // tag
      boost::multi_index::tag<expiration_time>,
      // member as the key
      boost::multi_index::member
      <
        entry, // referred structure
        boost::system_time, // member type
        &entry::expiration // pointer to the member
      >,
      // direct order, default predicat can be omitted
      std::less<boost::system_time> 
    >
    
    

    The ordered_non_unique template has three parameters. The first parameter is optional and it assigns "tag" to the view. Generally any C++ type can be used as a tag, but it is more convenient to use dedicated structure, struct expiration_time in our case. The second parameter is the key specification and it is described in details below. The third parameter is optional, it specifies associated comparison predicate, which must order the keys in a less-than fashion. The default predicate value is std::less.

    To specify a field as sorting key we need use the multi_index::member template. This template in its turn has 3 parameters. The first parameter is the referred structure type, entry in our case. The second parameter is type of the referred structure field, boost::system_time, in out case. The last parameter is pointer of the referred structure field, in our case it is pointer to the entry::expiration field.

    // member as the key
    boost::multi_index::member
    <
      entry, // referred structure
      boost::system_time, // member type
      &entry::expiration // pointer to the member
    >
    

    The Second Indexed View - By Full User Id

    Our second indexed view is ordered by full user id. "Luckily" the entry structure has operator<() that sorts objects according to the full user id. To define this operator<() as the sorting key we should use the boost::multi_index::identity template as it shown below:

    /** tag structures */
    struct identity_id {};
    
    // 2nd index: non unuque index using operator<()
    // used for sorted reports
    
    boost::multi_index::ordered_non_unique
    <
      // tag
      boost::multi_index::tag<identity_id>,
      // object as index
      boost::multi_index::identity<entry>
    >
    

    BOOST Multi-Index Container and Transaction Storage

    The Third Indexed View - By Full Transaction Id

    The last indexed view is ordered by full transaction id that is composed from destination endpoint and transaction id itself. To define the indexed view we will use the composite_key template. The template composes several structure fields in one key. The participating fields must have types that are comparable each other. For example, one field could have type int and another - short. In our case entry::tx_id and entry::peer are not comparable: the first has type tx_id_type and the second - endpoint_type. So we can not combine them directly. Instead we will combine entry::peer.host, entry::peer.port and entry::tx_id that have comparable types: ip_type, port_type and tx_id_type.

    Syntax of composite_key allows usage only direct members of the referred structure, such as entry::tx_id. It does not allow usage of "member of member", such as entry::peer.host, and entry::peer.port. To get access to peer::port and peer::host fields we will use the const_mem_fun template. The template defines return value of a constant member function as a sorting key. Therefore we must define the following constant field accessors:

    struct entry
    {
    ...
      /** host accessor */
      endpoint_type::ip_type host() const
      {
        return peer.host;
      }
    
      /** port accessor */
      endpoint_type::port_type port() const
      {
        return peer.port;
      }
    ...
    };
    

    The const_mem_fun template has 3 parameters. The first parameter is the referred structure, the second parameter is the return value type and the last parameter is pointer to the member function. The resulting indexed view definition is shown below:

    /** tag structures */
    struct full_tx_id {};
    
    // 3rd index: unique index using host+port+transaction id
    // used for reply-request matching
    
    boost::multi_index::ordered_unique
    <
      // tag
      boost::multi_index::tag<full_tx_id>,
      // composed key
      boost::multi_index::composite_key
      <
        entry, // referred structure
        // composed key
        boost::multi_index::const_mem_fun<entry, 
          endpoint_type::ip_type, &entry::host>,
        boost::multi_index::const_mem_fun<entry, 
          endpoint_type::port_type, &entry::port>,
        boost::multi_index::member<entry, 
          entry::tx_id_type, &entry::tx_id>
      >
    >
    

    Having defined the transaction storage class we can now define transaction manager that uses this storage.

    Transaction Manager Interface

    To meet all requirements applied on the transaction manager it is convenient to have the transaction manager interface with the following methods:

  • request() - the methods is called when the application needs to place a request. It generates unique transaction id, issues request and stores newly created transaction in the collection.
  • reply() - the method is called when a reply arrives. It looks for matching pending transaction and moves resolved transaction from the collection into resolved transaction list. Often the method is called from different thread than the request() method.
  • update() - the methods is invoked periodically. It scans expired transactions, checks timeouts and re-sends if needed. If a re-sent transaction is expired again, the method will move it from the collection into resolved transaction list as failed.
  • resolve() - the method is called when the application needs to get all resolved transactions. It just moves all resolved/failed transactions from the inner list to an external container.
  • Using the following data structures:

    /** User request
    possible status: granted, denied
    */
    
    struct user_info
    {
      /** basic types */
      typedef unsigned char group_id_type;
      typedef unsigned int  user_id_type;
      enum user_status_type {unknown, granted, denied};
      
      /** Ctor creates user info from group id and user id.
      Sets the status to unknown
      */
      
      user_info(group_id_type grp, user_id_type usr) 
        : group_id(grp), user_id(usr), status(unknown)
      {}
    
      /** not uniqie key group id */
      group_id_type group_id;
      /** not unique key user id */
      user_id_type user_id;
      /** granted or denied */
      user_status_type status;
    };
    
    /** Ip+port */
    struct endpoint_type
    {
      /** net types */
      typedef unsigned int ip_type;
      typedef unsigned short port_type;
    
      /** destination host */
      ip_type host;
      /** destination port */
      port_type port;
    };
    
    

    The transaction manager interface can be written as:

    class storage_manager : public boost::noncopyable
    {
    public:
    ...
      /** Ctor accept reference to sender */
      storage_manager(sender_type& sender);
    
      /** Processes reply arrived from peer
      @param from peer endpoint
      @param id reply transaction id
      */
      
      void reply(const endpoint_type& from, 
        entry::tx_id_type id, user_info::user_status_type st);
    
      /** Allocates new transaction id and stores transaction 
      data into map
      @param info user info data
      @param to destination address
      @return true on success and false on failure
      */
      
      bool request(const user_info& info, const endpoint_type& to);
    
      /** Checks timeout, resends and removes time-outed 
      transactions
      */
      
      void update();
    
      /** Copy all resolved transaction 
      in external container
      */
      
      template<class Container>
      size_t resolve(Container& dest);
    
      /** Report */
      std::ostream& dump(std::ostream& os) const;
    ...
    private:
    ...
      /** pendign tx map */
      mi_map_type m_map;
      /** reference to external sender object */
      sender_type& m_sender;
    ...
    };
    

    We derived the storage_manager class from boost::noncopyable to prevent it from coping. The boost::noncopyable declares private, non-accessible copy constructor and assign operator. Hence any attempts to define (either explicitly or implicitly) copy constructor or assign operator for a derived class will cause compilation error.

    BOOST Multi-Index Container and Transaction Storage

    Transaction Manager Implementation

    The transaction manager needs to send and resend messages. We do not want the transaction manager to be responsible for message transportation so we decided to delegate the job to a sender_type class that implements send() method. The transaction manager holds a reference to an external object of sender_type type. The reference is initialized in the transaction manager constructor.

    Such functionality delegation is very convenient but it has one significant drawback: the transaction manager does not know how the send() method is implemented. In particular, it does not know if the method is blocking or if it uses some synchronization objects. Because our class methods can be called from different threads, our class must be thread-safe. Hence it must use locks to protect its members from simultaneously access from different threads. However calling blocking send() method while holding a lock can (and will) cause serious dead lock problems. Hence we must call to the send() method outside of guarded section.

    The dump() method

    The simples method is the dump() method. It just scans all pending transactions according to the identity view and prints them to output stream. It is implements as:

    /** Printing functor 
    Works with any class that has operator<<
    */
    
    template<typename T>
    struct prn_op
    {
      prn_op(std::ostream& os) : m_os(os){}
      void operator()(const T& e){m_os<<e<<std::endl;}
    private:
      std::ostream& m_os;
    };
    
    /** Dump */
    std::ostream& dump(std::ostream& os) const
    {
      // lock map till end of scope
      boost::mutex::scoped_lock lock(m_map_guard);
    
      // print sorted as group+user
    
      std::for_each(m_map.get<identity_id>().begin(), 
        m_map.get<identity_id>().end(), prn_op<entry>(os));
    
      return os;
    }
    

    For transaction scanning we used std::for_each algorithm with prn_op functor. The prn_op functor is a template class that works with any data type that has operator<<(os).

    As it is mentioned above our class must be thread-safe. We use boost::mutex m_map_guard member to protect all interface methods. The boost::mutex::scoped_lock lock acquires the mutex in constructor and frees it in destructor - the guarded section is equivalent to life scope the variable lock.

    Besides the boost::mutex class defined in <boost/thread/mutex.hpp> include file, the BOOST library has variety of in-process and inter-process synchronization objects. It is worth to mention such classes as boost::shared_mutex that implements multiple-reader/single-writer mutex (<boost/thread/shared_mutex.hpp>) and boost::interprocess::shared_memory_object (<boost/interprocess/shared_memory_object.hpp>) that manipulates shared memory segment mapped by several processes.

    The resolve() method

    The next method is the resolve() method. It just moves all resolved/failed transactions from the inner list to an external container. It is implements as:

    /** Copy all resolved transaction 
    in external container
    @param dest reference to destination external container
    @return amout of resolved transactions
    */
    
    template<class Container>
    size_t resolve(Container& dest)
    {
      // lock map till end of scope
      boost::mutex::scoped_lock lock(m_map_guard);
    
      dest.clear();
      // inserter works with any container type 
      // that has insert(where, what) method
      
      std::copy(m_resolved.begin(), m_resolved.end(), 
        std::inserter(dest, dest.begin()));
      size_t rc=m_resolved.size();
    
      m_resolved.clear();
      return rc;
    }
    

    We used template function because we do not want to force applications use specific type of container. The std::inserter class will work with any container that has insert(what, where) method, therefore our template method will work with any container that has clear(), insert(what, where), end() and size() methods, i.e. with all STL-like containers: std::vector, std::list, std::set and std::deque.

    Note that it is important to insert at the end of the external container because insertion at another position (beginning, for example) is inefficient for std::vector (for others STL containers it does not matter).

    Note that all STL-like containers have the insert(what, where) method, but not all have the insert(what) variation.

    The method uses the same m_map_guard mutex to protect the m_map storage from simultaneously assess from concurrent threads.

    The request() method

    The request() method accepts user data and destination endpoint. It generates transaction id, stores new transaction object in the storage container and sends a message with assistance from the m_sender member. The method implementation looks like:

    bool storage_manager<S>::request(const user_info& info, 
           const endpoint_type& to)
    {
      typedef mi_map_type::index<expiration_time>::type time_view;
    
      // allocate new transaction id
      // can not allocate new - return INVALID_TR_ID
      
      entry::tx_id_type id=m_id_generator.allocate_id(to);
      if(entry::INVALID_TR_ID==id) return false; 
      {
        // guards the map within the current {...} block
        boost::mutex::scoped_lock lock(m_map_guard);
    
        // hinted insert pending transaction into the map end()
        mi_map_type::iterator it=m_map.get<expiration_time>().insert(
          m_map.get<expiration_time>().end(), 
          entry(info, to, id, m_timeout));
    
        // there is no reason to fail exept a bug
        BOOST_ASSERT(m_map.get<expiration_time>().end()!=it);
      }
    
      // send 
      // the call must be outside the guarded scope
      // to avoid posssible dead locks
      // and must be after inserting into storage
      // to avoid race condition
      
      m_sender.send(info, to, id);
    
      return true;
    }
    

    The method uses the same m_map_guard mutex to protect the m_map storage from simultaneously assess from concurrent threads.

    The method works with indexed view that is sorted by transaction expiration time. It inserts newly created transaction at place where it should be - at the end of the sorted collection. Such hinted insertion has constant complexity instead of logarithmic.

    The method uses macro BOOST_ASSERT to assert if the insertion failed. The BOOST library has another useful macro BOOST_VERIFY. It is the same as BOOST_ASSERT, except one thing: the expression that is passed to BOOST_VERIFY is always evaluated - even in "release" build. Besides, it also eliminates warnings about unused variables when the only use of the variable is inside the assertion.

    Code below will generate 'unused variable ok' warning and will not call must_be_called() in the release build:

    bool ok=must_return_true();
    BOOST_ASSERT(ok);
    BOOST_ASSERT(must_be_called());
    

    The following code does not have the mentioned problems:

    bool ok=must_return_true();
    BOOST_VERIFY (ok);
    BOOST_VERIFY (must_be_called());
    

    The method sends message with help of m_sender member. As we mentioned above the send() method must be invoked outside of guarded section. Calling to external method within guarded section can cause serious dead-lock problems. Also the send() method must be invoked after the transaction is already stored. Otherwise we can face race condition: the reply could arrive before the transaction was actually inserted into storage.

    The reply() method

    The reply() method is called upon reply arrival. It looks for matching pending transaction in the transaction storage. If the transaction is found it will move the found resolved transaction from the collection and put it into the resolved transaction list. The method implementation looks like:

    inline void storage_manager::reply(const endpoint_type& from, 
      entry::tx_id_type id, user_info::user_status_type st)
    {
      typedef mi_map_type::index<full_tx_id>::type tx_view;
    
      // lock map till end of scope
      boost::mutex::scoped_lock lock(m_map_guard);
    
      // look for request match
      tx_view::iterator it=m_map.get<full_tx_id>().find(
        boost::make_tuple(from.host, from.port, id));
    
      // not found - do nothing
      if(m_map.get<full_tx_id>().end()==it) return;
    
      // remove from storage
      // and add to resolved tx list
      
      m_resolved.push_back(user_info(it->info));
      BOOST_ASSERT(!m_resolved.empty());
      m_resolved.back().status=st;
    
      m_map.get<full_tx_id>().erase(it);
    }
    

    The method uses the same m_map_guard mutex to protect the m_map storage from simultaneously assess from concurrent threads.

    The method works with indexed view that is sorted by full transaction id: host+port+id. To generate searching key from host, port and id the method uses the boost::make_tuple template.

    The update() method

    The update() method is invoked periodically. It scans transaction storage for expired transactions and re-sends them if needed. If a re-sent transaction is expired again, the method will move it from the storage and put into the resolved transaction list as failed. The method implementation looks like:

    
    inline void storage_manager::update()
    {
      // time now
      boost::system_time now(boost::get_system_time());
      
      // data needed for resending
      user_info info; 
      endpoint_type to; 
      entry::tx_id_type id;
    
      // continue while there are requests to
      // be resent
      
      if(need_to_resend_(info, to, id, now))
      {
        // call to send() outside the guarded lock
        // and after the corresponding trsnsaction
        // was stored
        
        m_sender.send(info, to, id);
      }
    }
    

    The function checks if there is a transactions to resend by calling to the need_to_resend_() If there is such transaction, the function will resend it with assistance of m_sender.send(). Note that m_sender.send() is called outside of guarded section.

    The need_to_resend_() function is implemented as:

    /** Scans transaction storage for request that should be resent or failed
    @param info user info for resending
    @param to destination for resending
    @param id id for resending
    @param now current time
    @return true if the request should be resent and false when 
    there is no more expired transactions
    */
    
    inline bool storage_manager::need_to_resend_(user_info& info, 
      endpoint_type& to, entry::tx_id_type& id, 
      const boost::system_time& now)
    {
      typedef mi_map_type::index<expiration_time>::type time_view;
    
      // lock map till end of scope
      boost::mutex::scoped_lock lock(m_map_guard);
    
      time_view::iterator it=m_map.get<expiration_time>().begin();
    
      // till the end of expired transaction is reached
      if(m_map.get<expiration_time>().end()!=it && it->expiration<now)
      {
        // timeouted transaction 
        // should be re-sent again
        
        if(entry::pending==it->state)
        {
          // data for resending
          // we can not return pointer
          // to stored transaction because
          // due to race condition it can
          // be removed
          
          info=it->info;
          to=it->peer;
          id=it->tx_id;
    
          // modify at place
          BOOST_VERIFY(m_map.get<expiration_time>().modify(it, 
            entry::set_expiration(now+m_timeout)));
    
          // can modify because it is not
          // a key and mutable
          
          it->state=entry::resent;
    
          // now we are ready to leave the guarded section
          // for resending
          
          return true;
        }
    
        // if the request complited we do not need resend it
        // so we can continue to scan the storage within
        // the guarded section
    
        // put into resolved list as failed
        
        m_resolved.push_back(user_info(it->info));
        // there is no reason to fail exept a bug
        BOOST_ASSERT(!m_resolved.empty());
        m_resolved.back().status=user_info::denied;
    
        // remove from storage. 
        // The construction erase(it++) first moves to next item 
        // and then deletes current position. 
        // Note that erase() invalidate only the removed 
        // iterator itself, it leaves all other intact.
        
        m_map.get<expiration_time>().erase(it++);
      }
    
      // nothing to resend
      return false;
    }
    

    The function scans the transaction storage starting from the beginning until either the end is reached or the current transaction is not expired. If the expired transaction has pending state, it will be resent: the function will break the scanning routine and return transaction data to calling procedure. If the expired transaction is already in resent state, it will be removed from the storage and moved into resolved transaction list. Note that the function will not leave the guarded section unless it needs to resend message.

    Erasing from the container:

    if(m_map.get<expiration_time>().end()!=it && it->expiration<now)
    {
      m_map.get<expiration_time>().erase(it++);
    }
    

    The postfix form of the operator it++ first increases the iterator and then returns previous value. This code is equivalent to:

    if(m_map.get<expiration_time>().end()!=it && it->expiration<now)
    {
      it_old=it;
      it++;
      m_map.get<expiration_time>().erase(it_old);
    }
    

    It works because in associated containers the erase(it) method invalidates only it itself and does not affect others iterators.

    Resending and modifying entry:

    // modify at place
    m_map.get<expiration_time>().modify(it, 
      entry::set_expiration(now+m_timeout));
    
    // can modify because it is not
    // a key and mutable
    
    it->state=entry::resent;
    

    Like for std::set the multi-index iterator references to constant object: *it references to const entry type. We can change the entry state by expression it->state=entry::resent because the state field is declared as mutable. The state field is not a key so the change will not affect the container.

    What can we do if we need change a key field? Just declaring the key field as mutable and modifying it (as it works for the state field) will obviously destroy integrity of the multi-indexed collection.

    As an option we can remove the entry entirely, modify it and re-insert it back.

    entry tmp=*it;
    tmp.expiration=now+m_timeout;
    m_map.get<expiration_time>().erase(it);
    m_map.get<expiration_time>().insert(tmp);
    

    The problem here is that the insertion requires logarithmic time for every indexed view even if only one key was changed.

    To cape with the problem the BOOST library provides convenient interface for the procedure, the replace method:

    entry tmp=*it;
    tmp.expiration=now+m_timeout;
    m_map.get<expiration_time>().replace(it, tmp);
    

    This form will require only constant time for unchanged indices. Only changed index will require logarithmic time. But still the approach has two unnecessary copies: from the container to tmp and back to the container. To cope with this problem, the BOOST provides the modify method. The method uses a functor (or pointer to function) that is accepts a reference to the field to be modified.

    /** functor used by modify */
    struct set_expiration
    {
      set_expiration(const boost::system_time& t) : new_exp(t){}
      void operator ()(entry& e) {e.expiration=new_exp;}
    }
    
    // modify at place
    m_map.get<expiration_time>().modify(it, 
      entry::set_expiration(now+m_timeout));
    

    Like the replace method, the modify method requires constant time for unchanged indices and logarithmic time for updated indices. Unlike the replace method, the modify method avoids unnecessary copying.

    But here we run into another problem: what happens if the modified element can not be inserted into collection because it has the same key value as another element stored in the collection (supposing the sorting key is unique)? The replace method will just keep the original element, but the modify method works with the modifying functor that can not restore the original element. The BOOST library uses the following policy: in case of a collision the modify method erases the element and returns false.

    This is why we must verify the return code of the modify method using BOOST_VERIFY (the passed expression will evaluated - even in "release" build).

    The modify method has another signature that accept a rollback functor called in case of a collision.

    // store "old" time value
    old_time=it->expiration;
    // modify at place
    m_map.get<expiration_time>().modify(it, 
      entry::set_expiration(now+m_timeout), 
      entry::set_expiration(old_time));
      
    

    Also there is a variation of the modify method - the modify_key method. The difference is that the modifying functor accepts reference to the key member instead of the whole object.

    Unfortunately the BOOST library does not provide signature for "hinted" modification.

    Now consider the following section of the need_to_resend_() function:

    info=it->info;
    to=it->peer;
    id=it->tx_id;
    
    BOOST_VERIFY(m_map.get<expiration_time>().modify(it, 
      entry::set_expiration(now+m_timeout)));
    
    it->state=entry::resent;
    return true;
    

    Why should we copy data into auxiliary variables info, to and id? Why could not we just return pointer to the stored entry? After all the modified entry will stay in the container and the pointer will stay valid even when other entries were inserted or deleted (according to an associated container behavior):

    const entry* storage_manager::need_to_resend_(user_info& info, 
      endpoint_type& to, entry::tx_id_type& id, 
      const boost::system_time& now)
    {
      ...
      BOOST_VERIFY(m_map.get<expiration_time>().modify(it, 
        entry::set_expiration(now+m_timeout)));
    
      it->state=entry::resent;
      return &*it;
    }
    
    // continue while there are requests to
    // be resent
    const entry* e;
    if(e=need_to_resend_(info, to, id, now))
    {
      m_sender.send(e->info, e->peer, e->tx_id);
    }
    

    The reason is possible race condition: Suppose the need_to_resend_ method has returned valid pointer to an entry. In the same time another thread got a "late" reply that has the same transaction id as the referred entry. Because the first thread has already released the mutex, the second thread can remove the entry from the storage before the first thread calls to the send() method. This will certainly crash the application.

    Using of auxiliary variables info, to and id our application will just unnecessarily re-send request that was already replied- common issue for protocols with timeouts, like RADIUS, however escape the crash problem.

    Test Application

    The test application loads the transaction manager with requests for 100 seconds. To simulate message loss the application uses the simple_sender object that is configured to drops every 100th (1%) packet. Others 99% of request are replied and the replies are put into a reply queue. The application launches dedicated thread to process the reply queue and prints out the request counters.

    The application counts amount of issued, dropped, denied and replied requests. If the transaction manager works properly amount of replied (granted and denied together) requests must fit amount of issued requests; amount of dropped in simple_sender requests must be 1% of issued and resent requests; and amount of denied requests must be approximately 0.01% of issued requests because each denied request was dropped twice: first time as an original request, the second time as resent request.

    transaction::simple_sender sender(100);
    transaction::storage store(sender);
    bool continue_flag=true;
    ...
    // uses refernce
    boost::thread reply_thread(reply_functor(continue_flag, store, sender));
    ...
    // stop the thread
    // and wait till is done
    
    continue_flag=false;
    reply_thread.join();
    
    double drop_fraction=1.0/(double)drop_rate;
    // percent of sent+re-sent requests
    double drop_predict=(double)sent*drop_fraction*(1.0+drop_fraction);
    // percent of re-sent requests
    double deny_predict=(double)sent*drop_fraction*drop_fraction;
    
    std::cout<<"test run for "<<name<<std::endl;
    std::cout<<"sent: "<<sent<<std::endl;
    // percent of sent and re-sent requests
    std::cout<<"dropped (each "<<drop_rate<<"th): "
      <<sender.dropped()<<" expected: "<<floor(drop_predict)
      <<std::endl;
    
    // must be same as sent amount
    std::cout<<"total replied: "<<replied<<" expected: "
      <<sent<<std::endl;
    std::cout<<"denied="<<failed<<" expected: "
      <<floor(deny_predict)<<std::endl;
    

    Results of a 100 second test run are shown below:

    G:\Work\Projects\C++\boost\src\Release>multindex.exe
    test run for default
    sent: 20200827
    dropped (each 100th): 204028 expected: 204028
    total replied: 20200827 expected: 20200827
    denied=2054 expected: 2020
    

    We can see that the request counters meet our expectations.

    Besides the source files the zip archive contains the MS Visual Studio 2008 project file and the KDevelop Make.am.

    To build the project the BOOST library must be installed and configured at your computer. The library can be downloaded from the BOOST Site.

    Conclusion

    The BOOST' multi_index_container is very powerful and flexible tool for implementation of containers maintaining one or more indices with different sorting and access semantics.

    References:

    1. Boost Multi-index Containers Library


    Downloads

    Comments

    • There are no comments yet. Be the first to comment!

    Leave a Comment
    • Your email address will not be published. All fields are required.

    Top White Papers and Webcasts

    • On-demand Event Event Date: December 18, 2014 The Internet of Things (IoT) incorporates physical devices into business processes using predictive analytics. While it relies heavily on existing Internet technologies, it differs by including physical devices, specialized protocols, physical analytics, and a unique partner network. To capture the real business value of IoT, the industry must move beyond customized projects to general patterns and platforms. Check out this webcast and join industry experts as …

    • Today's agile organizations pose operations teams with a tremendous challenge: to deploy new releases to production immediately after development and testing is completed. To ensure that applications are deployed successfully, an automatic and transparent process is required. We refer to this process as Zero Touch Deployment™. This white paper reviews two approaches to Zero Touch Deployment--a script-based solution and a release automation platform. The article discusses how each can solve the key …

    Most Popular Programming Stories

    More for Developers

    RSS Feeds