• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

01MySQL内核分析-The Skeleton of the Server Code

开发技术 开发技术 1周前 (06-23) 16次浏览

摘要

这个官方文档一段对MySQL内核分析的一个向导。是对MySQL一条insert语句写入到MySQL数据库的分析。
但是,对于MySQL 5.7版本来说,基本上都是写入到innodb引擎。但也还是有借鉴意义,大的框架没有太大变化。
后面的文档,会通过mysqld –debug 和gdb等工具,通过分析mysqld.trace来分析insert语句在MySQL 5.7中怎么写入数据库

官方文档给出的一段结构,如下:

/sql/mysqld.cc
/sql/sql_parse.cc
/sql/sql_prepare.cc
/sql/sql_insert.cc
/sql/ha_myisam.cc
/myisam/mi_write.c

上述梳理一个过程,是说从客户段执行一条简单的insert语句,然后到达MySQL服务器端,并通过MyISAM存储层。写入到MyISAM文件的过程。

由于,我们现在的主流都是InnoDB存储引擎,所以我们分析的写入到存储层应该是InnoDB的源代码。但是上述的一个框架也有借鉴意义。虽然,走的是InnoDB存储引擎插入数据,但是也还是需要通过SQL层的ha_*这样的接口进行接入。

正题开始!!!!!!!!!!!!!!!!!!!!!!!

第一步,进入MySQL大门的地方。梦开始的地方。众所周知,C语言都是需要main方法作为主入口。而MySQL的主入口如下:

代码位置 /sql/mysqld.cc

  int main(int argc, char **argv)
  {
    _cust_check_startup();
    (void) thr_setconcurrency(concurrency);
    init_ssl();
    server_init();                             // 'bind' + 'listen'
    init_server_components();
    start_signal_handler();
    acl_init((THD *)0, opt_noacl);
    init_slave();
    create_shutdown_thread();
    create_maintenance_thread();
    handle_connections_sockets(0);             // !  这里也代表着我们进入下一个门的地方
    DBUG_PRINT("quit",("Exiting main thread"));
    exit(0);
  }

这里可以看到很多的init_*或者server_init()。通过名字我们可以猜测出,这里做了很多初始化的工作。例如:启动过程中一些初始化的检查和MySQL配置变量的加载和一些组件的初始化等。

这里重要的函数是handle_connections_sockets

