场景

各个系统模块之间的数据库是独立的,而我开发的功能需要使用到业务库中员工和角色的信息,因此使用了cannal将业务库中的员工和角色表信息实时同步到自己的库中。

思路

  1. canal监听数据库二进制日志(binlog)的变动
  2. 解析生成sql语句
  3. 通过目标库的数据库连接Connection,创建Statement
  4. 执行对应的sql语句,完成数据同步

步骤

安装mysql

首先需要安装好 mysql,用来作为数据提供方(muster),具体安装步骤网上很多,不再赘述。

部署Canal

下载Canal:Canal,可以选择自己需要的版本进行下载。我使用的的版本是:canal.deployer-1.1.4.tar.gz。下载完成后解压,并对配置文件进行修改。

  1. 首先修改conf/canal.properties:
  • canal.ip = 127.0.0.1 #客户端连接时的ip,需要指定ip,否则客户端可能会连接失败
  • canal.port = 11111 #客户端连接时的端口
  • canal.destinations = example #客户端连接指定的destination
  1. conf/example/instance.properties:对应一个数据库实例:
  • canal.instance.mysql.slaveId=123 #模拟slave的身份标志,不能重复
  • canal.instance.master.address=192.168.0.30:3306//监听的数据库连接
  • canal.instance.master.journal.name=mysql-bin.000001
  • canal.instance.master.position=154
  • canal.instance.dbUsername=canal
  • canal.instance.dbPassword=Canal@123456
  1. 启动: 进入bin/startup.bat启动。windows10启动会失败,需要修改启动文件:删除-Dlogback.configuretionFile的那个配置项。然后启动。如图表示启动成功:

    至此canal服务搭建完成。

编写Java代码

  1. 项目使用springboot+mysql+druid。引入相关依赖:pom.xml
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.data</groupId>
			<artifactId>spring-data-commons</artifactId>
			<version>2.0.8.RELEASE</version>
			<scope>compile</scope>
		</dependency>

		<dependency>
			<groupId>com.alibaba.otter</groupId>
			<artifactId>canal.client</artifactId>
			<version>1.1.4</version>
		</dependency>

		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.47</version>
		</dependency>

		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>druid</artifactId>
			<version>1.1.10</version>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.16</version>
			<optional>true</optional>
		</dependency>

		<!-- 工具类 -->
		<dependency>
			<groupId>cn.hutool</groupId>
			<artifactId>hutool-all</artifactId>
			<version>5.5.2</version>
		</dependency>
	</dependencies>

2.容器启动完成后,进行数据同步:
SyncDataListener .class

@Service
public class SyncDataListener implements ApplicationListener<ContextRefreshedEvent> {

    @Autowired
    public CannalHandler cannalHandler;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if(event.getApplicationContext().getParent() == null)//root application context 没有parent,他就是老大.
        {
            //需要执行的逻辑代码,当spring容器初始化完成后就会执行该方法。
            cannalHandler.start();
        }
    }
}

canal具体处理类:CannalHandler.class

public class CannalHandler {

    private final static int BATCH_SIZE = 1000;

    @Autowired
    public SqlExecutor sqlExecutor;

    @Autowired
    public CanalProperties canalProperties;

