Diseno y desarrollo de flujos en NiFi
Disenar en NiFi no consiste en arrastrar procesadores hasta que algo funcione. Un flujo mantenible debe expresar responsabilidades, controlar errores y permitir operar sin miedo.
Modelo mental
Piensa cada flujo como una serie de etapas:
entrada -> normalizacion -> validacion -> routing -> salida -> erroresCada etapa deberia poder explicarse con una frase. Si un process group necesita una pagina entera para entenderse, probablemente esta mezclando demasiadas responsabilidades.
Nombres y estructura
Usa nombres orientados a negocio:
Ingesta de pedidos desde SFTP
Validacion de esquema de pedidos
Publicacion en Kafka ventas.pedidos
Errores de pedidos invalidosEvita nombres genericos:
GetFile 1
ProcessGroup final
Test nuevo
Processor copiaProcess groups recomendados
Una estructura habitual:
01 Entrada
02 Validacion
03 Enriquecimiento
04 Publicacion
90 Reintentos
99 ErroresNo hace falta seguir exactamente esa numeracion, pero ayuda a que el canvas tenga lectura de izquierda a derecha.
Relationships
Cada processor tiene salidas posibles llamadas relationships. Algunas comunes:
successfailureretryoriginalmatchedunmatched
Una relationship sin conectar puede bloquear el processor, salvo que se marque como auto-terminated. Auto-terminar una relacion significa descartar esos FlowFiles, asi que debe ser una decision explicita.
Routing por atributos
RouteOnAttribute permite enviar FlowFiles a rutas distintas segun sus atributos.
Ejemplo:
${mime.type:equals('application/json')}
${filename:endsWith('.csv')}
${source:equals('crm')}Uso tipico:
entrada -> RouteOnAttribute
-> json -> flujo JSON
-> csv -> flujo CSV
-> unmatched -> erroresRouting por contenido
Cuando la decision depende del contenido, puedes usar processors como:
EvaluateJsonPathEvaluateXPathExtractTextQueryRecordValidateRecord
Patron comun:
EvaluateJsonPath -> RouteOnAttribute -> salida especificaExtrae lo minimo necesario a atributos. No conviertas cada campo de negocio en atributo si el contenido es grande.
Formato Record
La familia de processors Record permite trabajar con CSV, JSON, Avro, Parquet y otros formatos de manera mas estructurada.
Ejemplos:
ConvertRecordValidateRecordQueryRecordPutDatabaseRecordPublishKafkaRecord_2_6
Necesitan lectores y escritores configurados como controller services.
Ejemplo: ingesta de CSV
GetFile
-> UpdateAttribute
-> ValidateRecord
-> valid -> ConvertRecord -> PutDatabaseRecord
-> invalid -> PutFile cuarentenaControles recomendados:
- Guardar ruta y nombre original.
- Anadir identificador de ejecucion.
- Validar esquema.
- Separar registros invalidos.
- Publicar metricas de volumen.
Ejemplo: API a Kafka
GenerateTableFetch
-> ExecuteSQLRecord
-> ConvertRecord
-> PublishKafkaRecord_2_6O para HTTP:
InvokeHTTP
-> EvaluateJsonPath
-> RouteOnAttribute
-> PublishKafkaRecord_2_6Documentacion del flujo
Cada process group importante deberia documentar:
- Objetivo.
- Fuente.
- Destino.
- Frecuencia.
- Formato esperado.
- Relaciones de error.
- Parametros necesarios.
- Propietario.
NiFi permite anadir comentarios y labels en el canvas. Usalos para decisiones que no sean obvias.
Checklist de desarrollo
- El flujo tiene grupos por responsabilidad.
- Los nombres explican intencion.
- Todas las relationships estan conectadas o terminadas de forma consciente.
- Hay rutas separadas para exito, retry y fallo definitivo.
- Las rutas, hosts y credenciales son parametros.
- Los controller services estan reutilizados.
- La salida final es idempotente o deduplicable.
- El flujo queda versionado en Registry.
