Debezium and pt-online-schema-change or gh-ost


When I first started working with Debezium, I needed to monitor tables in a Percona MySQL cluster and we used the pt-online-schema-change Percona’s tool to make online schema changes on tables, this means with no downtime.

pt-online-schema-change or gh-ost (from the GitHub infrastructure team) works in a similar fashion, they apply the change on a temporary/ghost table while keeping data in sync with the original table. This means to make in brief that the schema changes happen only on the temporary/ghost tables which in a later step is renamed to replace the original one.

These changes are unfortunately not seem by Debezium which monitor only the original tables, leading to out of sync schema between the database and Debezium schema store as I explained in this Debezium bug ticket. Since then a short information notice has been added in the official documentation:

When MySQL connector monitors a table to which a schema change tool like Gh-ost or pt-online-schema-change is applied then helper tables created during migration process need to be included among whitelisted tables.

If the downstream systems do not need the messages generated by the temporary table then a simple message transform can be written and applied to filter them out.

Let’s see how this can applied. If we want to monitor called say mydatabase.mytable and we will also use pt-online-schema-change then we need to monitor both:

  • mydatabase.mytable
  • mydatabase._mytable_new the (default) temporary table that will be used
"table.whitelist": "mydatabase.mytable,mydatabase._mytable_new"

Now we are not very interested in CDC events from mydatabase._mytable_new, especially since lots of SQL statements will be made against that table to keep data in sync with mydatabase.mytable.

As suggested, we need to add a (custom) SMT to discard those events.

Such an SMT is very straightforward to code:

package com.gyg.kafka.connect.transforms;

import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;

import java.util.Map;
import java.util.regex.Pattern;
import java.util.regex.Matcher;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.apache.kafka.connect.transforms.util.RegexValidator;


/**
 * This SMT is used to drop record if its topic matches a configured regular expression
 * @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate
 * @author Emmanuel Brard
 */
public class DropRecord<R extends ConnectRecord<R>> implements Transformation<R> {

    private static final Logger LOGGER = LoggerFactory.getLogger(DropRecord.class);

    private static final String PATTERN_CONFIG = "pattern";

    public static final ConfigDef CONFIG_DEF = new ConfigDef()
            .define(PATTERN_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new RegexValidator(), ConfigDef.Importance.HIGH, "Pattern of the topic to drop records from");

    private static final String PURPOSE = "drop records";

    private Pattern topicPattern;

    @Override
    public void configure(Map<String, ?> props) {
        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
        topicPattern = Pattern.compile(config.getString(PATTERN_CONFIG));
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }

    @Override
    public R apply(final R record) {

        // Check if record topic matches regex
        Matcher matcher = topicPattern.matcher(record.topic());
        if (matcher.matches()) {
            return null;
        }
        else {
            return record;
        }
    }

    @Override
    public void close() {
    }

}

And then we can configure it as such:

"transforms": "DropTmpTable",
"transforms.DropTmpTable.type": "com.gyg.kafka.connect.transforms.DropRecord",
"transforms.DropTmpTable.pattern": ".*\\.gyg\\._.*_new.*",

With those, Debezium will monitor both tables and will be aware of the schema changes. CDC events will be emitted for the temporary/ghost table and they will be identified and dropped using a custom kafka-connect SMT.