    public void start() {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalProperties.getHostName(), canalProperties.getPort()), canalProperties.getDestination(), "", "");
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表
            System.out.println(">>>>"+canalProperties.getSubscribe());
            String subscribe=canalProperties.getSubscribe();
            connector.subscribe(subscribe);
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                //获取批量ID
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    try {
                        //线程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //如果有数据,处理数据
                    printEntry(message.getEntries());
                }
                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }

    /**
     * 打印canal server解析binlog获得的实体类信息
     */
    private  void printEntry(List<CanalEntry.Entry> entrys) {
        System.out.println("printEntry>>>>>>开始");
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                //开启/关闭事务的实体类型,跳过
                continue;
            }
            //RowChange对象,包含了一行数据变化的所有特征
            //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
            CanalEntry.RowChange rowChage;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }
            //获取操作类型:insert/update/delete类型
            CanalEntry.EventType eventType = rowChage.getEventType();
            //打印Header信息
            System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            String dbName=entry.getHeader().getSchemaName();
            String tableName= entry.getHeader().getTableName();
            //判断是否是DDL语句
            if (rowChage.getIsDdl()) {
                System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
                sqlExecutor.executorSql(rowChage.getSql());
            }
            //获取RowChange对象里的每一行数据,打印出来
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                String sql=parseCRUDSql(tableName,rowData.getBeforeColumnsList(),rowData.getAfterColumnsList(),eventType);
                //执行sql语句
                sqlExecutor.executorSql(sql);
            }
        }
        System.out.println("printEntry>>>>>>结束");
    }
	//解析sql语句,代码可以进一步优化
    private  String parseCRUDSql( String tableName,List<CanalEntry.Column> beforeColumns, List<CanalEntry.Column> afterColumns, CanalEntry.EventType eventType) {
        String sql="";
        if (eventType == CanalEntry.EventType.DELETE) { //删除
            //DELETE FROM table_name WHERE ID=1 AND YEAR=2
            StringBuffer deleteBuffer=new StringBuffer();
            deleteBuffer.append("DELETE FROM "+tableName+" WHERE ");
            for (CanalEntry.Column column : beforeColumns) {
                String columnName=column.getName();
                String columnValue="'"+ column.getValue()+"'";
                if(StringUtils.isNotEmpty(columnValue)){
                    deleteBuffer.append(columnName+"="+columnValue);
                    deleteBuffer.append(" AND ");
                }
            }
            //截取掉最后一个AND
            sql = deleteBuffer.substring(0, deleteBuffer.lastIndexOf("AND"));
        } else if (eventType == CanalEntry.EventType.INSERT) {  //如果是新增语句
            StringBuffer sqlBeforeBuffer=new StringBuffer();//sql前一部分
            StringBuffer sqlAfterBuffer=new StringBuffer();//sql后一部分
            // INSERT INTO `canal.sys_holiday` (`year`, `date_str`, `day_type`) VALUES ('2020', '2020-10-01', '0');
            sqlBeforeBuffer.append("INSERT INTO "+tableName);
            sqlBeforeBuffer.append('(');//前半部分
            sqlAfterBuffer.append(" VALUES (");//后半部分
            for (CanalEntry.Column column : afterColumns) {
                String columnName=column.getName();
                String columnValue="'"+column.getValue()+"'";
                sqlBeforeBuffer.append(columnName+",");
                sqlAfterBuffer.append(columnValue+",");
            }
            String beforeStr=sqlBeforeBuffer.substring(0,sqlBeforeBuffer.length()-1);
            String afterStr=sqlAfterBuffer.substring(0,sqlAfterBuffer.length()-1);
            beforeStr=beforeStr+")";
            afterStr=afterStr+")";
            sql=beforeStr+afterStr;//拼接前一部分和后一部分

        } else {  //如果是更新的语句
            //UPDATE runoob_tbl SET id=1,runoob_title='学习 C++' WHERE runoob_id=3 AND TITLE=45;
            StringBuffer updateBuffer=new StringBuffer();
            StringBuffer whereBuffer=new StringBuffer();
            updateBuffer.append("UPDATE "+tableName+" SET ");
            for (CanalEntry.Column column : afterColumns) {
                String columnName=column.getName();
                String columnValue="'"+ column.getValue()+"'";
                boolean updated = column.getUpdated();
                if(updated){//表示更新的字段
                    updateBuffer.append(columnName+"="+columnValue+",");
                }else{
                    whereBuffer.append(" "+columnName+"="+columnValue+" AND");
                }
            }
            String beforeStr=updateBuffer.substring(0,updateBuffer.length()-1);
            String whereStr=whereBuffer.substring(0, whereBuffer.lastIndexOf("AND"));
            sql=beforeStr+" WHERE "+whereStr;
        }
        return sql;
    }
}

SqlExecutor.class

public class SqlExecutor {

    @Resource
    private DataSource dataSource;

    public void executorSql(String sql){
        Connection connection = null;
        Statement statement = null;
        try {
            connection = dataSource.getConnection();
            statement = connection.createStatement();
            statement.execute(sql);
            System.out.println(sql+">>>>>>>>执行完毕");
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            try {
                statement.close();
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

注入bean的信息:

    @Bean(name = "dataSource")
    public DataSource druidDataSource(){
        return new DruidDataSource();
    }

    @Bean("sqlExecutor")
    public SqlExecutor sqlExecutor(){
        return  new SqlExecutor();
    }

    @Bean("cannalHandler")
    public CannalHandler cannalHandler(){
        return new CannalHandler();
    }

CanalProperties.java

@Component
@ConfigurationProperties(prefix = "canal")
@Data
public class CanalProperties {

    private String hostName;

    private Integer port;

    private String destination;

    private String subscribe;
}
spring:
  application:
    name: canal
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driver-class-name: com.mysql.jdbc.Driver
    # 目标数据库Slaver
    url: jdbc:mysql://127.0.0.1:3306/canal? useUnicode=true&characterEncoding=utf-8&useSSL=false
    username: root
    password: root

canal:
  hostName: 127.0.0.1
  port: 11111
  destination: example
  # 订阅所有表
  #subscribe: .*\..*
  # 订阅canal库中,表名sys_开始的表
  subscribe: canal\.sys_.*

配置完成后,启动项目。启动成功后,修改数据源,然后查看slaver中是否发生变化。
在过程中,遇到了一个问题:程序同步数据时,只同步了一部分就不再执行,重启后依然如此。经过排查,由于每执行一个sql,获取一个连接,同时执行后并未关闭这个连接,导致最初连接被用完,程序一直在等待数据库连接。
文中哪里有不妥之处,请指正。希望对你能够有所帮助!

更多推荐

Canal进行mysql数据同步