00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include "mmstools/mmsthreadserver.h"
00034 #include <cstdlib>
00035 #include <cstdio>
00036 #include <string.h>
00037
00038 MMSThreadServer::MMSThreadServer(int queue_size, string identity, bool blocking) : MMSThread(identity) {
00039
00040 this->server_tid = 0;
00041
00042
00043 if (queue_size < 100)
00044 this->queue_size = 100;
00045 else
00046 this->queue_size = queue_size;
00047 this->queue = (MMSTS_QUEUE_ITEM**)malloc(this->queue_size * sizeof(MMSTS_QUEUE_ITEM*));
00048 memset(this->queue, 0, this->queue_size * sizeof(MMSTS_QUEUE_ITEM*));
00049 this->queue_rp = 0;
00050 this->queue_wp = 0;
00051 this->buffer_full = false;
00052
00053
00054
00055 this->blocking = blocking;
00056
00057
00058 pthread_mutex_init(&this->mutex, NULL);
00059 pthread_cond_init(&this->cond, NULL);
00060 }
00061
00062 MMSThreadServer::~MMSThreadServer() {
00063 pthread_mutex_unlock(&this->mutex);
00064 pthread_cond_destroy(&this->cond);
00065 pthread_mutex_destroy(&this->mutex);
00066 free(this->queue);
00067 }
00068
00069 bool MMSThreadServer::start() {
00070
00071 pthread_mutex_lock(&this->mutex);
00072
00073
00074 return MMSThread::start();
00075 }
00076
00077 void MMSThreadServer::threadMain() {
00078
00079 this->server_tid = pthread_self();
00080
00081
00082 while (1) {
00083 if (pthread_cond_wait(&this->cond, &this->mutex) == 0) {
00084
00085 while (this->queue_rp != this->queue_wp) {
00086
00087 MMSTS_QUEUE_ITEM *item = this->queue[this->queue_rp];
00088 if (item) {
00089 if (this->blocking) {
00090
00091 processData(item->in_data, item->in_data_len, item->out_data, item->out_data_len);
00092
00093
00094 pthread_mutex_lock(&item->mutex);
00095 pthread_cond_signal(&item->cond);
00096 pthread_mutex_unlock(&item->mutex);
00097 }
00098 else {
00099
00100 void *in_data = item->in_data;
00101 int in_data_len = item->in_data_len;
00102 void **out_data = item->out_data;
00103 int *out_data_len = item->out_data_len;
00104
00105
00106 pthread_mutex_lock(&item->mutex);
00107 pthread_cond_signal(&item->cond);
00108 pthread_mutex_unlock(&item->mutex);
00109
00110
00111 processData(in_data, in_data_len, out_data, out_data_len);
00112 }
00113 }
00114
00115
00116 this->queue[this->queue_rp] = NULL;
00117 if (this->queue_rp + 1 < this->queue_size)
00118 this->queue_rp++;
00119 else
00120 this->queue_rp = 0;
00121 this->buffer_full = false;
00122 }
00123 }
00124 }
00125 }
00126
00127 void MMSThreadServer::processData(void *in_data, int in_data_len, void **out_data, int *out_data_len) {
00128 this->onProcessData.emit(in_data, in_data_len, out_data, out_data_len);
00129 }
00130
00131 bool MMSThreadServer::trigger(void *in_data, int in_data_len, void **out_data, int *out_data_len) {
00132 if (this->server_tid == pthread_self()) {
00133
00134
00135 processData(in_data, in_data_len, out_data, out_data_len);
00136 return true;
00137 }
00138
00139
00140 MMSTS_QUEUE_ITEM item;
00141 item.in_data = in_data;
00142 item.in_data_len = in_data_len;
00143 item.out_data = out_data;
00144 item.out_data_len = out_data_len;
00145
00146
00147 pthread_mutex_init(&item.mutex, NULL);
00148 pthread_cond_init(&item.cond, NULL);
00149 pthread_mutex_lock(&item.mutex);
00150
00151
00152 pthread_mutex_lock(&this->mutex);
00153 this->queue[this->queue_wp] = &item;
00154 this->queue_wp++;
00155 if (this->queue_wp >= this->queue_size)
00156 this->queue_wp = 0;
00157
00158
00159 if (this->queue_rp == this->queue_wp) {
00160
00161 this->buffer_full = true;
00162 printf("%s - ring buffer is full!\n", this->identity.c_str());
00163 while (this->buffer_full) usleep(10000);
00164 }
00165
00166
00167 pthread_cond_signal(&this->cond);
00168 pthread_mutex_unlock(&this->mutex);
00169
00170
00171 pthread_cond_wait(&item.cond, &item.mutex);
00172
00173
00174 pthread_mutex_unlock(&item.mutex);
00175 pthread_cond_destroy(&item.cond);
00176 pthread_mutex_destroy(&item.mutex);
00177
00178 return true;
00179 }
00180