队列相关(Queue)

发布时间:2022-06-26 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了队列相关(Queue)脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

Queue(队列)

1、函数介绍

Queue()函数底层通过app_queue.c中queue_exec函数实现。

static char *app = "Queue";
err |= ast_register_application_xml(app, queue_exec);
static int queue_exec(struct ast_channel *chan, const char *data);

该函数调用过程主要分成六个步骤:(如下图)

2、源码解析

1)函数声明:

/*
 * chan:主叫通道
 * data:设置队列属性的相关参数
 */
static int queue_exec(struct ast_channel *chan, const char *data);

2)解析:

0x1、解析调用Queue()时指定的选项。

/*
 * 将data通过','分割开解析至args
 */
char *parse;

AST_DECLARE_APP_ARGS(args,
	AST_APP_ARG(queuename);
	AST_APP_ARG(options);
	AST_APP_ARG(url);
	AST_APP_ARG(announceoverride);
	AST_APP_ARG(queuetimeoutstr);
	AST_APP_ARG(agi);
	AST_APP_ARG(macro);
	AST_APP_ARG(gosub);
	AST_APP_ARG(rule);
	AST_APP_ARG(position);
);

parse = ast_strdupa(data);
AST_STANDARD_APP_ARGS(args, parse);

/*
 * 将args.options对应queue_exec_options设置对应的标志位结构(opts)里,opt_args是对应的数据
 */
struct ast_flags opts = { 0, };
char *opt_args[OPT_ARG_ARRAY_SIZE];

if (!ast_strlen_zero(args.options)) {
	ast_app_parse_options(queue_exec_options, &opts, opt_args, args.options);
}

struct queue_ent qe = { 0 };
/* 设置开始时间 */
qe.start = time(NULL);
/* 设置超时时间 */
if (!ast_strlen_zero(args.queuetimeoutstr)) {
	qe.expire = qe.start + atoi(args.queuetimeoutstr);
} else {
	qe.expire = 0;
}

qe.chan = chan;	// 主叫通道
qe.prio = prio;	// 优先级
qe.max_penalty = max_penalty;	// 最大惩罚数
qe.min_penalty = min_penalty;	// 最小惩罚数
qe.raise_penalty = raise_penalty;	// 
qe.last_pos_said = 0;	// 我们告诉用户的最后一个位置
qe.last_pos = 0;	// 我们上次播放位置的时间
qe.last_periodic_announce_time = time(NULL);	// 我们上次播放定期公告的时间
qe.last_periodic_announce_sound = 0;	//
qe.valid_digits = 0;	// 输入的数字对应于有效的扩展名。退出
......

0x2.加入队列

enum queue_result reason = QUEUE_UNKNOWN;

if (args.position) {
	position = atoi(args.position);
	if (position < 0) {
		ast_log(LOG_WARNING, "Invalid position '%s' given for call to queue '%s'. Assuming no preference for positionn", args.position, args.queuename);
        position = 0;
	}
}

if (join_queue(args.queuename, &qe, &reason, position)) {
	ast_log(LOG_WARNING, "Unable to join queue '%s'n", args.queuename);
	set_queue_result(chan, reason);
	return 0;
}
ast_assert(qe.parent != NULL);
0x21.join_queue展开解析:
获取命名队列引用
if (!(q = find_load_queue_rt_friendly(queuename))) {
	return res;
}

/*
 * 返回对命名队列的引用,如果队列是实时的也会加载队列
 */
static struct call_queue *find_load_queue_rt_friendly(const char *queuename);

/* 内部拆解 */

/* Find the queue in the in-core list first. */
q = ao2_t_find(queues, &tmpq, OBJ_POINTER, "Look for queue in memory first");

/* 更新实时队列成员状态 */
update_realtime_members
判断进入队列为空的条件
if (q->joinempty) {
	int status = 0;
    /* 获取座席状态 */
	if ((status = get_member_status(q, qe->max_penalty, qe->min_penalty, qe->raise_penalty, q->joinempty, 0))) {
		*reason = QUEUE_JOINEMPTY;
		ao2_unlock(q);
		queue_t_unref(q, "Done with realtime queue");
		return res;
	}
}

/*
 * 检查是否存在活跃的成员
 * 存在返回0,没有成员属于活跃则返回-1
 * q:队列的引用
 * conditions:判断成员是否活跃的条件
 * devstate:判断成员状态的获取方式
 	1:ast_device_state(member->state_interface)
 	0:member->status
 */
static int get_member_status(struct call_queue *q, int max_penalty, int min_penalty, int raise_penalty, enum empty_conditions conditions, int devstate);

if (!devstate && (conditions & QUEUE_EMPTY_RINGING)) {
    /* member state still may be RINGING due to lag in event message - check again with device state */
    return get_member_status(q, max_penalty, min_penalty, raise_penalty, conditions, 1);
}
判断优先级
/*
 * 读取queue.conf的配置文件设置队列属性
 */
