Adding Logic Flow Control to Your Programming

1. The Problem

In a computer program, besides the real execution flow control, logic flows are also very common. Sometimes, execution flow is the same as logic flow, but they could be different in many cases. For example, for a Web server to read a HTTP request from a socket, if you use synchronous socket, you will write something like this:

   void read(HTTP_REQUEST& http_request)
   {
      read(http_request.header);
      read(http_request.body, http_request.header);
   }

   void read(HTTP_REQUEST_HEADER& header)
   {
      string line = read_line();
      parse_request_link(line, header.method, header.uri,
                         header.version);

      while (TRUE)
      {
         line = read_line();
         if (line.empty())
            break;

         parse_header_field(line, header);
      }
   }

   void read(BYTE[]& body, HTTP_REQUEST_HEADER& header)
   {
      string transfer_encoding = header.fields['Transfer-Encoding'];
      if (transfer_encoding != b.chunkedb.)
         body = read_bytes(header.fields['Content-Length']);
      else
      {
         while (TRUE)
         {
            string chunk_header = read_line();
            DWORD chunk_size = atoi(chunk_header);
            if (chunk_size == 0)
               break;
            BYTE[] chunk_body = read_bytes(chunk_size);
            body += chunk_body;
         }
      }
   }

   string read_line()
   {
      while (TRUE)
      {
         int n = strpos(read_buffer, b.\nb., read_buffer.size());
         if (n > 0)
            break;
         read_buffer += socket.read();
      }
      return read_buffer.extract(n);
   }

   Byte[] read_bytes(int sz)
   {
      while (TRUE)
      {
         if (sz <= read_buffer.size())
            break;
         read_buffer += socket.read();
      }
      return read_buffer.extract(sz);
   }

In this code, execution flow and logic flow are the same. However, if you use an asynchronous socket where you receive events passively, you have to write something like this:

when socket is available to read()
{
   read_buffer += socket.read();
   if (state == read_request_line)
   {
      if (!read_line(line))
         return;
      parse_request_link(line, method, uri, version);
      state = read_header_field;
   }
   while (state == read_request_line)
   {
      if (!read_line(line))
         return;
      if (line.empty())
      {
         transfer_encoding = header.fields['Transfer-Encoding'];
         if (transfer_encoding != b.chunkedb.)
         {
            content_length = header.fields['Content-Length'];
            state = read_body;
         }
         else
            state = read_chunk_header;
      }
      else
         parse_header_field(line, header, value);
   }
   if (state == read_body)
   {
      request_body += read_buffer;
      read_buffer.clear();
      if (request_body.size() >= content_length)
         state = read_finished;
      return;
   }
   if (state == read_chunk_header)
   {
      if (!read_line(line))
         return;
      chunk_size = atoi(line);
      if (chunk_size == 0)
      {
         state = read_finished;
         return;
      }
      state = read_body;
   }
   if (state == read_chunk_body)
   {
   // append at most chunk_size bytes
      request_body.append(read_buffer, chunk_size);
      if (chunk_size == 0)
         state = read_chunk_header;
      return;
   }
}

The execution flow is completely different whereas the logic flow remains the same. Because you can only receive data piece by piece, you have to remember state and some other variables so that you know what to do when an event comes. The code is usually longer. It's harder to read/write/modify. And it's easier to produce more bugs.

Adding Logic Flow Control to Your Programming

2. The Solution

To reduce this inconveniency, you can create a subroutine alongaside the main routine. This subroutine executes some virtual logic. When some condition needs to be met to continue, it stops until the main routine tells it to check again. For the example above, it can be written like this:

class Connection
{
   SOCKET socket;
   Connection(SOCKET s) : socket(s)
   {
      FLOW_START conn.flow_start();
   }

   void flow_start()
   {
      while (TRUE)
      {
         HTTP_REQUEST http_request;
         try {
            read(&http_request);
         }
         catch (EXCEPTION e)
         {
            break;
         }

         FILE file;
         fp = fopen(http_request.uri);
         if (fp == NULL)
         {
            write(fp);
            fclose(file);
         }
         else
            write(504);
      }

      socket.close();
      delete this;
   }

   void read(HTTP_REQUEST* http_request)
   {
      read(&http_request.header);
      read(&http_request.body, &http_request.header);
   }

