Oracle Advanced Queuing (AQ) proporciona un mecanismo poderoso para procesamiento automático de datos basado en eventos con mecanismos de reintento integrados y funciones de callback.
En esta publicación, exploraremos cómo implementar un sistema de sincronización de datos completamente automatizado usando callbacks de DBMS_AQ que procesa automáticamente los mensajes cuando llegan, con manejo inteligente de reintentos para operaciones fallidas.
Código Fuente
- Implementación Completa en GitHub – Ejemplo de código funcional completo con mecanismos de reintento y procesamiento de callbacks
¿Qué es Oracle Advanced Queuing?
Oracle Advanced Queuing es un sistema de colas de mensajes integrado en Oracle Database que permite comunicación automática basada en eventos entre aplicaciones. Las características clave incluyen:
- Procesamiento automático de mensajes vía funciones de callback
- Mecanismos de reintento integrados para operaciones fallidas
- Arquitecturas basadas en eventos con procesamiento inmediato
- Entrega confiable de mensajes con persistencia y recuperación
El Problema: Integración de Sistemas
Las empresas modernas a menudo necesitan sincronizar datos entre múltiples sistemas:
- Sistemas CRM a almacenes de datos
- Sistemas ERP a bases de datos analíticas
- Aplicaciones web a sistemas de respaldo
- Flujos de datos en tiempo real entre microservicios
La Solución: Sincronización de Datos Basada en AQ
La implementación crea un sistema basado en colas completamente automatizado que maneja tareas de sincronización de forma asíncrona con capacidades de reintento automático y procesamiento inmediato de callbacks.
Resumen de Arquitectura
┌─────────────────┐ ┌──────────────┐ ┌──────────────┐ ┌─────────────────┐
│ Source │───▶│ AQ Queue │───▶│ Auto Callback│───▶│ Target │
│ System │ │ (with Retry) │ │ (Immediate) │ │ System │
└─────────────────┘ └──────────────┘ └──────────────┘ └─────────────────┘
│ │
▼ ▼
┌──────────────┐
│ Retry Logic │
│ (3 attempts) │
└──────────────┘
Detalles de Implementación
- Configuración del Esquema de Base de Datos
Primero, creamos un esquema dedicado para nuestra implementación AQ:
-- Create user with necessary privileges
create user "AQ_DEMO" identified by "Password123*";
grant create session, create table, create type, create procedure to "AQ_DEMO";
grant execute on dbms_aqadm, dbms_aq to "AQ_DEMO";
grant unlimited tablespace to "AQ_DEMO";
- Seguimiento de Operaciones de Sincronización
Mantenemos un historial de todas las operaciones de sincronización:
create table sync_operations (
id number generated by default as identity primary key,
source_system varchar2(50) not null,
target_system varchar2(50) not null,
sync_type varchar2(30) not null,
record_count number not null,
sync_status varchar2(20) default 'COMPLETED',
created_on timestamp with local time zone default localtimestamp
);
Características Clave:
- Columna de identidad para generación automática de ID
- Pista de auditoría con marcas de tiempo de creación
- Seguimiento de estado (PENDING, PROCESSING, COMPLETED, FAILED)
- Clasificación de tipo de sincronización (FULL, INCREMENTAL, DELTA, REALTIME)
- Definición del Tipo de Mensaje
Definimos un tipo de objeto personalizado para nuestros mensajes de sincronización:
create or replace type sync_task_type as object (
source_system varchar2(50),
target_system varchar2(50),
sync_type varchar2(30),
record_count number,
priority number
);
- Configuración de Cola
La cola se configura con mecanismos de reintento:
dbms_aqadm.create_queue_table(
queue_table => 'AQ_DEMO.SYNC_QUEUE_TABLE',
queue_payload_type => 'AQ_DEMO.SYNC_TASK_TYPE',
multiple_consumers => false,
comment => 'Tabla de cola para tareas de sincronización de datos'
);
-- Crear cola con configuración de reintento
dbms_aqadm.create_queue(
queue_name => 'AQ_DEMO.SYNC_QUEUE',
queue_table => 'AQ_DEMO.SYNC_QUEUE_TABLE',
max_retries => 3, -- Reintentar mensajes fallidos 3 veces
retry_delay => 30, -- Esperar 30 segundos entre reintentos
retention_time => 3600, -- Mantener mensajes por 1 hora
comment => 'Cola para tareas de sincronización con configuración de reintento'
);
Beneficios de la Configuración de Reintento:
- Reintento automático para mensajes fallidos
- Retrasos configurables para evitar sobrecargar sistemas
- Retención de mensajes para depuración y auditoría
- Procesamiento Automático de Mensajes
El procedimiento de callback procesa mensajes automáticamente:
create or replace procedure sync_callback(
context in raw
, reginfo in sys.aq$_reg_info
, descr in sys.aq$_descriptor
, payloadl in number
, payload in varchar2
) as
l_message sync_task_type;
l_dequeue_options dbms_aq.dequeue_options_t;
l_message_props dbms_aq.message_properties_t;
l_message_id raw(16);
l_sync_type varchar2(100);
begin
dbms_output.put_line('Sync callback triggered at ' || to_char(systimestamp, 'HH24:MI:SS'));
-- Configure dequeue options
-- WAIT: Controls how long to wait for messages
-- • NO_WAIT: Return immediately if no message available
-- • FOREVER: [default] Wait indefinitely for a message
-- • Number: Wait specified seconds (0-4294967295)
l_dequeue_options.wait := dbms_aq.no_wait;
-- NAVIGATION: Which message to retrieve from queue
-- • FIRST_MESSAGE: [default] Get first available message
-- • NEXT_MESSAGE: Get next message in sequence
-- • FIRST_MESSAGE_MULTI_GROUP: First message across consumer groups
-- • NEXT_MESSAGE_MULTI_GROUP: Next message across consumer groups
--l_dequeue_options.navigation := dbms_aq.first_message;
-- VISIBILITY: When dequeue operation becomes visible to other transactions
-- • ON_COMMIT: [default] Changes visible only after transaction commit
-- • IMMEDIATE: Changes visible immediately
l_dequeue_options.visibility := dbms_aq.on_commit;
-- DEQUEUE_MODE: What happens to the message after dequeue
-- • BROWSE: Read message but keep it in queue
-- • LOCKED: Lock message for exclusive access
-- • REMOVE: [default] Delete message after reading
-- • REMOVE_NODATA: Delete message but don't return payload
l_dequeue_options.dequeue_mode := dbms_aq.remove;
-- DELIVERY_MODE: How messages are stored and delivered
-- • PERSISTENT: [default] Messages stored in database tables (durable)
-- • BUFFERED: Messages kept in memory (faster)
-- • PERSISTENT_OR_BUFFERED: Use either mode as appropriate
--l_dequeue_options.delivery_mode := dbms_aq.persistent;
-- Additional dequeue options available:
-- l_dequeue_options.consumer_name := 'consumer_name'; -- Target specific consumer in multi-consumer queues
-- l_dequeue_options.msgid := raw_message_id; -- Dequeue specific message by its unique ID
-- l_dequeue_options.correlation := 'correlation_id'; -- Filter messages by correlation identifier
-- l_dequeue_options.deq_condition := 'priority > 5'; -- SQL WHERE condition to filter which messages to dequeue
-- l_dequeue_options.transformation := 'transform_name'; -- Apply transformation function to message payload before returning
-- Available descriptor properties for logging/debugging:
-- descr.msg_id - Message ID (RAW)
-- descr.queue_name - Queue name where message is located
-- descr.consumer_name - Consumer name (for multi-consumer queues)
-- Note: msg_priority and msg_state are not directly available on descriptor
--logger.log('msg_id: ' || descr.msg_id);
--logger.log('queue_name: ' || descr.queue_name);
--logger.log('consumer_name: ' || descr.consumer_name);
--logger.log('payload: ' || payload);
--logger.log('payloadl: ' || payloadl);
l_dequeue_options.msgid := descr.msg_id;
-- Dequeue the message
dbms_aq.dequeue(
queue_name => 'AQ_DEMO.SYNC_QUEUE'
, dequeue_options => l_dequeue_options
, message_properties => l_message_props
, payload => l_message
, msgid => l_message_id
);
l_sync_type := l_message.sync_type;
-- Process the sync task
insert into sync_operations (source_system, target_system, sync_type, record_count, sync_status)
values (l_message.source_system, l_message.target_system, l_message.sync_type, l_message.record_count, 'COMPLETED');
dbms_output.put_line('Sync operation completed: ' || l_message.source_system || ' -> ' || l_message.target_system);
--commit;
exception
when others then
rollback;
dbms_output.put_line('Error in sync callback: ' || sqlerrm);
raise;
end;
- API de Gestión de Cola
Un procedimiento simple para encolar tareas de sincronización:
create or replace procedure queue_sync_task(
p_source_system in varchar2,
p_target_system in varchar2,
p_sync_type in varchar2,
p_record_count in number,
p_priority in number default 5
) as
l_enqueue_options dbms_aq.enqueue_options_t;
l_message_props dbms_aq.message_properties_t;
l_message sync_task_type;
l_msgid raw(16);
begin
dbms_output.put_line('Encolando tarea de sincronización: ' || p_source_system || ' -> ' || p_target_system);
-- Crear mensaje de tarea de sincronización
l_message := sync_task_type(
p_source_system, p_target_system,
p_sync_type, p_record_count, p_priority
);
-- Opciones de encolado: visibilidad ON_COMMIT
l_enqueue_options.visibility := dbms_aq.on_commit;
-- Encolar la tarea
dbms_aq.enqueue(
queue_name => 'AQ_DEMO.SYNC_QUEUE',
enqueue_options => l_enqueue_options,
message_properties => l_message_props,
payload => l_message,
msgid => l_msgid
);
commit;
exception
when others then
rollback;
dbms_output.put_line('Error encolando tarea de sincronización: ' || sqlerrm);
raise;
end;
Ejemplos de Uso
Encolando Tareas de Sincronización
-- Queue different types of sync operations
begin
queue_sync_task('CRM_SYSTEM', 'DATA_WAREHOUSE', 'INCREMENTAL', 1250, 3);
queue_sync_task('ERP_SYSTEM', 'ANALYTICS_DB', 'FULL', 50000, 5);
queue_sync_task('WEB_APP', 'BACKUP_SYSTEM', 'DELTA', 340, 1);
end;
Monitoreo de Operaciones
— Verificar operaciones de sincronización completadas
select
source_system,
target_system,
sync_type,
record_count,
sync_status,
created_on
from sync_operations
order by created_on desc;

