Transaction Manager Implementation
The transaction manager needs to send and resend messages. We do not want the transaction manager to be responsible for message transportation so we decided to delegate the job to a sender_type class that implements send() method. The transaction manager holds a reference to an external object of sender_type type. The reference is initialized in the transaction manager constructor.
Such functionality delegation is very convenient but it has one significant drawback: the transaction manager does not know how the send() method is implemented. In particular, it does not know if the method is blocking or if it uses some synchronization objects. Because our class methods can be called from different threads, our class must be thread-safe. Hence it must use locks to protect its members from simultaneously access from different threads. However calling blocking send() method while holding a lock can (and will) cause serious dead lock problems. Hence we must call to the send() method outside of guarded section.
The dump() method
The simples method is the dump() method. It just scans all pending transactions according to the identity view and prints them to output stream. It is implements as:
template<typename T>
struct prn_op
{
prn_op(std::ostream& os) : m_os(os){}
void operator()(const T& e){m_os<<e<<std::endl;}
private:
std::ostream& m_os;
};
std::ostream& dump(std::ostream& os) const
{
boost::mutex::scoped_lock lock(m_map_guard);
std::for_each(m_map.get<identity_id>().begin(),
m_map.get<identity_id>().end(), prn_op<entry>(os));
return os;
}
For transaction scanning we used std::for_each algorithm with prn_op functor. The prn_op functor is a template class that works with any data type that has operator<<(os).
As it is mentioned above our class must be thread-safe. We use boost::mutex m_map_guard member to protect all interface methods. The boost::mutex::scoped_lock lock acquires the mutex in constructor and frees it in destructor - the guarded section is equivalent to life scope the variable lock.
Besides the boost::mutex class defined in <boost/thread/mutex.hpp> include file, the BOOST library has variety of in-process and inter-process synchronization objects. It is worth to mention such classes as boost::shared_mutex that implements multiple-reader/single-writer mutex (<boost/thread/shared_mutex.hpp>) and boost::interprocess::shared_memory_object (<boost/interprocess/shared_memory_object.hpp>) that manipulates shared memory segment mapped by several processes.
The resolve() method
The next method is the resolve() method. It just moves all resolved/failed transactions from the inner list to an external container. It is implements as:
template<class Container>
size_t resolve(Container& dest)
{
boost::mutex::scoped_lock lock(m_map_guard);
dest.clear();
std::copy(m_resolved.begin(), m_resolved.end(),
std::inserter(dest, dest.begin()));
size_t rc=m_resolved.size();
m_resolved.clear();
return rc;
}
We used template function because we do not want to force applications use specific type of container. The std::inserter class will work with any container that has insert(what, where) method, therefore our template method will work with any container that has clear(), insert(what, where), end() and size() methods, i.e. with all STL-like containers: std::vector, std::list, std::set and std::deque.
Note that it is important to insert at the end of the external container because insertion at another position (beginning, for example) is inefficient for std::vector (for others STL containers it does not matter).
Note that all STL-like containers have the insert(what, where) method, but not all have the insert(what) variation.
The method uses the same m_map_guard mutex to protect the m_map storage from simultaneously assess from concurrent threads.
The request() method
The request() method accepts user data and destination endpoint. It generates transaction id, stores new transaction object in the storage container and sends a message with assistance from the m_sender member. The method implementation looks like:
bool storage_manager<S>::request(const user_info& info,
const endpoint_type& to)
{
typedef mi_map_type::index<expiration_time>::type time_view;
entry::tx_id_type id=m_id_generator.allocate_id(to);
if(entry::INVALID_TR_ID==id) return false;
{
boost::mutex::scoped_lock lock(m_map_guard);
mi_map_type::iterator it=m_map.get<expiration_time>().insert(
m_map.get<expiration_time>().end(),
entry(info, to, id, m_timeout));
BOOST_ASSERT(m_map.get<expiration_time>().end()!=it);
}
m_sender.send(info, to, id);
return true;
}
The method uses the same m_map_guard mutex to protect the m_map storage from simultaneously assess from concurrent threads.
The method works with indexed view that is sorted by transaction expiration time. It inserts newly created transaction at place where it should be - at the end of the sorted collection. Such hinted insertion has constant complexity instead of logarithmic.
The method uses macro BOOST_ASSERT to assert if the insertion failed. The BOOST library has another useful macro BOOST_VERIFY. It is the same as BOOST_ASSERT, except one thing: the expression that is passed to BOOST_VERIFY is always evaluated - even in "release" build. Besides, it also eliminates warnings about unused variables when the only use of the variable is inside the assertion.
Code below will generate 'unused variable ok' warning and will not call must_be_called() in the release build:
bool ok=must_return_true();
BOOST_ASSERT(ok);
BOOST_ASSERT(must_be_called());
The following code does not have the mentioned problems:
bool ok=must_return_true();
BOOST_VERIFY (ok);
BOOST_VERIFY (must_be_called());
The method sends message with help of m_sender member. As we mentioned above the send() method must be invoked outside of guarded section. Calling to external method within guarded section can cause serious dead-lock problems. Also the send() method must be invoked after the transaction is already stored. Otherwise we can face race condition: the reply could arrive before the transaction was actually inserted into storage.
The reply() method
The reply() method is called upon reply arrival. It looks for matching pending transaction in the transaction storage. If the transaction is found it will move the found resolved transaction from the collection and put it into the resolved transaction list. The method implementation looks like:
inline void storage_manager::reply(const endpoint_type& from,
entry::tx_id_type id, user_info::user_status_type st)
{
typedef mi_map_type::index<full_tx_id>::type tx_view;
boost::mutex::scoped_lock lock(m_map_guard);
tx_view::iterator it=m_map.get<full_tx_id>().find(
boost::make_tuple(from.host, from.port, id));
if(m_map.get<full_tx_id>().end()==it) return;
m_resolved.push_back(user_info(it->info));
BOOST_ASSERT(!m_resolved.empty());
m_resolved.back().status=st;
m_map.get<full_tx_id>().erase(it);
}
The method uses the same m_map_guard mutex to protect the m_map storage from simultaneously assess from concurrent threads.
The method works with indexed view that is sorted by full transaction id: host+port+id. To generate searching key from host, port and id the method uses the boost::make_tuple template.
The update() method
The update() method is invoked periodically. It scans transaction storage for expired transactions and re-sends them if needed. If a re-sent transaction is expired again, the method will move it from the storage and put into the resolved transaction list as failed. The method implementation looks like:
inline void storage_manager::update()
{
boost::system_time now(boost::get_system_time());
user_info info;
endpoint_type to;
entry::tx_id_type id;
if(need_to_resend_(info, to, id, now))
{
m_sender.send(info, to, id);
}
}
The function checks if there is a transactions to resend by calling to the need_to_resend_() If there is such transaction, the function will resend it with assistance of m_sender.send(). Note that m_sender.send() is called outside of guarded section.
The need_to_resend_() function is implemented as:
inline bool storage_manager::need_to_resend_(user_info& info,
endpoint_type& to, entry::tx_id_type& id,
const boost::system_time& now)
{
typedef mi_map_type::index<expiration_time>::type time_view;
boost::mutex::scoped_lock lock(m_map_guard);
time_view::iterator it=m_map.get<expiration_time>().begin();
if(m_map.get<expiration_time>().end()!=it && it->expiration<now)
{
if(entry::pending==it->state)
{
info=it->info;
to=it->peer;
id=it->tx_id;
BOOST_VERIFY(m_map.get<expiration_time>().modify(it,
entry::set_expiration(now+m_timeout)));
it->state=entry::resent;
return true;
}
m_resolved.push_back(user_info(it->info));
BOOST_ASSERT(!m_resolved.empty());
m_resolved.back().status=user_info::denied;
m_map.get<expiration_time>().erase(it++);
}
return false;
}
The function scans the transaction storage starting from the beginning until either the end is reached or the current transaction is not expired. If the expired transaction has pending state, it will be resent: the function will break the scanning routine and return transaction data to calling procedure. If the expired transaction is already in resent state, it will be removed from the storage and moved into resolved transaction list. Note that the function will not leave the guarded section unless it needs to resend message.
Erasing from the container:
if(m_map.get<expiration_time>().end()!=it && it->expiration<now)
{
m_map.get<expiration_time>().erase(it++);
}
The postfix form of the operator it++ first increases the iterator and then returns previous value. This code is equivalent to:
if(m_map.get<expiration_time>().end()!=it && it->expiration<now)
{
it_old=it;
it++;
m_map.get<expiration_time>().erase(it_old);
}
It works because in associated containers the erase(it) method invalidates only it itself and does not affect others iterators.
Resending and modifying entry:
m_map.get<expiration_time>().modify(it,
entry::set_expiration(now+m_timeout));
it->state=entry::resent;
Like for std::set the multi-index iterator references to constant object: *it references to const entry type. We can change the entry state by expression it->state=entry::resent because the state field is declared as mutable. The state field is not a key so the change will not affect the container.
What can we do if we need change a key field? Just declaring the key field as mutable and modifying it (as it works for the state field) will obviously destroy integrity of the multi-indexed collection.
As an option we can remove the entry entirely, modify it and re-insert it back.
entry tmp=*it;
tmp.expiration=now+m_timeout;
m_map.get<expiration_time>().erase(it);
m_map.get<expiration_time>().insert(tmp);
The problem here is that the insertion requires logarithmic time for every indexed view even if only one key was changed.
To cape with the problem the BOOST library provides convenient interface for the procedure, the replace method:
entry tmp=*it;
tmp.expiration=now+m_timeout;
m_map.get<expiration_time>().replace(it, tmp);
This form will require only constant time for unchanged indices. Only changed index will require logarithmic time. But still the approach has two unnecessary copies: from the container to tmp and back to the container. To cope with this problem, the BOOST provides the modify method. The method uses a functor (or pointer to function) that is accepts a reference to the field to be modified.
struct set_expiration
{
set_expiration(const boost::system_time& t) : new_exp(t){}
void operator ()(entry& e) {e.expiration=new_exp;}
}
m_map.get<expiration_time>().modify(it,
entry::set_expiration(now+m_timeout));
Like the replace method, the modify method requires constant time for unchanged indices and logarithmic time for updated indices. Unlike the replace method, the modify method avoids unnecessary copying.
But here we run into another problem: what happens if the modified element can not be inserted into collection because it has the same key value as another element stored in the collection (supposing the sorting key is unique)? The replace method will just keep the original element, but the modify method works with the modifying functor that can not restore the original element. The BOOST library uses the following policy: in case of a collision the modify method erases the element and returns false.
This is why we must verify the return code of the modify method using BOOST_VERIFY (the passed expression will evaluated - even in "release" build).
The modify method has another signature that accept a rollback functor called in case of a collision.
old_time=it->expiration;
m_map.get<expiration_time>().modify(it,
entry::set_expiration(now+m_timeout),
entry::set_expiration(old_time));
Also there is a variation of the modify method - the modify_key method. The difference is that the modifying functor accepts reference to the key member instead of the whole object.
Unfortunately the BOOST library does not provide signature for "hinted" modification.
Now consider the following section of the need_to_resend_() function:
info=it->info;
to=it->peer;
id=it->tx_id;
BOOST_VERIFY(m_map.get<expiration_time>().modify(it,
entry::set_expiration(now+m_timeout)));
it->state=entry::resent;
return true;
Why should we copy data into auxiliary variables info, to and id? Why could not we just return pointer to the stored entry? After all the modified entry will stay in the container and the pointer will stay valid even when other entries were inserted or deleted (according to an associated container behavior):
const entry* storage_manager::need_to_resend_(user_info& info,
endpoint_type& to, entry::tx_id_type& id,
const boost::system_time& now)
{
...
BOOST_VERIFY(m_map.get<expiration_time>().modify(it,
entry::set_expiration(now+m_timeout)));
it->state=entry::resent;
return &*it;
}
const entry* e;
if(e=need_to_resend_(info, to, id, now))
{
m_sender.send(e->info, e->peer, e->tx_id);
}
The reason is possible race condition: Suppose the need_to_resend_ method has returned valid pointer to an entry. In the same time another thread got a "late" reply that has the same transaction id as the referred entry. Because the first thread has already released the mutex, the second thread can remove the entry from the storage before the first thread calls to the send() method. This will certainly crash the application.
Using of auxiliary variables info, to and id our application will just unnecessarily re-send request that was already replied- common issue for protocols with timeouts, like RADIUS, however escape the crash problem.
Test Application
The test application loads the transaction manager with requests for 100 seconds. To simulate message loss the application uses the simple_sender object that is configured to drops every 100th (1%) packet. Others 99% of request are replied and the replies are put into a reply queue. The application launches dedicated thread to process the reply queue and prints out the request counters.
The application counts amount of issued, dropped, denied and replied requests. If the transaction manager works properly amount of replied (granted and denied together) requests must fit amount of issued requests; amount of dropped in simple_sender requests must be 1% of issued and resent requests; and amount of denied requests must be approximately 0.01% of issued requests because each denied request was dropped twice: first time as an original request, the second time as resent request.
transaction::simple_sender sender(100);
transaction::storage store(sender);
bool continue_flag=true;
...
boost::thread reply_thread(reply_functor(continue_flag, store, sender));
...
continue_flag=false;
reply_thread.join();
double drop_fraction=1.0/(double)drop_rate;
double drop_predict=(double)sent*drop_fraction*(1.0+drop_fraction);
double deny_predict=(double)sent*drop_fraction*drop_fraction;
std::cout<<"test run for "<<name<<std::endl;
std::cout<<"sent: "<<sent<<std::endl;
std::cout<<"dropped (each "<<drop_rate<<"th): "
<<sender.dropped()<<" expected: "<<floor(drop_predict)
<<std::endl;
std::cout<<"total replied: "<<replied<<" expected: "
<<sent<<std::endl;
std::cout<<"denied="<<failed<<" expected: "
<<floor(deny_predict)<<std::endl;
Results of a 100 second test run are shown below:
G:\Work\Projects\C++\boost\src\Release>multindex.exe
test run for default
sent: 20200827
dropped (each 100th): 204028 expected: 204028
total replied: 20200827 expected: 20200827
denied=2054 expected: 2020
We can see that the request counters meet our expectations.
Besides the source files the zip archive contains the MS Visual Studio 2008 project file and the KDevelop Make.am.
To build the project the BOOST library must be installed and configured at your computer. The library can be downloaded from the BOOST Site.
Conclusion
The BOOST' multi_index_container is very powerful and flexible tool for implementation of containers maintaining one or more indices with different sorting and access semantics.
References:
- Boost Multi-index Containers Library
Comments
There are no comments yet. Be the first to comment!