   void read(HTTP_REQUEST_HEADER* header)
      b&

   void read(BYTE[]& body, HTTP_REQUEST_HEADER& header)
      b&

   string read_line()
   {
      while (TRUE)
      {
         FLOW_WAIT (m_buffer += )
         char* str = strchr(m_buffer, '\n');
         if (!str)
            continue;
      }

      string s(m_buffer, 0, str - m_buffer);
      memcpy(m_buffer, str);
      buf_avail -= str - m_buffer;
      return s;
   }

   BYTE[] read_bytes(int sz)
   {
      while (TRUE)
      {
         WAIT (m_buffer += );
         if (m_buffer.length < sz)
            continue;
      }

      BYTE[] data = m_buffer.extract(0, sz);
      return data;
   }

   void write(FILE* fp)
   {
      int filesize = fp.size();

      string header;
      header << "200 OK\r\nContent-Length: " << filesize << ";\r\n"
         "\r\n";
      write(header.c_str(), header.size());

      int szBulk;
      for (int i = 0; i < filesize; i += szBulk)
      {
         szBulk = min(filesize - i, 8192);
         data = fread(fp, szBulk);
         write(data, szBulk);
      }
   }

   void write(WORD error_status)
   {
      string header;
      header << error_status << " Error\r\n"
         "\r\n";
      write(header.c_str(), header.size());
   }

   void write(BYTE[] data, int len)
   {
      while (len > 0)
      {
         int ret = socket.write(data, len);
         if (ret > 0)
         {
            data += ret;
            len -= ret;
         }
         if (len)
         {
            WAIT (bWritable == TRUE);
         }
      }
   }

   void OnRead()
   {
      int avail = socket.avail();
      m_buffer += socket.read(avail);
   }

   void OnWrite()
   {
      bWritable = TRUE;
   }

   void OnClose()
   {
      delete this;
   }
};

main {
   Socket listen_socket;
   listen_socket.listen(http_port);
   socket_add(listen_socket, NULL);

   socket_loop(socket_callback);
}

void socket_callback(void* user_data, SOCKET s, int msg,
                     int lParam, void* pParam)
{
   switch (msg)
   {
   case READ:
      if (user_data == NULL)
      {
         SOCKET s2 = accept(s);
         Connection conn = new Connection(socket);
         socket_add(s2, conn);
         break;
      }
      ((Connection*)user_data)->OnRead();
      break;

   case WRITE:
      ((Connection*)user_data)->OnWrite();
      break;

   case EXCEPT:
      ((Connection*)user_data)->OnExcept();
      break;
   }
}

There are two new primitives involved. One is FLOW_START, which creates a new subroutine (basically a stack). The other is FLOW_WAIT, which tells the system when it should be invoked to continue the flow. For example, FLOW_WAIT (m_buffer += ) means that the operator += of m_buffer has been executed. FLOW_WAIT (bWritable = TRUE) means that bWritable is set to TRUE.

When a connection object is created, because of the FLOW_START command, a subroutine is also created; execution flow will execute that routine until it meets FLOW_WAIT. Then, it resumes the main routine. When it appends m_buffer or sets bWritable to TRUE, it resumes the subroutine until another FLOW_WAIT is met. Then, it goes back to the main routine again.

Logic Flow vs Thread

Logic Flow operates like a virtual thread, but it actually runs in the same thread with the one that creates it. Although they all have an independent process stack, the cost of logic flow is less and you don't need to handle synchronization between flows.

Logic Flows also can be used for exception handling. For example, you can add something like this:

START_FLOW {
   FLOW_WAIT(read_err=);
   b&
}

START_FLOW {
   FLOW_WAIT(current_tick b. last_receive_tick >= RECEIVE_TIMEOUT);
   b&
}

Adding Logic Flow Control to Your Programming

3. Another Example

There's another example that shows how flexible and powerful it can be: Say you want to parse a URL based on the following format:

