Apex · Database Integration · Oracle · PL/SQL Programming

Un ejemplo de Oracle Advanced Queue (DBMS AQ): Callback automático #JoelKallmanDay

Oracle Advanced Queuing (AQ) proporciona un mecanismo poderoso para procesamiento automático de datos basado en eventos con mecanismos de reintento integrados y funciones de callback.

En esta publicación, exploraremos cómo implementar un sistema de sincronización de datos completamente automatizado usando callbacks de DBMS_AQ que procesa automáticamente los mensajes cuando llegan, con manejo inteligente de reintentos para operaciones fallidas.

Código Fuente

¿Qué es Oracle Advanced Queuing?

Oracle Advanced Queuing es un sistema de colas de mensajes integrado en Oracle Database que permite comunicación automática basada en eventos entre aplicaciones. Las características clave incluyen:

  • Procesamiento automático de mensajes vía funciones de callback
  • Mecanismos de reintento integrados para operaciones fallidas
  • Arquitecturas basadas en eventos con procesamiento inmediato
  • Entrega confiable de mensajes con persistencia y recuperación

El Problema: Integración de Sistemas

Las empresas modernas a menudo necesitan sincronizar datos entre múltiples sistemas:

  • Sistemas CRM a almacenes de datos
  • Sistemas ERP a bases de datos analíticas
  • Aplicaciones web a sistemas de respaldo
  • Flujos de datos en tiempo real entre microservicios

La Solución: Sincronización de Datos Basada en AQ

La implementación crea un sistema basado en colas completamente automatizado que maneja tareas de sincronización de forma asíncrona con capacidades de reintento automático y procesamiento inmediato de callbacks.

Resumen de Arquitectura

┌─────────────────┐    ┌──────────────┐    ┌──────────────┐    ┌─────────────────┐
│   Source        │───▶│  AQ Queue    │───▶│ Auto Callback│───▶│   Target        │
│   System        │    │ (with Retry) │    │ (Immediate)  │    │   System        │
└─────────────────┘    └──────────────┘    └──────────────┘    └─────────────────┘
                              │                      │
                              ▼                      ▼
                       ┌──────────────┐
                       │ Retry Logic  │
                       │ (3 attempts) │
                       └──────────────┘

Detalles de Implementación

  1. Configuración del Esquema de Base de Datos

Primero, creamos un esquema dedicado para nuestra implementación AQ:

-- Create user with necessary privileges
create user "AQ_DEMO" identified by "Password123*";
grant create session, create table, create type, create procedure to "AQ_DEMO";
grant execute on dbms_aqadm, dbms_aq to "AQ_DEMO";
grant unlimited tablespace to "AQ_DEMO";
  1. Seguimiento de Operaciones de Sincronización

Mantenemos un historial de todas las operaciones de sincronización:

create table sync_operations (
    id                number generated by default as identity primary key,
    source_system     varchar2(50) not null,
    target_system     varchar2(50) not null,
    sync_type         varchar2(30) not null,
    record_count      number not null,
    sync_status       varchar2(20) default 'COMPLETED',
    created_on        timestamp with local time zone default localtimestamp
);

Características Clave:

  • Columna de identidad para generación automática de ID
  • Pista de auditoría con marcas de tiempo de creación
  • Seguimiento de estado (PENDING, PROCESSING, COMPLETED, FAILED)
  • Clasificación de tipo de sincronización (FULL, INCREMENTAL, DELTA, REALTIME)
  1. Definición del Tipo de Mensaje

Definimos un tipo de objeto personalizado para nuestros mensajes de sincronización:

create or replace type sync_task_type as object (
    source_system    varchar2(50),
    target_system    varchar2(50),
    sync_type        varchar2(30),
    record_count     number,
    priority         number
);
  1. Configuración de Cola

La cola se configura con mecanismos de reintento:

dbms_aqadm.create_queue_table(
    queue_table        => 'AQ_DEMO.SYNC_QUEUE_TABLE',
    queue_payload_type => 'AQ_DEMO.SYNC_TASK_TYPE',
    multiple_consumers => false,
    comment            => 'Tabla de cola para tareas de sincronización de datos'
);