static void queue_set_param(struct call_queue *q, const char *param, const char *val, int linenum, int failunknown);

/*
 * 判断队列等待数是否超过最大值
 */
if (*reason == QUEUE_UNKNOWN && q->maxlen && (q->count >= q->maxlen)) {
	*reason = QUEUE_FULL;
/*
 * 优先级大于当前优先级,加入队列
 */
if ((!inserted) && (qe->prio > cur->prio)) {
	insert_entry(q, prev, qe, &pos);
	inserted = 1;
}

/*
 * 优先级大于等于当前优先级判断是否可以加入指定位置,加入队列
 */
if (!inserted && (qe->prio >= cur->prio) && position && (position <= pos + 1)) {
	insert_entry(q, prev, qe, &pos);
	inserted = 1;
	/*pos is incremented inside insert_entry, so don't need to add 1 here*/
	if (position < pos) {
		ast_log(LOG_NOTICE, "Asked to be inserted at position %d but forced into position %d due to higher priority callersn", position, pos);
	}
}

/*
 * 优先级最低,加入队尾
 */
/* No luck, join at the end of the queue */
if (!inserted) {
	insert_entry(q, prev, qe, &pos);
}
加入队列
/*
 * 在队列 'q' 的 'prev' 条目之后插入 'new' 条目
 */
static inline void insert_entry(struct call_queue *q, struct queue_ent *prev, struct queue_ent *new, int *pos);

new->parent = q;

0x3、循环等待,直到轮到我们尝试调用队列成员

int ringing=0;

if (ast_test_flag(&opts, OPT_RINGING)) {
	ringing = 1;
}

res = wait_our_turn(&qe, ringing, &reason);
if (res) {
	goto stop;
}

/*
 * 不主动呼叫成员的呼叫者的等候区
 * 该函数为大循环,
 * retval:
 	0:轮到来电
 	-1:退出队列
 */
static int wait_our_turn(struct queue_ent *qe, int ringing, enum queue_result *reason);
0x31、wait_our_turn展开解析:
1、检测是否应该开始尝试呼叫队列成员
if (is_our_turn(qe)) {
	break;
}

/* 内部拆解 */

/* 获取成员数 */
avl = num_available_members(qe->parent);

int idx = 0;
ch = qe->parent->head;
/* 计算未接听的呼叫者数量 */
while ((idx < avl) && (ch) && (ch != qe)) {
    if (!ch->pending) {
		idx++;
	}
	ch = ch->next;
}

ao2_unlock(qe->parent);
/* If the queue entry is within avl [the number of available members] calls from the top ...
 * Autofill and position check added to support autofill=no (as only calls
 * from the front of the queue are valid when autofill is disabled)
 */
if (ch && idx < avl && (qe->parent->autofill || qe->pos == 1)) {
	ast_debug(1, "It's our turn (%s).n", ast_channel_name(qe->chan));
	res = 1;
} else {
	ast_debug(1, "It's not our turn (%s).n", ast_channel_name(qe->chan));
	res = 0;
}
2、判断为空时离开条件
if (qe->parent->leavewhenempty) {
	int status = 0;

	if ((status = get_member_status(qe->parent, qe->max_penalty, qe->min_penalty, qe->raise_penalty, qe->parent->leavewhenempty, 0))) {
		record_abandoned(qe);
		*reason = QUEUE_LEAVEEMPTY;
		ast_queue_log(qe->parent->name, ast_channel_uniqueid(qe->chan), "NONE", "EXITEMPTY", "%d|%d|%ld", qe->pos, qe->opos, (long) (time(NULL) - qe->start));
		res = -1;
		qe->handled = -1;
		break;
	}
}
3、队列信息播报
/* Make a position announcement, if enabled */
if (qe->parent->announcefrequency &&
	(res = say_position(qe,ringing))) {
	break;
}

/* Make a periodic announcement, if enabled */
if (qe->parent->periodicannouncefrequency &&
	(res = say_periodic_announcement(qe,ringing)))
	break;
4、判断是否更新规则
/* see if we need to move to the next penalty level for this queue */
while (qe->pr && ((time(NULL) - qe->start) >= qe->pr->time)) {
	update_qe_rule(qe);
}
5、等待按键
/* Wait a second before checking again */
if ((res = ast_waitfordigit(qe->chan, RECHECK * 1000))) {
	if (res > 0 && !valid_exit(qe, res)) {
		res = 0;
	} else {
		break;
	}
}

0x4、尝试调用队列成员

/* Try calling all queue members for 'timeout' seconds */
res = try_calling(&qe, opts, opt_args, args.announceoverride, args.url, &tries, &noption, args.agi, args.macro, args.gosub, ringing);
if (res) {
	goto stop;
}
0x41、try_calling展开解析
1、处理传递给Queue()应用程序的选项
/* 各类ast_test_flag->ast_set_flag */
if (ast_test_flag(&opts, OPT_CALLEE_TRANSFER)) {
	ast_set_flag(&(bridge_config.features_callee), AST_FEATURE_REDIRECT);
}
......
2、遍历队列的成员,创建与每个成员对应的callattempt
struct callattempt *outgoing = NULL; /* the list of calls we are building */
struct member *cur;
struct ao2_iterator memi;

memi = ao2_iterator_init(qe->parent->members, 0);
while ((cur = ao2_iterator_next(&memi))) {
	struct callattempt *tmp = ast_calloc(1, sizeof(*tmp));
	if (!tmp) {
		ao2_ref(cur, -1);
		ao2_iterator_destroy(&memi);
		ao2_unlock(qe->parent);
		goto out;
	}

	/*
	 * Seed the callattempt's connected line information with previously
	 * acquired connected line info from the queued channel.  The
	 * previously acquired connected line info could have been set
	 * through the CONNECTED_LINE dialplan function.
	 */
	ast_channel_lock(qe->chan);
	ast_party_connected_line_copy(&tmp->connected, ast_channel_connected(qe->chan));
	ast_channel_unlock(qe->chan);

	tmp->block_connected_update = block_connected_line;
	tmp->stillgoing = 1;
	tmp->member = cur; /* Place the reference for cur into callattempt. */
	ast_copy_string(tmp->interface, cur->interface, sizeof(tmp->interface));
	/* Calculate the metric for the appropriate strategy. */
	/* 计算适当策略的指标,根据策略计算成员应答的顺序 */
    /* 计算出一个数字度量,低度量的成员在高度量成员之前响铃 */
	if (!calc_metric(qe->parent, cur, x++, qe, tmp)) {
		/* Put them in the list of outgoing thingies...  We're ready now.
		   XXX If we're forcibly removed, these outgoing calls won't get
		   hung up XXX */
		tmp->q_next = outgoing;
		outgoing = tmp;
	} else {
		callattempt_free(tmp);
	}
}
ao2_iterator_destroy(&memi);
3、调用ring_one呼叫合适的成员
/* Call the queue members with the best metric now. */
ring_one(qe, outgoing, &numbusies);

/* 为每个成员计算出度量后,使用此函数呼叫适当的成员(或多个成员)
 */
static int ring_one(struct queue_ent *qe, struct callattempt *outgoing, int *busies);

/* 内部解析 */
/* 查找最大度量的成员 */
struct callattempt *best = find_best(outgoing);
if (!best) {
	ast_debug(1, "Nobody left to try ringing in queuen");
	break;
}

/* 根据响铃策略呼叫成员 */
if (qe->parent->strategy == QUEUE_STRATEGY_RINGALL) {
	/* Ring everyone who shares this best metric (for ringall) */
	/* 全部响铃 */
	for (cur = outgoing; cur; cur = cur->q_next) {
		if (cur->stillgoing && !cur->chan && cur->metric <= best->metric) {
			ast_debug(1, "(Parallel) Trying '%s' with metric %dn", cur->interface, cur->metric);
			ret |= ring_entry(qe, cur, busies);
			if (qe->predial_callee && cur->chan) {
				ast_autoservice_start(cur->chan);
			}
		}
	}
} else {
	/* Ring just the best channel */
	ast_debug(1, "Trying '%s' with metric %dn", best->interface, best->metric);
	ret = ring_entry(qe, best, busies);
	if (qe->predial_callee && best->chan) {
		ast_autoservice_start(best->chan);
	}
}

/* 
 * ring_one的第二部分
 */
static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies);