[scheme://[user:pass@]host[:port]]/]uri[?param[#ankor]]

If you only want to enumerate the URL string only once, you'll probably write something like this:

void URL::ParseString(const string &url)
{
   string s;
   s.reserve(url.length());
   if (Original.empty())
      Original = url;
   OriginalLength = url.length();
   const char *p = url.c_str();

   // parse the scheme [http:]

   while (*p && (*p != '/') && (*p != ':') &&
         (*p != ';') && (*p != '?') &&
         (*p != '#')) s += *p++;

   if (*p == ':')
   {
      Scheme = s;
      p++;
      s.resize(0);
      while (*p && (*p != '/') && (*p != ';') &&
            (*p != '?') && (*p != '#')) s += *p++;
   }

   // parse netloc field    //[user[:pass]@]host[:port]/
   // canonicalize port (if 80 absent, else present)

   if (*p && (*p == '/') && (*(p+1) == '/'))
   {
      p+=2;
      s.resize(0);
      while (*p && (*p != '/') && (*p != ':') &&
            (*p != '@')) s += *p++;
      Host = s;
      if (*p == ':')
      {
         s.resize(0);
         while (*p && (*p != '/') && (*p != '@')) s += *p++;
         if (*p != '@') Port = IP_PORT(atol(&s[0]));
      }

      if (*p == '@')
      {
         p++;
         if (Host.length() == 0)
         {
            User = s;
         }
         else
         {
            User = Host;
            Password = s;
            Host.resize(0);
         }
         s.resize(0);
         while (*p && (*p != '/') && (*p != ':')) s += *p++;
         Host = s;
         if (*p == ':')
         {
            p++;
            s.resize(0);
            while (*p && *p != '/') s += *p++;
            Port = IP_PORT(atol(&s[0]));
         }
      }

      // now reconstruct the netloc as a whole string

      if (User.length())
      {
         NetLoc = User;
         if (Password.length())
         {
            NetLoc += ":";
            NetLoc += Password;
         }
         NetLoc += '@';
      }

      NetLoc += Host;
      if (Port != 80)
      {
         char portstring[15];
         NetLoc += ':';
         sprintf(portstring, "%d", Port);
         NetLoc += portstring;
      }

      s.resize(0);
   }

   // parse the path [/a[/b[..]]/] and file
   // if we are at a '/' and s is non-blank, this is a relative path

   if (s.length() && (*p == '/'))
   {
      p++;
      RelativePath = true;
      Path.push_back(s);
      s.resize(0);
      while (*p && (*p != '/') && (*p != ';') &&
            (*p != '?') && (*p != '#') && (*p != '&')) s += *p++;
   }
   else
   {
   // this is a pure filename with no slashes unless it's just a
   // host name
      if (*p != '/') RelativePath = Host.empty();
      else {
         p++;
         while (*p && (*p != '/') && (*p != ';') &&
               (*p != '?') && (*p != '#') && (*p != '&')) s += *p++;
      }
   }

   // as long as the current word is followed by slash, append it
   // to the path

   while (*p == '/')
   {
      p++;
      //if (s.length())
      Path.push_back(s);    // uri could be something like '...//...'
      s.resize(0);
      while (*p && (*p != '/') && (*p != ';') &&
            (*p != '?') && (*p != '#') && (*p != '&')) s += *p++;
   }

   // now the current word is the filename

   File = s;

   //
   // Grab the file type
   //
   string::size_type pp = File.rfind('.');
   if (pp != string::npos) {
      FileType = File.substr(pp+1);
   }

   // look for the parameters

   if (*p == ';')
   {
      p++;
      s.resize(0);
      while (*p && (*p != '?') && (*p != '#') &&
            (*p != '&')) s += *p++;
      Params = s;
   }

   // look for the query

   // Accept queries that start with a '&' non-standard but people
   // do it....
   if (*p == '?'  *p == '&')
   {
      s = *p;  // Save the leading query character
      p++;
      while (*p && (*p != '#')) s += *p++;
      Query = s;
   }

   // look for the fragment

   if (*p == '#')
   {
      p++;
      s.resize(0);
      while (*p) s += *p++;
      Fragment = s;
   }
}

But if you use Flow, it could be written like this:

class Url
{
   string scheme, host, port, user, pass, uri, param, ankor;
   string* head_token;
   int last_pos, cur_pos;
   char* url;

   parse_url(char* param)
   {
      START_FLOW analyze_url();

      url        = param;
      int len    = strlen(url);
      last_pos   = 0;
      cur_pos    = 0;
      head_token = NULL;

      while (cur_pos < len) {
         cur_pos++;
      }
      if (head_token)
         *head_token = url + last_pos;
   }

   void analyze_url()
   {
      START_FLOW
      {
         read_to_tail(&scheme, "://");

         START_FLOW
            read_from_head(&host, "/");

         START_FLOW
            read_from_head(&port, ":");

         START_FLOW
         {
            string tmp;
            read_from_head(&tmp, "@");

            user = host;
            pass = port;
            host.erase();
            port.erase();

            read_from_head(&port, ":");
            host = tmp;
         }
      }

      START_FLOW
      {
         read_from_head(&uri, "/"));
         START_FLOW
            read_from_head(&param, "?");
         START_FLOW
            read_from_head(&anchor, "#");
      }
   }

   void read_to_tail(string* token, char* end_str)
   {
      head_token = token;
      while (TRUE)
      {
         WAIT (cur_pos=);    // means whenever cur_pos is changed
         if (memcmp(url + cur_pos, end_str, strlen(end_str)) == 0)
            break;
      }

      head_token->assign(url + last_pos, cur_pos - last_pos);
      last_pos = cur_pos = cur_pos + strlen(end_str);
      head_token = NULL;
   }

   void read_from_head(string* token, char* start_str)
   {
      while (TRUE)
      {
         WAIT (cur_pos=);
         if (memcmp(url + cur_pos, end_str, strlen(end_str)) == 0)
            break;
      }
      if (head_token)
         head_token->assign(url + last_pos, cur_pos - last_pos);

      head_token = token;
      last_pos = cur_pos + 1;
   }
};

