软件世界网 购物 网址 三丰软件 | 小说 美女秀 图库大全 游戏 笑话 | 下载 开发知识库 新闻 开发 图片素材
多播视频美女直播
↓电视,电影,美女直播,迅雷资源↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
移动开发 架构设计 编程语言 Web前端 互联网
开发杂谈 系统运维 研发管理 数据库 云计算 Android开发资料
  软件世界网 -> 架构设计 -> Libevent源码分析(六) -> 正文阅读

[架构设计]Libevent源码分析(六)


上一节说过,libevent提供六种bufferevent类型,后面会详细分析其中的两个:bufferevent_sock和bufferevent_async.下面是bufferevent的详细定义:
struct bufferevent {
    /** Event base for which this bufferevent was created. */
    struct event_base *ev_base;
    /** Pointer to a table of function pointers to set up how this
        bufferevent behaves. */
    const struct bufferevent_ops *be_ops;

    /** A read event that triggers when a timeout has happened or a socket
        is ready to read data.  Only used by some subtypes of
        bufferevent. */
    struct event ev_read;
    /** A write event that triggers when a timeout has happened or a socket
        is ready to write data.  Only used by some subtypes of
        bufferevent. */
    struct event ev_write;

    /** An input buffer. Only the bufferevent is allowed to add data to
        this buffer, though the user is allowed to drain it. */
    struct evbuffer *input;

    /** An input buffer. Only the bufferevent is allowed to drain data
        from this buffer, though the user is allowed to add it. */
    struct evbuffer *output;

    struct event_watermark wm_read;
    struct event_watermark wm_write;

    bufferevent_data_cb readcb;
    bufferevent_data_cb writecb;
    /* This should be called 'eventcb', but renaming it would break
     * backward compatibility */
    bufferevent_event_cb errorcb;
    void *cbarg;

    struct timeval timeout_read;
    struct timeval timeout_write;

    /** Events that are currently enabled: currently EV_READ and EV_WRITE
        are supported. */
    short enabled;
};

其中ev_base指向bufferevent所属的event_base ,ev_read,ev_write是读写事件,timeout_read和timeout_write是读写超时时间,readcb,writecb和errorcb分别是读回调,写回调和事件回调,因为版本兼容问题使用errorcb命名。input和output是evbuffer类型的读写缓存。enabled表示支持的事件,目前只有EV_READ 和EV_WRITE。wm_read和wm_write代表读写缓存的伐值,be_ops是bufferevent_ops类型变量,它的定义如下:
struct bufferevent_ops {
    /** The name of the bufferevent's type. */
    const char *type;
    /** At what offset into the implementation type will we find a
        bufferevent structure?

        Example: if the type is implemented as
        struct bufferevent_x {
           int extra_data;
           struct bufferevent bev;
        }
        then mem_offset should be offsetof(struct bufferevent_x, bev)
    */
    off_t mem_offset;

    /** Enables one or more of EV_READ|EV_WRITE on a bufferevent.  Does
        not need to adjust the 'enabled' field.  Returns 0 on success, -1
        on failure.
     */
    int (*enable)(struct bufferevent *, short);

    /** Disables one or more of EV_READ|EV_WRITE on a bufferevent.  Does
        not need to adjust the 'enabled' field.  Returns 0 on success, -1
        on failure.
     */
    int (*disable)(struct bufferevent *, short);

    /** Free any storage and deallocate any extra data or structures used
        in this implementation.
     */
    void (*destruct)(struct bufferevent *);

    /** Called when the timeouts on the bufferevent have changed.*/
    int (*adj_timeouts)(struct bufferevent *);

    /** Called to flush data. */
    int (*flush)(struct bufferevent *, short, enum bufferevent_flush_mode);

    /** Called to access miscellaneous fields. */
    int (*ctrl)(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);

};

bufferevent_ops 的实现和eventop比较类似,都是定义了一组指针,不同的bufferevent实现可以自定义这些具体操作。比如bufferevent_sock的对应定义:
const struct bufferevent_ops bufferevent_ops_socket = {
    "socket",
    evutil_offsetof(struct bufferevent_private, bev),
    be_socket_enable,
    be_socket_disable,
    be_socket_destruct,
    be_socket_adj_timeouts,
    be_socket_flush,
    be_socket_ctrl,
};

