DBMS_AQ
POLARDB提供消息排队和消息处理。用户定义的消息存储在队列中;队列的集合存储在队列表中。DBMS_AQADM包中的存储过程创建并管理消息队列和队列表。使用DBMS_AQ包可添加消息到队列或者从队列中删除消息,或者注册或注销PL/SQL回调过程。
POLARDB还通过如下SQL命令为DBMS_AQ 包提供扩展(不兼容)功能。
- ALTER QUEUE
- ALTER QUEUE TABLE
- CREATE QUEUE
- CREATE QUEUE TABLE
- DROP QUEUE
- DROP QUEUE TABLE
DBMS_AQ 包为您提供让消息入队/出队和管理回调过程的过程。支持的存储过程包括:
函数/存储过程 | 返回类型 | 说明 |
---|---|---|
ENQUEUE | n/a | 发布消息到队列。 |
DEQUEUE | n/a | 如果有消息可用或者在消息可用时,从队列检索消息。 |
REGISTER | n/a | 注册回调过程。 |
UNREGISTER | n/a | 注销回调过程。 |
POLARDB的DBMS_AQ 实施与 Oracle 的版本相比是部分实施。仅支持上表中列出的那些存储过程。
POLARDB支持使用下面列出的常量:
常量 | 说明 | 用于参数 |
---|---|---|
DBMS_AQ.BROWSE (0) | 读取消息而不锁定。 | dequeue_options_t.dequeue_mode |
DBMS_AQ.LOCKED (1) | 此常量已定义,如果使用会返回错误。 | dequeue_options_t.dequeue_mode |
DBMS_AQ.REMOVE (2) | 读取之后删除消息;该参数为默认值。 | dequeue_options_t.dequeue_mode |
DBMS_AQ.REMOVE_NODATA (3) | 此常量已定义,如果使用会返回错误。 | dequeue_options_t.dequeue_mode |
DBMS_AQ.FIRST_MESSAGE (0) | 返回与搜索词匹配的第一个可用消息。 | dequeue_options_t.navigation |
DBMS_AQ.NEXT_MESSAGE (1) | 返回与搜索词匹配的下一个可用消息。 | dequeue_options_t.navigation |
DBMS_AQ.NEXT_TRANSACTION (2) | 此常量已定义,如果使用会返回错误。 | dequeue_options_t.navigation |
DBMS_AQ.FOREVER (0) | 如果找不到与搜索词匹配的消息,则持续等待,该参数为默认值。 | dequeue_options_t.wait |
DBMS_AQ.NO_WAIT (1) | 如果找不到与搜索词匹配的消息,则不等待。 | dequeue_options_t.wait |
DBMS_AQ.ON_COMMIT (0) | 出队是当前事务的一部分。 | enqueue_options_t.visibility,dequeue_options_t.visibility |
DBMS_AQ.IMMEDIATE (1) | 此常量已定义,如果使用会返回错误。 | enqueue_options_t.visibility,dequeue_options_t.visibility |
DBMS_AQ.PERSISTENT (0) | 此消息应存储在表中。 | enqueue_options_t.delivery_mode |
DBMS_AQ.BUFFERED (1) | 此常量已定义,如果使用会返回错误。 | enqueue_options_t.delivery_mode |
DBMS_AQ.READY (0) | 指定消息已经准备好进行处理。 | message_properties_t.state |
DBMS_AQ.WAITING (1) | 指定消息正在等待处理。 | message_properties_t.state |
DBMS_AQ.PROCESSED (2) | 指定消息已处理。 | message_properties_t.state |
DBMS_AQ.EXPIRED (3) | 指定消息处于异常队列中。 | message_properties_t.state |
DBMS_AQ.NO_DELAY (0) | 此常量已定义,如果使用会返回错误。 | message_properties_t.delay |
DBMS_AQ.NEVER (NULL) | 此常量已定义,如果使用会返回错误。 | message_properties_t.expiration |
DBMS_AQ.NAMESPACE_AQ (0) | 接受来自 DBMS_AQ 队列的通知。 | sys.aq$_reg_info.namespace |
DBMS_AQ.NAMESPACE_ANONYMOUS (1) | 此常量已定义,如果使用会返回错误。 | sys.aq$_reg_info.namespace |
ENQUEUE
ENQUEUE
存储过程将一个条目添加到队列。特征为:
ENQUEUE(
queue_name IN VARCHAR2,
enqueue_options IN DBMS_AQ.ENQUEUE_OPTIONS_T,
message_properties IN DBMS_AQ.MESSAGE_PROPERTIES_T,
payload IN <type_name>,
msgid OUT RAW)
参数
queue_name
现有队列的名称(可能是 schema 限定的)。如果您省略 schema 名称,则服务器将使用在 SEARCH_PATH 中指定的 schema。请注意,与 Oracle 不同,不带引号的标识符在存储之前将转换为小写。要包括特殊字符或者使用区分大小写的名称,请在双引号中引起名称。
enqueue_options
enqueue_options 是类型为 enqueue_options_t 的值:
DBMS_AQ.ENQUEUE_OPTIONS_T IS RECORD( visibility BINARY_INTEGER DEFAULT ON_COMMIT, relative_msgid RAW(16) DEFAULT NULL, sequence_deviation BINARY INTEGER DEFAULT NULL, transformation VARCHAR2(61) DEFAULT NULL, delivery_mode PLS_INTEGER NOT NULL DEFAULT PERSISTENT);
目前,enqueue_options_t 唯一支持的参数值为:
visibility ON_COMMIT delivery_mode PERSISTENT sequence_deviation NULL transformation NULL relative_msgid NULL message_properties
message_properties
是类型为message_properties_t
的值:message_properties_t IS RECORD( priority INTEGER, delay INTEGER, expiration INTEGER, correlation CHARACTER VARYING(128) COLLATE pg_catalog.”C”, attempts INTEGER, recipient_list“AQ$_RECIPIENT_LIST_T”, exception_queue CHARACTER VARYING(61) COLLATE pg_catalog.”C”, enqueue_time TIMESTAMP WITHOUT TIME ZONE, state INTEGER, original_msgid BYTEA, transaction_group CHARACTER VARYING(30) COLLATE pg_catalog.”C”, delivery_mode INTEGER DBMS_AQ.PERSISTENT);
message_properties_t
支持的值如下:参数 说明 priority 如果队列表定义包括 sort_list 并引用了 priority,则此参数影响消息出队的顺序。较低的值指示较高的出队优先级。 delay 指定消息在可以出队或 NO_DELAY 之前将经过的秒数。 expiration 使用 expiration 参数指定消息过期的秒数。 correlation 使用 correlation 指定将与条目关联的消息;默认值为 NULL。 attempts 这是系统维护的值,指定消息出队的尝试次数。 recipient_list 不支持此参数。 exception_queue 使用 exception_queue 参数指定异常队列的名称,如果消息过期或者由回退太多次数的事务出队,则消息将移动到该队列。 enqueue_time enqueue_time 是记录添加到队列的时间;此值由系统提供。 state 此参数由 DBMS_AQ 维护,状态可以为: - DBMS_AQ.READY – 未达到延迟。
- DBMS_AQ.WAITING – 队列条目已准备好处理。
- DBMS_AQ.PROCESSED – 队列条目已处理。
- DBMS_AQ.EXPIRED – 队列条目已移动到异常队列。
original_msgid 为了实现兼容性而支持此参数,忽略此参数。 transaction_group 为了实现兼容性而支持此参数,忽略此参数。 delivery_mode 不支持此参数;指定 DBMS_AQ.PERSISTENT 的值。 payload
使用 payload 参数提供将与队列条目关联的数据。有效负载类型必须与创建对应的队列表时指定的类型匹配(参见DBMS_AQADM.CREATE_QUEUE_TABLE)。
msgid
使用 msgid 参数检索唯一(系统生成的)消息标识符。
示例
以下匿名块调用 DBMS_AQ.ENQUEUE,将消息添加到名为 work_order 的队列:
DECLARE
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle raw(16);
payload work_order;
BEGIN
payload := work_order('Smith', 'system upgrade');
DBMS_AQ.ENQUEUE(
queue_name => 'work_order',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => payload,
msgid => message_handle
);
END;
DEQUEUE
DEQUEUE
存储过程让消息出队。特征为:
DEQUEUE(
queue_name IN VARCHAR2,
dequeue_options IN DBMS_AQ.DEQUEUE_OPTIONS_T,
message_properties OUT DBMS_AQ.MESSAGE_PROPERTIES_T,
payload OUT type_name,
msgid OUT RAW)
参数
queue_name
现有队列的名称(可能是 schema 限定的)。如果您省略 schema 名称,则服务器将使用在 SEARCH_PATH 中指定的 schema。请注意,与 Oracle 不同,不带引号的标识符在存储之前将转换为小写。要包括特殊字符或者使用区分大小写的名称,请在双引号中引起名称。
dequeue_options
dequeue _options
是类型为dequeue_options_t
的值:DEQUEUE_OPTIONS_T IS RECORD( consumer_name CHARACTER VARYING(30), dequeue_mode INTEGER, navigation INTEGER, visibility INTEGER, wait INTEGER, msgid BYTEA, correlation CHARACTER VARYING(128), deq_condition CHARACTER VARYING(4000), transformation CHARACTER VARYING(61), delivery_mode INTEGER);
目前,dequeue_options_t 支持的参数值为:
参数 说明 consumer_name 必须为 NULL。 dequeue_mode 出队操作的锁定行为。必须为: - DBMS_AQ.BROWSE – 读取消息而不获取锁定。
- DBMS_AQ.LOCKED – 获取锁定之后读取消息。
- DBMS_AQ.REMOVE – 删除消息之前读取消息。
- DBMS_AQ.REMOVE_NODATA – 读取消息,但不删除消息。
navigation 标识将检索的消息。必须为: - FIRST_MESSAGE – 队列中与搜索词匹配的第一条消息。
- NEXT_MESSAGE – 与第一个词语匹配的下一条可用消息。
visibility 必须为 ON_COMMIT – 如果您回退当前事务,出队项目将保持在队列中。 wait 必须为大于 0 的数字,或者: - DBMS_AQ.FOREVER – 无限期等待。
- DBMS_AQ.NO_WAIT – 不等待。
msgid 将出队消息的 ID。 correlation 为了实现兼容性而提供的支持,将被忽略。 deq_condition 一个 VARCHAR2 表达式,求值为 BOOLEAN 值,指示消息是否应出队。 transformation 为了实现兼容性而提供的支持,将被忽略。 delivery_mode 必须为 PERSISTENT;此时不支持缓冲的消息。 message_properties
message_properties
是类型为message_properties_t
的值:message_properties_t IS RECORD( priority INTEGER, delay INTEGER, expiration INTEGER, correlation CHARACTER VARYING(128) COLLATE pg_catalog.”C”, attempts INTEGER, recipient_list“AQ$_RECIPIENT_LIST_T”, exception_queue CHARACTER VARYING(61) COLLATE pg_catalog.”C”, enqueue_time TIMESTAMP WITHOUT TIME ZONE, state INTEGER, original_msgid BYTEA, transaction_group CHARACTER VARYING(30) COLLATE pg_catalog.”C”, delivery_mode INTEGER DBMS_AQ.PERSISTENT);
message_properties_t 支持的值为:
参数 说明 priority 如果队列表定义包括 sort_list 并引用了 priority,则此参数影响消息出队的顺序。较低的值指示较高的出队优先级。 delay 指定消息在可以出队或 NO_DELAY 之前将经过的秒数。 expiration 使用 expiration 参数指定消息过期的秒数。 correlation 使用 correlation 指定将与条目关联的消息;默认值为 NULL。 attempts 这是系统维护的值,指定消息出队的尝试次数。 recipient_list 不支持此参数。 exception_queue 使用 exception_queue 参数指定异常队列的名称,如果消息过期或者由回退太多次数的事务出队,则消息将移动到该队列。 enqueue_time enqueue_time 是记录添加到队列的时间;此值由系统提供。 state 此参数由 DBMS_AQ 维护,状态可以为: - DBMS_AQ.WAITING – 未达到延迟。
- DBMS_AQ.READY – 队列条目已准备好处理。
- DBMS_AQ.PROCESSED – 队列条目已处理。
- DBMS_AQ.EXPIRED – 队列条目已移动到异常队列。
original_msgid 为了实现兼容性而支持此参数,但被忽略。 transaction_group 为了实现兼容性而支持此参数,但被忽略。 delivery_mode 不支持此参数;指定 DBMS_AQ.PERSISTENT 的值。 payload
使用
payload
参数检索具有出队操作的消息的有效负载。有效负载类型必须与创建队列表时指定的类型匹配。msgid
使用
msgid
参数检索唯一消息标识符。
示例
以下匿名块调用DBMS_AQ.DEQUEUE
,从队列和有效负载中检索消息:
DECLARE
dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle raw(16);
payload work_order;
BEGIN
dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;
DBMS_AQ.DEQUEUE(
queue_name => 'work_queue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => payload,
msgid => message_handle
);
DBMS_OUTPUT.PUT_LINE(
'The next work order is [' || payload.subject || '].'
);
END;
有效负载由DBMS_OUTPUT.PUT_LINE
显示。
REGISTER
使用 REGISTER 存储过程注册在项目入队或出队时接收通知的电子邮件地址、过程或 URL。特征为:
REGISTER(
reg_list IN SYS.AQ$_REG_INFO_LIST,
count IN NUMBER)
参数
reg_list
reg_list 是类型为 AQ$_REG_INFO_LIST 的列表,提供有关您要注册的各个订阅的信息。列表中每个条目的类型都是 AQ$_REG_INFO,可以包含:
属性 类型 说明 name VARCHAR2 (128) 订阅的名称(可能是 schema 限定的)。 namespace NUMERIC 唯一支持的值为 DBMS_AQ.NAMESPACE_AQ (0) callback VARCHAR2 (4000) 说明将对通知执行的操作。目前,仅支持调用 PL/SQL 存储过程。调用应采取以下形式: plsql://schema.procedure
其中:- schema:指定存储过程所在的 schema。
- procedure:指定将通知的存储过程的名称。
context RAW (16) 回调过程需要的任何用户定义的值。 count
count
是reg_list
中的条目数。
示例
以下匿名块调用 DBMS_AQ.REGISTER,注册在队列中添加或删除项目时将通知到的存储过程。为在 DECLARE 部分中标识的每个订阅提供一组属性(类型为 sys.aq$_reg_info
):
DECLARE
subscription1 sys.aq$_reg_info;
subscription2 sys.aq$_reg_info;
subscription3 sys.aq$_reg_info;
subscriptionlist sys.aq$_reg_info_list;
BEGIN
subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker?PR=0',HEXTORAW('FFFF'));
subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history?PR=1',HEXTORAW('FFFF'));
subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts?PR=2',HEXTORAW('FFFF'));
subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
dbms_aq.register(subscriptionlist, 3);
commit;
END;
/
subscriptionlist
的类型为sys.aq$_reg_info_list
,包含以前描述的sys.aq$_reg_info
对象。列表名称和对象计数传递到dbms_aq.register
。
UNREGISTER
使用UNREGISTER
存储过程关闭与入队和出队相关的通知。特征为:
UNREGISTER(
reg_list IN SYS.AQ$_REG_INFO_LIST,
count
IN NUMBER)
参数
reg_list
reg_list
是类型为AQ$_REG_INFO_LIST
的列表,提供有关您要注册的各个订阅的信息。列表中每个条目的类型都是AQ$_REG_INFO
,可以包含:属性 类型 说明 name VARCHAR2 (128) 订阅的名称(可能是 schema 限定的)。 namespace NUMERIC 唯一支持的值为 DBMS_AQ.NAMESPACE_AQ (0) callback VARCHAR2 (4000) 说明将对通知执行的操作。目前,仅支持调用 PL/SQL 存储过程。调用应采取以下形式: plsql://schema.procedure
其中:- schema:指定存储过程所在的 schema。
- procedure:指定将通知的存储过程的名称。
context RAW (16) 该存储过程需要的任何用户定义的值。 count
count
是reg_list
中的条目数。
示例
以下匿名块调用DBMS_AQ.UNREGISTER
,禁用在示例中为DBMS_AQ.REGISTER
指定的通知:
DECLARE
subscription1 sys.aq$_reg_info;
subscription2 sys.aq$_reg_info;
subscription3 sys.aq$_reg_info;
subscriptionlist sys.aq$_reg_info_list;
BEGIN
subscription1 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://assign_worker?PR=0',HEXTORAW('FFFF'));
subscription2 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://add_to_history?PR=1',HEXTORAW('FFFF'));
subscription3 := sys.aq$_reg_info('q', DBMS_AQ.NAMESPACE_AQ, 'plsql://reserve_parts?PR=2',HEXTORAW('FFFF'));
subscriptionlist := sys.aq$_reg_info_list(subscription1, subscription2, subscription3);
dbms_aq.unregister(subscriptionlist, 3);
commit;
END;
/
subscriptionlist
的类型为sys.aq$_reg_info_list
,包含以前描述的sys.aq$_reg_info
对象。列表名称和对象计数传递到dbms_aq.unregister
。
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。
评论