The code is shorter, easier to read/write/modify, and scalable for a more complicated format.

Adding Logic Flow Control to Your Programming

4. Implementing by Using a Real Thread

Unfortunately, these two primitives are not supported by any compiler. If you want to use them now, you have to implement them by using a real thread. It will cost a little more, but should be worth it. Instead of having two primitives, you now use seven macros:

Macro Name Description
VFLOW_EVENT_DECLARE(evt) Declares a variable of an event. An event can be used to wait and signal by virtual flows.
VFLOW_EVENT_INITIALIZE(evt) Initializes an event variable. This macro can be merged with the above one in C++.
VFLOW_WAIT(evt) A virtual flow can call this to wait for an event.
VFLOW_SIGNAL(evt) Signals an event. All virtual flows waiting for the event will be activatated one by one. When being activated, it resumes its flow until it meets another VFLOW_WAIT. Then, it's suspended and the next virtual flow waiting in the queue will be activated. The flow that calls VFLOW_SIGNAL will not execute until all waiting flows are finished.
VFLOW_TERMINATE(evt) When this is called, all virtual flows that waiting for the event will exit immediately.
VFLOW_START(routine, param) To start a virtual flow, routine(param) will be called. When it meets the first VFLOW_WAIT, it will give the execution control back to its parent flow.
VFLOW_EXIT This is for a virtual flow to exit in the middle.

The new code is listed below. It can be compiled with vflow.c and vflow.h attached in this article. And, it works in both Windows and Linux.

//analyze [scheme://[user:pass@]host[:port]]/]uri[?param[#ankor]]
#include "vflow.h"
#include <stdio.h>
#include <string>
using namespace std;

class Url;

void flow_read_domain(void*);
void flow_read_host(void*);
void flow_read_port(void*);
void flow_read_host_port(void*);
void flow_read_query_string(void*);
void flow_read_param(void*);
void flow_read_anchor(void*);

class Url
{
public:
   Url() {}
   ~Url() {}

   string scheme, host, port, user, pass, uri, param, anchor;
   string* head_token;
   int last_pos, cur_pos;
   char* url;
   VFLOW_EVENT_DECLARE(cur_pos_changed);

   void parse_url(char* param)
   {
      VFLOW_EVENT_INITIALIZE(cur_pos_changed);

      url        = param;
      int len    = strlen(url);
      last_pos   = 0;
      set_pos(0);
      head_token = NULL;

      analyze_url();

      while (cur_pos < len) {
         set_pos(cur_pos+1);
      }
      if (head_token)
         *head_token = url + last_pos;

      VFLOW_TERMINATE(cur_pos_changed);
      uri = "/" + uri;
   }

   void set_pos(int pos)
   {
      cur_pos = pos;
      VFLOW_SIGNAL(cur_pos_changed);
   }

