source: nanovis/branches/1.2/ResponseQueue.cpp @ 5406

Last change on this file since 5406 was 4937, checked in by ldelgass, 9 years ago

merge threading

  • Property svn:eol-style set to native
File size: 3.2 KB
Line 
1/* -*- mode: c++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 * Copyright (C) 2004-2012  HUBzero Foundation, LLC
4 *
5 * Author: George A. Howlett <gah@purdue.edu>
6 */
7
8#include <pthread.h>
9#include <semaphore.h>
10#include <cerrno>
11#include <cstring>
12#include <cstdlib>
13#include <list>
14
15#include "Trace.h"
16#include "ResponseQueue.h"
17
18using namespace nv;
19
20ResponseQueue::ResponseQueue()
21{
22    pthread_mutex_init(&_idle, NULL);
23    if (sem_init(&_ready, 0, 0) < 0) {
24        ERROR("can't initialize semaphore: %s", strerror(errno));
25    }
26}
27
28ResponseQueue::~ResponseQueue()
29{
30    TRACE("Deleting ResponseQueue");
31    pthread_mutex_destroy(&_idle);
32    if (sem_destroy(&_ready) < 0) {
33        ERROR("can't destroy semaphore: %s", strerror(errno));
34    }
35    for (std::list<Response *>::iterator itr = _list.begin();
36         itr != _list.end(); ++itr) {
37        delete *itr;
38    }
39    _list.clear();
40}
41
42void
43ResponseQueue::enqueue(Response *response)
44{
45    if (pthread_mutex_lock(&_idle) != 0) {
46        ERROR("can't lock mutex: %s", strerror(errno));
47    }   
48    /* Examine the list and remove any queued responses of the same type. */
49    TRACE("Before # of elements is %d", _list.size());
50    for (std::list<Response *>::iterator itr = _list.begin();
51         itr != _list.end();) {
52        /* Remove any duplicate image or legend responses. There should be no
53         * more than one.  Note that if the client starts using multiple legends
54         * for different fields, this optimization should be disabled for legends.
55         * We may also need to differentiate between screen images and "hardcopy"
56         * images.
57         */
58        if ((response->type() == Response::IMAGE ||
59             response->type() == Response::LEGEND) &&
60            (*itr)->type() == response->type()) {
61            TRACE("Removing duplicate response of type: %d", (*itr)->type());
62            delete *itr;
63            itr = _list.erase(itr);
64        } else {
65            TRACE("Found queued response of type %d", (*itr)->type());
66            ++itr;
67        }
68    }
69    /* Add the new response to the end of the list. */
70    _list.push_back(response);
71    TRACE("After # of elements is %d", _list.size());
72    if (sem_post(&_ready) < 0) {
73        ERROR("can't post semaphore: %s", strerror(errno));
74    }
75    if (pthread_mutex_unlock(&_idle) != 0) {
76        ERROR("can't unlock mutex: %s", strerror(errno));
77    }   
78}
79
80Response *
81ResponseQueue::dequeue()
82{
83    Response *response = NULL;
84
85    int ret;
86    while ((ret = sem_wait(&_ready)) < 0 && errno == EINTR)
87        continue;
88    if (ret < 0) {
89        ERROR("can't wait on semaphore: %s", strerror(errno));
90    }
91    if (pthread_mutex_lock(&_idle) != 0) {
92        ERROR("can't lock mutex: %s", strerror(errno));
93    }
94    /* List may be empty if semaphore value was incremented beyond list
95     * size in the case of an enqueue operation deleting from the queue
96     * This is not an error, so just unlock and return to wait loop
97     */
98    if (_list.empty()) {
99        TRACE("Empty queue");
100    } else {
101        response = _list.front();
102        _list.pop_front();
103        TRACE("Dequeued response of type %d", response->type());
104    }
105    if (pthread_mutex_unlock(&_idle) != 0) {
106        ERROR("can't unlock mutex: %s", strerror(errno));
107    }   
108    return response;
109}
Note: See TracBrowser for help on using the repository browser.