En mi artículo anterior, se planteaba una solución para la exportación de los datos de auditoría de Dataverse a una cuenta de almacenamiento de Azure Data Lake (Gen2) utilizando el componente de KingswaySoft para Dynamics 365 y a través de un paquete de SQL Integration Services (SSIS).

En este artículo se propone una solución similar, pero en esta ocasión el proceso de exportación se implementará mediante la utilización de un espacio de trabajo de Azure Synapse Analytics y el framework Apache Spark, con el mismo objetivo de liberar espacio de almacenamiento en la base de datos de Dataverse y explotar la información de auditoría alojada en un origen de datos externo a través de un informe de PowerBI.

Requisitos

  • Disponer de una suscripción de Azure activa.
  • Aprovisionar un espacio de trabajo de Azure Synapse Analytics.
  • Grupo de Apache Spark.
  • Habilitar la auditoría en el entorno Dataverse de origen.
  • Cuenta de almacenamiento de Data Lake Gen2.

Arquitectura

En primer lugar, se detalla la arquitectura del proceso ETL a implementar para la exportación periódica de los datos de auditoría desde Dataverse a Data Lake.

Los registros de auditoría en Dataverse se crean constantemente, tanto por la actividad de los usuarios como por los procesos programados. Por ello, es necesario recuperar esta información de forma incremental y periódica (por ejemplo, una vez al día). Para ello, es necesario conservar la fecha de la última extracción de datos, de modo que solo se recuperen los nuevos registros creados en el sistema durante las próximas ejecuciones del proceso.

Con ese objetivo, se propone guardar la fecha de la última extracción de datos para cada entidad auditada en un archivo CSV de control almacenado en el Data Lake. Durante la ejecución del proceso, se recupera este valor y se utiliza como fecha de corte para filtrar los registros de auditoría. Al final del proceso, se actualiza la fecha de la última extracción de datos para futuras ejecuciones.

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

Para la organización de los datos de auditoría en Data Lake, se propone crear un contenedor raíz específico (audit-crm) para almacenar esta información, integrado por los siguientes directorios:

  • /Ingest: Utilizado como área de almacenamiento auxiliar para los datos de auditoría incrementales recuperados durante cada ejecución del proceso.
  • /AuditData: Contenedor principal en el cual se almacena el histórico de los datos de auditoría procesados, integrado por una carpeta para cada una de las entidades auditadas en Dataverse.

 

Con respecto a la organización de los datos de auditoría dentro del contenedor principal, se propone realizar el particionado de acuerdo con la estructura ‘AÑO / MES / DÍA’. No obstante, esta es solo una sugerencia. Se puede utilizar cualquier otra estructura de partición, siempre que se ajuste a los requisitos específicos del proyecto o la recurrencia en la ejecución del proceso. Por ejemplo, si la extracción de auditoría se realiza de forma mensual o trimestral, se podría crear una carpeta por cada mes o trimestre, respectivamente.

Para el caso de uso planteado en el artículo, se implementará un proceso para la extracción diaria de la auditoría de la entidad de Contacto en Dataverse, utilizando la siguiente estructura de directorios en Data Lake:

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

El proceso ETL para la exportación de la auditoría se implementa mediante una canalización de Azure Synapse Analytics, integrada por las siguientes fases:

  1. Truncado: Se elimina el contenido del directorio auxiliar de ingesta correspondientes a la última ejecución del proceso.
  1. Obtención de fecha última ejecución: Se recupera la fecha correspondiente a la última extracción por entidad desde el fichero CSV de control.
  1. Ingesta: Se obtienen los datos incrementales de auditoría desde Dataverse generados desde la última extracción, a través de una consulta en formato FetchXML.
  1. Transformación y carga: Se procesan los datos incrementales obtenidos en la fase anterior y se actualiza el histórico de auditoría en Data Lake, mediante la ejecución de un cuaderno de Apache Spark.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

A continuación, se indican los pasos a realizar de forma previa a la implementación del proceso, y posteriormente se detalla cada una de estas fases, especificando los recursos y componentes utilizados en cada una.

Creación de estructura de directorios en Data Lake

El primer paso consiste en la creación de la estructura de directorios utilizada por el proceso para el almacenamiento de la información de auditoría. El contenedor principal, denominado ‘audit-crm’, se creará en la cuenta de almacenamiento de Data Lake (Gen2) principal vinculada al espacio de trabajo de Synapse Analytics.

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

