Si bien los datos son un activo fundamental para las empresas modernas, la capacidad de la tecnología para escalar ha resultado en una avalancha de big data. La gestión y el almacenamiento de datos se han convertido en un componente necesario para los procesos operativos modernos.
Snowflake, un almacén de datos en la nube que es elogiado por su capacidad para admitir entornos de infraestructura de múltiples nubes, es una de las plataformas de datos más populares. Snowflake es un almacén de datos que se ejecuta sobre Amazon Web Services o la infraestructura en la nube de Microsoft Azure, lo que permite que el almacenamiento y la computación se escalen de forma independiente.
Integración de la base de datos Snowflake con SAP Data Intelligence
Tanto la versión SAP DI On-Premises como la versión en la nube brindan conectividad con Snowflake Database.
Sin embargo, hasta la versión local de SAP DI 3.2, no hay un operador incorporado disponible en el modelador DI para admitir la carga de datos en la base de datos Snowflake.
En la versión de SAP DI Cloud, hay un operador de Generación 2 Cloud Table Producer que admite la carga de datos en Snowflake Database.
Existen ciertas limitaciones mencionadas a continuación con el operador Cloud Table Producer:
Para superar este tipo de desafíos/requisitos, hay una opción disponible en SAP Data Intelligence para crear un operador Snowflake personalizado para cargar datos.
Construyendo Operador Snowflake Personalizado en SAP Data Intelligence.
Para construir el operador personalizado en SAP DI, primero debemos cumplir algunos pasos.
1. Cree un archivo acoplable en SAP DI para instalar una biblioteca de Python de Snowflake.
Vaya a la pestaña Repositorio y haga clic en crear archivo Docker.
Escriba los comandos como se menciona en la captura de pantalla, guárdelo y haga clic en compilar.
2. Crea un nuevo operador personalizado
Vaya a la pestaña Operador y seleccione Operador de generación 2. Haga clic en el signo Más para crear un nuevo operador. Seleccione Operador base como Python3 (Generación 2)
3. Vaya a la pestaña de configuración y defina los parámetros.
Puede crear parámetros haciendo clic en el botón editar y definir los detalles.
4.Seleccione una imagen de icono de operador y guarde el operador. Puede ver este operador personalizado visible en la pestaña Operador de generación 2.
Cree un gráfico para cargar los datos de las vistas de S4H CDS en la base de datos Snowflake.
Crearemos un gráfico para extraer los datos de la vista S4 Hana CDS y cargarlos en la tabla Snowflake usando Custom Snowflake Operator.
1.Cree el gráfico Gen2 y arrastre el operador «Leer datos de SAP» en el gráfico. Aquí asumimos que para conectarnos con S4 Hana CDS view, hemos creado el tipo de conexión ABAP RFC.
Seleccione la conexión, el nombre del objeto y el modo de transferencia en la configuración del operador.
2. Arrastre el operador Transformación de datos en el gráfico y conéctelo con Leer datos del operador SAP, defina la asignación de columnas de salida en el operador de transformación.
Aquí en Target, he agregado una nueva columna LAST_UPDATE_DATE para completar la marca de tiempo mientras se cargan los datos.
3.Ahora arrastre el operador Snowflake personalizado que hemos creado. Cree un puerto de entrada con Tipo de datos como TABLA.
4.Conecte el puerto de salida del operador de transformación de datos con el puerto de entrada del operador de copo de nieve personalizado y finalmente agregue el terminador gráfico, el operador de terminal a la salida del operador de copo de nieve. Además, agrupe el operador de copo de nieve con el archivo Docker que hemos creado para copo de nieve.
Ahora el gráfico completo se verá así.
5. Abra la configuración del operador Snowflake Gen2 y mencione los detalles requeridos en los parámetros.
6.Haga clic en la opción Script en Snowflake Operator y escriba el código de Python para transformar y cargar los datos de las vistas de CDS en Snowflake Table.
import pandas as pd
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
from io import StringIO
import csv
import json
from datetime import datetime
snowflake.connector.paramstyle="qmark"
if api.config.Truncate == "Yes":
conn_sf = snowflake.connector.connect(
account = api.config.Account,
user = api.config.Username,
warehouse = api.config.Warehouse,
password = api.config.Password,
database = api.config.Database,
schema = api.config.Schema)
table=[api.config.Snowflake_Table_Name]
conn_sf.cursor().execute("delete from IDENTIFIER(?)",table)
else:
conn_sf = snowflake.connector.connect(
account = api.config.Account,
user = api.config.Username,
warehouse = api.config.Warehouse,
password = api.config.Password,
database = api.config.Database,
schema = api.config.Schema)
def on_input(msg_id, header, data):
tbl = data.get()
x=header['com.sap.headers.batch']
y=x[1]
if x[1]==False: # Check Last Batch - False, If True then Send signal to terminate the Graph
api.outputs.log.publish(str("Graph Started"))
tbl_info = api.type_context.get_vtype(tbl.type_ref)
col_names = list(tbl_info.columns.keys())
df = pd.DataFrame(tbl.body,columns = col_names,dtype=str)
table=api.config.Snowflake_Table_Name
api.outputs.log.publish(str("Load Started"))
try:
inserted_rows = write_pandas(conn = conn_sf, df = df,table_name = table,compression="snappy",parallel=90, quote_identifiers = False)
api.outputs.log.publish(str(inserted_rows))
api.outputs.log.publish(str("Load Completed"))
api.logger.info(str(inserted_rows))
except Exception as e:
api.outputs.log.publish(str('Error while loading data'))
date = datetime.now(). strftime("%Y_%m_%d-%I:%M:%S")
df.to_csv(r'/vrep/vflow/Error_File_Name.csv_{}'.format(date), index=False)
api.outputs.log.publish(str('Failed data pushed to error file in path /vrep/vflow/'))
api.outputs.log.publish(str(e))
api.logger.info(str(e))
api.propagate_exception(e)
else:
api.outputs.log2.publish(str('no data'))
Conector de copo de nieve Escribir pandas
Aquí, en este operador personalizado, hemos utilizado la biblioteca estándar de pandas de escritura de copos de nieve. El conector proporciona métodos API para escribir datos en una base de datos Snowflake desde un Pandas DataFrame.
Una de las principales desventajas de este conector es que la tabla de destino debe existir en la base de datos de Snowflake, solo entonces cargará los datos en la tabla. Este conector no crea la tabla automáticamente.
Una vez que se define la tabla de destino, usaremos la función de escribir pandas para agregar los datos, lo que realiza un truco de SQL detrás de escena. Primero carga los datos en una ubicación de almacenamiento temporal, luego usa COPY INTO para mover los datos de esa ubicación a la tabla.
Aquí, no tenemos que especificar la ubicación de almacenamiento externo como lo tenemos que hacer en el operador productor de Table Cloud.
Una vez que ejecute el gráfico, los datos se extraerán de la vista CDS y se cargarán en la tabla Snowflake con TIMESTAMP. A continuación se muestra una instantánea de los datos cargados en la tabla de destino.
Limitaciones de este enfoque
1.Como se mencionó anteriormente, la tabla de destino debe crearse primero en Snowflake antes de ejecutar el gráfico. Sin embargo, también existe un enfoque para crear la tabla a partir del gráfico DI utilizando el paquete SQLAlchemy. Discutiremos eso en nuevos blogs.
2. Requerir un buen conocimiento del script de Python para definir la lógica y el flujo
En conclusión, el operador Custom Snowflake es un buen enfoque si no tenemos una ubicación de preparación externa como S3, WASB para almacenar los archivos y desea cargar datos en la tabla de Snowflake usando la preparación interna. Además, si queremos implementar requisitos adicionales como el mecanismo UPSERT que no está disponible en el operador productor de la nube de tablas, podemos aprovechar esta opción y escribir el código personalizado dentro del operador. También proporciona flexibilidad para escribir un mecanismo de manejo de excepciones de acuerdo con los requisitos.
Por favor, comparta sus comentarios o pensamientos en un comentario.
Para obtener más información sobre SAP Data Intelligence, puede consultar y seguir las siguientes páginas:
Calle Eloy Gonzalo, 27
Madrid, Madrid.
Código Postal 28010
Paseo de la Reforma 26
Colonia Juárez, Cuauhtémoc
Ciudad de México 06600
Real Cariari
Autopista General Cañas,
San José, SJ 40104
Av. Jorge Basadre 349
San Isidro
Lima, LIM 15073