Monitoreo del Estado de la Cola
-- Monitor queue status and retry counts
select
treat(user_data as sync_task_type).source_system,
treat(user_data as sync_task_type).target_system,
enq_time,
deq_time,
retry_count,
msg_state
from aq$sync_queue_table
order by enq_time desc;
Beneficios de Este Enfoque
- Confiabilidad
- Mecanismos de reintento automático para operaciones fallidas
- La persistencia de mensajes asegura que no se pierdan datos
- Operaciones seguras por transacciones
- Escalabilidad
- El procesamiento asíncrono no bloquea sistemas origen
- Múltiples consumidores pueden procesar mensajes en paralelo
- La arquitectura basada en colas maneja picos de carga
Aplicaciones del Mundo Real
Este patrón es particularmente útil para:
- Procesos ETL: Activando actualizaciones de almacenes de datos
- Microservicios: Comunicación basada en eventos
- Replicación de Datos: Manteniendo sistemas sincronizados
- Operaciones de Respaldo: Disparadores de respaldo asíncronos
- Analíticas: Procesamiento de flujos de datos en tiempo real
Conclusión
Oracle Advanced Queuing proporciona una base robusta para construir sistemas de sincronización de datos confiables. La combinación de mecanismos de reintento automático, persistencia de mensajes y configuración flexible lo convierte en una excelente opción para escenarios de integración empresarial.
La implementación mostrada aquí demuestra cómo crear un sistema de sincronización listo para producción que puede manejar varios tipos de tareas de sincronización de datos mientras proporciona capacidades integrales de monitoreo y manejo de errores.
Referencias y Lectura Adicional
Documentación Oficial de Oracle
- Oracle Database 21c: Advanced Queuing Types – Comprehensive AQ type definitions
- Oracle Database 21c: DBMS_AQ Package – Complete API reference
- Oracle Database 12c: AQ Operations Guide – Advanced queue operations
Tutoriales y Ejemplos
- Oracle-Base: Advanced Queuing in Oracle Database – Base tutorial that inspired this implementation
- Asynchronous Processing using AQ Callback – Callback implementation patterns
Solución de Problemas
- Stack Overflow: Oracle AQ Dequeue Issues – Common dequeue problems and solutions