/* 确定是否可以振铃队列条目 */
/* on entry here, we know that tmp->chan == NULL */
if (!can_ring_entry(qe, tmp)) {
    tmp->stillgoing = 0;
    ++*busies;
    return 0;
}

/* 请求一个通道 */
tmp->chan = ast_request(tech, nativeformats, NULL, qe->chan, location, &status);

/* 从父通道继承变量 */
/* Inherit specially named variables from parent channel */
/* 从父通道继承特殊命名的变量 */
ast_channel_inherit_variables(qe->chan, tmp->chan);
ast_channel_datastore_inherit(qe->chan, tmp->chan);
ast_max_forwards_decrement(tmp->chan);

/* Inherit context and extension */
/* 继承上下文和分机 */
macrocontext = pbx_builtin_getvar_helper(qe->chan, "MACRO_CONTEXT");
ast_channel_dialcontext_set(tmp->chan, ast_strlen_zero(macrocontext) ? ast_channel_context(qe->chan) : macrocontext);
macroexten = pbx_builtin_getvar_helper(qe->chan, "MACRO_EXTEN");
if (!ast_strlen_zero(macroexten)) {
    ast_channel_exten_set(tmp->chan, macroexten);
} else {
    ast_channel_exten_set(tmp->chan, ast_channel_exten(qe->chan));
}

