Schema Inferer
###Overview Schema Inferer is a library, based on Spark Streaming, that can be used to infer the schema of a given Kafka data stream and continuously monitor the stream for schema changes. A schema is basically the union of all fields, in a data stream, observed over a period of time. The inferred schemas (initial and the later modified ones) are then stored in a SchemaStore which can be systems like Zookeeper among others. Depending upon the SchemaStore implementation, it can also be accessed externally to obtain the current schema for a given data stream or to obtain the schema at a certain point in time. The Schema Inferer also exposes an interface which lets users define an action to be performed when a schema change is observed. This could include creating/updating a Hive table, emitting a schema change to a changelog etc.
Schema Inferer is built using Baryon, a library that consumes data from Kafka and provides an interface to operate on the consumed data.
###Common use-cases
- Understand the structure of a Kafka data stream based on the infered schema.
- Track schema changes and understand how a schema has evolved over time. Every time the schema for a stream changes, the
SchemaStorecaptures and stores the observed schema. - Monitor a schema and take action as soon as an invalid or unexpected change in schema is observed.
- Publish schema changes to other systems for further analysis. Schema changes can be stored in HDFS for long term analysis or even sent to systems like Kafka, from which other real time applications can consume the change logs and perform relevant actions.
- If the Kafka data stream is persisted to a long term storage like HDFS, then Schema Inferer can be used to automatically create Hive EXTERNAL tables for that data.
###Components
- SchemaStore: The
SchemaStoretrait is responsible for storing the latest schema inferred from a data stream. Schema Inferer has implementations for two stores - Zookeeper and In-Memory.- ZookeeperSchemaStore - The
ZookeeperSchemaStorecreates a new znode every time a new schema is inferred. The Zookeeper location can be adjusted and will be of the format/$ZK_ROOT/$TOPIC_NAME/$TOPIC_NAME_$TIMESTAMP. Every topic will also have acurrentnode which contains the latest schema corresponding to that topic data stream. TheZookeeperSchemaStorethus contains the entire history of schemas corresponding to a data stream. On application restarts, the Schema Inferer using theZookeeperSchemaStorewill fetch the previously inferred schema and continue monitoring the data stream using the stored schema as the baseline. - InMemorySchemaStore - The
InMemorySchemaStoresaves the inferred schemas inside an in-memory cache. UnlikeZookeeperSchemaStoretheInMemorySchemaStoredoes not provide external API's to fetch the schemas inferred over a period of time. The other disadvantage is that during each application restart, the baseline schema will have to be inferred, since there is no previous schema to start with. Note:ZookeeperSchemaStoreinternally usesInMemorySchemaStoreas the cache.
- ZookeeperSchemaStore - The
- Publisher: The
Publishertrait is responsible for taking an action when a schema is inferred for the first time or during a subsequent schema change. The current Schema Inferer has implementations for aHiveSchemaPublisherandLog4jSchemaPublisher.
###Design The Schema Inferer plugin operates on RDD's generated by the Baryon receiver. The schema for a data stream is generated by iteratively inferring the schema for these RDD's. The RDD's of type WrappedMessage, obtained from Baryon, are first converted to RDD's of type TopicAndEvent by an Extractor and then later converted to a SchemaRDD of type String. Spark SQL API's are then used to extract the schema for the RDD. The fetched schema is later converted to a custom type called Schema and passed on to the corresponding SchemaStore and Publisher implementations. The Schema object generated during each run is compared with the cached Schema object to detect changes. In the event a change, the newly generated Schema object is merged with the cached Schema object. The new merged Schema is then saved in the appropriate SchemaStore and processed using the provided Publisher implementation.
###Resolving schema merge conflicts If the cached and newly generated Schema objects have a field at the same nested level but belong to different types, a merge conflict occurs. To resolve the merge conflicts we have defined a precedence order so that the type with greater precedence is chosen over the one with lower precedence. Precedence order:
"struct", "array", "string", "double", "float", "long", "integer", "short", "byte", "boolean"
Here struct has the highest precedence while boolean has the lowest.
###Example of schema merge
- Cached schema:
root
|-- body: struct <
| |-- request: string
| |-- hostname: string
| |-- page: struct <
| | |-- parentEventId: string
| | |-- country: string
| | |-- division: string
| | |-- id: integer
| | |-- app: string
| |-- >
|-- timestamp: string
- Schema inferred during a run:
root
|-- body: struct <
| |-- request: string
| |-- hostname: string
| |-- page: struct <
| | |-- id: string
| | |-- app: string
| | |-- platform: string
| |-- >
| |-- layer: string
|-- timestamp: string
- Difference between cached and newly generated schema
root
|-- body: struct <
| |-- page: struct <
| | |-- id: string -- integer
| | |-- platform: string
| |-- >
| |-- layer: string
- New Merged schema:
root
|-- body: struct <
| |-- request: string
| |-- hostname: string
| |-- page: struct <
| | |-- parentEventId: string
| | |-- country: string
| | |-- division: string
| | |-- id: string
| | |-- app: string
| | |-- platform: string
| |-- >
| |-- layer: string
|-- timestamp: string
###Configurations The following configurations need to be used along with the Baryon configs. They can be hard coded in your driver or provided via a config file
| Config name | Default | Description |
|---|---|---|
| topic.warmup.policy | TIME | Policy that decides how long to wait before the initial publish/store of a schema. Valid values:[TIME] |
| topic.warmup.time.sec | 60 | Time in seconds to wait before publishing/storing schema for first time |
| spark.num.receivers | 3 | Number of spark streaming receivers |
| store.type | HASH | The type of schema store to use ZOOKEEPER or HASH(In memory) |
| store.zk.connect | NONE | (Required if store.type is ZOOKEEPER) The zookeeper connect string |
| store.zk.root | NONE | (Required if store.type is ZOOKEEPER) The zookeeper root path |
| publisher.type | NONE | (Required) The type of the publisher : HIVE/KAFKA/DEFAULT/FILE/LOG |
| publisher.hive.db | Default | The hive database to connect to |
| publisher.hive.serde.jar | NONE | (Required if publisher.type is HIVE) The hive serde jar |
| publisher.hive.serde.class | NONE | (Required if publisher.type is HIVE) The hive serde class |
| publisher.hive.blacklist | NONE | The topics to not publish to hive |
| rdd.sampling.ratio | 0.25 | The sampling ratio to use when inferring the schema |
| consumer.type | baryon | The consumer to use : direct/baryon |
###Maven
<dependency>
<groupId>com.groupon.dse</groupId>
<artifactId>schema-inferer</artifactId>
<version>1.0</version>
</dependency>
###Sample Usage Refer to a sample inferer to understand the usage.
###Limitations
- Schema Inferer only supports Kafka data streams comprising of JSON formatted events.
- The
HiveSchemaPublishercurrently performs aDROP TABLEfollowed by aCREATE TABLEduring a schema change. As a result of this, existing Hive partitions have to be re-created.