-- Crear cola con configuración de reintento
dbms_aqadm.create_queue(
    queue_name     => 'AQ_DEMO.SYNC_QUEUE',
    queue_table    => 'AQ_DEMO.SYNC_QUEUE_TABLE',
    max_retries    => 3,        -- Reintentar mensajes fallidos 3 veces
    retry_delay    => 30,       -- Esperar 30 segundos entre reintentos
    retention_time => 3600,     -- Mantener mensajes por 1 hora
    comment        => 'Cola para tareas de sincronización con configuración de reintento'
);

Beneficios de la Configuración de Reintento:

  • Reintento automático para mensajes fallidos
  • Retrasos configurables para evitar sobrecargar sistemas
  • Retención de mensajes para depuración y auditoría
  1. Procesamiento Automático de Mensajes

El procedimiento de callback procesa mensajes automáticamente:

create or replace procedure sync_callback(
    context   in raw
  , reginfo   in sys.aq$_reg_info
  , descr     in sys.aq$_descriptor
  , payloadl  in number
  , payload   in varchar2
) as
  l_message sync_task_type;
  l_dequeue_options dbms_aq.dequeue_options_t;
  l_message_props   dbms_aq.message_properties_t;
  l_message_id      raw(16);

  l_sync_type       varchar2(100);
begin
  dbms_output.put_line('Sync callback triggered at ' || to_char(systimestamp, 'HH24:MI:SS'));

  -- Configure dequeue options
  -- WAIT: Controls how long to wait for messages
  --   • NO_WAIT: Return immediately if no message available
  --   • FOREVER: [default] Wait indefinitely for a message
  --   • Number: Wait specified seconds (0-4294967295)
  l_dequeue_options.wait := dbms_aq.no_wait;
  
  -- NAVIGATION: Which message to retrieve from queue
  --   • FIRST_MESSAGE: [default] Get first available message
  --   • NEXT_MESSAGE: Get next message in sequence
  --   • FIRST_MESSAGE_MULTI_GROUP: First message across consumer groups
  --   • NEXT_MESSAGE_MULTI_GROUP: Next message across consumer groups
  --l_dequeue_options.navigation := dbms_aq.first_message;
  
  -- VISIBILITY: When dequeue operation becomes visible to other transactions
  --   • ON_COMMIT: [default] Changes visible only after transaction commit
  --   • IMMEDIATE: Changes visible immediately
  l_dequeue_options.visibility := dbms_aq.on_commit;
  
  -- DEQUEUE_MODE: What happens to the message after dequeue
  --   • BROWSE: Read message but keep it in queue
  --   • LOCKED: Lock message for exclusive access
  --   • REMOVE: [default] Delete message after reading
  --   • REMOVE_NODATA: Delete message but don't return payload
  l_dequeue_options.dequeue_mode := dbms_aq.remove;
  
  -- DELIVERY_MODE: How messages are stored and delivered
  --   • PERSISTENT: [default] Messages stored in database tables (durable)
  --   • BUFFERED: Messages kept in memory (faster)
  --   • PERSISTENT_OR_BUFFERED: Use either mode as appropriate
  --l_dequeue_options.delivery_mode := dbms_aq.persistent;
  
  -- Additional dequeue options available:
  -- l_dequeue_options.consumer_name := 'consumer_name';  -- Target specific consumer in multi-consumer queues
  -- l_dequeue_options.msgid := raw_message_id;           -- Dequeue specific message by its unique ID
  -- l_dequeue_options.correlation := 'correlation_id';   -- Filter messages by correlation identifier
  -- l_dequeue_options.deq_condition := 'priority > 5';   -- SQL WHERE condition to filter which messages to dequeue
  -- l_dequeue_options.transformation := 'transform_name'; -- Apply transformation function to message payload before returning

  -- Available descriptor properties for logging/debugging:
  -- descr.msg_id          - Message ID (RAW)
  -- descr.queue_name      - Queue name where message is located
  -- descr.consumer_name   - Consumer name (for multi-consumer queues)
  -- Note: msg_priority and msg_state are not directly available on descriptor
  
  
  --logger.log('msg_id: ' || descr.msg_id);
  --logger.log('queue_name: ' || descr.queue_name);
  --logger.log('consumer_name: ' || descr.consumer_name);
  --logger.log('payload: ' || payload);
  --logger.log('payloadl: ' || payloadl);
 
  l_dequeue_options.msgid := descr.msg_id;

  -- Dequeue the message
  dbms_aq.dequeue(
      queue_name         => 'AQ_DEMO.SYNC_QUEUE'
    , dequeue_options    => l_dequeue_options
    , message_properties => l_message_props
    , payload            => l_message
    , msgid              => l_message_id
  );
  l_sync_type := l_message.sync_type;

  -- Process the sync task
  insert into sync_operations (source_system, target_system, sync_type, record_count, sync_status)
  values (l_message.source_system, l_message.target_system, l_message.sync_type, l_message.record_count, 'COMPLETED');

  dbms_output.put_line('Sync operation completed: ' || l_message.source_system || ' -> ' || l_message.target_system);
  --commit;