La creación del contenedor y las carpetas subyacentes se puede realizar directamente desde el explorador de datos del área de trabajo de Synapse Analytics, desde la sección ‘Contenedores’ del recurso correspondiente a la cuenta de almacenamiento en el portal de Azure, o bien utilizando la herramienta gratuita Microsoft Azure Storage Explorer.

Creación de ficheros de control incremental

Una vez creada la estructura de directorios, lo siguiente en crear el fichero de control, utilizado para almacenar la fecha correspondiente a la última extracción.

Para ello, es necesario crear un nuevo fichero con extensión CSV para cada una de las entidades auditadas (Contacto en este caso) con la fecha a partir de la cual se desea recuperar la auditoría, con la siguiente estructura:

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

NOTA: Se recomienda utilizar el formato de fecha ‘yyyy-MM-dd HH:mm:ss.000′ para evitar errores al realizar la consulta a Dataverse durante la fase de ingesta desde la canalización de Azure Synapse Analytics.

A continuación, subir el fichero de control en el directorio raíz (audit-crm) de la cuenta de almacenamiento principal.

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

Implementación de canalización de Azure Synapse Analytics

Tras generar los ficheros de control incremental, el siguiente paso consiste en la creación de la canalización correspondiente al proceso principal en Azure Synapse Analytics, el cual actúa como orquestador sobre todas las fases indicadas previamente.

A continuación, se detallan los pasos necesarios:

  1. Desde el recurso correspondiente al espacio de trabajo de Synapse en Azure, abrir Synapse Studio.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Si consultamos los servicios vinculados (Linked services) desde la sección de conexiones externas al seleccionar la opción ‘Manage’ en el menú principal:
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

Observamos que ya existe un servicio vinculado de tipo Data Lake Gen2 llamado ‘XXXXXX-WorkspaceDefaultStorage’, correspondiente a la cuenta de almacenamiento por defecto del área de trabajo de Synapse Analytics. Si se quisiera utilizar una cuenta de almacenamiento diferente o de otro tipo (como Azure blob storage, SFTP, Amazon S3, etc.) para almacenar los ficheros, habría que crear y configurar un nuevo servicio vinculado de forma previa a la creación de la canalización. En este caso se utilizará la cuenta de almacenamiento principal del área de trabajo de Synapse Analytics para almacenar el incremental durante la fase de ingesta y el histórico de auditoría.

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. El siguiente paso consiste en la creación de la canalización principal. Para ello, desde la sección ‘Integrate’ del menú principal, pinchamos en el botón + y seleccionamos la opción ‘Canalización’.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

Introducimos el nombre y la descripción de la canalización principal. También se pueden añadir anotaciones para ayudarnos a clasificar e identificar las canalizaciones en el área de trabajo de Synapse Analytics.

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

Fase 1: Truncado

La primera fase del proceso consiste en la eliminación del contenido en el directorio de ingesta (/Ingest), utilizado como área de almacenamiento temporal para la extracción de la información incremental de auditoría a procesar durante cada ejecución. Para ello, se deben seguir los siguientes pasos:

1. Desde la sección ‘General’ del panel de actividades, seleccionar y arrastrar una actividad de tipo Eliminar al área de trabajo

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. En la pestaña ‘General’ de la actividad, establecer el nombre y la descripción, verificando que el estado de la actividad es ‘Activado’.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. En la pestaña ‘Origen’, seleccionar ‘+ Nuevo’.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Seleccionar ‘Azure Data Lake Storage Gen2’ y Continuar.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. En las propiedades del conjunto de datos, introducir un nombre descriptivo y seleccionar el servicio vinculado en el que se encuentra la estructura de ficheros generada anteriormente (en este caso en la cuenta de almacenamiento principal del área de trabajo de Synapse Analytics).
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. A continuación, seleccionar el directorio audit-crm/Ingest desde el explorador de archivos y hacer click en Aceptar.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. En el diálogo ‘Establecer propiedades’, pinchar en Aceptar y comprobar que el conjunto de datos recién creado se encuentra seccionado en el campo ‘Conjunto de datos’ de la sección Origen en las propiedades de la actividad Eliminar. Seleccionar la opción ‘Ruta de acceso de archivo comodín’ y en el campo ‘Wildcard file name’ introducir el valor ‘*’, para especificar que se debe eliminar todo el contenido del directorio.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Finalmente, hacer click en ‘Publicar todo’ para guardar los cambios en la canalización.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

