OceanBase 升级过程研究(4.2.1.6-4.2.1.8)

模拟业务

使用benchmark加载10仓数据模拟业务场景

升级方法

使用滚动升级方式来进行OB升级。该方法前提是OB集群必须满足官方规定的高可用架构(如果 Zone 个数小于 3,滚动升级时则无法构成多数派), 滚动升级的原理就是轮流完成每个ZONE的升级工作,由于OB属于分布式集群,当一个ZONE处于升级状态中,其余的ZONE提供服务,不影响业务

任务类型

升级时会产生一个主任务及多个子任务。主任务根据每个需要替换 binary 版本的升级节点,来生成数个子任务,来逐步替换掉需要升级的 binary 文件。

  • 主任务:将升级路径中每个需要替换 binary 文件的升级节点都生成为一个子任务,用于逐步替换掉需要升级的 binary 文件。 如下图所示,Submit cluster upgrade task 即为生成一个子任务。Wait dag success 为等待子任务执行完成。Submit cluster upgrade task 和 Wait dag success 这个组合可能会执行多次,执行次数取决于升级路径中有多少个替换 binary 的节点。

  • 子任务:升级所有 OceanBase 集群版本。主要是执行升级脚本: 预检查脚本、升级 Pre 脚本、替换 OBServer 节点、升级 Post 脚本、版本检查等。

升级任务(子任务)

由于是滚动升级,所以子任务中的升级任务数量取决于ZONE的数量

升级前检查

在 set zone contaxt 任务模块前的所有任务均是升级前对集群的检查,检查租户的凭据以及运行安装目录etc下的升级检查脚本,查看当前集群是否满足升级。由于是检查操作,所以这些任务模块如果运行失败可以进行回滚。这些检查模块运行成功后开始执行ZONE的升级

upgrade_checker.py 脚本进行升级前置检查,脚本执行成功表示可以继续升级

upgrade_pre.py 该脚本主要用于执行升级前的一系列准备工作,确保集群处于适合升级的状态

upgrade_health_checker.py该脚本主要用于执行集群级别的健康检查,确保集群在升级前处于健康状态

upgrade_checker.py脚本内容

[root@sdw1 etc]# cat upgrade_checker.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-


import sys
import os
import mysql.connector
from mysql.connector import errorcode
import logging
import getopt
import time


class UpgradeParams:
  log_filename = 'upgrade_checker.log'
  old_version = '4.0.0.0'
#### --------------start : my_error.py --------------
class MyError(Exception):
  def __init__(self, value):
    self.value = value
  def __str__(self):
    return repr(self.value)
#### --------------start : actions.py------------
class Cursor:
  __cursor = None
  def __init__(self, cursor):
    self.__cursor = cursor
  def exec_sql(self, sql, print_when_succ = True):
    try:
      self.__cursor.execute(sql)
      rowcount = self.__cursor.rowcount
      if True == print_when_succ:
        logging.info('succeed to execute sql: %s, rowcount = %d', sql, rowcount)
      return rowcount
    except mysql.connector.Error, e:
      logging.exception('mysql connector error, fail to execute sql: %s', sql)
      raise e
    except Exception, e:
      logging.exception('normal error, fail to execute sql: %s', sql)
      raise e
  def exec_query(self, sql, print_when_succ = True):
    try:
      self.__cursor.execute(sql)
      results = self.__cursor.fetchall()
      rowcount = self.__cursor.rowcount
      if True == print_when_succ:
        logging.info('succeed to execute query: %s, rowcount = %d', sql, rowcount)
      return (self.__cursor.description, results)
    except mysql.connector.Error, e:
      logging.exception('mysql connector error, fail to execute sql: %s', sql)
      raise e
    except Exception, e:
      logging.exception('normal error, fail to execute sql: %s', sql)
      raise e


def set_parameter(cur, parameter, value):
  sql = """alter system set {0} = '{1}'""".format(parameter, value)
  logging.info(sql)
  cur.execute(sql)
  wait_parameter_sync(cur, parameter, value)


def wait_parameter_sync(cur, key, value):
  sql = """select count(*) as cnt from oceanbase.__all_virtual_sys_parameter_stat
           where name = '{0}' and value != '{1}'""".format(key, value)
  times = 10
  while times > 0:
    logging.info(sql)
    cur.execute(sql)
    result = cur.fetchall()
    if len(result) != 1 or len(result[0]) != 1:
      logging.exception('result cnt not match')
      raise e
    elif result[0][0] == 0:
      logging.info("""{0} is sync, value is {1}""".format(key, value))
      break
    else:
      logging.info("""{0} is not sync, value should be {1}""".format(key, value))


    times -= 1
    if times == 0:
      logging.exception("""check {0}:{1} sync timeout""".format(key, value))
      raise e
    time.sleep(5)