/* Place the call, but don't wait on the answer */
/* 拨打,但是不等待应答*/
if ((res = ast_call(tmp->chan, location, 0))) {
    /* Again, keep going even if there's an error */
    ast_verb(3, "Couldn't call %sn", tmp->interface);
    do_hang(tmp);
    ++*busies;
    return 0;
}
4、调用wait_for_answer等待应答,如果无人应答则返回
lpeer = wait_for_answer(qe, outgoing, &to, &digit, numbusies,
		ast_test_flag(&(bridge_config.features_caller), AST_FEATURE_DISCONNECT),
		forwardsallowed);

/* 
 * 等待一个成员接听通话
 */
static struct callattempt *wait_for_answer(struct queue_ent *qe, struct callattempt *outgoing, int *to, char *digit, int prebusies, int caller_disconnect, int forwardsallowed);
5、处理任何等待时间通知、会员延迟或在呼叫被应答后发生的其他选项
/* Play announcement to the caller telling it's his turn if defined */
if (!ast_strlen_zero(qe->parent->sound_callerannounce)) {
	if (play_file(qe->chan, qe->parent->sound_callerannounce)) {
		ast_log(LOG_WARNING, "Announcement file '%s' is unavailable, continuing anyway...n", qe->parent->sound_callerannounce);
	}
}
6、如果设置了该选项,则启动监视器或混音监视器
/* Begin Monitoring */
if (*qe->parent->monfmt) {
	if (!qe->parent->montype) {
		const char *monexec;
		ast_debug(1, "Starting Monitor as requested.n");
		ast_channel_lock(qe->chan);
		if ((monexec = pbx_builtin_getvar_helper(qe->chan, "MONITOR_EXEC")) || pbx_builtin_getvar_helper(qe->chan, "MONITOR_EXEC_ARGS")) {
			which = qe->chan;
			monexec = monexec ? ast_strdupa(monexec) : NULL;
		} else {
			which = peer;
		}
		ast_channel_unlock(qe->chan);
		if (monitorfilename) {
			ast_monitor_start(which, qe->parent->monfmt, monitorfilename, 1, X_REC_IN | X_REC_OUT, NULL);
		} else if (qe->chan) {
			ast_monitor_start(which, qe->parent->monfmt, ast_channel_uniqueid(qe->chan), 1, X_REC_IN | X_REC_OUT, NULL);
		} else {
			/* Last ditch effort -- no channel, make up something */
			snprintf(tmpid, sizeof(tmpid), "chan-%lx", (unsigned long)ast_random());
			ast_monitor_start(which, qe->parent->monfmt, tmpid, 1, X_REC_IN | X_REC_OUT, NULL);
		}
		if (!ast_strlen_zero(monexec)) {
			ast_monitor_setjoinfiles(which, 1);
		}
	} else {
		setup_mixmonitor(qe, monitorfilename);
	}
}
7、将来电者从队列中移除以允许其他来电者前进
/* Drop out of the queue at this point, to prepare for next caller */
leave_queue(qe);
if (!ast_strlen_zero(url) && ast_channel_supports_html(peer)) {
	ast_debug(1, "app_queue: sendurl=%s.n", url);
	ast_channel_sendurl(peer, url);
}
8、桥接呼叫
ao2_lock(qe->parent);
time(&member->starttime);
starttime = member->starttime;
ao2_unlock(qe->parent);
/* As a queue member may end up in multiple calls at once if a transfer occurs with
* a Local channel in the mix we pass the current call information (starttime) to the
* Stasis subscriptions so when they update the queue member data it becomes a noop
* if this call is no longer between the caller and the queue member.
*/
setup_stasis_subs(qe, peer, member, qe->start, starttime, callcompletedinsl);
bridge = ast_bridge_call_with_flags(qe->chan, peer, &bridge_config,
AST_BRIDGE_FLAG_MERGE_INHIBIT_FROM | AST_BRIDGE_FLAG_MERGE_INHIBIT_TO | AST_BRIDGE_FLAG_SWAP_INHIBIT_FROM);

res = bridge ? bridge : 1;
ao2_ref(member, -1);
9、通话断开后进行任何后期处理
hangupcalls(qe, outgoing, NULL, qe->cancel_answered_elsewhere);

0x5、如果4.没有导致桥接呼叫,则检查通话设置,比如定期公告等

0x6、除非某些条件(例如过期时间)导致我们重新尝试4退出队列

脚本宝典总结

以上是脚本宝典为你收集整理的队列相关(Queue)全部内容,希望文章能够帮你解决队列相关(Queue)所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。
标签: