Difference between revisions of "Apache Kafka integration"

From Fusion Registry Wiki
Jump to navigation Jump to search
(Structure Notification)
 
(75 intermediate revisions by 4 users not shown)
Line 1: Line 1:
 +
[[Category:Fusion Registry Install]]
 +
[[Category:How_To]]
 
=Compatibility=
 
=Compatibility=
 
{| class="wikitable"
 
{| class="wikitable"
Line 4: Line 6:
 
! Product !! Module !! Version !! Support
 
! Product !! Module !! Version !! Support
 
|-
 
|-
| Fusion Registry Enterprise Edition|| Core || 10.0 || Kafka producer supporting 'Structure Notification' events
+
| Fusion Registry || Core || 10.0 and higher || Kafka producer supporting 'Structure Notification' events
 
|-
 
|-
 
|}
 
|}
Line 10: Line 12:
 
=Overview=
 
=Overview=
 
Fusion Registry can act as an Apache Kafka Producer where specified events are published on definable Kafka [https://kafka.apache.org/quickstart#quickstart_createtopic topics].
 
Fusion Registry can act as an Apache Kafka Producer where specified events are published on definable Kafka [https://kafka.apache.org/quickstart#quickstart_createtopic topics].
 +
It consists of a generalised ‘producer’ interface capable of publishing any information to definable Kafka topics, and a collection of  ‘handlers’ for managing specific events.
  
The library of possible events is extensible with the expection that new events can be easily added as required when use cases emerge. At present, the Kafka publication system recognises only one event, namely [[#Structure Notification|Structure Notification]] which publishes changes to structures in a similar way to that of the RSS feed and the email notification subscription process.
+
At present there is only one event handler [[#Structure Notification|Structure Notification]] which publishes changes to any structures as they occur, this also include data registrations.
  
Other events are envisaged include:
+
For changes or modifications to structures, the body of the message is always an SDMX structure message. The format is configurable at the event handler level, the structure producer can support any structure output as long as the registry is aware of the VND header, however the UI offers the following choices:
* Anything that's audited including data registrations, user login events etc
+
* Fusion JSON (the JSON dialect that pre-dated the formal SDMX-JSON specification)
* Errors
+
* SDMX-ML v2.1
* Configuration changes, e.g. changes to server settings
+
 
* Changes to Content Security Rules
+
Fusion JSON is recommended if the Fusion Registry is being used to support non-sdmx structures (which can not be described in SDMX-ML), for example Reporting Templates and Validation Schemes.
 +
 
 +
A ‘tombstone’ message is used for structure deletions with the structure URN in the Kafka Message Key, and a null body.
  
Support for these will be added in future Fusion Registry releases.
+
The list of supported events that can be published to Kafka will be extended over time.
  
A Fusion Registry instance can connect to only one Kafka broker service.
+
Other events envisaged include:
 +
* Audit events such as logins, data registrations and API calls
 +
* Errors, for instance where a scheduled data import fails
 +
* Configuration changes
 +
* Changes to Content Security Rules
  
 
=Configuration=
 
=Configuration=
 
Configuration is performed through the GUI with 'admin' privileges.
 
Configuration is performed through the GUI with 'admin' privileges.
  
===Connection===
+
=== Connection ===
The Connection form configures the parameters needed to connect to the Kafka broker service.
+
Before Kafka can be used, a connection must be defined the Fusion Registry to a Kafka server (or servers), taking note of the following.
 +
 
 +
==== Number of Connections====
 +
The Registry supports one connection definition to one or more bootstrap servers.  Each server connection can be separated by a comma.  The UI displays all connection properties supported by Kafka, and displays the default value that will be used when not explicitly set.
 +
 
 +
==== Transactions ====
 +
Whilst Kafka does not enforce transactions, the Fusion Registry does.  A connection will default to using the transaction prefix ‘tx’.  The documentation states that this can be null, which is not the case for the Fusion Registry.
 +
 
 +
==== SSL ====
 +
SSL is supported provided that:
 +
* Your Kafka system has been set up correctly with a KeyStore, TrustStore and appropriate passwords
 +
* The Registry has access to both KeyStore and TrustStore
 +
 
 +
To enable SSL:
 +
* Edit the Kafka configuration file: server.properties
 +
* Locate the address for the listeners. This will need to either be changed to support SSL or another port will need to be added. For example, having 2 ports, one for PlainText and the other for SSL can be defined with:
 +
<pre>
 +
listeners=PLAINTEXT://:9092,SSL://localhost:9093
 +
</pre>
 +
 
 +
* Add the following section for your SSL configuration:
 +
<pre>
 +
ssl.keystore.location = <location of the keystore>
 +
ssl.keystore.password = <keystore password>
 +
ssl.key.password = <key password>
 +
ssl.truststore.location = <location of the truststore>
 +
ssl.truststore.password = <truststore password>
 +
</pre>
 +
 
 +
For example:
 +
<pre>
 +
ssl.keystore.location = f:/keystores/kafka.server.keystore.jks
 +
ssl.keystore.password = password
 +
ssl.key.password = password
 +
ssl.truststore.location = f:/keystores/kafka.server.truststore.jks
 +
ssl.truststore.password = password
 +
</pre>
  
[[File:KafkaConnection.PNG|500px]]
+
==== Kerberos ====
 +
Kerberos integration can be achieved by providing a value for the sasl.jaas.config property.  This property expects a new line per configuration setting, in the UI the text area is provided to support multiple lines.
  
{| class="wikitable"
+
Example SASL configuration:
|-
+
<pre>
! Paramater!! Value
+
com.sun.security.auth.module.Krb5LoginModule required
|-
+
debug=false
| Client ID || A unique identifier for the Fusion Registry client. There's no real restrictions.
+
doNotPrompt=true
|-
+
principal=\xxx@Byyy.zzz.org\
| Host || Hostname or IP address of the Kafka Service as defined by the Kafka administrator
+
useKeyTab=true
|-
+
keyTab=path_to_keytab
| Port || Port number of the Kafka service
+
useTicketCache=true
|-
+
sasl.kerberos.service.name=kafka
| Comression Algorithm || The algorithm to compress the payload. Choices are: None, GZIP, Snappy, LZ4.   
+
serviceName=kafka;
|-
+
</pre>
| Enable Kerberos Security || If Enabled,the Producer attempts to authenticate with the specified Kerberos service for access to Kafka
+
 
|}
+
==== Max Request Size ====
 +
The connection definition supports the max.request.size property, which will enforce a restriction on the maximum size of the message that can be sent.  Additional restrictions for this can be defined on the Kafka server, and the Kafka topic, which are both configured outside of the Fusion Registry. It is important to ensure the server and topic can also handle large request sizes if this property is changed in the Fusion Registry. 
 +
It is important to note that the max request size refers to the size of the message BEFORE any compression is applied.  This is a behaviour of Kafka and cannot be modified. When the message is sent, both the topic and connection will apply the max request size on the compressed message (if compression is enabled) and therefore these can be set to a lower threshold.
 +
 
 +
==== Test Connection ====
 +
[[File:Kafka test results.png|thumb|Example connection test results]]
 +
The Fusion Registry ensures the bootstrap servers can all be connected to before the connection is saved.  However, the test function should be used to ensure the following:
 +
* A message can be sent using the security defined in the connection (e.g. Kerberos)
 +
* Transactions are supported by the connection
 +
* Large messages can be sent with no issue
 +
 
 +
'''Note 1''': the last test generates a large message (up to the number of bytes specified). As the message contains a lot of repeated text, the compression (if set) will make the final message size very small.  If the intention is to also test the message size against the topic and kafka server, then ensure compression is set to none before running the test.
 +
 
 +
'''Note 2''': The test is run against the applied configuration, remember to apply changes BEFORE running the test.
 +
 
 +
===Structure Producer===
 +
 
 +
==== Setting Up ====
 +
[[File:Kafka structure producer.png|thumb|Showing the UI for the Structure Producer]]
 +
The Structure Producer uses the Kafka connection to send all structural changes. However, before the structure producer becomes operational, it must be assigned to the Kafka topic to use, and must be given a message format to notify structures in.
 +
 
 +
The structure producer will not become active until it is started.  The fist time the producer is started it can either:
 +
# Start and remain idle until a structure is added/edited/removed
 +
# Start and send all current Fusion Registry structures to it's given topic in a single transaction (beware of the max size set on the connection)
 +
 
 +
==== Behaviour ====
 +
When the Fusion Registry restarts the Structure Producer will always cross check the last transaction successfully sent to the topic against the last transaction the Fusion Registry processed.  If the Fusion Registry has transactions not sent to the topic, then the structure producer will go through each unsent transaction, in sequence, until it is up to date.   
 +
 
 +
'''Note''': If the structure producer has never sent a transaction, and the Fusion Registry is restarted, the producer will not replay all transactions, it will remain idle until the next transaction occurs in the Fusion Registry.
  
===Topics===
+
The transaction Id shown by the structure producer match those shown in the Fusion Registry RSS feed.
The Topics form allows configuration of which events should be published on Kafka, and on what Kafka Topics.
 
  
[[File:KafkaTopics.PNG|500px]]
+
==== Manual Suspension ====
 +
The structure producer can be suspended at any time by clicking on the 'suspend' button.  Note, that this only suspends the producer for as long as the Fusion Registry is running.  On Fusion Registry restart, the producer will always attempt to start if it can.  To remove the producer permanently, use the 'delete configuration' option.  
  
Kafka Topic is ID of the topic on which to publish.<br>
+
==== Forced Suspension and Error Handling ====
 +
Kafka is able to detect and handle transitory errors, such as the Kafka server becoming unresponsive for a period of time, there may be occasions where a message simply cannot be sent and no amount of re-attempts will fix this.  An example failure is when the message size exceeds the limit set on the connection, server, or topic.
  
Each message can be published onto multiple topics by providing a comma separated list of topic names.
+
When a failure condition is met by the structure producer which cannot be resolved through re-attempts, the structure producer is shut down, with the transaction state being recorded as failed.   It is strongly recommended to configure the Fusion Registry to use an email server, and enter an Admin email address for the Structure Producer. When there is a failure, the Fusion Registry will send an email to this email account, which will include the error message (also found in the Fusion Registry log files).
So 'FOO' publishes on just the single topic specified.<br>
 
'FOO,BAR' publishes on both FOO and BAR topics.
 
  
 
=General Behaviour=
 
=General Behaviour=
 
===Publication Reliability===
 
===Publication Reliability===
Message publication is not 100% reliable.
+
Message publication has been designed to be 100% reliable, with messages sent in the same sequence that the Fusion Registry processed them.
 +
 
 +
The Fusion Registry makes use of Kafka supplied libraries to place messages on the topic, any transitory failures are handled by the Kafka library. A transitory failure includes the inability for the Fusion Registry to talk to the kafka server, for example if the kafka server is being restarted.  Possible transitory errors in the Fusion Registry are managed by reattempting the send (up to 3 times). 
 +
 
 +
When a message simply cannot be sent, for example the Fusion Registry can no longer authenticate with kafka, or the message size exceeds limits set, the producer is stopped and emails are sent to notify of the failure.  Once the problem is rectified and the producer restarted, the Fusion Registry will replay any pending transactions that occurred during the period of time for which the producer was stopped. 
 +
 
 +
Each Fusion Registry transaction is recorded in the database, along with the kafka status.  On server restart the Fusion Registry will cross reference the transaction logs with the kafka logs to ensure all messages have been sent to kafka.  
  
Fusion Registry places event messages as they are created onto an internal in-memory Staging Queue. When a message is created, an immediate attempt is made to publish to the Kafka broker cluster. If the publication fails (the Kafka service is unavailable, for instance), the message is returned to the Queue. An independent Queue Processor thread makes periodic attempts to complete the publication of staged messages.
+
The kafka logs are only set to SENT when the Fusion Registry has been notified by the kafka server that it has successfully received the message.
  
  IMPORTANT: Messages stay on the Staging Queue until they are successfully published to Kafka. However, the Staging Queue is not persistent so any staged but unpublished messages will be lost when the Fusion Registry service terminates.
+
=Check Structure Notification Status=
 +
  http(s)://[server]:[port]/[webapp]/ws/secure/settings/kafka/producer/status/structure
  
Each individual event message service manages the risk of message loss in a way appropriate for their specific use case. For [[#Structure Notification|Structure Notification]], the risk is managed by forcing consumers to re-synchronise their complete structural metadata content with the Registry on Registry startup. Even if structure change event messages were lost, Consumers' metadata is reset to a consistent state.
+
Response:
 +
{
 +
  Running : boolean
 +
}
  
 
=Events=
 
=Events=
Fusion Registry events available for publication on a Kafka service.
+
The following Fusion Registry events are available for publication on a Kafka service.
 +
 
 +
{| class="wikitable"
 +
|-
 +
! Event !! Details
 +
|-
 +
| Structure Notification || Addition / modification or deletion of SDMX structures
 +
|}
  
 
==Structure Notification==
 
==Structure Notification==
SDMX Structural Metadata is published to definable Kafka topics on Fusion Registry startup and each time structures are added / modified, or deleted.
+
SDMX Structural Metadata is published to definable Kafka topics on Fusion Registry startup and each time structures are added / modified, or deleted. Metadata-driven applications can subscribe to the relevant topic(s) to receive notifications of changes to structures allowing them to maintain an up-to-date replica copy of the structural metadata they need.  
  
[[File:KafkaMetadataNotification.png||900px]]
+
Example use cases include:
 +
* Structural validation applications which require up-to-date structural metadata such as Codelists and DSDs to check whether data is correctly structured.
 +
* Structure mapping applications which require up-to-date copies of relevant Structure Sets and maps to perform data transformations.
 +
 
 +
 
 +
[[File:KafkaMetadataNotification.png||700px]]
  
 
===Kafka Message Key===
 
===Kafka Message Key===
 
The SDMX Structure URN is used as the Kafka Message Key. Consumers must therefore be prepared to receive and interpret the key as the Structure URN correctly when processing the message. This is particularly important for structure deletion where the Message Key is the only source of information about which structure is being referred to.
 
The SDMX Structure URN is used as the Kafka Message Key. Consumers must therefore be prepared to receive and interpret the key as the Structure URN correctly when processing the message. This is particularly important for structure deletion where the Message Key is the only source of information about which structure is being referred to.
 +
 +
===Kafka Message Body===
 +
The Kafka Message Body is a normal SDMX structure message, except in the case of deletions where the message has no content (a null payload), and the structure to delete is identified exclusively by its URN in the Message Key.
 +
The format of the SDMX structure message is configurable via the web service, or UI (which only offers SDMX 2.1 and Fusion-JSON) for simplicity.
 +
{| class="wikitable"
 +
|-
 +
! Format !! Explanation !! HTTP Accept Header Equivalent
 +
|-
 +
| SDMX-ML 1.0 || SDMX-ML 1.0 XML structure document || application/vnd.sdmx.structure+xml;version=1.0
 +
|-
 +
| SDMX-ML 2.0 || SDMX-ML 2.0 XML structure document || application/vnd.sdmx.structure+xml;version=2.0
 +
|-
 +
| SDMX-ML 2.0 || SDMX-ML 2.1 XML structure document || application/vnd.sdmx.structure+xml;version=2.1
 +
|-
 +
| SDMX-EDI || EDI || application/vnd.sdmx.structure;version=edi
 +
|-
 +
| SDMX-JSON || SDMX-JSON 1.0 || application/vnd.sdmx.json
 +
|-
 +
| XLSX (flat) || Metadata Technology's Excel structure format || application/vnd.xlsx
 +
|-
 +
| Fusion-JSON || Metadata Technology's JSON format with similarities to SDMX-JSON provided primarily for backward compatibility || application/vnd.fusion.json
 +
|}
 +
 +
Events for certain structures will only be published if the structure type is supported by the chosen format. Only Fusion-JSON supports all possible structures including Fusion Registry 'extended structures' like [[Validation scheme|Validation Schemes]].
  
 
===Additions and Modifications===
 
===Additions and Modifications===
Line 82: Line 201:
  
 
===Deletions===
 
===Deletions===
Deletion of structures results in a 'tombstone' message, i.e. one with null payload but the URN of the deleted structure in the Message Key.
+
Deletion of structures results in a 'tombstone' message, i.e. one with a null payload but the URN of the deleted structure in the Message Key.
  
 
===Kafka Transactions===
 
===Kafka Transactions===
The addition and / or modification of a number of structures in a single Registry process are encapsulated into a Kafka Transaction, as when processing an SDMX-ML message containing more than one structure.<br>
+
Additions and modifications to structures in Fusion Registry are published using [https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka Kafka Transactional Messaging] to help guarantee consistency by providing:
 +
* Atomicity - The consumer will not be exposed to uncommited transactions
 +
* Durability - Kafka brokers guarantee not to lose any committed transactions
 +
* Ordering - Transaction-aware consumers receive transactions in the original order
 +
* Non-Duplication - No duplicate messages within transactions
 +
 
 +
If a single structure is added or changed, the transaction will contain just that structure.
 +
 
 +
Transactions will contain multiple structures in cases including:
 +
* Where multiple structures are submitted to the Registry in a single SDMX-ML message.
 +
* Where a structure's Agency or ID is changed that affects dependent structures.
 +
 
 +
Subscribers to the Kafka topic must be transaction-aware, and must ensure that each transaction is processed atomically to maintain the referential integrity of their structural metadata replicas.
 +
 
 
Structure deletions are never mixed with additions / modifications in the same transaction. This reflects the behaviour of the SDMX REST API whereby additions / modifications use HTTP POST, while deletions require HTTP DELETE.
 
Structure deletions are never mixed with additions / modifications in the same transaction. This reflects the behaviour of the SDMX REST API whereby additions / modifications use HTTP POST, while deletions require HTTP DELETE.
  
 
===Full Content Publication and Re-synchronisation===
 
===Full Content Publication and Re-synchronisation===
Under certain conditions, the Fusion Registry Producer will publish all of its structures in a single SDMX Structure Message of the chosen format to Kafka.<br>
+
It is possible to send the full registry structural metadata content to the kafka topic. This can be done when the structure producer is started for the first time, or after it has been suspended.  On restarting the producer, select 'Full Flush'.
These are:
 
# When a new Kafka connection is set up, or the configuration of an existing Kafka connection is changed
 
# Registry startup
 
 
 
The principle here is to ensure that Consumers have a consistent baseline against which to process subsequent change notifications.  
 
  
It also mitigates the risk that changes held in the Registry's in-memory queue of messages awaiting Kafka publication were lost on shutdown by forcing Consumers to re-synchronise on Registry startup.
+
'''Note''' Be mindful of the maximum message size that the connection has been configured to support.  Also note the size check is performed by the Fusion Registry BEFORE the message is compressed - this is not a behaviour of the Fusion Registry, it is the [https://issues.apache.org/jira/browse/KAFKA-4169 behaviour of the Kafka libraries].
  
 
===Not Supported: Subscription to Specific Structures===
 
===Not Supported: Subscription to Specific Structures===
 
The current implementation does not allow Kafka Consumers to subscribe to changes on specific structures or sets of structures, such as those maintained by a particular Agency. Consumers subscribing to the chosen topic will receive information about all structures and will need to select what they need.
 
The current implementation does not allow Kafka Consumers to subscribe to changes on specific structures or sets of structures, such as those maintained by a particular Agency. Consumers subscribing to the chosen topic will receive information about all structures and will need to select what they need.
  
===Publication Arbitration on Fusion Registry Load Balanced Clusters===
+
===Not Supported: Publication Arbitration on Fusion Registry Load Balanced Clusters===
 
  IMPORTANT: There is currently no arbitration mechanism between peer Fusion Registry instances configured in a Load Balanced cluster. This means that each member of the cluster independently publishes each structure change notification to Kafka, resulting in duplication of notifications.
 
  IMPORTANT: There is currently no arbitration mechanism between peer Fusion Registry instances configured in a Load Balanced cluster. This means that each member of the cluster independently publishes each structure change notification to Kafka, resulting in duplication of notifications.
  
Line 107: Line 234:
  
 
Changes are required to the Kafka event processing behaviour of Registries operating in a cluster to arbitrate over which instance publishes the notification on Kafka such that a structure change is notified once and only once, as expected.
 
Changes are required to the Kafka event processing behaviour of Registries operating in a cluster to arbitrate over which instance publishes the notification on Kafka such that a structure change is notified once and only once, as expected.
 +
 +
===Not Supported: Enforcement of Fusion Registry Content Security Rules to Structures Published on Kafka===
 +
Fusion Registry Content Security defines rules restricting access to selected structures and items to specific groups.
 +
 +
The Kafka Structure Notification processor does not enforce Content Security structure rules meaning that all structures are published to the Kafka broker service on the defined topic(s), irrespective of what restrictions may be in place. The risk here is that people or applications that would otherwise not have access to specific structures may be able circumvent the rules normally imposed by Fusion Registry's Content Security sub-system by subscribing to the Structure Notification topic on Kafka. Kafa [https://kafka.apache.org/10/documentation/streams/developer-guide/security.html Streams Security] allows control over access to topics. While an all-or-nothing approach of limiting access to all published structures may be sufficient for some applications, it doesn't solve the use case where a consumer should be allowed access to certain structures, but not others.
 +
 +
The Kafka publish-and-subscribe mechanism means that the Fusion Registry producer has no knowledge of who will be consuming the metadata at the point of publication. Up-front decisions therefore have to be made as to what rules to apply when publishing to Kafa. Options include:
 +
* Only publish 'public' structures - i.e. those without any rules restricting access
 +
* Publish changes for each Content Security group (and 'public') on their own separate Kafka topics - applying Kafka Security to the topics would then ensure the intended access restrications are enforced
 +
 +
= Registry Database Tables =
 +
 +
The folowing database tables are used in the Fusion Registry to store Kafka information
 +
 +
 +
'''kafa_producer_config'''
 +
This table is used to link a Fusion Registry Producer (i.e. the structure producer) to a topic, and message format
 +
{| class="wikitable"
 +
|-
 +
! Column !! Description
 +
|-
 +
| id || Id of the Registry Kafka Producer
 +
|-
 +
| topic || The kafka topic the producer sends messages to
 +
|-
 +
| format || The format the message is sent in
 +
|}
 +
 +
'''kafa_producer_config_properties'''
 +
This table is used to store any additional properties for the Fusion Registry Producer
 +
{| class="wikitable"
 +
|-
 +
! Column !! Description
 +
|-
 +
| id || Id of the Registry Kafka Producer
 +
|-
 +
| line_number|| properties are stored as a list, this is the index in the list
 +
|-
 +
| property || The property, in the format '''prop=value'''
 +
|}
 +
 +
 +
'''kafka_connection_settings'''
 +
This table is used to store any additional properties for the Fusion Registry Producer
 +
{| class="wikitable"
 +
|-
 +
! Column !! Description
 +
|-
 +
| conn_key|| kafka connection property e.g. batch.size
 +
|-
 +
| conn_value|| the value of the property e.g. 1000
 +
|}
 +
 +
 +
'''kafka_producer_tx'''
 +
The producer transaction table
 +
{| class="wikitable"
 +
|-
 +
! Column !! Description
 +
|-
 +
| tx_id || the id of the transaction (see '''sdmx_transaction''' table)
 +
|-
 +
| producer_id || the id of the producer
 +
|-
 +
| tx_status|| transaction status
 +
                                <br>'''0''' Pending
 +
                                <br>'''1''' Sent Sucessfully
 +
                                <br>'''2''' Was not sent due to local failure, message did not leave Fusion Registry, typically message fails local checks with respect to connection settings
 +
                                <br>'''3''' Sent and a notification of error received from kafka server
 +
                                <br>'''4''' Unexpected Fusion Registry error
 +
|-
 +
| tx_last_updated || the last time this transaction record was updated (UNIX Time milliseconds)
 +
|-
 +
| tx_size || size of the transaction in bytes (before compression)
 +
|-
 +
| tx_messages || number of messages in the transaction
 +
|}

Latest revision as of 04:51, 13 September 2023

Compatibility

Product Module Version Support
Fusion Registry Core 10.0 and higher Kafka producer supporting 'Structure Notification' events

Overview

Fusion Registry can act as an Apache Kafka Producer where specified events are published on definable Kafka topics. It consists of a generalised ‘producer’ interface capable of publishing any information to definable Kafka topics, and a collection of ‘handlers’ for managing specific events.

At present there is only one event handler Structure Notification which publishes changes to any structures as they occur, this also include data registrations.

For changes or modifications to structures, the body of the message is always an SDMX structure message. The format is configurable at the event handler level, the structure producer can support any structure output as long as the registry is aware of the VND header, however the UI offers the following choices:

  • Fusion JSON (the JSON dialect that pre-dated the formal SDMX-JSON specification)
  • SDMX-ML v2.1

Fusion JSON is recommended if the Fusion Registry is being used to support non-sdmx structures (which can not be described in SDMX-ML), for example Reporting Templates and Validation Schemes.

A ‘tombstone’ message is used for structure deletions with the structure URN in the Kafka Message Key, and a null body.

The list of supported events that can be published to Kafka will be extended over time.

Other events envisaged include:

  • Audit events such as logins, data registrations and API calls
  • Errors, for instance where a scheduled data import fails
  • Configuration changes
  • Changes to Content Security Rules

Configuration

Configuration is performed through the GUI with 'admin' privileges.

Connection

Before Kafka can be used, a connection must be defined the Fusion Registry to a Kafka server (or servers), taking note of the following.

Number of Connections

The Registry supports one connection definition to one or more bootstrap servers. Each server connection can be separated by a comma. The UI displays all connection properties supported by Kafka, and displays the default value that will be used when not explicitly set.

Transactions

Whilst Kafka does not enforce transactions, the Fusion Registry does. A connection will default to using the transaction prefix ‘tx’. The documentation states that this can be null, which is not the case for the Fusion Registry.

SSL

SSL is supported provided that:

  • Your Kafka system has been set up correctly with a KeyStore, TrustStore and appropriate passwords
  • The Registry has access to both KeyStore and TrustStore

To enable SSL:

  • Edit the Kafka configuration file: server.properties
  • Locate the address for the listeners. This will need to either be changed to support SSL or another port will need to be added. For example, having 2 ports, one for PlainText and the other for SSL can be defined with:
listeners=PLAINTEXT://:9092,SSL://localhost:9093
  • Add the following section for your SSL configuration:
ssl.keystore.location = <location of the keystore>
ssl.keystore.password = <keystore password>
ssl.key.password = <key password>
ssl.truststore.location = <location of the truststore>
ssl.truststore.password = <truststore password>

For example:

ssl.keystore.location = f:/keystores/kafka.server.keystore.jks
ssl.keystore.password = password
ssl.key.password = password
ssl.truststore.location = f:/keystores/kafka.server.truststore.jks
ssl.truststore.password = password

Kerberos

Kerberos integration can be achieved by providing a value for the sasl.jaas.config property. This property expects a new line per configuration setting, in the UI the text area is provided to support multiple lines.

Example SASL configuration:

com.sun.security.auth.module.Krb5LoginModule required
debug=false
doNotPrompt=true
principal=\xxx@Byyy.zzz.org\
useKeyTab=true
keyTab=path_to_keytab
useTicketCache=true
sasl.kerberos.service.name=kafka
serviceName=kafka;

Max Request Size

The connection definition supports the max.request.size property, which will enforce a restriction on the maximum size of the message that can be sent. Additional restrictions for this can be defined on the Kafka server, and the Kafka topic, which are both configured outside of the Fusion Registry. It is important to ensure the server and topic can also handle large request sizes if this property is changed in the Fusion Registry. It is important to note that the max request size refers to the size of the message BEFORE any compression is applied. This is a behaviour of Kafka and cannot be modified. When the message is sent, both the topic and connection will apply the max request size on the compressed message (if compression is enabled) and therefore these can be set to a lower threshold.

Test Connection

Example connection test results

The Fusion Registry ensures the bootstrap servers can all be connected to before the connection is saved. However, the test function should be used to ensure the following:

  • A message can be sent using the security defined in the connection (e.g. Kerberos)
  • Transactions are supported by the connection
  • Large messages can be sent with no issue

Note 1: the last test generates a large message (up to the number of bytes specified). As the message contains a lot of repeated text, the compression (if set) will make the final message size very small. If the intention is to also test the message size against the topic and kafka server, then ensure compression is set to none before running the test.

Note 2: The test is run against the applied configuration, remember to apply changes BEFORE running the test.

Structure Producer

Setting Up

Showing the UI for the Structure Producer

The Structure Producer uses the Kafka connection to send all structural changes. However, before the structure producer becomes operational, it must be assigned to the Kafka topic to use, and must be given a message format to notify structures in.

The structure producer will not become active until it is started. The fist time the producer is started it can either:

  1. Start and remain idle until a structure is added/edited/removed
  2. Start and send all current Fusion Registry structures to it's given topic in a single transaction (beware of the max size set on the connection)

Behaviour

When the Fusion Registry restarts the Structure Producer will always cross check the last transaction successfully sent to the topic against the last transaction the Fusion Registry processed. If the Fusion Registry has transactions not sent to the topic, then the structure producer will go through each unsent transaction, in sequence, until it is up to date.

Note: If the structure producer has never sent a transaction, and the Fusion Registry is restarted, the producer will not replay all transactions, it will remain idle until the next transaction occurs in the Fusion Registry.

The transaction Id shown by the structure producer match those shown in the Fusion Registry RSS feed.

Manual Suspension

The structure producer can be suspended at any time by clicking on the 'suspend' button. Note, that this only suspends the producer for as long as the Fusion Registry is running. On Fusion Registry restart, the producer will always attempt to start if it can. To remove the producer permanently, use the 'delete configuration' option.

Forced Suspension and Error Handling

Kafka is able to detect and handle transitory errors, such as the Kafka server becoming unresponsive for a period of time, there may be occasions where a message simply cannot be sent and no amount of re-attempts will fix this. An example failure is when the message size exceeds the limit set on the connection, server, or topic.

When a failure condition is met by the structure producer which cannot be resolved through re-attempts, the structure producer is shut down, with the transaction state being recorded as failed. It is strongly recommended to configure the Fusion Registry to use an email server, and enter an Admin email address for the Structure Producer. When there is a failure, the Fusion Registry will send an email to this email account, which will include the error message (also found in the Fusion Registry log files).

General Behaviour

Publication Reliability

Message publication has been designed to be 100% reliable, with messages sent in the same sequence that the Fusion Registry processed them.

The Fusion Registry makes use of Kafka supplied libraries to place messages on the topic, any transitory failures are handled by the Kafka library. A transitory failure includes the inability for the Fusion Registry to talk to the kafka server, for example if the kafka server is being restarted. Possible transitory errors in the Fusion Registry are managed by reattempting the send (up to 3 times).

When a message simply cannot be sent, for example the Fusion Registry can no longer authenticate with kafka, or the message size exceeds limits set, the producer is stopped and emails are sent to notify of the failure. Once the problem is rectified and the producer restarted, the Fusion Registry will replay any pending transactions that occurred during the period of time for which the producer was stopped.

Each Fusion Registry transaction is recorded in the database, along with the kafka status. On server restart the Fusion Registry will cross reference the transaction logs with the kafka logs to ensure all messages have been sent to kafka.

The kafka logs are only set to SENT when the Fusion Registry has been notified by the kafka server that it has successfully received the message.

Check Structure Notification Status

http(s)://[server]:[port]/[webapp]/ws/secure/settings/kafka/producer/status/structure

Response:

{
  Running : boolean
}

Events

The following Fusion Registry events are available for publication on a Kafka service.

Event Details
Structure Notification Addition / modification or deletion of SDMX structures

Structure Notification

SDMX Structural Metadata is published to definable Kafka topics on Fusion Registry startup and each time structures are added / modified, or deleted. Metadata-driven applications can subscribe to the relevant topic(s) to receive notifications of changes to structures allowing them to maintain an up-to-date replica copy of the structural metadata they need.

Example use cases include:

  • Structural validation applications which require up-to-date structural metadata such as Codelists and DSDs to check whether data is correctly structured.
  • Structure mapping applications which require up-to-date copies of relevant Structure Sets and maps to perform data transformations.


KafkaMetadataNotification.png

Kafka Message Key

The SDMX Structure URN is used as the Kafka Message Key. Consumers must therefore be prepared to receive and interpret the key as the Structure URN correctly when processing the message. This is particularly important for structure deletion where the Message Key is the only source of information about which structure is being referred to.

Kafka Message Body

The Kafka Message Body is a normal SDMX structure message, except in the case of deletions where the message has no content (a null payload), and the structure to delete is identified exclusively by its URN in the Message Key. The format of the SDMX structure message is configurable via the web service, or UI (which only offers SDMX 2.1 and Fusion-JSON) for simplicity.

Format Explanation HTTP Accept Header Equivalent
SDMX-ML 1.0 SDMX-ML 1.0 XML structure document application/vnd.sdmx.structure+xml;version=1.0
SDMX-ML 2.0 SDMX-ML 2.0 XML structure document application/vnd.sdmx.structure+xml;version=2.0
SDMX-ML 2.0 SDMX-ML 2.1 XML structure document application/vnd.sdmx.structure+xml;version=2.1
SDMX-EDI EDI application/vnd.sdmx.structure;version=edi
SDMX-JSON SDMX-JSON 1.0 application/vnd.sdmx.json
XLSX (flat) Metadata Technology's Excel structure format application/vnd.xlsx
Fusion-JSON Metadata Technology's JSON format with similarities to SDMX-JSON provided primarily for backward compatibility application/vnd.fusion.json
Events for certain structures will only be published if the structure type is supported by the chosen format. Only Fusion-JSON supports all possible structures including Fusion Registry 'extended structures' like Validation Schemes.

Additions and Modifications

Addition and Modification of structures both result in an SDMX 'replace' message containing the full content of the structures. Deltas (such as the addition of a Code to a Codelist) are not supported.

Deletions

Deletion of structures results in a 'tombstone' message, i.e. one with a null payload but the URN of the deleted structure in the Message Key.

Kafka Transactions

Additions and modifications to structures in Fusion Registry are published using Kafka Transactional Messaging to help guarantee consistency by providing:

  • Atomicity - The consumer will not be exposed to uncommited transactions
  • Durability - Kafka brokers guarantee not to lose any committed transactions
  • Ordering - Transaction-aware consumers receive transactions in the original order
  • Non-Duplication - No duplicate messages within transactions

If a single structure is added or changed, the transaction will contain just that structure.

Transactions will contain multiple structures in cases including:

  • Where multiple structures are submitted to the Registry in a single SDMX-ML message.
  • Where a structure's Agency or ID is changed that affects dependent structures.

Subscribers to the Kafka topic must be transaction-aware, and must ensure that each transaction is processed atomically to maintain the referential integrity of their structural metadata replicas.

Structure deletions are never mixed with additions / modifications in the same transaction. This reflects the behaviour of the SDMX REST API whereby additions / modifications use HTTP POST, while deletions require HTTP DELETE.

Full Content Publication and Re-synchronisation

It is possible to send the full registry structural metadata content to the kafka topic. This can be done when the structure producer is started for the first time, or after it has been suspended. On restarting the producer, select 'Full Flush'.

Note Be mindful of the maximum message size that the connection has been configured to support. Also note the size check is performed by the Fusion Registry BEFORE the message is compressed - this is not a behaviour of the Fusion Registry, it is the behaviour of the Kafka libraries.

Not Supported: Subscription to Specific Structures

The current implementation does not allow Kafka Consumers to subscribe to changes on specific structures or sets of structures, such as those maintained by a particular Agency. Consumers subscribing to the chosen topic will receive information about all structures and will need to select what they need.

Not Supported: Publication Arbitration on Fusion Registry Load Balanced Clusters

IMPORTANT: There is currently no arbitration mechanism between peer Fusion Registry instances configured in a Load Balanced cluster. This means that each member of the cluster independently publishes each structure change notification to Kafka, resulting in duplication of notifications.

Structure changes made on one instance of Fusion Registry in a cluster are replicated to all of the others through either a shared database polling mechanism, or by Rabbit MQ messaging. An instance receiving information that structures have changed elsewhere in the cluster updates its in-memory SDMX information model accordingly. The update event is however trapped by its Kafka notification processor and published as normal.

Changes are required to the Kafka event processing behaviour of Registries operating in a cluster to arbitrate over which instance publishes the notification on Kafka such that a structure change is notified once and only once, as expected.

Not Supported: Enforcement of Fusion Registry Content Security Rules to Structures Published on Kafka

Fusion Registry Content Security defines rules restricting access to selected structures and items to specific groups.

The Kafka Structure Notification processor does not enforce Content Security structure rules meaning that all structures are published to the Kafka broker service on the defined topic(s), irrespective of what restrictions may be in place. The risk here is that people or applications that would otherwise not have access to specific structures may be able circumvent the rules normally imposed by Fusion Registry's Content Security sub-system by subscribing to the Structure Notification topic on Kafka. Kafa Streams Security allows control over access to topics. While an all-or-nothing approach of limiting access to all published structures may be sufficient for some applications, it doesn't solve the use case where a consumer should be allowed access to certain structures, but not others.

The Kafka publish-and-subscribe mechanism means that the Fusion Registry producer has no knowledge of who will be consuming the metadata at the point of publication. Up-front decisions therefore have to be made as to what rules to apply when publishing to Kafa. Options include:

  • Only publish 'public' structures - i.e. those without any rules restricting access
  • Publish changes for each Content Security group (and 'public') on their own separate Kafka topics - applying Kafka Security to the topics would then ensure the intended access restrications are enforced

Registry Database Tables

The folowing database tables are used in the Fusion Registry to store Kafka information


kafa_producer_config This table is used to link a Fusion Registry Producer (i.e. the structure producer) to a topic, and message format

Column Description
id Id of the Registry Kafka Producer
topic The kafka topic the producer sends messages to
format The format the message is sent in

kafa_producer_config_properties This table is used to store any additional properties for the Fusion Registry Producer

Column Description
id Id of the Registry Kafka Producer
line_number properties are stored as a list, this is the index in the list
property The property, in the format prop=value


kafka_connection_settings This table is used to store any additional properties for the Fusion Registry Producer

Column Description
conn_key kafka connection property e.g. batch.size
conn_value the value of the property e.g. 1000


kafka_producer_tx The producer transaction table

Column Description
tx_id the id of the transaction (see sdmx_transaction table)
producer_id the id of the producer
tx_status transaction status
                                
0 Pending
1 Sent Sucessfully
2 Was not sent due to local failure, message did not leave Fusion Registry, typically message fails local checks with respect to connection settings
3 Sent and a notification of error received from kafka server
4 Unexpected Fusion Registry error
tx_last_updated the last time this transaction record was updated (UNIX Time milliseconds)
tx_size size of the transaction in bytes (before compression)
tx_messages number of messages in the transaction