#### --------------start :  opt.py --------------
help_str = \
"""
Help:
""" +\
sys.argv[0] + """ [OPTIONS]""" +\
'\n\n' +\
'-I, --help          Display this help and exit.\n' +\
'-V, --version       Output version information and exit.\n' +\
'-h, --host=name     Connect to host.\n' +\
'-P, --port=name     Port number to use for connection.\n' +\
'-u, --user=name     User for login.\n' +\
'-t, --timeout=name  Cmd/Query/Inspection execute timeout(s).\n' +\
'-p, --password=name Password to use when connecting to server. If password is\n' +\
'                    not given it\'s empty string "".\n' +\
'-m, --module=name   Modules to run. Modules should be a string combined by some of\n' +\
'                    the following strings: ddl, normal_dml, each_tenant_dml,\n' +\
'                    system_variable_dml, special_action, all. "all" represents\n' +\
'                    that all modules should be run. They are splitted by ",".\n' +\
'                    For example: -m all, or --module=ddl,normal_dml,special_action\n' +\
'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\
'\n\n' +\
'Maybe you want to run cmd like that:\n' +\
sys.argv[0] + ' -h 127.0.0.1 -P 3306 -u admin -p admin\n'


version_str = """version 1.0.0"""


class Option:
  __g_short_name_set = set([])
  __g_long_name_set = set([])
  __short_name = None
  __long_name = None
  __is_with_param = None
  __is_local_opt = None
  __has_value = None
  __value = None
  def __init__(self, short_name, long_name, is_with_param, is_local_opt, default_value = None):
    if short_name in Option.__g_short_name_set:
      raise MyError('duplicate option short name: {0}'.format(short_name))
    elif long_name in Option.__g_long_name_set:
      raise MyError('duplicate option long name: {0}'.format(long_name))
    Option.__g_short_name_set.add(short_name)
    Option.__g_long_name_set.add(long_name)
    self.__short_name = short_name
    self.__long_name = long_name
    self.__is_with_param = is_with_param
    self.__is_local_opt = is_local_opt
    self.__has_value = False
    if None != default_value:
      self.set_value(default_value)
  def is_with_param(self):
    return self.__is_with_param
  def get_short_name(self):
    return self.__short_name
  def get_long_name(self):
    return self.__long_name
  def has_value(self):
    return self.__has_value
  def get_value(self):
    return self.__value
  def set_value(self, value):
    self.__value = value
    self.__has_value = True
  def is_local_opt(self):
    return self.__is_local_opt
  def is_valid(self):
    return None != self.__short_name and None != self.__long_name and True == self.__has_value and None != self.__value


g_opts =\
[\
Option('I', 'help', False, True),\
Option('V', 'version', False, True),\
Option('h', 'host', True, False),\
Option('P', 'port', True, False),\
Option('u', 'user', True, False),\
Option('t', 'timeout', True, False, 0),\
Option('p', 'password', True, False, ''),\
# 要跑哪个模块,默认全跑
Option('m', 'module', True, False, 'all'),\
# 日志文件路径,不同脚本的main函数中中会改成不同的默认值
Option('l', 'log-file', True, False)
]\


def change_opt_defult_value(opt_long_name, opt_default_val):
  global g_opts
  for opt in g_opts:
    if opt.get_long_name() == opt_long_name:
      opt.set_value(opt_default_val)
      return


def has_no_local_opts():
  global g_opts
  no_local_opts = True
  for opt in g_opts:
    if opt.is_local_opt() and opt.has_value():
      no_local_opts = False
  return no_local_opts


def check_db_client_opts():
  global g_opts
  for opt in g_opts:
    if not opt.is_local_opt() and not opt.has_value():
      raise MyError('option "-{0}" has not been specified, maybe you should run "{1} --help" for help'\
          .format(opt.get_short_name(), sys.argv[0]))


def parse_option(opt_name, opt_val):
  global g_opts
  for opt in g_opts:
    if opt_name in (('-' + opt.get_short_name()), ('--' + opt.get_long_name())):
      opt.set_value(opt_val)


def parse_options(argv):
  global g_opts
  short_opt_str = ''
  long_opt_list = []
  for opt in g_opts:
    if opt.is_with_param():
      short_opt_str += opt.get_short_name() + ':'
    else:
      short_opt_str += opt.get_short_name()
  for opt in g_opts:
    if opt.is_with_param():
      long_opt_list.append(opt.get_long_name() + '=')
    else:
      long_opt_list.append(opt.get_long_name())
  (opts, args) = getopt.getopt(argv, short_opt_str, long_opt_list)
  for (opt_name, opt_val) in opts:
    parse_option(opt_name, opt_val)
  if has_no_local_opts():
    check_db_client_opts()