exception
  when others then
    rollback;
    dbms_output.put_line('Error in sync callback: ' || sqlerrm);
    raise;
end;
  1. API de Gestión de Cola

Un procedimiento simple para encolar tareas de sincronización:

create or replace procedure queue_sync_task(
    p_source_system in varchar2,
    p_target_system in varchar2,
    p_sync_type     in varchar2,
    p_record_count  in number,
    p_priority      in number default 5
) as
    l_enqueue_options dbms_aq.enqueue_options_t;
    l_message_props   dbms_aq.message_properties_t;
    l_message         sync_task_type;
    l_msgid           raw(16);
begin
    dbms_output.put_line('Encolando tarea de sincronización: ' || p_source_system || ' -> ' || p_target_system);

    -- Crear mensaje de tarea de sincronización
    l_message := sync_task_type(
        p_source_system, p_target_system,
        p_sync_type, p_record_count, p_priority
    );

    -- Opciones de encolado: visibilidad ON_COMMIT
    l_enqueue_options.visibility := dbms_aq.on_commit;

    -- Encolar la tarea
    dbms_aq.enqueue(
        queue_name         => 'AQ_DEMO.SYNC_QUEUE',
        enqueue_options    => l_enqueue_options,
        message_properties => l_message_props,
        payload            => l_message,
        msgid              => l_msgid
    );

    commit;
exception
    when others then
        rollback;
        dbms_output.put_line('Error encolando tarea de sincronización: ' || sqlerrm);
        raise;
end;

Ejemplos de Uso

Encolando Tareas de Sincronización

-- Queue different types of sync operations
begin
    queue_sync_task('CRM_SYSTEM', 'DATA_WAREHOUSE', 'INCREMENTAL', 1250, 3);
    queue_sync_task('ERP_SYSTEM', 'ANALYTICS_DB', 'FULL', 50000, 5);
    queue_sync_task('WEB_APP', 'BACKUP_SYSTEM', 'DELTA', 340, 1);
end;

Monitoreo de Operaciones

— Verificar operaciones de sincronización completadas

select 
    source_system,
    target_system,
    sync_type,
    record_count,
    sync_status,
    created_on
from sync_operations
order by created_on desc;

Monitoreo del Estado de la Cola

-- Monitor queue status and retry counts
select
    treat(user_data as sync_task_type).source_system,
    treat(user_data as sync_task_type).target_system,
    enq_time,
    deq_time,
    retry_count,
    msg_state
from aq$sync_queue_table
order by enq_time desc;

Beneficios de Este Enfoque

  1. Confiabilidad
  • Mecanismos de reintento automático para operaciones fallidas
  • La persistencia de mensajes asegura que no se pierdan datos
  • Operaciones seguras por transacciones
  1. Escalabilidad
  • El procesamiento asíncrono no bloquea sistemas origen
  • Múltiples consumidores pueden procesar mensajes en paralelo
  • La arquitectura basada en colas maneja picos de carga

Aplicaciones del Mundo Real

Este patrón es particularmente útil para:

  • Procesos ETL: Activando actualizaciones de almacenes de datos
  • Microservicios: Comunicación basada en eventos
  • Replicación de Datos: Manteniendo sistemas sincronizados
  • Operaciones de Respaldo: Disparadores de respaldo asíncronos
  • Analíticas: Procesamiento de flujos de datos en tiempo real

Conclusión

Oracle Advanced Queuing proporciona una base robusta para construir sistemas de sincronización de datos confiables. La combinación de mecanismos de reintento automático, persistencia de mensajes y configuración flexible lo convierte en una excelente opción para escenarios de integración empresarial.

La implementación mostrada aquí demuestra cómo crear un sistema de sincronización listo para producción que puede manejar varios tipos de tareas de sincronización de datos mientras proporciona capacidades integrales de monitoreo y manejo de errores.

Referencias y Lectura Adicional

Documentación Oficial de Oracle
Tutoriales y Ejemplos
Solución de Problemas

Leave a comment