   void analyze_url()
   {
      VFLOW_START(::flow_read_domain, this);
      VFLOW_START(::flow_read_query_string, this);
   }

   void flow_read_domain()
   {
      read_to_tail(&scheme, "://");

      VFLOW_START(::flow_read_host, this);
      VFLOW_START(::flow_read_port, this);

      VFLOW_START(::flow_read_host_port, this);
   }

   void flow_read_host()
   {
      read_to_tail(&host, "/");
   }

   void flow_read_port()
   {
      read_from_head(&port, ":");
   }

   void flow_read_host_port()
   {
      string tmp;
      read_from_head(&tmp, "@");

      user = host;
      pass = port;
      host.erase();
      port.erase();

      read_from_head(&port, ":");
      host = tmp;
   }

   void flow_read_query_string()
   {
      read_from_head(&uri, "/");
      VFLOW_START(::flow_read_param, this);
      VFLOW_START(::flow_read_anchor, this);
   }

   void flow_read_param()
   {
      read_from_head(&param, "?");
   }

   void flow_read_anchor()
   {
      read_from_head(&anchor, "#");
   }

   void read_to_tail(string* token, char* end_str)
   {
      head_token = token;
      while (1)
      {
         VFLOW_WAIT(cur_pos_changed);
         if (memcmp(url + cur_pos, end_str, strlen(end_str)) == 0)
            break;
      }

      head_token->assign(url + last_pos, cur_pos - last_pos);
      last_pos = cur_pos = cur_pos + strlen(end_str);
      head_token = NULL;
   }

   void read_from_head(string* token, char* start_str)
   {
      while (1)
      {
         VFLOW_WAIT(cur_pos_changed);
         if (memcmp(url + cur_pos, start_str, strlen(start_str)) == 0)
            break;
      }
      if (head_token)
         head_token->assign(url + last_pos, cur_pos - last_pos);

      head_token = token;
      last_pos = cur_pos + 1;
   }
};

void flow_read_domain(void* param)
   { ((Url*)param)->flow_read_domain(); }
void flow_read_host(void* param)
   { ((Url*)param)->flow_read_host(); }
void flow_read_port(void* param)
   { ((Url*)param)->flow_read_port(); }
void flow_read_host_port(void* param)
   { ((Url*)param)->flow_read_host_port(); }
void flow_read_query_string(void* param)
   { ((Url*)param)->flow_read_query_string(); }
void flow_read_param(void* param)
   { ((Url*)param)->flow_read_param(); }
void flow_read_anchor(void* param)
   { ((Url*)param)->flow_read_anchor(); }

int main(int argc, char* argv[])
{
   Url url;
   url.parse_url("http://user:pass@abc.com:80/abc/def/
                  ghi.php?jklmn=1234&opq=567#rstuvw");

   printf("schema=%s\nuser=%s\npass=%s\nhost=%s\nport=%s\nuri=%s\
           nparam=%s\nanchor=%s\n",
           url.scheme.c_str(), url.user.c_str(), url.pass.c_str(),
           url.host.c_str(), url.port.c_str(), url.uri.c_str(),
           url.param.c_str(), url.anchor.c_str());
   return 0;
}


About the Author

Stone Yan

My blog: http://stoneyan.blogspot.com/

Downloads

Comments

  • There are no comments yet. Be the first to comment!

Leave a Comment
  • Your email address will not be published. All fields are required.

Top White Papers and Webcasts

  • Live Event Date: December 11, 2014 @ 1:00 p.m. ET / 10:00 a.m. PT Market pressures to move more quickly and develop innovative applications are forcing organizations to rethink how they develop and release applications. The combination of public clouds and physical back-end infrastructures are a means to get applications out faster. However, these hybrid solutions complicate DevOps adoption, with application delivery pipelines that span across complex hybrid cloud and non-cloud environments. Check out this …

  • Due to internal controls and regulations, the amount of long term archival data is increasing every year. Since magnetic tape does not need to be periodically operated or connected to a power source, there will be no data loss because of performance degradation due to the drive actuator. Read this white paper to learn about a series of tests that determined magnetic tape is a reliable long-term storage solution for up to 30 years.

Most Popular Programming Stories

More for Developers

RSS Feeds