def deal_with_local_opt(opt):
  if 'help' == opt.get_long_name():
    global help_str
    print help_str
  elif 'version' == opt.get_long_name():
    global version_str
    print version_str


def deal_with_local_opts():
  global g_opts
  if has_no_local_opts():
    raise MyError('no local options, can not deal with local options')
  else:
    for opt in g_opts:
      if opt.is_local_opt() and opt.has_value():
        deal_with_local_opt(opt)
        # 只处理一个
        return


def get_opt_host():
  global g_opts
  for opt in g_opts:
    if 'host' == opt.get_long_name():
      return opt.get_value()


def get_opt_port():
  global g_opts
  for opt in g_opts:
    if 'port' == opt.get_long_name():
      return opt.get_value()


def get_opt_user():
  global g_opts
  for opt in g_opts:
    if 'user' == opt.get_long_name():
      return opt.get_value()


def get_opt_password():
  global g_opts
  for opt in g_opts:
    if 'password' == opt.get_long_name():
      return opt.get_value()


def get_opt_timeout():
  global g_opts
  for opt in g_opts:
    if 'timeout' == opt.get_long_name():
      return opt.get_value()


def get_opt_module():
  global g_opts
  for opt in g_opts:
    if 'module' == opt.get_long_name():
      return opt.get_value()


def get_opt_log_file():
  global g_opts
  for opt in g_opts:
    if 'log-file' == opt.get_long_name():
      return opt.get_value()
#### ---------------end----------------------


#### --------------start :  do_upgrade_pre.py--------------
def config_logging_module(log_filenamme):
  logging.basicConfig(level=logging.INFO,\
      format='[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s',\
      datefmt='%Y-%m-%d %H:%M:%S',\
      filename=log_filenamme,\
      filemode='w')
  # 定义日志打印格式
  formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s', '%Y-%m-%d %H:%M:%S')
  #######################################
  # 定义一个Handler打印INFO及以上级别的日志到sys.stdout
  stdout_handler = logging.StreamHandler(sys.stdout)
  stdout_handler.setLevel(logging.INFO)
  # 设置日志打印格式
  stdout_handler.setFormatter(formatter)
  # 将定义好的stdout_handler日志handler添加到root logger
  logging.getLogger('').addHandler(stdout_handler)
#### ---------------end----------------------




fail_list=[]


def get_version(version_str):
  versions = version_str.split(".")


  if len(versions) != 4:
    logging.exception("""version:{0} is invalid""".format(version_str))
    raise e


  major = int(versions[0])
  minor = int(versions[1])
  major_patch = int(versions[2])
  minor_patch = int(versions[3])


  if major > 0xffffffff or minor > 0xffff or major_patch > 0xff or minor_patch > 0xff:
    logging.exception("""version:{0} is invalid""".format(version_str))
    raise e


  version = (major << 32) | (minor << 16) | (major_patch << 8) | (minor_patch)
  return version


#### START ####
# 1. 检查前置版本
def check_observer_version(query_cur, upgrade_params):
  (desc, results) = query_cur.exec_query("""select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'""")
  if len(results) != 1:
    fail_list.append('min_observer_version is not sync')
  elif cmp(results[0][0], upgrade_params.old_version) < 0 :
    fail_list.append('old observer version is expected equal or higher then: {0}, actual version:{1}'.format(upgrade_params.old_version, results[0][0]))
  logging.info('check observer version success, version = {0}'.format(results[0][0]))


def check_data_version(query_cur, input_min_cluster_version):
  min_cluster_version = 0
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
  (desc, results) = query_cur.exec_query(sql)
  if len(results) != 1:
    fail_list.append('min_observer_version is not sync')
  elif len(results[0]) != 1:
    fail_list.append('column cnt not match')
  else:
    min_cluster_version = get_version(results[0][0])
    input_min_cluster_version[0] = min_cluster_version


    # check data version
    if min_cluster_version < get_version("4.1.0.0"):
      # last barrier cluster version should be 4.1.0.0
      fail_list.append('last barrier cluster version is 4.1.0.0. prohibit cluster upgrade from cluster version less than 4.1.0.0')
    else:
      data_version_str = ''
      data_version = 0
      # check compatible is same
      sql = """select distinct value from oceanbase.__all_virtual_tenant_parameter_info where name='compatible'"""
      (desc, results) = query_cur.exec_query(sql)
      if len(results) != 1:
        fail_list.append('compatible is not sync')
      elif len(results[0]) != 1:
        fail_list.append('column cnt not match')
      else:
        data_version_str = results[0][0]
        data_version = get_version(results[0][0])


        if data_version < get_version("4.1.0.0"):
          # last barrier data version should be 4.1.0.0
          fail_list.append('last barrier data version is 4.1.0.0. prohibit cluster upgrade from data version less than 4.1.0.0')
        else:
          # check target_data_version/current_data_version
          sql = "select count(*) from oceanbase.__all_tenant"
          (desc, results) = query_cur.exec_query(sql)
          if len(results) != 1 or len(results[0]) != 1:
            fail_list.append('result cnt not match')
          else:
            tenant_count = results[0][0]


            sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0}".format(data_version)
            (desc, results) = query_cur.exec_query(sql)
            if len(results) != 1 or len(results[0]) != 1:
              fail_list.append('result cnt not match')
            elif 2 * tenant_count != results[0][0]:
              fail_list.append('target_data_version/current_data_version not match with {0}, tenant_cnt:{1}, result_cnt:{2}'.format(data_version_str, tenant_count, results[0][0]))
            else:
              logging.info("check data version success, all tenant's compatible/target_data_version/current_data_version is {0}".format(data_version_str))