Fase 2: Obtención de fecha última ejecución

En la segunda fase, se recuperar la fecha correspondiente a la última extracción de la auditoría desde Dataverse, utilizada como fecha de corte para la obtención del nuevo incremental de datos. Este valor se almacena en el fichero CSV de control incremental generado previamente, actualizado al final de cada ejecución del proceso. A continuación, se indican los pasos necesarios:

  1. Desde la sección ‘General’ del panel de actividades, seleccionar y arrastrar una actividad de tipo Búsqueda al área de trabajo, y conectarla a la salida de la actividad Eliminar creada en el paso anterior.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. En la pestaña ‘General’ de la actividad, establecer el nombre y la descripción, verificando que el estado de la actividad es ‘Activado’.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Se procede a crear el conjunto de datos correspondiente al fichero CSV de control, para ello, desde la pestaña ‘Configuración’, seleccionar ‘+ Nuevo’.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Seleccionar ‘Azure Data Lake Storage Gen2’ y Continuar.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Seleccionar ‘DelimitedText’ como formato y pinchar en Continuar.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. En las propiedades del conjunto de datos, introducir un nombre descriptivo y seleccionar el servicio vinculado en el que se encuentra la estructura de ficheros generada anteriormente (en este caso en la cuenta de almacenamiento principal del área de trabajo de Synapse Analytics). Verificar que se encuentra marcada la opción ‘Primera fila como encabezado’.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Desde el explorador de archivos, seleccionar el fichero CSV de control incremental depositado previamente en el directorio raíz de la estructura de carpetas definida anteriormente (audit-crm/) y hacer click en Aceptar.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. En la pestaña ‘Configuración’ de la actividad de búsqueda, comprobar que se encuentra seleccionado el origen de datos, la opción ‘Solo primera fila’ está marcada y en el campo ‘Tipo de ruta de acceso de archivo’ se encuentra seleccionada la opción ‘Ruta de acceso de archivo del conjunto de datos’, las demás opciones se pueden dejar con el valor por defecto.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Si se pincha en el botón ‘Vista previa de los datos’, se debería visualizar una venta similar a la siguiente:
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Hacer click en ‘Publicar todo’ para guardar los cambios en la canalización.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

Fase 3: Ingesta

La tercera fase del proceso corresponde a la ingesta de los datos incrementales de auditoría desde Dataverse (a partir de la fecha de corte recuperada en la fase anterior) en el área de almacenamiento temporal (directorio /Ingest). Para ello, se realiza una consulta en formato FetchXML sobre la entidad de auditoría de Dataverse y se almacenan los resultados de la extracción en un fichero de texto plano con formato CSV, siguiendo los siguientes pasos:

  1. Desde la sección ‘Mover y transformar’ del panel de actividades, seleccionar y arrastrar una actividad de tipo Copiar datos al área de trabajo, y conectarla a la salida de la actividad de Búsqueda creada en el paso anterior.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. En la pestaña ‘General’ de la actividad, establecer el nombre y la descripción, verificando que el estado de la actividad es ‘Activado’.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Desde la pestaña Origen, se configurará el servicio vinculado correspondiente a la conexión con el entorno de Dataverse desde el cual se extrae la información de auditoría. Para ello, hacer click en ‘+ Nuevo’.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Seleccionar ‘Dataverse (Common Data Service para aplicaciones)’ y Continuar.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. En el campo ‘Servicio vinculado’, seleccionar la opción ‘+ Nuevo’ para crear la conexión con el entorno de Dataverse, especificando la dirección URL del entorno, el tipo de autenticación y las credenciales.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. En la pestaña ‘Origen’, comprobar que se encuentra seleccionado el servicio vinculado recién creado, y en el campo ‘Utilizar consulta’ marcar la opción ‘Consulta’. A continuación, se configurará la consulta en formato FetchXML a ejecutar contra el entorno configurado.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

Durante la extracción de datos, se recuperan los siguientes campos de la entidad auditoría de Dataverse:

Campo Descripción Tipo
Createdon Fecha en la que se realiza el cambio en el registro. Fecha
Operation Tipo de modificación realizada sobre el registro. Número entero
Objectid Identificador único del registro modificado (Contacto en este caso). Guid
User Usuario que realiza la modificación en el registro. Texto
Changedata Modificaciones realizadas en los atributos del registro. Texto (formato JSON)