接下来分析一下buffevent_sock的实现方式。
struct bufferevent *
bufferevent_new(evutil_socket_t fd,
    bufferevent_data_cb readcb, bufferevent_data_cb writecb,
    bufferevent_event_cb eventcb, void *cbarg)
{
    struct bufferevent *bufev;

    if (!(bufev = bufferevent_socket_new(NULL, fd, 0)))
        return NULL;

    bufferevent_setcb(bufev, readcb, writecb, eventcb, cbarg);

    return bufev;
}

struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
    int options)
{
    struct bufferevent_private *bufev_p;
    struct bufferevent *bufev;

#ifdef WIN32
    if (base && event_base_get_iocp(base))
        return bufferevent_async_new(base, fd, options);
#endif

    if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
        return NULL;

    if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
                    options) < 0) {
        mm_free(bufev_p);
        return NULL;
    }
    bufev = &bufev_p->bev;
    evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);

    event_assign(&bufev->ev_read, bufev->ev_base, fd,
        EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
    event_assign(&bufev->ev_write, bufev->ev_base, fd,
        EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);

    evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

    evbuffer_freeze(bufev->input, 0);
    evbuffer_freeze(bufev->output, 1);

    return bufev;
}

bufferevent_socket_new方法用于创建一个bufferevent_socket类型的bufferevent。如果是在win32环境下则会默认调用iocp的对应实现,即bufferevent_async_new。该方法还会创建一个bufferevent_private变量,下面是他的定义:
struct bufferevent_private {
    /** The underlying bufferevent structure. */
    struct bufferevent bev;

    /** Evbuffer callback to enforce watermarks on input. */
    struct evbuffer_cb_entry *read_watermarks_cb;

    /** If set, we should free the lock when we free the bufferevent. */
    unsigned own_lock : 1;
    /** Flag: set if we have deferred callbacks and a read callback is
     * pending. */
    unsigned readcb_pending : 1;
    /** Flag: set if we have deferred callbacks and a write callback is
     * pending. */
    unsigned writecb_pending : 1;
    /** Flag: set if we are currently busy connecting. */
    unsigned connecting : 1;
    /** Flag: set if a connect failed prematurely; this is a hack for
     * getting around the bufferevent abstraction. */
    unsigned connection_refused : 1;
    /** Set to the events pending if we have deferred callbacks and
     * an events callback is pending. */
    short eventcb_pending;

    /** If set, read is suspended until one or more conditions are over.
     * The actual value here is a bitfield of those conditions; see the
     * BEV_SUSPEND_* flags above. */
    bufferevent_suspend_flags read_suspended;

    /** If set, writing is suspended until one or more conditions are over.
     * The actual value here is a bitfield of those conditions; see the
     * BEV_SUSPEND_* flags above. */
    bufferevent_suspend_flags write_suspended;

    /** Set to the current socket errno if we have deferred callbacks and
     * an events callback is pending. */
    int errno_pending;

    /** The DNS error code for bufferevent_socket_connect_hostname */
    int dns_error;

    /** Used to implement deferred callbacks */
    struct deferred_cb deferred;

    /** The options this bufferevent was constructed with */
    enum bufferevent_options options;

    /** Current reference count for this bufferevent. */
    int refcnt;

    /** Lock for this bufferevent.  Shared by the inbuf and the outbuf.
     * If NULL, locking is disabled. */
    void *lock;

    /** Rate-limiting information for this bufferevent */
    struct bufferevent_rate_limit *rate_limiting;
};

bufferevent_private结构的第一个变量是bufferevent类型的,其实每一个bufferevent都对应一个bufferevent_private变量,只是对用户来说是透明的,用户只需要了解bufferevent即可。bufferevent_private的变量基本都是用于记录状态。
继续分析bufferevent_socket_new函数,创建bufferevent_private之后调用bufferevent_init_common函数,这个函数定义在bufferevent.c文件中,该文件中的函数都是各个bufferevent类型通用的函数。
int bufferevent_init_common(struct bufferevent_private *bufev_private,
    struct event_base *base,
    const struct bufferevent_ops *ops,
    enum bufferevent_options options)
{
    struct bufferevent *bufev = &bufev_private->bev;