# 2. 检查paxos副本是否同步, paxos副本是否缺失
def check_paxos_replica(query_cur):
  # 2.1 检查paxos副本是否同步
  (desc, results) = query_cur.exec_query("""select count(1) as unsync_cnt from GV$OB_LOG_STAT where in_sync = 'NO'""")
  if results[0][0] > 0 :
    fail_list.append('{0} replicas unsync, please check'.format(results[0][0]))
  # 2.2 检查paxos副本是否有缺失 TODO
  logging.info('check paxos replica success')


# 3. 检查是否有做balance, locality变更
def check_rebalance_task(query_cur):
  # 3.1 检查是否有做locality变更
  (desc, results) = query_cur.exec_query("""select count(1) as cnt from DBA_OB_TENANT_JOBS where job_status='INPROGRESS' and result_code is null""")
  if results[0][0] > 0 :
    fail_list.append('{0} locality tasks is doing, please check'.format(results[0][0]))
  # 3.2 检查是否有做balance
  (desc, results) = query_cur.exec_query("""select count(1) as rebalance_task_cnt from CDB_OB_LS_REPLICA_TASKS""")
  if results[0][0] > 0 :
    fail_list.append('{0} rebalance tasks is doing, please check'.format(results[0][0]))
  logging.info('check rebalance task success')


# 4. 检查集群状态
def check_cluster_status(query_cur, min_cluster_version):
  # 4.1 检查是否非合并状态
  (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""")
  if results[0][0] > 0 :
    fail_list.append('{0} tenant is merging, please check'.format(results[0][0]))
  if len(min_cluster_version) > 0 and min_cluster_version[0] < get_version("4.2.0.0"):
    (desc, results) = query_cur.exec_query("""select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""")
    if results[0][0] > 0 :
      fail_list.append('{0} tablet is merging, please check'.format(results[0][0]))
    logging.info('check cluster tablet major status success, cluster_version={0}'.format(min_cluster_version))
  else:
    logging.info('skip check cluster tablet major status, cluster_version={0}'.format(min_cluster_version))


# 5. 检查是否有异常租户(creating,延迟删除,恢复中)
def check_tenant_status(query_cur):


  # check tenant schema
  (desc, results) = query_cur.exec_query("""select count(*) as count from DBA_OB_TENANTS where status != 'NORMAL'""")
  if len(results) != 1 or len(results[0]) != 1:
    fail_list.append('results len not match')
  elif 0 != results[0][0]:
    fail_list.append('has abnormal tenant, should stop')
  else:
    logging.info('check tenant status success')


  # check tenant info
  # don't support restore tenant upgrade
  (desc, results) = query_cur.exec_query("""select count(*) as count from oceanbase.__all_virtual_tenant_info where tenant_role != 'PRIMARY' and tenant_role != 'STANDBY'""")
  if len(results) != 1 or len(results[0]) != 1:
    fail_list.append('results len not match')
  elif 0 != results[0][0]:
    fail_list.append('has abnormal tenant info, should stop')
  else:
    logging.info('check tenant info success')


# 6. 检查无恢复任务
def check_restore_job_exist(query_cur):
  (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_RESTORE_PROGRESS""")
  if len(results) != 1 or len(results[0]) != 1:
    fail_list.append('failed to restore job cnt')
  elif results[0][0] != 0:
      fail_list.append("""still has restore job, upgrade is not allowed temporarily""")
  logging.info('check restore job success')


def check_is_primary_zone_distributed(primary_zone_str):
  semicolon_pos = len(primary_zone_str)
  for i in range(len(primary_zone_str)):
    if primary_zone_str[i] == ';':
      semicolon_pos = i
      break
  comma_pos = len(primary_zone_str)
  for j in range(len(primary_zone_str)):
    if primary_zone_str[j] == ',':
      comma_pos = j
      break
  if comma_pos < semicolon_pos:
    return True
  else:
    return False


