Apache Flink 内部提供了内置的常用消息格式的 Schemas
前言
Flink 消费 Kafka 时,要对消息进行格式化
Schemas
Flink 有提供内置的 Schemas
先看 Flink 中初始化 Kafka 数据源代码,其中传入服务器名和 Topic 名就可以了
1 | Properties props = new Properties(); |
上方 SimpleStringSchema 即为 Schemas。如果需要使用其他的,替换掉该参数,并把相应数据类型进行修改
如果想自己实现 Schema ,可以参看Apache-Flink深度解析-DataStream-Connectors之Kafka Simple ETL 部分,
与 SimpleStringSchema 是一样的效果,只是自己实现的 Schema
如果类找不到,请添加依赖
1 | <dependency> |
SimpleStringSchema
SimpleStringSchema 把 message 反序列化为字符串。如果 message 有键, 则忽略键
JSONDeserializationSchema
JSONDeserializationSchema 使用 jackson 将 message 反序列化为 json 格式的消息并返回 com.fasterxml.jackson.databind.node.ObjectNode 对象流。你可以使用 .get(“property”) 方法访问字段。再一次, 键被忽略。
JSONKeyValueDeserializationSchema
JSONKeyValueDeserializationSchema 与前一个非常类似,但处理带有json编码的键和值的消息
返回的 ObjectNode 包含如下字段:
key: 键中存在的所有字段
value: 所有的 message 字段
metadata(可选): 暴露消息的 offset, partition 和 topic(将 true 传递给构造函数以获取元数据)
例如:
1 | kafka-console-producer --broker-list localhost:9092 --topic json-topic \ |
会被解码为
1 | { |
如果消息本身就没有 key、metadata。JSONKeyValueDeserializationSchema 解析出来也是没有key、value
AvroDeserializationSchema
使用静态提供的模式读取使用 Avro 格式序列化的数据
可以从 Avro 生成的类(AvroDeserializationSchema.forSpecific(…))推断出模式,或者它可以与 GenericRecords 一起使用手动提供的模式(使用AvroDeserializationSchema.forGeneric(…))
TypeInformationSerializationSchema
TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema) 它基于Flink的TypeInformation创建模式。 如果数据由Flink写入和读取,这将非常有用
总结
AvroDeserializationSchema 与 TypeInformationSerializationSchema
还没接触过,这里就暂时记录下,如果下次有用到再回来记录
参考链接