    if (!bufev->input) {
        if ((bufev->input = evbuffer_new()) == NULL)
            return -1;
    }

    if (!bufev->output) {
        if ((bufev->output = evbuffer_new()) == NULL) {
            evbuffer_free(bufev->input);
            return -1;
        }
    }

    bufev_private->refcnt = 1;
    bufev->ev_base = base;

    /* Disable timeouts. */
    evutil_timerclear(&bufev->timeout_read);
    evutil_timerclear(&bufev->timeout_write);

    bufev->be_ops = ops;

    /*
     * Set to EV_WRITE so that using bufferevent_write is going to
     * trigger a callback.  Reading needs to be explicitly enabled
     * because otherwise no data will be available.
     */
    bufev->enabled = EV_WRITE;

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
    if (options & BEV_OPT_THREADSAFE) {
        if (bufferevent_enable_locking(bufev, NULL) < 0) {
            /* cleanup */
            evbuffer_free(bufev->input);
            evbuffer_free(bufev->output);
            bufev->input = NULL;
            bufev->output = NULL;
            return -1;
        }
    }
#endif
    if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
        == BEV_OPT_UNLOCK_CALLBACKS) {
        event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
        return -1;
    }
    if (options & BEV_OPT_DEFER_CALLBACKS) {
        if (options & BEV_OPT_UNLOCK_CALLBACKS)
            event_deferred_cb_init(&bufev_private->deferred,
                bufferevent_run_deferred_callbacks_unlocked,
                bufev_private);
        else
            event_deferred_cb_init(&bufev_private->deferred,
                bufferevent_run_deferred_callbacks_locked,
                bufev_private);
    }

    bufev_private->options = options;

    evbuffer_set_parent(bufev->input, bufev);
    evbuffer_set_parent(bufev->output, bufev);

    return 0;
}

bufferevent_init_common比较简单,需要注意 bufferEvent默认开启EV_WRITE,EV_READ需要手动打开,如果需要支持多线程则为bufferevent创建锁,另外在目前的版本中,UNLOCK_CALLBACKS需要DEFER_CALLBACKS作为前置条件。
继续分析bufferevent_socket_new函数:
    evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
    event_assign(&bufev->ev_read, bufev->ev_base, fd,
        EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
    event_assign(&bufev->ev_write, bufev->ev_base, fd,
        EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);

    evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

调用bufferevent_init_common之后会将bufev->output添加EVBUFFER_FLAG_DRAINS_TO_FD标记,这样output就支持从文件读取数据了,接下来设置读写事件回掉,最后添加bufferevent_socket_outbuf_cb作为output的call,当output中有数据时调用bufferevent_socket_outbuf_cb,这样就可以不必一直监听写事件,bufferevent_writecb在数据全部写出之后可以移除写事件, 然后在bufferevent_socket_outbuf_cb中判断是否需要添加事件。
初始化bufferevent之后需要调用be_socket_setfd设置网络套接字来监听读写事件:
static void
be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd)
{
    BEV_LOCK(bufev);
    EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket);

    event_del(&bufev->ev_read);
    event_del(&bufev->ev_write);

    event_assign(&bufev->ev_read, bufev->ev_base, fd,
        EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
    event_assign(&bufev->ev_write, bufev->ev_base, fd,
        EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);

    if (fd >= 0)
        bufferevent_enable(bufev, bufev->enabled);

    BEV_UNLOCK(bufev);
}
int bufferevent_enable(struct bufferevent *bufev, short event)
{
    struct bufferevent_private *bufev_private =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    short impl_events = event;
    int r = 0;

    _bufferevent_incref_and_lock(bufev);
    if (bufev_private->read_suspended)
        impl_events &= ~EV_READ;
    if (bufev_private->write_suspended)
        impl_events &= ~EV_WRITE;

    bufev->enabled |= event;

    if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
        r = -1;

    _bufferevent_decref_and_unlock(bufev);
    return r;
}