# 7. 升级前需要primary zone只有一个
def check_tenant_primary_zone(query_cur):
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
  (desc, results) = query_cur.exec_query(sql)
  if len(results) != 1:
    fail_list.append('min_observer_version is not sync')
  elif len(results[0]) != 1:
    fail_list.append('column cnt not match')
  else:
    min_cluster_version = get_version(results[0][0])
    if min_cluster_version < get_version("4.1.0.0"):
      (desc, results) = query_cur.exec_query("""select tenant_name,primary_zone from DBA_OB_TENANTS where  tenant_id != 1""");
      for item in results:
        if cmp(item[1], "RANDOM") == 0:
          fail_list.append('{0} tenant primary zone random before update not allowed'.format(item[0]))
        elif check_is_primary_zone_distributed(item[1]):
          fail_list.append('{0} tenant primary zone distributed before update not allowed'.format(item[0]))
      logging.info('check tenant primary zone success')


# 8. 修改永久下线的时间,避免升级过程中缺副本
def modify_server_permanent_offline_time(cur):
  set_parameter(cur, 'server_permanent_offline_time', '72h')


# 9. 检查是否有DDL任务在执行
def check_ddl_task_execute(query_cur):
  (desc, results) = query_cur.exec_query("""select count(1) from __all_virtual_ddl_task_status""")
  if 0 != results[0][0]:
    fail_list.append("There are DDL task in progress")
  logging.info('check ddl task execut status success')


# 10. 检查无备份任务
def check_backup_job_exist(query_cur):
  # Backup jobs cannot be in-progress during upgrade.
  (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_BACKUP_JOBS""")
  if len(results) != 1 or len(results[0]) != 1:
    fail_list.append('failed to backup job cnt')
  elif results[0][0] != 0:
    fail_list.append("""still has backup job, upgrade is not allowed temporarily""")
  else:
    logging.info('check backup job success')


# 11. 检查无归档任务
def check_archive_job_exist(query_cur):
  min_cluster_version = 0
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
  (desc, results) = query_cur.exec_query(sql)
  if len(results) != 1:
    fail_list.append('min_observer_version is not sync')
  elif len(results[0]) != 1:
    fail_list.append('column cnt not match')
  else:
    min_cluster_version = get_version(results[0][0])


    # Archive jobs cannot be in-progress before upgrade from 4.0.
    if min_cluster_version < get_version("4.1.0.0"):
      (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_ARCHIVELOG where status!='STOP'""")
      if len(results) != 1 or len(results[0]) != 1:
        fail_list.append('failed to archive job cnt')
      elif results[0][0] != 0:
        fail_list.append("""still has archive job, upgrade is not allowed temporarily""")
      else:
        logging.info('check archive job success')


# 12. 检查归档路径是否清空
def check_archive_dest_exist(query_cur):
  min_cluster_version = 0
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
  (desc, results) = query_cur.exec_query(sql)
  if len(results) != 1:
    fail_list.append('min_observer_version is not sync')
  elif len(results[0]) != 1:
    fail_list.append('column cnt not match')
  else:
    min_cluster_version = get_version(results[0][0])
    # archive dest need to be cleaned before upgrade from 4.0.
    if min_cluster_version < get_version("4.1.0.0"):
      (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_ARCHIVE_DEST""")
      if len(results) != 1 or len(results[0]) != 1:
        fail_list.append('failed to archive dest cnt')
      elif results[0][0] != 0:
        fail_list.append("""still has archive destination, upgrade is not allowed temporarily""")
      else:
        logging.info('check archive destination success')


# 13. 检查备份路径是否清空
def check_backup_dest_exist(query_cur):
  min_cluster_version = 0
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
  (desc, results) = query_cur.exec_query(sql)
  if len(results) != 1:
    fail_list.append('min_observer_version is not sync')
  elif len(results[0]) != 1:
    fail_list.append('column cnt not match')
  else:
    min_cluster_version = get_version(results[0][0])
    # backup dest need to be cleaned before upgrade from 4.0.
    if min_cluster_version < get_version("4.1.0.0"):
      (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_BACKUP_PARAMETER where name='data_backup_dest' and (value!=NULL or value!='')""")
      if len(results) != 1 or len(results[0]) != 1:
        fail_list.append('failed to data backup dest cnt')
      elif results[0][0] != 0:
        fail_list.append("""still has backup destination, upgrade is not allowed temporarily""")
      else:
        logging.info('check backup destination success')


def check_server_version(query_cur):
    sql = """select distinct(substring_index(build_version, '_', 1)) from __all_server""";
    (desc, results) = query_cur.exec_query(sql);
    if len(results) != 1:
      fail_list.append("servers build_version not match")
    else:
      logging.info("check server version success")