La consulta en formato FetchXML realizada tiene la siguiente estructura, donde ‘ENTIDAD’ corresponde al identificador numérico de la entidad en Dataverse (2 en el caso de contacto) y ‘FECHA_ÚLTIMA_EXTRACCIÓN’ a la fecha de última extracción recuperada durante las fases anteriores del proceso:

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. En el campo ‘Consulta’, introducir la siguiente consulta FetchXML, reemplazando el valor de la condición de fecha de creación (createdon) por el valor correspondiente a la primera fila recuperado del fichero de control incremental en la actividad de búsqueda de la canalización, con objeto de parametrizar la fecha de extracción.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

Consulta FetchXML final:

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Una vez configurado el origen de datos, el siguiente paso consiste en crear el servicio vinculado correspondiente al directorio de ingesta en el que se van a almacenar los ficheros CSV recuperados. Para ello, desde la pestaña Receptor, pinchar en ‘+ Nuevo’.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Seleccionar ‘Azure Data Lake Storage Gen2’ y Continuar.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Seleccionar ‘DelimitedText’ como formato y pinchar en Continuar.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Desde el diálogo ‘Establecer propiedades’, introducir el nombre de destino de datos, el servicio vinculado y, desde el explorador, seleccionar el directorio audit-crm/Ingest/. Una vez establecido el directorio de ingesta, introducir el nombre de los ficheros CSV a generar durante la extracción, es importante especificar la extensión (.csv) al final para garantizar que el los ficheros se generen en Data Lake con el formato adecuado. Marcar la opción ‘Primera fila como encabezado’ y pinchar en Aceptar.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

En la pestaña Receptor de la actividad de copia de datos, comprobar que se encuentra seleccionado el servicio vinculado recién creado, las opciones restantes se pueden dejar con el valor por defecto.

  1. El último paso consiste en la asignación de campos entre los campos de la consulta sobre los datos de auditoría devuelta por Dataverse y las columnas de los ficheros CSV a almacenar durante la fase de ingesta. Para configurar el mapeo, pinchar en el botón ‘Importar esquemas’ desde la pestaña ‘Asignación’ de la actividad de copiado de datos.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Puesto que la fecha de creación se encuentra parametrizada en la consulta, para recuperar el esquema de origen se requiere introducir una fecha, podemos utilizar una fecha cualquiera en formato ‘YYYY-MM-dd’.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Una vez importado el esquema de datos de origen, configurar la asignación como se indica en la siguiente imagen (algunos de los campos devueltos por la consulta no son necesarios y se pueden eliminar del mapeo):
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Hacer click en ‘Publicar todo’ para guardar los cambios en la canalización.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Antes de proceder a la implementación de la última fase, podemos verificar que la extracción de datos y la configuración de las fases anteriores es correcta. Para ello, una vez publicada la canalización, podemos ejecutarla de forma manual desde el botón ‘Agregar desencadenador’ y seleccionando la opción ‘Desencadenar ahora’.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Desde la sección Monitor del área de trabajo de Synapse Analytics, podemos comprobar el estado de ejecución.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Si la ejecución es correcta, se debería haber generado un nuevo fichero CSV en el directorio de ingesta (audit-crm/Ingest) con la siguiente estructura:
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

Fase 4: Transformación y carga

En la última fase del proceso, se procesan los ficheros CSV obtenidos durante las fases anteriores, para realizar las transformaciones correspondientes en el dato y actualizar el histórico de auditoría en la cuenta de almacenamiento de Data Lake.

Estas transformaciones se realizan a través de un Notebook de PySpark ejecutado desde la canalización principal de Azure Synapse Analytics.

Como punto de partida, se hace necesario crear un nuevo grupo de Apache Spark para la ejecución del Notebook, siguiendo los siguientes pasos:

  1. Desde la sección Manage del espacio de trabajo de Synapse Analytics, pinchar en ‘Apache Spark pools’ y hacer click en ‘+ Nuevo’.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. A continuación se establecen los parámetros de la configuración del grupo de Spark, como el nombre del grupo, el tamaño y el número mínimo y máximo de nodos a utilizar en el caso de habilitar la escalabilidad automática. Para este escenario, se utiliza un tamaño de nodo pequeño y un máximo de 3 nodos, pero la configuración a escoger dependerá de las necesidades de los recursos necesarios por parte de la solución a implementar, el volumen de datos a procesar y otros factores. En la parte inferior se visualiza el coste estimado por hora en función de la configuración seleccionada. Una vez definidos los parámetros, pinchar en el botón ‘Revisar y crear’ para proceder a la creación de grupo de Spark.

