Files correlati : Commento : Spostamento in libraries delle librerie esterne di Campo per una maggiore pulizia e organizzazione git-svn-id: svn://10.65.10.50/branches/R_10_00@24150 c028cbd2-c16b-5b4b-a496-9718f37d4682
		
			
				
	
	
		
			192 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			192 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
    Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
 | 
						|
 | 
						|
    This file is part of 0MQ.
 | 
						|
 | 
						|
    0MQ is free software; you can redistribute it and/or modify it under
 | 
						|
    the terms of the GNU Lesser General Public License as published by
 | 
						|
    the Free Software Foundation; either version 3 of the License, or
 | 
						|
    (at your option) any later version.
 | 
						|
 | 
						|
    0MQ is distributed in the hope that it will be useful,
 | 
						|
    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
    GNU Lesser General Public License for more details.
 | 
						|
 | 
						|
    You should have received a copy of the GNU Lesser General Public License
 | 
						|
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
						|
*/
 | 
						|
 | 
						|
#include <string.h>
 | 
						|
 | 
						|
#include "xpub.hpp"
 | 
						|
#include "pipe.hpp"
 | 
						|
#include "err.hpp"
 | 
						|
#include "msg.hpp"
 | 
						|
 | 
						|
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
 | 
						|
    socket_base_t (parent_, tid_, sid_),
 | 
						|
    verbose(false),
 | 
						|
    more (false)
 | 
						|
{
 | 
						|
    options.type = ZMQ_XPUB;
 | 
						|
}
 | 
						|
 | 
						|
zmq::xpub_t::~xpub_t ()
 | 
						|
{
 | 
						|
}
 | 
						|
 | 
						|
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
 | 
						|
{
 | 
						|
    zmq_assert (pipe_);
 | 
						|
    dist.attach (pipe_);
 | 
						|
 | 
						|
    //  If subscribe_to_all_ is specified, the caller would like to subscribe
 | 
						|
    //  to all data on this pipe, implicitly.
 | 
						|
    if (subscribe_to_all_)
 | 
						|
        subscriptions.add (NULL, 0, pipe_);
 | 
						|
 | 
						|
    //  The pipe is active when attached. Let's read the subscriptions from
 | 
						|
    //  it, if any.
 | 
						|
    xread_activated (pipe_);
 | 
						|
}
 | 
						|
 | 
						|
void zmq::xpub_t::xread_activated (pipe_t *pipe_)
 | 
						|
{
 | 
						|
    //  There are some subscriptions waiting. Let's process them.
 | 
						|
    msg_t sub;
 | 
						|
    while (pipe_->read (&sub)) {
 | 
						|
        //  Apply the subscription to the trie
 | 
						|
        unsigned char *const data = (unsigned char *) sub.data ();
 | 
						|
        const size_t size = sub.size ();
 | 
						|
        if (size > 0 && (*data == 0 || *data == 1)) {
 | 
						|
            bool unique;
 | 
						|
            if (*data == 0)
 | 
						|
                unique = subscriptions.rm (data + 1, size - 1, pipe_);
 | 
						|
            else
 | 
						|
                unique = subscriptions.add (data + 1, size - 1, pipe_);
 | 
						|
 | 
						|
            //  If the subscription is not a duplicate store it so that it can be
 | 
						|
            //  passed to used on next recv call. (Unsubscribe is not verbose.)
 | 
						|
            if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) {
 | 
						|
                pending_data.push_back (blob_t (data, size));
 | 
						|
                pending_flags.push_back (0);
 | 
						|
            }
 | 
						|
        }
 | 
						|
        else {
 | 
						|
            //  Process user message coming upstream from xsub socket
 | 
						|
            pending_data.push_back (blob_t (data, size));
 | 
						|
            pending_flags.push_back (sub.flags ());
 | 
						|
        }
 | 
						|
        sub.close ();
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
 | 
						|
{
 | 
						|
    dist.activated (pipe_);
 | 
						|
}
 | 
						|
 | 
						|
int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
 | 
						|
    size_t optvallen_)
 | 
						|
{
 | 
						|
    if (option_ != ZMQ_XPUB_VERBOSE) {
 | 
						|
        errno = EINVAL;
 | 
						|
        return -1;
 | 
						|
    }
 | 
						|
    if (optvallen_ != sizeof (int) || *static_cast <const int*> (optval_) < 0) {
 | 
						|
        errno = EINVAL;
 | 
						|
        return -1;
 | 
						|
    }
 | 
						|
    verbose = (*static_cast <const int*> (optval_) != 0);
 | 
						|
    return 0;
 | 
						|
}
 | 
						|
 | 
						|
