Logo
  • Main Page
  • Related Pages
  • Modules
  • Classes
  • Files

mmsthreadserver.cpp

Go to the documentation of this file.
00001 /***************************************************************************
00002  *   Copyright (C) 2005-2007 Stefan Schwarzer, Jens Schneider,             *
00003  *                           Matthias Hardt, Guido Madaus                  *
00004  *                                                                         *
00005  *   Copyright (C) 2007-2008 BerLinux Solutions GbR                        *
00006  *                           Stefan Schwarzer & Guido Madaus               *
00007  *                                                                         *
00008  *   Copyright (C) 2009-2013 BerLinux Solutions GmbH                       *
00009  *                                                                         *
00010  *   Authors:                                                              *
00011  *      Stefan Schwarzer   <stefan.schwarzer@diskohq.org>,                 *
00012  *      Matthias Hardt     <matthias.hardt@diskohq.org>,                   *
00013  *      Jens Schneider     <jens.schneider@diskohq.org>,                   *
00014  *      Guido Madaus       <guido.madaus@diskohq.org>,                     *
00015  *      Patrick Helterhoff <patrick.helterhoff@diskohq.org>,               *
00016  *      René Bählkow       <rene.baehlkow@diskohq.org>                     *
00017  *                                                                         *
00018  *   This library is free software; you can redistribute it and/or         *
00019  *   modify it under the terms of the GNU Lesser General Public            *
00020  *   License version 2.1 as published by the Free Software Foundation.     *
00021  *                                                                         *
00022  *   This library is distributed in the hope that it will be useful,       *
00023  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00024  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00025  *   Lesser General Public License for more details.                       *
00026  *                                                                         *
00027  *   You should have received a copy of the GNU Lesser General Public      *
00028  *   License along with this library; if not, write to the                 *
00029  *   Free Software Foundation, Inc.,                                       *
00030  *   51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA            *
00031  **************************************************************************/
00032 
00033 #include "mmstools/mmsthreadserver.h"
00034 #include <cstdlib>
00035 #include <cstdio>
00036 #include <string.h>
00037 
00038 MMSThreadServer::MMSThreadServer(int queue_size, string identity, bool blocking) : MMSThread(identity) {
00039     // unset id of the server thread
00040     this->server_tid = 0;
00041 
00042     // allocate and setup the queue
00043     if (queue_size < 100)
00044         this->queue_size = 100;
00045     else
00046         this->queue_size = queue_size;
00047     this->queue = (MMSTS_QUEUE_ITEM**)malloc(this->queue_size * sizeof(MMSTS_QUEUE_ITEM*));
00048     memset(this->queue, 0, this->queue_size * sizeof(MMSTS_QUEUE_ITEM*));
00049     this->queue_rp = 0;
00050     this->queue_wp = 0;
00051     this->buffer_full = false;
00052 
00053     // in non-blocking mode the caller of trigger() will get control directly after triggering
00054     // and do not wait until server has finished processData()
00055     this->blocking = blocking;
00056 
00057     // init the mutex and cond variable for the server thread
00058     pthread_mutex_init(&this->mutex, NULL);
00059     pthread_cond_init(&this->cond, NULL);
00060 }
00061 
00062 MMSThreadServer::~MMSThreadServer() {
00063     pthread_mutex_unlock(&this->mutex);
00064     pthread_cond_destroy(&this->cond);
00065     pthread_mutex_destroy(&this->mutex);
00066     free(this->queue);
00067 }
00068 
00069 bool MMSThreadServer::start() {
00070     // lock the server mutex
00071     pthread_mutex_lock(&this->mutex);
00072 
00073     // start the thread
00074     return MMSThread::start();
00075 }
00076 
00077 void MMSThreadServer::threadMain() {
00078     // get id of the server thread
00079     this->server_tid = pthread_self();
00080 
00081     // server loop
00082     while (1) {
00083         if (pthread_cond_wait(&this->cond, &this->mutex) == 0) {
00084             // signal from a client thread received
00085             while (this->queue_rp != this->queue_wp) {
00086                 // processing next item in the queue
00087                 MMSTS_QUEUE_ITEM *item = this->queue[this->queue_rp];
00088                 if (item) {
00089                     if (this->blocking) {
00090                         // blocking mode, now calling the worker routine for data processing
00091                         processData(item->in_data, item->in_data_len, item->out_data, item->out_data_len);
00092 
00093                         // handshake with the client thread
00094                         pthread_mutex_lock(&item->mutex);
00095                         pthread_cond_signal(&item->cond);
00096                         pthread_mutex_unlock(&item->mutex);
00097                     }
00098                     else {
00099                         // non-blocking mode, processing data in parallel to the caller thread of trigger()
00100                         void *in_data       = item->in_data;
00101                         int in_data_len     = item->in_data_len;
00102                         void **out_data     = item->out_data;
00103                         int *out_data_len   = item->out_data_len;
00104 
00105                         // handshake with the client thread
00106                         pthread_mutex_lock(&item->mutex);
00107                         pthread_cond_signal(&item->cond);
00108                         pthread_mutex_unlock(&item->mutex);
00109 
00110                         // now calling the worker routine for data processing
00111                         processData(in_data, in_data_len, out_data, out_data_len);
00112                     }
00113                 }
00114 
00115                 // remove item from queue
00116                 this->queue[this->queue_rp] = NULL;
00117                 if (this->queue_rp + 1 < this->queue_size)
00118                     this->queue_rp++;
00119                 else
00120                     this->queue_rp = 0;
00121                 this->buffer_full = false;
00122             }
00123         }
00124     }
00125 }
00126 
00127 void MMSThreadServer::processData(void *in_data, int in_data_len, void **out_data, int *out_data_len) {
00128     this->onProcessData.emit(in_data, in_data_len, out_data, out_data_len);
00129 }
00130 
00131 bool MMSThreadServer::trigger(void *in_data, int in_data_len, void **out_data, int *out_data_len) {
00132     if (this->server_tid == pthread_self()) {
00133         // caution: server thread will trigger itself!!!
00134         // we calling the worker routine for data processing directly without locking
00135         processData(in_data, in_data_len, out_data, out_data_len);
00136         return true;
00137     }
00138 
00139     // create new queue item and put data to it
00140     MMSTS_QUEUE_ITEM item;
00141     item.in_data        = in_data;
00142     item.in_data_len    = in_data_len;
00143     item.out_data       = out_data;
00144     item.out_data_len   = out_data_len;
00145 
00146     // init queue item's conditional variables
00147     pthread_mutex_init(&item.mutex, NULL);
00148     pthread_cond_init(&item.cond, NULL);
00149     pthread_mutex_lock(&item.mutex);
00150 
00151     // lock the server mutex and push the new item at the end of the queue
00152     pthread_mutex_lock(&this->mutex);
00153     this->queue[this->queue_wp] = &item;
00154     this->queue_wp++;
00155     if (this->queue_wp >= this->queue_size)
00156         this->queue_wp = 0;
00157 
00158     // check if the queue is full now
00159     if (this->queue_rp == this->queue_wp) {
00160         // yes, the ring buffer is full, no next request can be inserted
00161         this->buffer_full = true;
00162         printf("%s - ring buffer is full!\n", this->identity.c_str());
00163         while (this->buffer_full) usleep(10000);
00164     }
00165 
00166     // send signal to the server and unlock the server mutex
00167     pthread_cond_signal(&this->cond);
00168     pthread_mutex_unlock(&this->mutex);
00169 
00170     // waiting for the answer from the server
00171     pthread_cond_wait(&item.cond, &item.mutex);
00172 
00173     // freeing temporary conditional variables
00174     pthread_mutex_unlock(&item.mutex);
00175     pthread_cond_destroy(&item.cond);
00176     pthread_mutex_destroy(&item.mutex);
00177 
00178     return true;
00179 }
00180 

Generated by doxygen