be_socket_setfd中设置的套接字必须是已经建立好连接的socket。如果是client端,bufferevent_sock提供了bufferevent_socket_connect用于连接套接字,但是如果是server端,libevent没有提供listen的相关函数,不过libevent提供了listener.c文件可以用于监听连接。
int
bufferevent_socket_connect(struct bufferevent *bev,
    struct sockaddr *sa, int socklen)
{
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bev, struct bufferevent_private, bev);

    evutil_socket_t fd;
    int r = 0;
    int result=-1;
    int ownfd = 0;

    _bufferevent_incref_and_lock(bev);

    if (!bufev_p)
        goto done;

    fd = bufferevent_getfd(bev);
    if (fd < 0) {
        if (!sa)
            goto done;
        fd = socket(sa->sa_family, SOCK_STREAM, 0);
        if (fd < 0)
            goto done;
        if (evutil_make_socket_nonblocking(fd)<0)
            goto done;
        ownfd = 1;
    }
    if (sa) {
#ifdef WIN32
        if (bufferevent_async_can_connect(bev)) {
            bufferevent_setfd(bev, fd);
            r = bufferevent_async_connect(bev, fd, sa, socklen);
            if (r < 0)
                goto freesock;
            bufev_p->connecting = 1;
            result = 0;
            goto done;
        } else
#endif
        r = evutil_socket_connect(&fd, sa, socklen);
        if (r < 0)
            goto freesock;
    }
#ifdef WIN32
    /* ConnectEx() isn't always around, even when IOCP is enabled.
     * Here, we borrow the socket object's write handler to fall back
     * on a non-blocking connect() when ConnectEx() is unavailable. */
    if (BEV_IS_ASYNC(bev)) {
        event_assign(&bev->ev_write, bev->ev_base, fd,
            EV_WRITE|EV_PERSIST, bufferevent_writecb, bev);
    }
#endif
    bufferevent_setfd(bev, fd);
    if (r == 0) {
        if (! be_socket_enable(bev, EV_WRITE)) {
            bufev_p->connecting = 1;
            result = 0;
            goto done;
        }
    } else if (r == 1) {
        /* The connect succeeded already. How very BSD of it. */
        result = 0;
        bufev_p->connecting = 1;
        event_active(&bev->ev_write, EV_WRITE, 1);
    } else {
        /* The connect failed already.  How very BSD of it. */
        bufev_p->connection_refused = 1;
        bufev_p->connecting = 1;
        result = 0;
        event_active(&bev->ev_write, EV_WRITE, 1);
    }

    goto done;

freesock:
    _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
    if (ownfd)
        evutil_closesocket(fd);
    /* do something about the error? */
done:
    _bufferevent_decref_and_unlock(bev);
    return result;
}