void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
 | 
						|
{
 | 
						|
    //  Remove the pipe from the trie. If there are topics that nobody
 | 
						|
    //  is interested in anymore, send corresponding unsubscriptions
 | 
						|
    //  upstream.
 | 
						|
    subscriptions.rm (pipe_, send_unsubscription, this);
 | 
						|
 | 
						|
    dist.pipe_terminated (pipe_);
 | 
						|
}
 | 
						|
 | 
						|
void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
 | 
						|
{
 | 
						|
    xpub_t *self = (xpub_t*) arg_;
 | 
						|
    self->dist.match (pipe_);
 | 
						|
}
 | 
						|
 | 
						|
int zmq::xpub_t::xsend (msg_t *msg_)
 | 
						|
{
 | 
						|
    bool msg_more = msg_->flags () & msg_t::more ? true : false;
 | 
						|
 | 
						|
    //  For the first part of multi-part message, find the matching pipes.
 | 
						|
    if (!more)
 | 
						|
        subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
 | 
						|
            mark_as_matching, this);
 | 
						|
 | 
						|
    //  Send the message to all the pipes that were marked as matching
 | 
						|
    //  in the previous step.
 | 
						|
    int rc = dist.send_to_matching (msg_);
 | 
						|
    if (rc != 0)
 | 
						|
        return rc;
 | 
						|
 | 
						|
    //  If we are at the end of multi-part message we can mark all the pipes
 | 
						|
    //  as non-matching.
 | 
						|
    if (!msg_more)
 | 
						|
        dist.unmatch ();
 | 
						|
 | 
						|
    more = msg_more;
 | 
						|
 | 
						|
    return 0;
 | 
						|
}
 | 
						|
 | 
						|
bool zmq::xpub_t::xhas_out ()
 | 
						|
{
 | 
						|
    return dist.has_out ();
 | 
						|
}
 | 
						|
 | 
						|
int zmq::xpub_t::xrecv (msg_t *msg_)
 | 
						|
{
 | 
						|
    //  If there is at least one 
 | 
						|
    if (pending_data.empty ()) {
 | 
						|
        errno = EAGAIN;
 | 
						|
        return -1;
 | 
						|
    }
 | 
						|
 | 
						|
    int rc = msg_->close ();
 | 
						|
    errno_assert (rc == 0);
 | 
						|
    rc = msg_->init_size (pending_data.front ().size ());
 | 
						|
    errno_assert (rc == 0);
 | 
						|
    memcpy (msg_->data (),
 | 
						|
        pending_data.front ().data (),
 | 
						|
        pending_data.front ().size ());
 | 
						|
    msg_->set_flags (pending_flags.front ());
 | 
						|
    pending_data.pop_front ();
 | 
						|
    pending_flags.pop_front ();
 | 
						|
    return 0;
 | 
						|
}
 | 
						|
 | 
						|
bool zmq::xpub_t::xhas_in ()
 | 
						|
{
 | 
						|
    return !pending_data.empty ();
 | 
						|
}
 | 
						|
 | 
						|
void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
 | 
						|
    void *arg_)
 | 
						|
{
 | 
						|
    xpub_t *self = (xpub_t*) arg_;
 | 
						|
 | 
						|
    if (self->options.type != ZMQ_PUB) {
 | 
						|
        //  Place the unsubscription to the queue of pending (un)sunscriptions
 | 
						|
        //  to be retrived by the user later on.
 | 
						|
        blob_t unsub (size_ + 1, 0);
 | 
						|
        unsub [0] = 0;
 | 
						|
        memcpy (&unsub [1], data_, size_);
 | 
						|
        self->pending_data.push_back (unsub);
 | 
						|
        self->pending_flags.push_back (0);
 | 
						|
    }
 | 
						|
}
 |