NOTA: Esta configuración de puede modificar posteriormente tras la creación del grupo de Spark.

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

Una vez creado el grupo de Apache Spark, podemos proceder a la implementación del Notebook para el procesamiento de los ficheros incrementales, siguiendo los pasos detallados a continuación.

  1. Desde la sección ‘Develop’ del menú principal en el área de trabajo de Synapse Analytics, pinchar en el botón ‘+’ y seleccionar la opción ‘Notebook’. Esto abrirá una nueva pestaña con el editor, desde el cual podemos añadir el nombre y descripción del Notebook.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. En la parte superior del editor, seleccionar el grupo de Spark creado anteriormente en el campo ‘Asociar a’, y verificar que el lenguaje seleccionado es ‘PySpark (Python)’.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Los Notebook se organizan por celdas ejecutadas una detrás de otra, una celda puede ser de tipo comentario (Markdown), no interpretadas por el clúster de Spark; o de código, en las que se aceptan los lenguajes PySpark (Python), Scala, Java y SQL. En este caso se utiliza PySpark para la implementación del Notebook, creando una (o varias) celda/s independiente/s para cada uno de los pasos que conforman el proceso, detallados a continuación. Para crear una nueva celda de código dentro del Notebook, hace click en el botón ‘+ Code’, podemos ejecutar de forma manual cada una de las celdas de manera independiente, o bien ejecutar el notebook completo.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

3.1 Lectura de fichero incremental de auditoría

El primer paso consiste en leer el fichero incremental alojado en el directorio /audit-crm/Ingest/ y guardar su contenido en un dataframe. Un dataframe es una estructura de datos organizada en columnas utilizada frecuentemente en Spark (similar a una tabla SQL) optimizada para el procesamiento distribuido. Para ello, se crea una nueva celda y se añade el siguiente código, donde xxxxxx hace referencia al nombre de la cuenta de almacenamiento en la que se encuentra el fichero CSV extraído durante la fase de ingesta.