# 14. 检查server是否可服务
def check_observer_status(query_cur):
  (desc, results) = query_cur.exec_query("""select count(*) from oceanbase.__all_server where (start_service_time <= 0 or status != "active")""")
  if results[0][0] > 0 :
    fail_list.append('{0} observer not available , please check'.format(results[0][0]))
  logging.info('check observer status success')


# 15  检查schema是否刷新成功
def check_schema_status(query_cur):
  (desc, results) = query_cur.exec_query("""select if (a.cnt = b.cnt, 1, 0) as passed from (select count(*) as cnt from oceanbase.__all_virtual_server_schema_info where refreshed_schema_version > 1 and refreshed_schema_version % 8 = 0) as a join (select count(*) as cnt from oceanbase.__all_server join oceanbase.__all_tenant) as b""")
  if results[0][0] != 1 :
    fail_list.append('{0} schema not available, please check'.format(results[0][0]))
  logging.info('check schema status success')


# 16. 检查是否存在名为all/all_user/all_meta的租户
def check_not_supported_tenant_name(query_cur):
  names = ["all", "all_user", "all_meta"]
  (desc, results) = query_cur.exec_query("""select tenant_name from oceanbase.DBA_OB_TENANTS""")
  for i in range(len(results)):
    if results[i][0].lower() in names:
      fail_list.append('a tenant named all/all_user/all_meta (case insensitive) cannot exist in the cluster, please rename the tenant')
      break
  logging.info('check special tenant name success')


# last check of do_check, make sure no function execute after check_fail_list
def check_fail_list():
  if len(fail_list) != 0 :
     error_msg ="upgrade checker failed with " + str(len(fail_list)) + " reasons: " + ", ".join(['['+x+"] " for x in fail_list])
     raise MyError(error_msg)


def set_query_timeout(query_cur, timeout):
  if timeout != 0:
    sql = """set @@session.ob_query_timeout = {0}""".format(timeout * 1000 * 1000)
    query_cur.exec_sql(sql)


# 开始升级前的检查
def do_check(my_host, my_port, my_user, my_passwd, timeout, upgrade_params):
  try:
    conn = mysql.connector.connect(user = my_user,
                                   password = my_passwd,
                                   host = my_host,
                                   port = my_port,
                                   database = 'oceanbase',
                                   raise_on_warnings = True)
    conn.autocommit = True
    cur = conn.cursor(buffered=True)
    min_cluster_version = [0]
    try:
      query_cur = Cursor(cur)
      set_query_timeout(query_cur, timeout)
      check_observer_version(query_cur, upgrade_params)
      check_data_version(query_cur, min_cluster_version)
      check_paxos_replica(query_cur)
      check_rebalance_task(query_cur)
      check_cluster_status(query_cur, min_cluster_version)
      check_tenant_status(query_cur)
      check_restore_job_exist(query_cur)
      check_tenant_primary_zone(query_cur)
      check_ddl_task_execute(query_cur)
      check_backup_job_exist(query_cur)
      check_archive_job_exist(query_cur)
      check_archive_dest_exist(query_cur)
      check_backup_dest_exist(query_cur)
      check_observer_status(query_cur)
      check_schema_status(query_cur)
      check_server_version(query_cur)
      check_not_supported_tenant_name(query_cur)
      # all check func should execute before check_fail_list
      check_fail_list()
      modify_server_permanent_offline_time(cur)
    except Exception, e:
      logging.exception('run error')
      raise e
    finally:
      cur.close()
      conn.close()
  except mysql.connector.Error, e:
    logging.exception('connection error')
    raise e
  except Exception, e:
    logging.exception('normal error')
    raise e


if __name__ == '__main__':
  upgrade_params = UpgradeParams()
  change_opt_defult_value('log-file', upgrade_params.log_filename)
  parse_options(sys.argv[1:])
  if not has_no_local_opts():
    deal_with_local_opts()
  else:
    check_db_client_opts()
    log_filename = get_opt_log_file()
    upgrade_params.log_filename = log_filename
    # 日志配置放在这里是为了前面的操作不要覆盖掉日志文件
    config_logging_module(upgrade_params.log_filename)
    try:
      host = get_opt_host()
      port = int(get_opt_port())
      user = get_opt_user()
      password = get_opt_password()
      timeout = int(get_opt_timeout())
      logging.info('parameters from cmd: host=\"%s\", port=%s, user=\"%s\", password=\"%s\", timeout=\"%s\", log-file=\"%s\"',\
          host, port, user, password, timeout, log_filename)
      do_check(host, port, user, password, timeout, upgrade_params)
    except mysql.connector.Error, e:
      logging.exception('mysql connctor error')
      raise e
    except Exception, e:
      logging.exception('normal error')
      raise e