继续跟踪 /sql/mysqld.cc

 handle_connections_sockets (arg __attribute__((unused))
  {
     if (ip_sock != INVALID_SOCKET)
     {
       FD_SET(ip_sock,&clientFDs);
       DBUG_PRINT("general",("Waiting for connections."));
       while (!abort_loop)
       {
         new_sock = accept(sock, my_reinterpret_cast(struct sockaddr*)
           (&cAddr),             &length);
         thd= new THD;
         if (sock == unix_sock)
         thd->host=(char*) localhost;
         create_new_thread(thd);            // !
         }

从简易的思维,忽视其他的判断语句。可以看到这里做的是典型的client/server架构。服务器有一个主线程,它总是侦听来自新客户机的请求。一旦它接收到这样的请求,它将分配资源。特别是,主线程将生成一个新线程来处理连接。然后主服务器将循环并侦听新连接——但我们将保留它并跟踪新线程。

这里创建新线程的方法是:create_new_thread(thd);

继续跟踪 /sql/mysqld.cc

  create_new_thread(THD *thd)
  {
    pthread_mutex_lock(&LOCK_thread_count);
    pthread_create(&thd->real_id,&connection_attrib,
        handle_one_connection,                        // !
        (void*) thd));
    pthread_mutex_unlock(&LOCK_thread_count);
  }

可以看到这里获得一个新线程加入一个互斥锁,避免冲突。

继续跟踪 /sql/mysqld.cc

handle_one_connection(THD *thd)
  {
    init_sql_alloc(&thd->mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
    while (!net->error && net->vio != 0 && !thd->killed)
    {
      if (do_command(thd))            // !
        break;
    }
    close_connection(net);
    end_thread(thd,1);
    packet=(char*) net->read_pos;

从这里开始,我们即将脱离mysqld.cc文件,因为我们获得了thread,且分配一小段内存资源,给与我们来处理我们的SQL语句了。

我们会走向何方呢,可以开始观察do_command(thd)方法。

继续跟踪/sql/sql_parse.cc

bool do_command(THD *thd)
{
  net_new_transaction(net);
  packet_length=my_net_read(net);
  packet=(char*) net->read_pos;
  command = (enum enum_server_command) (uchar) packet[0];
  dispatch_command(command,thd, packet+1, (uint) packet_length);
// !
}

其中从这里可以看到,do_command(THD *thd)把它串联起来的是一个叫作THD的东西,也就是thread。所以后面的工作和行为,基本都是通过thread进行牵线搭桥的。

my_net_read函数位于另一个名为net_servlet .cc的文件中。该函数从客户端获取一个包,解压缩它,并去除头部。

一旦完成,我们就得到了一个名为packet的多字节变量,它包含客户端发送的内容。第一个字节很重要,因为它包含标识消息类型的代码。

说明了packet第一个字节很重要。debug也有证据进行一个佐证。

packet_header: Memory: 0x7f7fc000a4b0  Bytes: (4)
21 00 00 00

然后把packet第一个字节和余下的部分传递给dispatch_command

继续跟踪/sql/sql_parse.cc

bool dispatch_command(enum enum_server_command command, THD *thd,
       char* packet, uint packet_length)
{
  switch (command) {
    case COM_INIT_DB:          ...
    case COM_REGISTER_SLAVE:   ...
    case COM_TABLE_DUMP:       ...
    case COM_CHANGE_USER:      ...
    case COM_EXECUTE:
         mysql_stmt_execute(thd,packet);
    case COM_LONG_DATA:        ...
    case COM_PREPARE:
         mysql_stmt_prepare(thd, packet, packet_length);   // !
    /* and so on for 18 other cases */
    default:
     send_error(thd, ER_UNKNOWN_COM_ERROR);
     break;
    }

这里sql_parser .cc中有一个非常大的switch语句

switch语句中代码有:code for prepare, close statement, query, quit, create database, drop database, dump binary log, refresh, statistics, get process info, kill process, sleep, connect, and several minor commands

除了COM_EXECUTE和COM_PREPARE两种情况外,我们删除了所有情况下的代码细节。

可以看到

  • COM_EXECUTE 会调用mysql_stmt_execute(thd,packet);

  • COM_PREPARE 会调用mysql_stmt_prepare(thd, packet, packet_length);

这里就像一个中转站一般,看我们去向什么地方。这里去的门是:COM_PREPARE:mysql_stmt_prepare

跟踪 /sql/sql_prepare.cc

下面是一段prepare的注释

"Prepare:
Parse the query
Allocate a new statement, keep it in 'thd->prepared statements' pool
Return to client the total number of parameters and result-set
metadata information (if any)"

继续回到主线COM_EXECUTE

跟踪/sql/sql_parse.cc

  bool dispatch_command(enum enum_server_command command, THD *thd,
       char* packet, uint packet_length)
  {
  switch (command) {
    case COM_INIT_DB:          ...
    case COM_REGISTER_SLAVE:   ...
    case COM_TABLE_DUMP:       ...
    case COM_CHANGE_USER:      ...
    case COM_EXECUTE:
         mysql_stmt_execute(thd,packet);                   // !
    case COM_LONG_DATA:        ...
    case COM_PREPARE:
         mysql_stmt_prepare(thd, packet, packet_length);
    /* and so on for 18 other cases */
    default:
     send_error(thd, ER_UNKNOWN_COM_ERROR);
     break;
    }

现在“COM_EXECUTE 中的mysql_stmt_execute`是我们关注的重点,我们来看看

跟踪/sql/sql_prepare.cc代码

  void mysql_stmt_execute(THD *thd, char *packet)
  {
    if (!(stmt=find_prepared_statement(thd, stmt_id, "execute")))
    {
      send_error(thd);
      DBUG_VOID_RETURN;
    }
    init_stmt_execute(stmt);
    mysql_execute_command(thd);           // !
  }

这里做一个判断,看是否是execute,然后初始化语句,并开始执行mysql_execute_command(thd);可以看到,是通过thread来调用动作。

跟踪/sql/sql_parse.cc代码

  void mysql_execute_command(THD *thd)
       switch (lex->sql_command) {
       case SQLCOM_SELECT: ...
       case SQLCOM_SHOW_ERRORS: ...
       case SQLCOM_CREATE_TABLE: ...
       case SQLCOM_UPDATE: ...
       case SQLCOM_INSERT: ...                   // !
       case SQLCOM_DELETE: ...
       case SQLCOM_DROP_TABLE: ...
       }

lex 解析sql语句。然后进入SQLCOM_INSERT。

跟踪/sql/sql_parse.cc代码

case SQLCOM_INSERT:
{
  my_bool update=(lex->value_list.elements ? UPDATE_ACL : 0);
  ulong privilege= (lex->duplicates == DUP_REPLACE ?
                    INSERT_ACL | DELETE_ACL : INSERT_ACL | update);
  if (check_access(thd,privilege,tables->db,&tables->grant.privilege))
    goto error;
  if (grant_option && check_grant(thd,privilege,tables))
    goto error;
  if (select_lex->item_list.elements != lex->value_list.elements)
  {
    send_error(thd,ER_WRONG_VALUE_COUNT);
    DBUG_VOID_RETURN;
  }
  res = mysql_insert(thd,tables,lex->field_list,lex->many_values,
                     select_lex->item_list, lex->value_list,
                     (update ? DUP_UPDATE : lex->duplicates));
// !
  if (thd->net.report_error)
    res= -1;
  break;
}

对于插入数据,我们要做的第一件事情是:检查用户是否具有对表进行插入的适当特权,服务器通过调用check_access和check_grant函数在这里进行检查。

有了权限才可以做【插入】动作。

我们可以导航 /sql 目录,如下:

Program Name          SQL statement type
------------          ------------------
sql_delete.cc         DELETE
sql_do.cc             DO
sql_handler.cc        HANDLER
sql_help.cc           HELP
sql_insert.cc         INSERT            // !
sql_load.cc           LOAD
sql_rename.cc         RENAME
sql_select.cc         SELECT
sql_show.cc           SHOW
sql_update.cc         UPDATE

sql_insert.cc是具体执行插入的操作。

上面的mysql_insert() 的方法具体实现,在sql_insert.cc文件中。

跟踪 /sql/sql_insert.cc代码

 int mysql_insert(THD *thd,TABLE_LIST *table_list, List<Item> &fields,
        List<List_item> &values_list,enum_duplicates duplic)
  {
    table = open_ltable(thd,table_list,lock_type);
    if (check_insert_fields(thd,table,fields,*values,1) ||
      setup_tables(table_list) ||
      setup_fields(thd,table_list,*values,0,0,0))
      goto abort;
    fill_record(table->field,*values);
    error=write_record(table,&info);                 // !
    query_cache_invalidate3(thd, table_list, 1);
    if (transactional_table)
      error=ha_autocommit_or_rollback(thd,error);
    query_cache_invalidate3(thd, table_list, 1);
    mysql_unlock_tables(thd, thd->lock);
    }

这里就要开始,打开一张表。然后各种检查,看插入表的字段是否有问题。不行就abort。

然后,开始填充记录数据。最终调用write_record 写记录的方法。

由于write_record 会对应不同的存储引擎,所以这里有分支的。我这里讲解两种

继续跟踪/sql/sql_insert.cc

  int write_record(TABLE *table,COPY_INFO *info)
  {
    table->file->write_row(table->record[0];           // !
  }

终于,要写文件了。调用那个存储引擎呢?看handler.h

  /* The handler for a table type.
     Will be included in the TABLE structure */

  handler(TABLE *table_arg) :
table(table_arg),active_index(MAX_REF_PARTS),
    ref(0),ref_length(sizeof(my_off_t)),
block_size(0),records(0),deleted(0),
    data_file_length(0), max_data_file_length(0),
index_file_length(0),
    delete_length(0), auto_increment_value(0), raid_type(0),
    key_used_on_scan(MAX_KEY),
    create_time(0), check_time(0), update_time(0), mean_rec_length(0),
    ft_handler(0)
    {}
  ...
  virtual int write_row(byte * buf)=0;

写入之MyISAM的代码路径

官方文档默认调用的是 ha_myisam::write_row

代码 /sql/ha_myisam.cc

如下:

int ha_myisam::write_row(byte * buf)
{
  statistic_increment(ha_write_count,&LOCK_status);
   /* If we have a timestamp column, update it to the current time */
   if (table->time_stamp)
    update_timestamp(buf+table->time_stamp-1);
   /*
  If we have an auto_increment column and we are writing a changed row
    or a new row, then update the auto_increment value in the record.
  */
  if (table->next_number_field && buf == table->record[0])
    update_auto_increment();
  return mi_write(file,buf);     // !
}

这些以字母ha开头的程序是处理程序的接口,而这个程序是myisam处理程序的接口。我们这里就开始调用MyISAM了。

可以看到这里调用了mi_write(file,buf);

跟踪/myisam/mi_write.c

int mi_write(MI_INFO *info, byte *record)
{
  _mi_readinfo(info,F_WRLCK,1);
  _mi_mark_file_changed(info);
  /* Calculate and check all unique constraints */
  for (i=0 ; i < share->state.header.uniques ; i++)
  {
    mi_check_unique(info,share->uniqueinfo+i,record,
      mi_unique_hash(share->uniqueinfo+i,record),
      HA_OFFSET_ERROR);
  }

  ... to be continued in next snippet

这里有很多唯一性的校验,继续看下面

 ... continued from previous snippet

  /* Write all keys to indextree */
  for (i=0 ; i < share->base.keys ; i++)
  {
    share->keyinfo[i].ck_insert(info,i,buff,
      _mi_make_key(info,i,buff,record,filepos)
  }
  (*share->write_record)(info,record);
  if (share->base.auto_key)
    update_auto_increment(info,record);
}

这里就是我们写入到文件的地方。至此,MySQL的插入操作结束。

路径为:

main in /sql/mysqld.cc
handle_connections_sockets in /sql/mysqld.cc
create_new_thread in /sql/mysqld.cc
handle_one_connection in /sql/sql_parse.cc
do_command in /sql/sql_parse.cc
dispatch_command in /sql/sql_parse.cc
mysql_stmt_execute in /sql/sql_prepare.cc
mysql_execute_command in /sql/sql_parse.cc
mysql_insert in /sql/mysql_insert.cc
write_record in /sql/mysql_insert.cc
ha_myisam::write_row in /sql/ha_myisam.cc
mi_write in /myisam/mi_write.c

1.进入主函数入口

2.建立socket connection的请求

3.创建一个新的线程

4.处理线程,分配内存资源

5.do_command,是获取packet第一字节,看做什么操作,并接受余下字节。

6.dispatch_command,分发操作,这里分发的是insert。

7.mysql_stmt_execute,检查是否为execute,初始化,准备做execute动作。

8.mysql_execute_command ,lex解析SQL语句,进入到SQLCOM_INSERT

9.mysql_insert ,开始做插入操作。调用write_record

10.write_record,准备写入,看调用哪个存储引擎,写入前期准备工作

11.ha_myisam::write_row,ha_myisam进行插入写入。

12.mi_write,最后做写入操作。

文献参考:https://dev.mysql.com/doc/internals/en/guided-tour-skeleton.html


喜欢 (0)