win32的connect使用iocp实现,将在下一节iocp章节详细分析,这里只看其他平台的实现方式。evutil_socket_connect是libevent提供的工具函数,它的返回值有三种情况:0代表正在连接,此时需要调用be_socket_enable激活写事件。当连接成功时会调用bufferevent_writecb回掉。1代表已经连接成功,此时调用event_active直接手动触发bufferevent_writecb事件处理连接。-1代表连接被拒绝,此时同样调用event_active处理连接失败情况。
下面是bufferevent_writecb的具体实现:
static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
{
    struct bufferevent *bufev = arg;
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    int res = 0;
    short what = BEV_EVENT_WRITING;
    int connected = 0;
    ev_ssize_t atmost = -1;

    _bufferevent_incref_and_lock(bufev);

    if (event == EV_TIMEOUT) {
        /* Note that we only check for event==EV_TIMEOUT. If
         * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
         * timeout, since a read has occurred */
        what |= BEV_EVENT_TIMEOUT;
        goto error;
    }
    if (bufev_p->connecting) {
        int c = evutil_socket_finished_connecting(fd);
        /* we need to fake the error if the connection was refused
         * immediately - usually connection to localhost on BSD */
        if (bufev_p->connection_refused) {
          bufev_p->connection_refused = 0;
          c = -1;
        }

        if (c == 0)
            goto done;

        bufev_p->connecting = 0;
        if (c < 0) {
            event_del(&bufev->ev_write);
            event_del(&bufev->ev_read);
            _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
            goto done;
        } else {
            connected = 1;
#ifdef WIN32
            if (BEV_IS_ASYNC(bufev)) {
                event_del(&bufev->ev_write);
                bufferevent_async_set_connected(bufev);
                _bufferevent_run_eventcb(bufev,
                        BEV_EVENT_CONNECTED);
                goto done;
            }
#endif
            _bufferevent_run_eventcb(bufev,
                    BEV_EVENT_CONNECTED);
            if (!(bufev->enabled & EV_WRITE) ||
                bufev_p->write_suspended) {
                event_del(&bufev->ev_write);
                goto done;
            }
        }
    }

    atmost = _bufferevent_get_write_max(bufev_p);

    if (bufev_p->write_suspended)
        goto done;

    if (evbuffer_get_length(bufev->output)) {
        evbuffer_unfreeze(bufev->output, 1);
        res = evbuffer_write_atmost(bufev->output, fd, atmost);
        evbuffer_freeze(bufev->output, 1);
        if (res == -1) {
            int err = evutil_socket_geterror(fd);
            if (EVUTIL_ERR_RW_RETRIABLE(err))
                goto reschedule;
            what |= BEV_EVENT_ERROR;
        } else if (res == 0) {
            /* eof case
               XXXX Actually, a 0 on write doesn't indicate
               an EOF. An ECONNRESET might be more typical.
             */
            what |= BEV_EVENT_EOF;
        }
        if (res <= 0)
            goto error;

        _bufferevent_decrement_write_buckets(bufev_p, res);
    }

    if (evbuffer_get_length(bufev->output) == 0) {
        event_del(&bufev->ev_write);
    }

    /*
     * Invoke the user callback if our buffer is drained or below the
     * low watermark.
     */
    if ((res || !connected) &&
        evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
        _bufferevent_run_writecb(bufev);
    }

    goto done;

 reschedule:
    if (evbuffer_get_length(bufev->output) == 0) {
        event_del(&bufev->ev_write);
    }
    goto done;

 error:
    bufferevent_disable(bufev, EV_WRITE);
    _bufferevent_run_eventcb(bufev, what);

 done:
    _bufferevent_decref_and_unlock(bufev);
}

bufferevent_writecb需要处理两件事,一个是连接逻辑,一个是发送数据逻辑,注意当处理连接逻辑时,如果当前没有激活EV_WRITE或者处于write_suspended状态,需要删除写监听,因为当前的写监听事件只用于处理连接。另外当处理数据逻辑时如果output的数据长度为0,同样需要删除写事件,因为output的callback在有数据之后会重新添加写事件:
static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf,
    const struct evbuffer_cb_info *cbinfo,
    void *arg)
{
    struct bufferevent *bufev = arg;
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);

    if (cbinfo->n_added &&
        (bufev->enabled & EV_WRITE) &&
        !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
        !bufev_p->write_suspended) {
        /* Somebody added data to the buffer, and we would like to
         * write, and we were not writing.  So, start writing. */
        if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
            /* Should we log this? */
        }
    }
}

bufferevent_writecb和bufferevent_readcb还需要调用用户自定义的读写事件:
void
_bufferevent_run_writecb(struct bufferevent *bufev)
{
    /* Requires that we hold the lock and a reference */
    struct bufferevent_private *p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    if (bufev->writecb == NULL)
        return;
    if (p->options & BEV_OPT_DEFER_CALLBACKS) {
        p->writecb_pending = 1;
        if (!p->deferred.queued)
            SCHEDULE_DEFERRED(p);
    } else {
        bufev->writecb(bufev, bufev->cbarg);
    }
}
#define SCHEDULE_DEFERRED(bevp)                     \
    do {                                \
        bufferevent_incref(&(bevp)->bev);           \
        event_deferred_cb_schedule(             \
            event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \
            &(bevp)->deferred);             \
    } while (0)

如果bufferevent的选项包含BEV_OPT_DEFER_CALLBACKS则需要使用延迟调用方式,延迟调用是在event_base的loop循环统一处理的。p->deferred.queued如果没有被设置过,则加入到ev_base的延迟调用队列。bufferevent_init_common中设置过defered对应的回掉:
    if (options & BEV_OPT_DEFER_CALLBACKS) {
        if (options & BEV_OPT_UNLOCK_CALLBACKS)
            event_deferred_cb_init(&bufev_private->deferred,
                bufferevent_run_deferred_callbacks_unlocked,
                bufev_private);
        else
            event_deferred_cb_init(&bufev_private->deferred,
                bufferevent_run_deferred_callbacks_locked,
                bufev_private);
    }

