Generic Notifiers: Message Passing in C++

Generic Notifiers: Message Passing in C++

Notifiers make anonymous communication between objects in a system possible. Because they are anonymous, objects communicating have no knowledge of one another and therefore are independent of the object they are communicating with. They are also easy to understand, providing a seamless migration as new developers are introduced to a project. Other languages (notably SmallTalk) have this feature built in; C++ gives users the freedom to create their own.

Design

Those interested in sending messages to subscribers do so through the notifier class methods; those interested in receiving messages implement the subscriber interface and register with the appropriate notifier. The notifier is responsible for message routing; for each type of message, a different notifier exists. Notifier makes it possible to uncouple senders and receivers (subscribers) of messages.

Figure 1. Notifier collaboration diagram

A Thread-Safe Notifier

The notifier must fulfill three requirements:

  • simple
  • extensible
  • thread-safe

Simplicity is a vague term. I choose to define it in terms of complexity—the less complex a piece of code is, the easier it is to maintain, explain, optimize and test. The less complex a piece of code is, the simpler it is. Convenient?

Extensibility is a difficult metric to gauge. Although there are many definitions, the common theme amongst them is the ability to add and incorporate new functionality, or technical advances, into an existing code base. My ideal notion of extensibility is code that allows me to maintain it over time without major work to the original code. Templates make this possible by parameterizing key types and allowing one to work with those outside of the implementation.

Thread safety relates to the fact that clients using a notifier in a multithreaded environment should not have to be concerned with thread synchronization issues—subscribers will need to, but the notifier should not.

The notifier has the following responsibilities:

  • registering subscribers
  • unregistering subscribers
  • notifying subscribers

To do so, a subscriber map is maintained. The subscriber serves as the key, a flag indicating whether or not the subscriber is registered serves as the data. A flag is used so that unregistration does not affect (remove an entry from) the subscription map. This is important if subscribers choose to unsubscribe during synchronous notification.

Upon registration, the subscriber is inserted into the subscriber map; if it already exists, a check is made to determine whether or not it has been unregistered. If the subscriber has been flagged for unregistration but has not been removed from the subscriber map, its state is reset to ‘registered.’

Notification involves obtaining a lock to the critical section, iterating through the map, and notifying the subscribers. Any subscriber flagged as unregistered will not be notified. Once subscribers have been notified, all subscribers flagged as unregistered from the subscriber map are removed.

Unregistration involves flagging the subscriber for unregistration. The next time notification occurs, the subscriber will be removed from the map.

Asynchronous Notification

Asynchronous notification is straightforward using the thread pool created in Windows Thread Pooling. A work unit is defined as one that, while processing, notifies subscribers of the event.

   void async_notifier<T_state>::process() {
   notifier<T_state>::instance().notify(m_state);
   }

That is all there is to it!

Demonstration

Our need for notifiers arose when we created a service that starts a socket listening. While waiting for incoming connections, the calling thread sleeps as it awaits notification:

   // wait for accept
   WSANETWORKEVENTS c_events = {0};
   wait_for_event(e_event_accept, c_events, INFINITE);

   // accept
   SOCKADDR_IN sa_client = {0};
   int n_sa = sizeof(sa_client);
   smart_socket_handle sp_conn(accept((SOCKADDR*)&c_sa_client,
                               n_sa));

If the service is to shut down cleanly, we need a way to signal the waiting socket to shut down. Moreover, there were several objects that required notification of the service shutdown. Notifiers turned out to be the best solution because we could quickly bind them to the shutdown event.

   namespace events
   {
      enum service
      {
         service_start,
         service_pause,
         service_resume,
         service_stop
      };

      typedef notifier<service> service_notifier;

events::service is the notification data; publishers simply need to inform our subscriber:

template<class T_connection>
struct listener :
       socket,
       core::work_unit,
       subscriber<events::service>

of the event:

void on_shutdown()
{
   // notify our listener(s) to stop
   events::service_notifier::instance()->
           notify(events::service_stop);

The handler in listener takes care of the dirty work:

   void on_notify(const state& event)
   {
      switch (event)
      {
         case events::service_start:
         case events::service_resume:
            start_listening();
            break;

         case events::service_stop:
         case events::service_pause:
            stop_listening();
            break;

         default: throw std::invalid_argument("listener::on_notify");
      }
   }

Using the Code

To create a subscriber, derive your class from subscriber. You must implement the on_notify method.

struct work : core::work_unit, subscriber<events>
{
   work();
   void process();
   void on_notify(const events& e);
   bool   m_work;
   double m_data;
};

To receive notification of events, call subscriber::subscribe.

void work::process()
{
   // We subscribe in process because we are created during
   // notification. If we were to subscribe during work::work
   // we would be added to the notifier map and spawning would
   // never cease to end!
   subscribe();

To discontinue notification of events, call subscriber::unsubscribe.

void work::on_notify(const events& e)
{
   switch (e)
   {
      case die:
         unsubscribe();
         std::cout << "dying: " << this << std::endl;
         m_work = false;
         break;

To synchronously notify subscribers, call notifier::notify.

void input::process()
{
   std::cout << "(S)pawn/(D)ie: ";
   switch (::tolower(::getch()))
   {
      // die and leave
      case 'd':
         notifier<events>::instance().notify(die);

To asynchronously notify subscribers, create and queue an instance of async_notifier.

   // and then kill it asynchronously
   global::thread_pool::instance().queue_request(
           new async_notifier<events>(die));

About the Demo Project

The demo project models the scenario above. Workers that perform work until notified of shutdown are created. Upon notification, the workers die and the system gracefully shuts down. More workers may be spawned to simulate a heavy load on the code.

  • The thread pool is initialized.
  • Work is queued and is killed asynchronously using the notifier.
  • A spawn work unit and an input work unit are queued.
  • The main thread waits on an event signal when the input work unit dies.

There are some important considerations to keep in mind when going through the testbed. Our main thread waits on an event that is signaled when input is dying. Depending on the amount of work pending, exiting the application immediately could be disastrous.

To understand, consider the following scenario: The user has instructed input to die; input synchronously notifies all subscribers to die and signals our main thread to die.

For the sake of argument, assume there are 100 work units queued and pending. Threads in the thread pool will not die until all work is extinguished and they have received the shutdown flag. Meanwhile, main is unwinding and destroys the process heap. Our work units are finally released as the last bit of work trickles out. Without a heap, what happens?

To keep things simple, I pause the main thread during which the machinery gently shuts down. More elaborate synchronization mechanisms may be employed. However, during shutdown I am not encumbered by performance requirements and prefer the simple implementation. Your mileage may vary.

Conclusions

Notifiers are a great way of coupling parts of a system that are interested in state changes but don’t want to be bound to one another. Templates provide a natural mechanism for reuse of the notification subsystem with different state data. Synchronous and asynchronous notification provides the flexibility required for all but the most esoteric situations.

Whether you choose to use a freely available implementation or to roll your own, notifiers make building complex solutions in any language more manageable.

Happy Coding!

More by Author

Get the Free Newsletter!

Subscribe to Developer Insider for top news, trends & analysis

Must Read