From f430ecc8ef5d5c611f076a69c1511b2627330b03 Mon Sep 17 00:00:00 2001 From: dfcao Date: Mon, 11 Dec 2017 15:39:09 +0800 Subject: [PATCH] standardize code format to comply with pep8 --- binlog2sql/binlog2sql.py | 178 +++++++++++++++++-------------- binlog2sql/binlog2sql_util.py | 195 +++++++++++++++++++--------------- tests/test_binlog2sql_util.py | 52 ++++----- 3 files changed, 232 insertions(+), 193 deletions(-) diff --git a/binlog2sql/binlog2sql.py b/binlog2sql/binlog2sql.py index c74d72f..89e641f 100755 --- a/binlog2sql/binlog2sql.py +++ b/binlog2sql/binlog2sql.py @@ -1,7 +1,9 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import os, sys, datetime +import os +import sys +import datetime import pymysql from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import ( @@ -10,114 +12,130 @@ from pymysqlreplication.row_event import ( DeleteRowsEvent, ) from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent -from binlog2sql_util import command_line_args, concat_sql_from_binlogevent, create_unique_file, reversed_lines +from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, reversed_lines + class Binlog2sql(object): - def __init__(self, connectionSettings, startFile=None, startPos=None, endFile=None, endPos=None, startTime=None, - stopTime=None, only_schemas=None, only_tables=None, nopk=False, flashback=False, stopnever=False): - ''' - connectionSettings: {'host': 127.0.0.1, 'port': 3306, 'user': slave, 'passwd': slave} - ''' - if not startFile: - raise ValueError('lack of parameter,startFile.') + def __init__(self, connection_settings, start_file=None, start_pos=None, end_file=None, end_pos=None, + start_time=None, stop_time=None, only_schemas=None, only_tables=None, no_pk=False, + flashback=False, stop_never=False): + """ + conn_setting: {'host': 127.0.0.1, 'port': 3306, 'user': user, 'passwd': passwd, 'charset': 'utf8'} + """ - self.connectionSettings = connectionSettings - self.startFile = startFile - self.startPos = startPos if startPos else 4 # use binlog v4 - self.endFile = endFile if endFile else startFile - self.endPos = endPos - self.startTime = datetime.datetime.strptime(startTime, "%Y-%m-%d %H:%M:%S") if startTime else datetime.datetime.strptime('1970-01-01 00:00:00', "%Y-%m-%d %H:%M:%S") - self.stopTime = datetime.datetime.strptime(stopTime, "%Y-%m-%d %H:%M:%S") if stopTime else datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S") + if not start_file: + raise ValueError('Lack of parameter: start_file') + + self.conn_setting = connection_settings + self.start_file = start_file + self.start_pos = start_pos if start_pos else 4 # use binlog v4 + self.end_file = end_file if end_file else start_file + self.end_pos = end_pos + if start_time: + self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") + else: + self.start_time = datetime.datetime.strptime('1970-01-01 00:00:00', "%Y-%m-%d %H:%M:%S") + if stop_time: + self.stop_time = datetime.datetime.strptime(stop_time, "%Y-%m-%d %H:%M:%S") + else: + self.stop_time = datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S") self.only_schemas = only_schemas if only_schemas else None self.only_tables = only_tables if only_tables else None - self.nopk, self.flashback, self.stopnever = (nopk, flashback, stopnever) + self.no_pk, self.flashback, self.stop_never = (no_pk, flashback, stop_never) self.binlogList = [] - self.connection = pymysql.connect(**self.connectionSettings) - try: - cur = self.connection.cursor() - cur.execute("SHOW MASTER STATUS") - self.eofFile, self.eofPos = cur.fetchone()[:2] - cur.execute("SHOW MASTER LOGS") - binIndex = [row[0] for row in cur.fetchall()] - if self.startFile not in binIndex: - raise ValueError('parameter error: startFile %s not in mysql server' % self.startFile) + self.connection = pymysql.connect(**self.conn_setting) + with self.connection as cursor: + cursor.execute("SHOW MASTER STATUS") + self.eof_file, self.eof_pos = cursor.fetchone()[:2] + cursor.execute("SHOW MASTER LOGS") + bin_index = [row[0] for row in cursor.fetchall()] + if self.start_file not in bin_index: + raise ValueError('parameter error: start_file %s not in mysql server' % self.start_file) binlog2i = lambda x: x.split('.')[1] - for bin in binIndex: - if binlog2i(bin) >= binlog2i(self.startFile) and binlog2i(bin) <= binlog2i(self.endFile): - self.binlogList.append(bin) + for binary in bin_index: + if binlog2i(self.start_file) <= binlog2i(binary) <= binlog2i(self.end_file): + self.binlogList.append(binary) - cur.execute("SELECT @@server_id") - self.serverId = cur.fetchone()[0] - if not self.serverId: - raise ValueError('need set server_id in mysql server %s:%s' % (self.connectionSettings['host'], self.connectionSettings['port'])) - finally: - cur.close() + cursor.execute("SELECT @@server_id") + self.server_id = cursor.fetchone()[0] + if not self.server_id: + raise ValueError('missing server_id in %s:%s' % (self.conn_setting['host'], self.conn_setting['port'])) def process_binlog(self): - stream = BinLogStreamReader(connection_settings=self.connectionSettings, server_id=self.serverId, - log_file=self.startFile, log_pos=self.startPos, only_schemas=self.only_schemas, + stream = BinLogStreamReader(connection_settings=self.conn_setting, server_id=self.server_id, + log_file=self.start_file, log_pos=self.start_pos, only_schemas=self.only_schemas, only_tables=self.only_tables, resume_stream=True) - cur = self.connection.cursor() - tmpFile = create_unique_file('%s.%s' % (self.connectionSettings['host'],self.connectionSettings['port'])) # to simplify code, we do not use file lock for tmpFile. - ftmp = open(tmpFile ,"w") - flagLastEvent = False - eStartPos, lastPos = stream.log_pos, stream.log_pos + cursor = self.connection.cursor() + # to simplify code, we do not use flock for tmp_file. + tmp_file = create_unique_file('%s.%s' % (self.conn_setting['host'], self.conn_setting['port'])) + f_tmp = open(tmp_file, "w") + flag_last_event = False + e_start_pos, last_pos = stream.log_pos, stream.log_pos try: - for binlogevent in stream: - if not self.stopnever: - if (stream.log_file == self.endFile and stream.log_pos == self.endPos) or (stream.log_file == self.eofFile and stream.log_pos == self.eofPos): - flagLastEvent = True - elif datetime.datetime.fromtimestamp(binlogevent.timestamp) < self.startTime: - if not (isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)): - lastPos = binlogevent.packet.log_pos + for binlog_event in stream: + if not self.stop_never: + if (stream.log_file == self.end_file and stream.log_pos == self.end_pos) or \ + (stream.log_file == self.eof_file and stream.log_pos == self.eof_pos): + flag_last_event = True + elif datetime.datetime.fromtimestamp(binlog_event.timestamp) < self.start_time: + if not (isinstance(binlog_event, RotateEvent) + or isinstance(binlog_event, FormatDescriptionEvent)): + last_pos = binlog_event.packet.log_pos continue - elif (stream.log_file not in self.binlogList) or (self.endPos and stream.log_file == self.endFile and stream.log_pos > self.endPos) or (stream.log_file == self.eofFile and stream.log_pos > self.eofPos) or (datetime.datetime.fromtimestamp(binlogevent.timestamp) >= self.stopTime): + elif (stream.log_file not in self.binlogList) or \ + (self.end_pos and stream.log_file == self.end_file and stream.log_pos > self.end_pos) or \ + (stream.log_file == self.eof_file and stream.log_pos > self.eof_pos) or \ + (datetime.datetime.fromtimestamp(binlog_event.timestamp) >= self.stop_time): break # else: # raise ValueError('unknown binlog file or position') - if isinstance(binlogevent, QueryEvent) and binlogevent.query == 'BEGIN': - eStartPos = lastPos + if isinstance(binlog_event, QueryEvent) and binlog_event.query == 'BEGIN': + e_start_pos = last_pos - if isinstance(binlogevent, QueryEvent): - sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, flashback=self.flashback, nopk=self.nopk) + if isinstance(binlog_event, QueryEvent): + sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event, + flashback=self.flashback, no_pk=self.no_pk) if sql: - print sql - elif isinstance(binlogevent, WriteRowsEvent) or isinstance(binlogevent, UpdateRowsEvent) or isinstance(binlogevent, DeleteRowsEvent): - for row in binlogevent.rows: - sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, row=row , flashback=self.flashback, nopk=self.nopk, eStartPos=eStartPos) + print(sql) + elif isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent) or\ + isinstance(binlog_event, DeleteRowsEvent): + for row in binlog_event.rows: + sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event, no_pk=self.no_pk, + row=row, flashback=self.flashback, e_start_pos=e_start_pos) if self.flashback: - ftmp.write(sql + '\n') + f_tmp.write(sql + '\n') else: - print sql + print(sql) - if not (isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent)): - lastPos = binlogevent.packet.log_pos - if flagLastEvent: + if not (isinstance(binlog_event, RotateEvent) or isinstance(binlog_event, FormatDescriptionEvent)): + last_pos = binlog_event.packet.log_pos + if flag_last_event: break - ftmp.close() + f_tmp.close() if self.flashback: - self.print_rollback_sql(tmpFile) + self.print_rollback_sql(filename=tmp_file) finally: - os.remove(tmpFile) - cur.close() + os.remove(tmp_file) + cursor.close() stream.close() return True - def print_rollback_sql(self, fin): - '''print rollback sql from tmpfile''' - with open(fin) as ftmp: - sleepInterval = 1000 + @staticmethod + def print_rollback_sql(filename): + """print rollback sql from tmp_file""" + with open(filename) as f_tmp: + sleep_interval = 1000 i = 0 - for line in reversed_lines(ftmp): - print line.rstrip() - if i >= sleepInterval: - print 'SELECT SLEEP(1);' + for line in reversed_lines(f_tmp): + print(line.rstrip()) + if i >= sleep_interval: + print('SELECT SLEEP(1);') i = 0 else: i += 1 @@ -129,9 +147,9 @@ class Binlog2sql(object): if __name__ == '__main__': args = command_line_args(sys.argv[1:]) - connectionSettings = {'host':args.host, 'port':args.port, 'user':args.user, 'passwd':args.password} - binlog2sql = Binlog2sql(connectionSettings=connectionSettings, startFile=args.startFile, - startPos=args.startPos, endFile=args.endFile, endPos=args.endPos, - startTime=args.startTime, stopTime=args.stopTime, only_schemas=args.databases, - only_tables=args.tables, nopk=args.nopk, flashback=args.flashback, stopnever=args.stopnever) + conn_setting = {'host': args.host, 'port': args.port, 'user': args.user, 'passwd': args.password, 'charset': 'utf8'} + binlog2sql = Binlog2sql(connection_settings=conn_setting, start_file=args.start_file, start_pos=args.start_pos, + end_file=args.end_file, end_pos=args.end_pos, start_time=args.start_time, + stop_time=args.stop_time, only_schemas=args.databases, only_tables=args.tables, + no_pk=args.no_pk, flashback=args.flashback, stop_never=args.stop_never) binlog2sql.process_binlog() diff --git a/binlog2sql/binlog2sql_util.py b/binlog2sql/binlog2sql_util.py index 6059192..b10a7e1 100755 --- a/binlog2sql/binlog2sql_util.py +++ b/binlog2sql/binlog2sql_util.py @@ -1,15 +1,16 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import os, sys, argparse, datetime -import pymysql -from pymysqlreplication import BinLogStreamReader +import os +import sys +import argparse +import datetime from pymysqlreplication.row_event import ( WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent, ) -from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent +from pymysqlreplication.event import QueryEvent def is_valid_datetime(string): @@ -19,23 +20,25 @@ def is_valid_datetime(string): except: return False + def create_unique_file(filename): version = 0 - resultFile = filename + result_file = filename # if we have to try more than 1000 times, something is seriously wrong - while os.path.exists(resultFile) and version<1000: - resultFile = filename + '.' + str(version) + while os.path.exists(result_file) and version < 1000: + result_file = filename + '.' + str(version) version += 1 if version >= 1000: raise OSError('cannot create unique file %s.[0-1000]' % filename) - return resultFile + return result_file -def parse_args(args): + +def parse_args(): """parse args for binlog2sql""" parser = argparse.ArgumentParser(description='Parse MySQL binlog to SQL you want', add_help=False) connect_setting = parser.add_argument_group('connect setting') - connect_setting.add_argument('-h','--host', dest='host', type=str, + connect_setting.add_argument('-h', '--host', dest='host', type=str, help='Host the MySQL database server located', default='127.0.0.1') connect_setting.add_argument('-u', '--user', dest='user', type=str, help='MySQL Username to log in as', default='root') @@ -43,23 +46,31 @@ def parse_args(args): help='MySQL Password to use', default='') connect_setting.add_argument('-P', '--port', dest='port', type=int, help='MySQL port to use', default=3306) - range = parser.add_argument_group('range filter') - range.add_argument('--start-file', dest='startFile', type=str, - help='Start binlog file to be parsed') - range.add_argument('--start-position', '--start-pos', dest='startPos', type=int, - help='Start position of the --start-file', default=4) - range.add_argument('--stop-file', '--end-file', dest='endFile', type=str, - help="Stop binlog file to be parsed. default: '--start-file'", default='') - range.add_argument('--stop-position', '--end-pos', dest='endPos', type=int, - help="Stop position of --stop-file. default: latest position of '--stop-file'", default=0) - range.add_argument('--start-datetime', dest='startTime', type=str, - help="Start reading the binlog at first event having a datetime equal or posterior to the argument; the argument must be a date and time in the local time zone, in any format accepted by the MySQL server for DATETIME and TIMESTAMP types, for example: 2004-12-25 11:25:56 (you should probably use quotes for your shell to set it properly).", default='') - range.add_argument('--stop-datetime', dest='stopTime', type=str, - help="Stop reading the binlog at first event having a datetime equal or posterior to the argument; the argument must be a date and time in the local time zone, in any format accepted by the MySQL server for DATETIME and TIMESTAMP types, for example: 2004-12-25 11:25:56 (you should probably use quotes for your shell to set it properly).", default='') - parser.add_argument('--stop-never', dest='stopnever', action='store_true', - help='Wait for more data from the server. default: stop replicate at the last binlog when you start binlog2sql', default=False) + interval = parser.add_argument_group('interval filter') + interval.add_argument('--start-file', dest='start_file', type=str, help='Start binlog file to be parsed') + interval.add_argument('--start-position', '--start-pos', dest='start_pos', type=int, + help='Start position of the --start-file', default=4) + interval.add_argument('--stop-file', '--end-file', dest='end_file', type=str, + help="Stop binlog file to be parsed. default: '--start-file'", default='') + interval.add_argument('--stop-position', '--end-pos', dest='end_pos', type=int, + help="Stop position. default: latest position of '--stop-file'", default=0) + interval.add_argument('--start-datetime', dest='start_time', type=str, + help="Start reading the binlog at first event having a datetime equal or posterior " + "to the argument; the argument must be a date and time in the local time zone," + " in any format accepted by the MySQL server for DATETIME and TIMESTAMP types," + " for example: 2004-12-25 11:25:56 (you should probably use quotes for your " + "shell to set it properly).", default='') + interval.add_argument('--stop-datetime', dest='stop_time', type=str, + help="Stop reading the binlog at first event having a datetime equal or posterior " + "to the argument; the argument must be a date and time in the local time zone," + " in any format accepted by the MySQL server for DATETIME and TIMESTAMP types," + " for example: 2004-12-25 11:25:56 (you should probably use quotes for your " + "shell to set it properly).", default='') + parser.add_argument('--stop-never', dest='stop_never', action='store_true', + help="Wait for more data from the server. default: stop replicate at the last binlog" + " when you start binlog2sql", default=False) - parser.add_argument('--help', dest='help', action='store_true', help='help infomation', default=False) + parser.add_argument('--help', dest='help', action='store_true', help='help information', default=False) schema = parser.add_argument_group('schema filter') schema.add_argument('-d', '--databases', dest='databases', type=str, nargs='*', @@ -68,26 +79,28 @@ def parse_args(args): help='tables you want to process', default='') # exclusive = parser.add_mutually_exclusive_group() - parser.add_argument('-K', '--no-primary-key', dest='nopk', action='store_true', - help='Generate insert sql without primary key if exists', default=False) + parser.add_argument('-K', '--no-primary-key', dest='no_pk', action='store_true', + help='Generate insert sql without primary key if exists', default=False) parser.add_argument('-B', '--flashback', dest='flashback', action='store_true', - help='Flashback data to start_postition of start_file', default=False) + help='Flashback data to start_position of start_file', default=False) return parser + def command_line_args(args): - needPrintHelp = False if args else True - parser = parse_args(args) + need_print_help = False if args else True + parser = parse_args() args = parser.parse_args(args) - if args.help or needPrintHelp: + if args.help or need_print_help: parser.print_help() sys.exit(1) - if not args.startFile: - raise ValueError('Lack of parameter: startFile') - if args.flashback and args.stopnever: + if not args.start_file: + raise ValueError('Lack of parameter: start_file') + if args.flashback and args.stop_never: raise ValueError('Only one of flashback or stop-never can be True') - if args.flashback and args.nopk: - raise ValueError('Only one of flashback or nopk can be True') - if (args.startTime and not is_valid_datetime(args.startTime)) or (args.stopTime and not is_valid_datetime(args.stopTime)): + if args.flashback and args.no_pk: + raise ValueError('Only one of flashback or no_pk can be True') + if (args.start_time and not is_valid_datetime(args.start_time)) or \ + (args.stop_time and not is_valid_datetime(args.stop_time)): raise ValueError('Incorrect datetime argument') return args @@ -99,6 +112,7 @@ def compare_items((k, v)): else: return '`%s`=%%s' % k + def fix_object(value): """Fixes python objects so that they can be properly inserted into SQL queries""" if isinstance(value, unicode): @@ -106,96 +120,103 @@ def fix_object(value): else: return value -def concat_sql_from_binlogevent(cursor, binlogevent, row=None, eStartPos=None, flashback=False, nopk=False): - if flashback and nopk: - raise ValueError('only one of flashback or nopk can be True') - if not (isinstance(binlogevent, WriteRowsEvent) or isinstance(binlogevent, UpdateRowsEvent) or isinstance(binlogevent, DeleteRowsEvent) or isinstance(binlogevent, QueryEvent)): - raise ValueError('binlogevent must be WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent or QueryEvent') + +def concat_sql_from_binlog_event(cursor, binlog_event, row=None, e_start_pos=None, flashback=False, no_pk=False): + if flashback and no_pk: + raise ValueError('only one of flashback or no_pk can be True') + if not (isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent) + or isinstance(binlog_event, DeleteRowsEvent) or isinstance(binlog_event, QueryEvent)): + raise ValueError('binlog_event must be WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent or QueryEvent') sql = '' - if isinstance(binlogevent, WriteRowsEvent) or isinstance(binlogevent, UpdateRowsEvent) or isinstance(binlogevent, DeleteRowsEvent): - pattern = generate_sql_pattern(binlogevent, row=row, flashback=flashback, nopk=nopk) + if isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent) \ + or isinstance(binlog_event, DeleteRowsEvent): + pattern = generate_sql_pattern(binlog_event, row=row, flashback=flashback, no_pk=no_pk) sql = cursor.mogrify(pattern['template'], pattern['values']) - sql += ' #start %s end %s time %s' % (eStartPos, binlogevent.packet.log_pos, datetime.datetime.fromtimestamp(binlogevent.timestamp)) - elif flashback is False and isinstance(binlogevent, QueryEvent) and binlogevent.query != 'BEGIN' and binlogevent.query != 'COMMIT': - if binlogevent.schema: - sql = 'USE {0};\n'.format(binlogevent.schema) - sql += '{0};'.format(fix_object(binlogevent.query)) + time = datetime.datetime.fromtimestamp(binlog_event.timestamp) + sql += ' #start %s end %s time %s' % (e_start_pos, binlog_event.packet.log_pos, time) + elif flashback is False and isinstance(binlog_event, QueryEvent) and binlog_event.query != 'BEGIN' \ + and binlog_event.query != 'COMMIT': + if binlog_event.schema: + sql = 'USE {0};\n'.format(binlog_event.schema) + sql += '{0};'.format(fix_object(binlog_event.query)) return sql -def generate_sql_pattern(binlogevent, row=None, flashback=False, nopk=False): + +def generate_sql_pattern(binlog_event, row=None, flashback=False, no_pk=False): template = '' values = [] if flashback is True: - if isinstance(binlogevent, WriteRowsEvent): + if isinstance(binlog_event, WriteRowsEvent): template = 'DELETE FROM `{0}`.`{1}` WHERE {2} LIMIT 1;'.format( - binlogevent.schema, binlogevent.table, + binlog_event.schema, binlog_event.table, ' AND '.join(map(compare_items, row['values'].items())) ) values = map(fix_object, row['values'].values()) - elif isinstance(binlogevent, DeleteRowsEvent): + elif isinstance(binlog_event, DeleteRowsEvent): template = 'INSERT INTO `{0}`.`{1}`({2}) VALUES ({3});'.format( - binlogevent.schema, binlogevent.table, - ', '.join(map(lambda k: '`%s`'%k, row['values'].keys())), + binlog_event.schema, binlog_event.table, + ', '.join(map(lambda key: '`%s`' % key, row['values'].keys())), ', '.join(['%s'] * len(row['values'])) ) values = map(fix_object, row['values'].values()) - elif isinstance(binlogevent, UpdateRowsEvent): + elif isinstance(binlog_event, UpdateRowsEvent): template = 'UPDATE `{0}`.`{1}` SET {2} WHERE {3} LIMIT 1;'.format( - binlogevent.schema, binlogevent.table, - ', '.join(['`%s`=%%s'%k for k in row['before_values'].keys()]), + binlog_event.schema, binlog_event.table, + ', '.join(['`%s`=%%s' % x for x in row['before_values'].keys()]), ' AND '.join(map(compare_items, row['after_values'].items()))) values = map(fix_object, row['before_values'].values()+row['after_values'].values()) else: - if isinstance(binlogevent, WriteRowsEvent): - if nopk: - # print binlogevent.__dict__ - # tableInfo = (binlogevent.table_map)[binlogevent.table_id] + if isinstance(binlog_event, WriteRowsEvent): + if no_pk: + # print binlog_event.__dict__ + # tableInfo = (binlog_event.table_map)[binlog_event.table_id] # if tableInfo.primary_key: # row['values'].pop(tableInfo.primary_key) - if binlogevent.primary_key: - row['values'].pop(binlogevent.primary_key) + if binlog_event.primary_key: + row['values'].pop(binlog_event.primary_key) template = 'INSERT INTO `{0}`.`{1}`({2}) VALUES ({3});'.format( - binlogevent.schema, binlogevent.table, - ', '.join(map(lambda k: '`%s`'%k, row['values'].keys())), + binlog_event.schema, binlog_event.table, + ', '.join(map(lambda key: '`%s`' % key, row['values'].keys())), ', '.join(['%s'] * len(row['values'])) ) values = map(fix_object, row['values'].values()) - elif isinstance(binlogevent, DeleteRowsEvent): - template ='DELETE FROM `{0}`.`{1}` WHERE {2} LIMIT 1;'.format( - binlogevent.schema, binlogevent.table, - ' AND '.join(map(compare_items, row['values'].items())) - ) + elif isinstance(binlog_event, DeleteRowsEvent): + template = 'DELETE FROM `{0}`.`{1}` WHERE {2} LIMIT 1;'.format( + binlog_event.schema, binlog_event.table, ' AND '.join(map(compare_items, row['values'].items()))) values = map(fix_object, row['values'].values()) - elif isinstance(binlogevent, UpdateRowsEvent): + elif isinstance(binlog_event, UpdateRowsEvent): template = 'UPDATE `{0}`.`{1}` SET {2} WHERE {3} LIMIT 1;'.format( - binlogevent.schema, binlogevent.table, - ', '.join(['`%s`=%%s'%k for k in row['after_values'].keys()]), + binlog_event.schema, binlog_event.table, + ', '.join(['`%s`=%%s' % k for k in row['after_values'].keys()]), ' AND '.join(map(compare_items, row['before_values'].items())) ) values = map(fix_object, row['after_values'].values()+row['before_values'].values()) - return {'template':template, 'values':values} + return {'template': template, 'values': values} -def reversed_lines(file): - "Generate the lines of file in reverse order." + +def reversed_lines(fin): + """Generate the lines of file in reverse order.""" part = '' - for block in reversed_blocks(file): + for block in reversed_blocks(fin): for c in reversed(block): if c == '\n' and part: yield part[::-1] part = '' part += c - if part: yield part[::-1] + if part: + yield part[::-1] -def reversed_blocks(file, blocksize=4096): - "Generate blocks of file's contents in reverse order." - file.seek(0, os.SEEK_END) - here = file.tell() + +def reversed_blocks(fin, block_size=4096): + """Generate blocks of file's contents in reverse order.""" + fin.seek(0, os.SEEK_END) + here = fin.tell() while 0 < here: - delta = min(blocksize, here) + delta = min(block_size, here) here -= delta - file.seek(here, os.SEEK_SET) - yield file.read(delta) + fin.seek(here, os.SEEK_SET) + yield fin.read(delta) diff --git a/tests/test_binlog2sql_util.py b/tests/test_binlog2sql_util.py index a3b740a..34b61be 100755 --- a/tests/test_binlog2sql_util.py +++ b/tests/test_binlog2sql_util.py @@ -4,14 +4,6 @@ import sys import unittest import mock -from pymysql.cursors import Cursor -from pymysql.connections import Connection -from pymysqlreplication.event import BinLogEvent -from pymysqlreplication.row_event import ( - WriteRowsEvent, - UpdateRowsEvent, - DeleteRowsEvent, -) sys.path.append("..") from binlog2sql.binlog2sql_util import * @@ -38,13 +30,13 @@ class TestBinlog2sqlUtil(unittest.TestCase): def test_command_line_args(self): try: - command_line_args([]) + command_line_args(['--flashback', '--no-primary-key']) except Exception as e: - self.assertEqual(str(e), "Lack of parameter: startFile") + self.assertEqual(str(e), "Lack of parameter: start_file") try: command_line_args(['--start-file', 'mysql-bin.000058', '--flashback', '--no-primary-key']) except Exception as e: - self.assertEqual(str(e), "Only one of flashback or nopk can be True") + self.assertEqual(str(e), "Only one of flashback or no_pk can be True") try: command_line_args(['--start-file', 'mysql-bin.000058', '--flashback', '--stop-never']) except Exception as e: @@ -63,35 +55,43 @@ class TestBinlog2sqlUtil(unittest.TestCase): self.assertEqual(fix_object(u'unicode'), u'unicode'.encode('utf-8')) def test_generate_sql_pattern(self): - row = {'values':{'data':'hello','id':1}} + row = {'values': {'data': 'hello', 'id': 1}} mock_write_event = mock.create_autospec(WriteRowsEvent) mock_write_event.schema = 'test' mock_write_event.table = 'tbl' mock_write_event.primary_key = 'id' - pattern = generate_sql_pattern(binlogevent=mock_write_event, row=row, flashback=False, nopk=False) - self.assertEqual(pattern, {'values': ['hello', 1], 'template': 'INSERT INTO `test`.`tbl`(`data`, `id`) VALUES (%s, %s);'}) - pattern = generate_sql_pattern(binlogevent=mock_write_event, row=row, flashback=True, nopk=False) - self.assertEqual(pattern, {'values': ['hello', 1], 'template': 'DELETE FROM `test`.`tbl` WHERE `data`=%s AND `id`=%s LIMIT 1;'}) - pattern = generate_sql_pattern(binlogevent=mock_write_event, row=row, flashback=False, nopk=True) + pattern = generate_sql_pattern(binlog_event=mock_write_event, row=row, flashback=False, no_pk=False) + self.assertEqual(pattern, {'values': ['hello', 1], + 'template': 'INSERT INTO `test`.`tbl`(`data`, `id`) VALUES (%s, %s);'}) + pattern = generate_sql_pattern(binlog_event=mock_write_event, row=row, flashback=True, no_pk=False) + self.assertEqual(pattern, {'values': ['hello', 1], + 'template': 'DELETE FROM `test`.`tbl` WHERE `data`=%s AND `id`=%s LIMIT 1;'}) + pattern = generate_sql_pattern(binlog_event=mock_write_event, row=row, flashback=False, no_pk=True) self.assertEqual(pattern, {'values': ['hello'], 'template': 'INSERT INTO `test`.`tbl`(`data`) VALUES (%s);'}) row = {'values':{'data':'hello','id':1}} mock_delete_event = mock.create_autospec(DeleteRowsEvent) mock_delete_event.schema = 'test' mock_delete_event.table = 'tbl' - pattern = generate_sql_pattern(binlogevent=mock_delete_event, row=row, flashback=False, nopk=False) - self.assertEqual(pattern, {'values': ['hello', 1], 'template': 'DELETE FROM `test`.`tbl` WHERE `data`=%s AND `id`=%s LIMIT 1;'}) - pattern = generate_sql_pattern(binlogevent=mock_delete_event, row=row, flashback=True, nopk=False) - self.assertEqual(pattern, {'values': ['hello', 1], 'template': 'INSERT INTO `test`.`tbl`(`data`, `id`) VALUES (%s, %s);'}) + pattern = generate_sql_pattern(binlog_event=mock_delete_event, row=row, flashback=False, no_pk=False) + self.assertEqual(pattern, {'values': ['hello', 1], + 'template': 'DELETE FROM `test`.`tbl` WHERE `data`=%s AND `id`=%s LIMIT 1;'}) + pattern = generate_sql_pattern(binlog_event=mock_delete_event, row=row, flashback=True, no_pk=False) + self.assertEqual(pattern, {'values': ['hello', 1], + 'template': 'INSERT INTO `test`.`tbl`(`data`, `id`) VALUES (%s, %s);'}) - row = {'before_values':{'data':'hello','id':1}, 'after_values':{'data':'binlog2sql','id':1}} + row = {'before_values': {'data': 'hello', 'id': 1}, 'after_values': {'data': 'binlog2sql', 'id': 1}} mock_update_event = mock.create_autospec(UpdateRowsEvent) mock_update_event.schema = 'test' mock_update_event.table = 'tbl' - pattern = generate_sql_pattern(binlogevent=mock_update_event, row=row, flashback=False, nopk=False) - self.assertEqual(pattern, {'values': ['binlog2sql', 1, 'hello', 1], 'template': 'UPDATE `test`.`tbl` SET `data`=%s, `id`=%s WHERE `data`=%s AND `id`=%s LIMIT 1;'}) - pattern = generate_sql_pattern(binlogevent=mock_update_event, row=row, flashback=True, nopk=False) - self.assertEqual(pattern, {'values': ['hello', 1, 'binlog2sql', 1], 'template': 'UPDATE `test`.`tbl` SET `data`=%s, `id`=%s WHERE `data`=%s AND `id`=%s LIMIT 1;'}) + pattern = generate_sql_pattern(binlog_event=mock_update_event, row=row, flashback=False, no_pk=False) + self.assertEqual(pattern, {'values': ['binlog2sql', 1, 'hello', 1], + 'template': 'UPDATE `test`.`tbl` SET `data`=%s, `id`=%s WHERE `data`=%s AND' + ' `id`=%s LIMIT 1;'}) + pattern = generate_sql_pattern(binlog_event=mock_update_event, row=row, flashback=True, no_pk=False) + self.assertEqual(pattern, {'values': ['hello', 1, 'binlog2sql', 1], + 'template': 'UPDATE `test`.`tbl` SET `data`=%s, `id`=%s WHERE `data`=%s AND' + ' `id`=%s LIMIT 1;'}) if __name__ == '__main__':