%%pyspark
df = spark.read.load(‘abfss://audit-crm@xxxxxx.dfs.core.windows.net/Ingest/stg_contactAuditData.csv’, format=’csv’, header=True,delimiter=’;’)

A través de la función display, al ejecutar la celda podemos obtener una vista previa del contenido cargado en el dataframe, correspondiente con la estructura del fichero CSV incremental de auditoría.

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

El campo Operation extraído de Dataverse, correspondiente al tipo de modificación realizada sobre el registro (Creación, Actualización, etc.) se devuelve como valor numérico. Para obtener el literal correspondiente a cada tipo de operación, se crea un dataframe adicional con la equivalencia, cruzando con el dataframe principal mediante el siguiente código PySpark a través de la función join:

crmOperationValues = [(«Crear», 1), («Actualizar», 2), («Eliminar», 3), («Acceder», 4), («Upsert», 5), («Archivar», 115), («Retener», 116), («DeshacerRetener», 117), («OperaciónPersonalizada», 200)] df_crmOperationValues = spark.createDataFrame(crmOperationValues, [«OperationName», «Value»])
from pyspark.sql.functions import col
df = df.join(df_crmOperationValues, col(«Operation») == col(«Value»), «inner»).drop(«Value»)

Si ejecutamos la función display sobre el dataframe principal, podemos observar que ya disponemos del literal correspondiente al campo operación (OperationName).

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

3.2 Procesar DataFrame

El campo ChangedFields (correspondiente al campo changedata en Dataverse), se devuelve en formato JSON, y puede contener uno o varios objetos correspondientes a los cambios en los atributos de la tabla (para cada uno se devuelve el valor anterior y el valor nuevo). De cara a facilitar la explotación del dato de auditoría (por ejemplo, desde un informe de PowerBI), se precisa obtener una fila por cada uno de los campos modificados de la entidad en Dataverse, obteniendo para cada fila el nombre del campo modificado, el valor anterior y el valor nuevo. Cada uno de los objetos JSON contiene la siguiente estructura:

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

A través del siguiente código PySpark, se realizan las modificaciones necesarias sobre el dataframe principal.

  • Convertir el campo ‘ChangedFields’ en estructura JSON a través de la función from_json definiendo la estructura del esquema previamente.
  • Obtener una línea por cada objeto JSON dentro del Array mediante la utilización de la función explode.
  • Eliminar la parte final ‘.0000000’ del campo de fecha (createdon) utilizando una expresión regular y la función regexp_replace, para evitar errores de formato durante la ejecución de las celdas posteriores del Notebook.
Copy to Clipboard

Si mostramos el dataframe actualizado tras las transformaciones a través de la función display, podemos observar que ya se incluye el desglose de los campos procedentes de cada objeto JSON para cada una de las filas procesadas.

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

3.3 Formatear fecha de creación

Para convertir el campo de fecha (createdon) al formato ‘yyyy-MM-dd’, añadiendo una columna adicional (Auditdate) al dataframe principal, se utiliza la función to_date.

Copy to Clipboard
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

3.4 Particionado

Como se comentaba en el apartado de arquitectura, de cara a organizar los ficheros históricos de auditoría en Data Lake, se propone utilizar la estructura ‘AÑO/MES/DÍA’. Para ello, se hace uso de la función partitionBy de PySpark, la cual permite distribuir los datos de un dataframe en particiones en función de los campos especificados:

df_final = df_expanded.select(«*»,year(«AuditDate»).alias(«year»),month(«AuditDate»).alias(«month») ,dayofmonth(«AuditDate»).alias(«day»))

3.5 Guardado del resultado en Data Lake

Una vez particionado el dataframe, se procede al almacenar el contenido en el directorio /AuditData de la cuenta de almacenamiento de Data Lake. Para ello, se hace uso de la función write, especificando los siguientes parámetros:

  • La escritura de datos en Data Lake se realiza de forma particionada por año, mes y día.
  • En el caso de existir ficheros en el directorio de destino, se actualizan (overwrite) en lugar de generar nuevos ficheros.
  • La primera fila del fichero incluye las columnas del dataframe.
  • Para la escritura en Data Lake se utiliza la codificación UTF-8.
  • Los ficheros generados se almacenan en el directorio audit-crm/AuditData/csv

 

df_final.write.partitionBy(«year»,»month»,»day»).mode(«overwrite»).option(«header», «true»).option(«encoding»,»UTF-8″).csv(«abfss://audit-crm@xxxxxx.dfs.core.windows.net/AuditData/csv»)

3.6 Actualización fecha de ejecución

Finalmente, se actualiza la fecha de última extracción en el fichero de control incremental, con objeto de recuperar solamente los datos de auditoría modificados a futuro desde la última extracción durante las próximas ejecuciones del proceso.

Se toma como fecha de última extracción el valor más reciente del campo de fecha (createdon) sobre el incremental del datos procesado en el Notebook. Para ello, se utilizan las funciones max y collect() y se almacena la fecha en la variable maxsq. A continuación, se lee el fichero de control incremental del directorio raíz y se almacena en un nuevo dataframe, añadiendo como columna adicional el valor almacenado en la variable.

maxsq = df_final.select(max(col(«Createdon»).cast(«string»))).collect()[0][0] df_lastSyncDate = spark.read.load(‘abfss://audit-crm@xxxxx.dfs.core.windows.net/contact_LastSyncDate.csv’, format=’csv’, header=True,delimiter=’;’)
df_lastSyncDate=df_lastSyncDate.withColumn(«LastSyncDate»,lit(maxsq))

Por último, utilizando la librería Pandas y la función to_csv se procede a actualizar el fichero CSV de control incremental en la cuenta de almacenamiento de Data Lake con la última fecha de extracción.

pandas_df=df_lastSyncDate.toPandas()
pandas_df.to_csv(‘abfss://audit-crm@xxxxx.dfs.core.windows.net/contact_LastSyncDate.csv’,index=False)

4. Una vez implementadas todas las celdas anteriores, publicar el notebook pinchando en el botón ‘Publicar’ para guardar los cambios.

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

. Para verificar que el proceso funciona correctamente, antes de proceder a su automatización desde la canalización principal, se puede ejecutar el Notebook completo pinchando en el botón ‘ ▶ Ejecutar todo’ ubicado al inicio de la botonera superior.

NOTA: La primera vez que se ejecuta el Notebook puede tardar unos minutos en iniciar hasta que se inicia el clúster de Spark.

Si la ejecución del Notebook es correcta, al finalizar se debería generar la estructura de carpetas particionada por Año, mes y día y los ficheros CSV de auditoría correspondientes.

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

Si examinamos uno de los ficheros particionados generados, se deberían visualizar la siguiente estructura de columnas:

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

Finalmente, se procede a añadir la ejecución del Notebook como fase final de la canalización principal de Synapse Analytics, a través de los siguientes pasos.

  1. Desde la sección ‘Synapse’ del panel de actividades, seleccionar y arrastrar una actividad de tipo Bloc de notas al área de trabajo, y conectarla a la salida de la actividad de Copiar datos creada anteriormente para la ingesta de datos.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. En la pestaña ‘General’ de la actividad, establecer el nombre y la descripción, verificando que el estado de la actividad es ‘Activado’.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. En la pestaña Configuración, seleccionar el Notebook creado anteriormente en el campo ‘Bloc de notas’ y el Grupo de Spark creado inicialmente, los demás campos se pueden dejar con el valor por defecto.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Hacer click en ‘Publicar todo’ para guardar los cambios en la canalización.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

Con esto, queda implementadas todas las fases que conforman la canalización principal de Synapse Analytics. Se podría ejecutar el proceso completo a petición pinchando en el botón ‘Agregar desencadenador’ y seleccionando la opción ‘Ejecutar ahora’, o bien crear un nuevo desencadenador para la ejecución del proceso de forma automática con una recurrencia específica.

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

Explotación de auditoría desde PowerBI

Al igual que el artículo anterior, se propone la elaboración de un reporte de Power BI para facilitar la consulta y la explotación de los datos históricos de auditoría de Dataverse almacenados en Data Lake. A continuación, se indican los pasos necesarios para establecer la conexión con la cuenta de almacenamiento y elaborar un informe de auditoría sencillo.

  1. Desde la herramienta PowerBI Desktop, seleccionar la opción ‘Azure Data Lake Storage Gen2’ desde el menú ‘Obtener datos’.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Introducir la dirección URL (dfs) del contenedor audit-crm utilizado para almacenar el histórico de auditoría en Data Lake. la cual tiene el siguiente formato (donde ‘XXXXX’ es el nombre de la cuenta de almacenamiento):

https://XXXXX.dfs.core.windows.net/audit-crm/AuditData/csv

 

Se solicitarán las credenciales de Azure o la clave compartida de la cuenta de almacenamiento para realizar la autenticación.

  1. En la siguiente pantalla, pinchar en ‘Transformar datos‘, ya que son necesarios algunos ajustes previos a la carga de datos en el reporte.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Desde el editor de Power Query, se deben excluir ficheros de metadatos, por lo que se aplica el filtro ‘Termina con .csv’ sobre la columna ‘Name’ para incluir exclusivamente los ficheros con extensión CSV correspondientes a los datos históricos de auditoría.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. A continuación, pinchar en el botón ‘Combinar archivos’, ubicado en la parte derecha del encabezado de la columna ‘Content’. En el diálogo ‘Combinar archivos’, pinchar en Aceptar y proceder a la carga de datos en el reporte pinchando en el botón ‘Cerrar y aplicar’ situado al inicio de la cinta de opciones.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
  1. Después de cargar los datos, se pueden agregar visualizaciones al área de trabajo. Estas visualizaciones pueden ser tablas, gráficos, segmentaciones u otros elementos visuales. La selección de las columnas, los filtros y otros ajustes se realizan según las necesidades del informe.
Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure

NOTA: Si se desea publicar el informe en el servicio de PowerBI, es necesario disponer de una licencia de PowerBI y los privilegios necesarios asignados.

Conclusión

A lo largo del artículo, hemos visto cómo implementar una solución paso a paso para exportar los datos de auditoría de la entidad de contacto de Dataverse a una cuenta de almacenamiento de Data Lake (Gen2), mediante la implementación de un proceso ETL utilizando Azure Synapse Analytics y Apache Spark, y la explotación del histórico de auditoría desde un informe de Power BI. Esta solución ofrece una serie de ventajas, como liberar espacio en la base de datos de Dataverse reduciendo costes en el almacenamiento, mejorar la flexibilidad y el análisis de los datos de auditoría, y la escalabilidad para adaptarse a entornos de gran tamaño.

Espero que haya resultado interesante y sirva de ayuda o como guía de referencia si os plantean un escenario similar en el que pueda aplicarse la solución propuesta.

About the Author: Alberto Cotelo

Exportar la auditoría de Dataverse a Data Lake mediante Azure Synapse y Spark Axazure
Consultor técnico Dynamics 365 CE

¿Quieres compartir?