[root@sdw1 etc]#



ZONE升级过程

由于采用的是滚动升级的方法。升级任务为了保障升级接断业务的稳定性,先进行一个ZONE升级,当这个ZONE升级完成后,再进行另外一个ZONE升级。即使一个ZONE正在进行升级或者升级失败,其它ZONE依然可以提供服务,保持业务的可持续性

每个ZONE升级任务阶段均是以下过程:

Set zone context (开始) -> Check zone(结束)

ZONE升级任务

ZONE升级前会关闭该ZONE的运行状态,并且停掉observer的进程,之后下载 升级目标版本的依赖(Install dependencies) 以及 OB安装包(Install OB rpm)

依赖和安装包下载完成后,开启observer进程(Wait observer version refreshed) 以及 observer版本的刷新(Wait observer version refreshed)

observer启动成功之后,会检查 clog的状态(Check clog stat)observer版本更新的情况(Check observer refresh schema) 和 集群的检查(Execute upgrade health checker script),之后开启ZONE

注意:当前zone开启成功后代表该zone已经升级成功,之后会重复以上步骤升级剩余的zone,所以升级任务的多少取决于zone以及租户的数量

查看业务情况

初始化数据完成,证明滚动升级不会对业务运行产生影响

总结

1.集群升级前会进行相应的集群检查工作,确保当前OB是否满足升级条件

2.升级任务量的多少取决于zone的多少

3.升级过程中会停止zone 与 observer ,该操作不会进行回滚,所以之前的检查任务如果出错必须查看原因,不可以进行跳过

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/916845.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

使用 Ant Design Vue 自定渲染函数customRender实现单元格合并功能rowSpan

使用 Ant Design Vue 自定渲染函数customRender实现单元格合并功能rowSpan 背景 在使用Ant Design Vue 开发数据表格时&#xff0c;我们常常会遇到需要合并单元格的需求。 比如&#xff0c;某些字段的值可能会在多行中重复出现&#xff0c;而我们希望将这些重复的单元格合并为…

Godot的开发框架应当是什么样子的?

目录 前言 全局协程还是实例协程&#xff1f; 存档&#xff01; 全局管理类&#xff1f; UI框架&#xff1f; Godot中的异步&#xff08;多线程&#xff09;加载 Godot中的ScriptableObject 游戏流程思考 结语 前言 这是一篇杂谈&#xff0c;主要内容是对我…

ssm118亿互游在线平台设计与开发+vue(论文+源码)_kaic

毕业设计(论文) 亿互游在线平台的设计与开发 学生姓名 XXX 学 号 XXXXXXXX 分院名称 XXXXXXXX 专业班级 XXXXX 指导教师 XXXX 填写日期 XXXX年…

MySQL技巧之跨服务器数据查询:基础篇-A数据库与B数据库查询合并

MySQL技巧之跨服务器数据查询&#xff1a;基础篇-A数据库与B数据库查询合并 上一篇已经描述&#xff1a;借用微软的SQL Server ODBC 即可实现MySQL跨服务器间的数据查询。 而且还介绍了如何获得一个在MS SQL Server 可以连接指定实例的MySQL数据库的链接名: MY_ODBC_MYSQL 以…

LLaMA-Factory全流程训练模型

&#x1f917;本文主要讲述在docker下使用LLaMA-Factory训练推理模型。 &#x1fae1;拉取镜像 首先需要启动docker&#xff0c;然后在终端中输入&#xff1a; docker run -tid --gpus all -p 8000:8000 --name LLM -e NVIDIA_DRIVER_CAPABILITIEScompute,utility -e NVIDIA…

英语每日一句

目录 每日一句今日单词知识加餐 每日一句 We will fail when we fail to try. 当我们不努力的时候&#xff0c;我们就会失败。 —— 罗莎帕克斯 今日单词 fail /feɪl/ v. 不及格&#xff1b;失败&#xff1b;未能&#xff08;做到&#xff09;&#xff1b;失灵&#xff1b…

若点集A=B则A必能恒等变换地变为B=A这一几何常识推翻直线(平面)公理

黄小宁 关键词&#xff1a;“更无理”复数 复平面z各点z的对应点z1的全体是z1面。z面平移变为z1面就使x轴⊂z面沿本身平移变为ux1轴。R可几何化为R轴&#xff0c;R轴可沿本身平移变为R′轴&#xff0c;R′轴可沿本身平移变为R″轴&#xff0c;...。直线公理和平面公理使几百年…

SpringMVC学习笔记(二)