这两个回掉只是使用锁的方式不同:
static void
bufferevent_run_deferred_callbacks_locked(struct deferred_cb *_, void *arg)
{
    struct bufferevent_private *bufev_private = arg;
    struct bufferevent *bufev = &bufev_private->bev;

    BEV_LOCK(bufev);
    if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
        bufev->errorcb) {
        /* The "connected" happened before any reads or writes, so
           send it first. */
        bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
        bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg);
    }
    if (bufev_private->readcb_pending && bufev->readcb) {
        bufev_private->readcb_pending = 0;
        bufev->readcb(bufev, bufev->cbarg);
    }
    if (bufev_private->writecb_pending && bufev->writecb) {
        bufev_private->writecb_pending = 0;
        bufev->writecb(bufev, bufev->cbarg);
    }
    if (bufev_private->eventcb_pending && bufev->errorcb) {
        short what = bufev_private->eventcb_pending;
        int err = bufev_private->errno_pending;
        bufev_private->eventcb_pending = 0;
        bufev_private->errno_pending = 0;
        EVUTIL_SET_SOCKET_ERROR(err);
        bufev->errorcb(bufev, what, bufev->cbarg);
    }
    _bufferevent_decref_and_unlock(bufev);
}

static void
bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *_, void *arg)
{
    struct bufferevent_private *bufev_private = arg;
    struct bufferevent *bufev = &bufev_private->bev;

    BEV_LOCK(bufev);
#define UNLOCKED(stmt) \
    do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0)

    if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
        bufev->errorcb) {
        /* The "connected" happened before any reads or writes, so
           send it first. */
        bufferevent_event_cb errorcb = bufev->errorcb;
        void *cbarg = bufev->cbarg;
        bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
        UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg));
    }
    if (bufev_private->readcb_pending && bufev->readcb) {
        bufferevent_data_cb readcb = bufev->readcb;
        void *cbarg = bufev->cbarg;
        bufev_private->readcb_pending = 0;
        UNLOCKED(readcb(bufev, cbarg));
    }
    if (bufev_private->writecb_pending && bufev->writecb) {
        bufferevent_data_cb writecb = bufev->writecb;
        void *cbarg = bufev->cbarg;
        bufev_private->writecb_pending = 0;
        UNLOCKED(writecb(bufev, cbarg));
    }
    if (bufev_private->eventcb_pending && bufev->errorcb) {
        bufferevent_event_cb errorcb = bufev->errorcb;
        void *cbarg = bufev->cbarg;
        short what = bufev_private->eventcb_pending;
        int err = bufev_private->errno_pending;
        bufev_private->eventcb_pending = 0;
        bufev_private->errno_pending = 0;
        EVUTIL_SET_SOCKET_ERROR(err);
        UNLOCKED(errorcb(bufev,what,cbarg));
    }
    _bufferevent_decref_and_unlock(bufev);
#undef UNLOCKED
}

以上就是bufferevent_sock的实现方式,下一节我们将分析IOCP的实现方式。
......显示全文...
    点击查看全文


上一篇文章      下一篇文章      查看所有文章
2016-04-01 16:50:07  
架构设计 最新文章
Opengl教程之读取obj并绘制在picturecontro
读《企业应用架构模式》第五章并发
StepbyStepintoSpring(IOC)
设计模式(2)用例图之一
使用实体组件系统(ECS)开发”吃方块”游戏实
编程学习之简单工厂模式与策略模式
Invalidprojectdescription.
基于Redis实现分布式消息队列(2)
《开源框架那点事儿15》:借船下海还是造船
原型模式——浅复制和深复制
360图书馆 软件开发资料 文字转语音 购物精选 软件下载 美食菜谱 新闻资讯 电影视频 小游戏 Chinese Culture 股票 租车
生肖星座 三丰软件 视频 开发 短信 中国文化 网文精选 搜图网 美图 阅读网 多播 租车 短信 看图 日历 万年历 2018年1日历
2018-1-17 15:02:17
多播视频美女直播
↓电视,电影,美女直播,迅雷资源↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  软件世界网 --