五、Rest风格编程 &#xff08;一&#xff09;Rest风格URL规范介绍 1、什么是restful RESTful架构&#xff0c;就是目前最流行的一种互联网软件架构风格。它结构清晰、符合标准、易于理解、扩展方便&#xff0c;所以正得到越来越多网站的采用。REST这个词&#xff0c;是Roy T…

PyTorch深度学习与企业级项目实战-预训练语言模型GPT

【图书推荐】《PyTorch深度学习与企业级项目实战》-CSDN博客 13个PyTorch深度学习案例简介-CSDN博客 《PyTorch深度学习与企业级项目实战&#xff08;人工智能技术丛书&#xff09;》(宋立桓&#xff0c;宋立林)【摘要 书评 试读】- 京东图书 (jd.com) PyTorch深度学习算法与…

自存 关于RestController请求传参数 前端和后端相关

1.Get请求 Get请求传递参数一般是 1.通过PathVariable来映射 URL 绑定的占位符 后端 GetMapping("test/{id}")public R test(PathVariable Integer id){System.out.println(id);return R.success(id);}前端 export function test(id:any){return request({url:&q…

前端web

题目&#xff1a;制作带有下拉悬停菜单的导航栏 效果图 一、先制作菜单栏 <body> <div id"menu"> <div id"container"> <div class"item">游戏1 <div cla…

RabbitMQ教程:工作队列(Work Queues)(二)

RabbitMQ教程&#xff1a;工作队列&#xff08;Work Queues&#xff09;&#xff08;二&#xff09; 一、引言 在快节奏的软件开发世界中&#xff0c;我们经常面临需要异步处理任务的场景&#xff0c;比如在Web应用中处理耗时的图片处理或数据分析任务。这些任务如果直接在用…

Node.js下载安装及环境配置教程

一、进入官网地址下载安装包 Node.js 中文网 选择对应你系统的Node.js版本&#xff0c;这里我选择的是Windows系统、64位 二、安装程序 &#xff08;1&#xff09;下载完成后&#xff0c;双击安装包&#xff0c;开始安装Node.js (2)直接点【Next】按钮&#xff0c;此处可根据…

免费,WPS Office教育考试专用版

WPS Office教育考试专用版&#xff0c;不仅满足了考试需求&#xff0c;更为教育信息化注入新动力。 https://pan.quark.cn/s/609ef85ae6d4

将 HTML 转换为 JSX:JSX 和 JSX 规则

JSX 是 JavaScript 的语法扩展。您可以在 JavaScript 文件中编写 HTML 格式。 它基于 Web、Html、Css 和 JavaScript。Web 开发人员将页面内容分别编写为 Html 文件&#xff0c;将设计编写为 Css 文件&#xff0c;将逻辑编写为 JavaScript 文件。 须知 &#xff1a; JSX 是一个…

数据结构-二叉树及其遍历

🚀欢迎来到我的【数据结构】专栏🚀 🙋我是小蜗,一名在职牛马。🐒我的博客主页​​​​​​ ➡️ ➡️ 小蜗向前冲的主页🙏🙏欢迎大家的关注,你们的关注是我创作的最大动力🙏🙏🌍前言 本篇文章咱们聊聊数据结构中的树,准确的说因该是只说一说二叉树以及相…

活动|华院计算作为联盟理事单位出席进博会全球人工智能合作论坛

第七届中国国际进口博览会&#xff08;进博会&#xff09;于11月5日至10日在上海举行&#xff0c;作为本次进博会的重要配套活动&#xff0c;首届人工智能全球合作论坛也于9日圆满落幕。本次论坛由全球招商中心委员会、人工智能全球合作论坛组委会主办&#xff0c;中国国际科技…

Selective attention improves transformer详细解读

Selective attention improves transformer Google 2024.10.3 一句话&#xff1a;简单且无需额外参数的选择性注意力机制&#xff0c;通过选择性忽略不相关信息并进行上下文剪枝&#xff0c;在不增加计算复杂度的情况下显著提升了Transformer模型的语言建模性能和推理效率。 论…

shell脚本(1)

声明&#xff1a;学习视频来自b站up主 泷羽sec&#xff0c;如涉及侵权马上删除文章 感谢泷羽sec 团队的教学 视频地址&#xff1a;shell脚本&#xff08;1&#xff09;脚本创建执行与变量使用_哔哩哔哩_bilibili 本文主要讲解shell脚本的创建、执行和变量的使用。 一、脚本执行…

本地 / 网络多绑定用例总结

原文连接&#xff1a;AUTOSAR_EXP_ARAComAPI的7章笔记&#xff08;4&#xff09; 情景设定 在前一节的基础上&#xff0c;假设有类似情景&#xff0c;区别在于服务实例 2 位于与 AP 产品相同以太网的不同 ECU 上&#xff0c;服务消费者及其代理驻留在 AP 产品 ECU 上。因以太网…