活动主持人

个人签名

169篇博客

FastDCS 分布式计算系统介绍(下)

活动主持人2018-06-11 15:23
 
  // 5.FastDCS会主动调用这个方法,将任务的计算结果导出到外部存储系统
  // 该方法只会由选举为'Primary master'的节点进行调用
  // 开发思路:根据FastDCS导出的计算结果,将单词转换成hash码作为唯一标示,将结果进行保存
  // 如果数据库中已经存在该单词所对应的的hash码,则进行累加处理,如果不存在直接进行保存处理
  bool ExportTaskUDF(vector<FdcsTask> tasks) {
    // 变量tasks中保存了数据结构FdcsTask的数组,变量中的数据结构类似于
    // tasks = {[
    //   FdcsTask = {task_id=id1, key_values_pairs=[{单词1,count1},{单词11,count11}, ...]},
    //   FdcsTask = {task_id=id2, key_values_pairs=[{单词2,count2},{单词22,count22}, ...]},
    //   ...
    // ]}

    // 初始化mysql查询对象
    MysqlQuery query(&mysql_connect_);

    // 循环获取tasks数组中的每一个计算结果
    for (int i = 0; i < tasks.size(); ++i) {
      // 打印计算任务的ID
      LOG(INFO) << "task_id = " << tasks[i].task_id();

      std::vector<KeyValuePair> key_values;

      // 每一个计算任务的结果保存在一对多键值对'key_values_pairs'变量中
      for (int ii = 0; ii < tasks[i].key_values_pairs_size(); ++ii) {
        // key_values_pair 中的数据结构类似于
        // FdcsTask = {task_id=id1, key_values_pairs=[{单词1,count1},{单词11,count11}, ...]}
        KeyValuesPair key_values_pair = tasks[i].key_values_pairs(ii);

        // 通过'ReadRecord'函数将'key_values_pairs'变量中的内容读取到一对一键值对'key_values'数组中
        ReadRecord(&key_values_pair, &key_values);
        for (int iii = 0; iii < key_values.size(); ++iii) {
          // 每一个计算任务结果中通过键值对保存了当前任务的
          // 每一个单词的名称和出现的次数
          // 数据结果类似[key=单词1, value=count1]
          string word = key_values[iii].key();
          word = StringReplace(word, "\"", "\\\"");
          word = StringReplace(word, "'", "\\\'");
          int count = KeyToInt32(key_values[iii].value());

          // 将单词转换成Hash编码
          uint32 word_id = BKDRHash(word);

          // 查找'dict_word'表中已有的数据中是否存在该单词,
          // 如果有进行累加处理,如果没有直接进行保存
          string select_sql;
          SStringPrintf(&select_sql, 
            "SELECT count FROM dict_word WHERE word_id = %lu;", word_id);        
          MysqlResult *sql_result = NULL;
          if (!query.Execute(select_sql.data())) {
            LOG(ERROR) << "SELECT count FROM dict_word faildure!";
            continue;
          }
          sql_result = query.Store();

          int old_count = 0;
          if (sql_result->RowsSize() > 0) {
            MysqlRow row = sql_result->FetchRow();
            old_count = (int)row[0];
          }
          query.FreeResult(sql_result);

          // 拼装SQL语句,如果 dict_word 表中已存在该单词则进行累加处理,否则直接进行保存
          string insert_sql;
          if (0 == old_count) {
            insert_sql = StringPrintf("INSERT INTO dict_word \
              (word_id, word, count, create_time) VALUES (%lu, '%s', %d, now())", \
              word_id, word.data(), count);
          } else {
            insert_sql = StringPrintf("UPDATE dict_word SET count = %d, \
              update_time = now() WHERE word_id = %lu", \
              old_count + count, word_id);
          }
          if ((old_count + count) < 0 || old_count < 0 || count < 0) {
            LOG(INFO) << "old_count = " << old_count;
            LOG(INFO) << "count = " << count;
            LOG(INFO) << "key_values[iii] = " << key_values[iii].value();
            LOG(INFO) << "insert_sql = " << insert_sql;
            continue;
          }

          // 执行SQL语句
          if (!query.TryExecute(insert_sql.data())) {
            LOG(ERROR) << "INSERT dict_word[" << word_id 
                       << ", word = " << word << "] faildure!";
            continue;
          }
        }
      }
    }
    return true;
  };

private:
  // 使用FastDCS提供的mysql封装类
  MysqlConnection mysql_connect_;
};
// 6.注册宏必须填写正确,'DictMaster'是你自定义的Master类名称
REGISTER_FASTDCS_TRACKER(DictMaster);

2.1.3 DictWorker开发说明

// 1.将自定义类'DictWorker'派生在'Worker'之上
class DictWorker : public Worker {
  public:
  // 2.Worker节点启动后,会调用这个方法
  void InitialTracker(struct settings_s settings) {
    // 初始化自定义变量 mysql_connect_,连接mysql数据库
    mysql_connect_.Connect(settings.mysql_database, 
                            settings.mysql_host, 
                            settings.mysql_user, 
                            settings.mysql_passwd, 
                            settings.mysql_port);

    // 必须调用基类中的InitialTracker函数,初始化Worker类
    Worker::InitialTracker(settings);
  }

  // 3.Worker节点退出时,会调用这个方法
  void FinalizeTracker() {
    // 由于自定义变量 mysql_connect_ 是自动释放资源的,所以没有自定义变量需要在这里释放

    // 必须调用基类中的FinalizeTracker函数,释放Worker类
    Worker::FinalizeTracker();
  }

  // 4.FastDCS中有需要计算的任务的时候会主动调用这个方法,开发者自行实现自定义的计算方法
  // 开发思路:根据FastDCS分发的 task 数据中的任务ID,从外部存储系统中获取该任务对应的文本
  // 将文本内容根据单词之间的空格进行拆分,并计算每个单词在该任务的文本中出现的次数
  // 计算结果完成后,FastDCS会自动将这个任务分发给 DictMaster 节点,进行数据更新
  bool ComputingUDF(FdcsTask &task) {
    // task 是由 DictMaster 分发给 DictWorker 的计算任务,task 中的数据结构类似 :
    // task = {task_id=id1, key_values_pairs=[{单词1,count1},{单词11,count11}, ...]}

    // 根据计算任务ID从外部存储系统mysql中获取相应的文本内容
    std::string select_sql, text;
    SStringPrintf(&select_sql, "SELECT text FROM dict_task WHERE task_id = %s;", 
        task.task_id().data());
    LOG(INFO) << select_sql;

    MysqlQuery query(&mysql_connect_);
    if (!query.Execute(select_sql.data())) {
      LOG(ERROR) << "select text from mysql failduer!";
      return false;
    }

    MysqlResult *sql_result = query.Store();
    LOG(INFO) << "sql_result->RowsSize() = " << sql_result->RowsSize();
    if (sql_result->RowsSize() > 0) {
      MysqlRow row = sql_result->FetchRow();

      text.append((char*)row[0]);
    }
    query.FreeResult(sql_result);

    // 根据文本之间的空格,拆分文本,将每个单词和出现的次数保存到'word_count'变量中
    std::map<string /*word*/, int /*count*/> word_count;
    std::vector<std::string> words;
    SplitStringUsing(text, " ", &words);
    for (int i = 0; i < words.size(); ++i) {
      if (true == words[i].empty()) continue;

      if (word_count.end() == word_count.find(words[i])) {
        word_count[words[i]] = 1;
      } else {
        word_count[words[i]] = word_count[words[i]] + 1;    
      }
    }

    // 将每个单词和出现的次数保存到'task'中的一对多键值对'KeyValuePair'变量中
    KeyValuesPair *key_values_pair = task.add_key_values_pairs();
    for (WordCountIter it = word_count.begin(); it != word_count.end(); ++it) {
      // 将每个单词和出现的次数保存到一对一键值对'key_value'中
      KeyValuePair key_value;
      key_value.set_key(it->first);
      string count = Int32ToKey(it->second);
      if (true == count.empty() || it->second < 0 || KeyToInt32(count) < 0) {
        LOG(INFO) << "it" << it->first << ", " << it->second << ", " << KeyToInt32(count);
        abort();
      }
      key_value.set_value(count);

      // 将 key_value 变量保存到'task'中的一对多键值对'KeyValuePair'变量中
      WriteRecord(key_values_pair, key_value);
    }

    // 你可以通过休眠5秒,来模拟较复杂的计算
    // sleep(5);

    // 'task'变量已经保存了你的计算结果,准备发送到'DictMaster'节点中
    return true;
  }
private:
  // 使用FastDCS提供的mysql封装类
  MysqlConnection mysql_connect_;

  typedef std::map<string /*word*/, int /*count*/>::iterator WordCountIter;
};
// 5.注册宏必须填写正确,'DictWorker'是你自定义的Worker类名称
REGISTER_FASTDCS_TRACKER(DictWorker);

2.1.4 DictMock开发说明

// 创建开发实例中的表结构
void CreateDictTable();
// 将dict.txt文件中的内容拆分成计算任务保存到数据库中
void CreateDictData();
// 清空开发实例中的表中的所有数据
void TruncateDictData();
// 清除开发实例中的表结构和数据
void DropDictTable();

// 使用FastDCS提供的mysql封装类
MysqlConnection connection;

int main(int argc, char const *argv[]) {
  // 你可以修改数据库名称、服务IP、用户名、用户密码和数据库端口连接你的mysql数据库
  bool conn = connection.Connect("FastDCS", "localhost", "FastDCS", "fastdcs", 3306);
  if (!conn)
  LOG(ERROR) << "connect mysql faildure!";

  // 接受用户的输入,进行相关的操作
  char cmd[20];
  printf("----- 0:quit!       -----\n");
  printf("----- 1:create dict table  -----\n");
  printf("----- 2:create dict data   -----\n");
  printf("----- 3:truncate dict data -----\n");
  printf("----- 4:drop dict table  -----\n");

  while (strncmp("0", cmd, 1) != 0) {
  fgets(cmd, 20, stdin);
  if (strncmp("1", cmd, 1) == 0) {
    CreateDictTable();
  } else if (strncmp("2", cmd, 1) == 0) {
    CreateDictData();
  } else if (strncmp("3", cmd, 1) == 0) {
    TruncateDictData();
  } else if (strncmp("4", cmd, 1) == 0) {
    DropDictTable();
  }
  }

  return 0;
}

// 创建开发实例中的表结构
void CreateDictTable() {
  MysqlQuery query(&connection);
  if (!query.Execute("SELECT * FROM dict_task;")) {
  query.Execute("CREATE TABLE dict_task (                 \
                task_id VARCHAR(32) NOT NULL PRIMARY KEY, \
                task_status INTEGER default 0,            \
                text VARCHAR(20480),                      \
                create_time DATETIME,                     \
                update_time DATETIME                      \
              )ENGINE=InnoDB DEFAULT CHARSET=utf8;");
  }
  LOG(INFO) << "CREATE TABLE dict_task";

  if (!query.Execute("SELECT * FROM dict_word;")) {
  query.Execute("CREATE TABLE dict_word (                      \
                word_id INTEGER UNSIGNED NOT NULL PRIMARY KEY, \
                word VARCHAR(256) NOT NULL,                    \
                count INTEGER default 0,                       \
                create_time DATETIME,                          \
                update_time DATETIME                           \
              )ENGINE=InnoDB DEFAULT CHARSET=utf8;");
  }
  LOG(INFO) << "CREATE TABLE dict_word";
}

// 清除开发实例中的表结构和数据
void DropDictTable() {
  MysqlQuery query(&connection);
  if (!query.TryExecute("DROP TABLE dict_task;")) {
  LOG(ERROR) << "DROP TABLE dict_task faildure!";
  } else {
  LOG(INFO) << "DROP TABLE dict_task success!";
  }

  if (!query.TryExecute("DROP TABLE dict_word;")) {
  LOG(ERROR) << "DROP TABLE dict_word faildure!";
  } else {
  LOG(INFO) << "DROP TABLE dict_word success!";
  }
}

// 将dict.txt文件中的内容拆分成计算任务保存到数据库中
void CreateDictData() {
  float time_use = 0;
  struct timeval start, end;

  gettimeofday(&start, NULL);
  LOG(INFO) << "start.tv_sec:" << start.tv_sec;
  LOG(INFO) << "start.tv_usec:" << start.tv_usec;

  FILE* file = NULL;
  char* line = NULL;
  size_t len = 0;
  size_t read;

  file = fopen("dict.txt", "r");
  CHECK(file);

  int32_t count = 1;
  std::string sql, line_text;

  MysqlQuery query(&connection);
  while ((read = getline(&line, &len, file)) != -1) {
    line_text = line;
    line_text = StringReplace(line_text, "\"", "\\\"");
    line_text = StringReplace(line_text, "'", "\\\'");

    sql = "INSERT INTO dict_task(task_id, task_status, text, create_time) VALUES ('";
    sql += Int32ToKey(count);
    sql += "' , 1, '";
    sql += line_text;
    sql += "', now())";

    if (!query.TryExecute(sql.data())) {
    LOG(ERROR) << "INSERT INTO dict_task[" << count << "] faildure!";
    fclose(file);
    return;
    }
    count ++;
  }
  LOG(INFO) << "file line count = " << count;
  if (line)
    free(line);

  fclose(file);

  gettimeofday(&end, NULL);
  LOG(INFO) << "end.tv_sec:" << end.tv_sec;
  LOG(INFO) << "end.tv_usec:" << end.tv_usec;
  time_use = (end.tv_sec-start.tv_sec)*1000000+(end.tv_usec-start.tv_usec); // ms
  LOG(INFO) << "time_use is " << time_use/1000000 << "s"; 
}

// 清空开发实例中的表中的所有数据
void TruncateDictData() {
  MysqlQuery query(&connection);
  if (!query.TryExecute("TRUNCATE TABLE dict_task;")) {
  LOG(ERROR) << "TRUNCATE TABLE dict_task faildure!";
  } else {
  LOG(INFO) << "TRUNCATE TABLE dict_task success.";   
  }

  if (!query.TryExecute("TRUNCATE TABLE dict_word;")) {
  LOG(ERROR) << "TRUNCATE TABLE dict_word faildure!";
  } else {
  LOG(INFO) << "TRUNCATE TABLE dict_word success.";   
  }
}

// 查看开发实例数据的一些常用脚本
// SELECT * FROM dict_word order by count desc limit 0, 10;
// SELECT count(1), MAX(count) FROM dict_word;
// SELECT count(1), task_status FROM dict_task GROUP BY task_status;

2.2.1 实例部署

  1. 按照/src/demo/dict.sql文件中提供的SQL脚本,在mysql中建立用于保存每个计算任务的dict_task表以及用于保存单词排序结果的dict_word表;
  2. 参照《1.6.1 编译和安装说明》将单词排序实例在Linux环境下进行编译和安装,实例程序将会安装到/home/liuxun/FastDCS/demo中,其中liuxun可以修改成你的工作目录;
  3. 通过运行dict_mock程序将dict.txt文件拆分成计算任务,保存到mysql数据库中;
  4. 你可以在本机上运行多个 master 和 worker 节点程序来模拟一个FastDCS服务集群,或者将 master 和 worker 节点程序复制到不同的Linux服务器上;
  5. 在编译安装的目录中,可以通过修改配置文件,让FastDCS系统以应用程序的方式运行并将日志输出到控制台

需要修改的配置项

run_by_daemon = false
max_core_file = true
info_log = 
warn_log = 
err_log  = 

// 服务集群的IP地址,可以配置一个或者多个,必须和你的master节点的tracker_server相对应
tracker_group = 127.0.0.1:32301;127.0.0.1:32302;127.0.0.1:32303

// 当前节点的IP地址,如果在同一台Linux服务器上需要配置不同的端口
tracker_server = 127.0.0.1:32301

// mysql参数,需要根据你的运行环境进行配置
mysql_database = FastDCS
mysql_host = localhost
mysql_user = FastDCS
mysql_passwd = fastdcs
mysql_port = 3306

使用多个控制台分别运行master和worker节点程序

$ ./master_32301.sh

$ ./master_32302.sh

$ ./master_32303.sh

$ ./worker.sh

如果配置正确,应该就可以看见其中一台 master 节点不停的获取计算任务,分配给其他的 master 节点,worker 节点接受计算任务的调度进行计算,你可以通过/src/demo/dict.sql中提供的SQL工具脚本在mysql中查看数据的运行情况。

2.2 如何进行实际运用

通过对单词排序应用实例的讲解,将分布式计算系统FastDCS中的计算任务分解、计算任务导入、任务的计算、计算结果保存到外部存储系统等方面进行了阐述,在将FastDCS运用于实际业务的开发中时和对单词排序应用实例的开发流程完全一致,所需要自定义的功能模块有以下区别:

  1. DictMock程序在实际运用中一般都是计算任务的输入源,可能是一个固定大小和时间长度的计算任务,也可能是一个在线业务永不停息进行输入的数据流,例如:2008-2012年瑞读网每天大量的用户上传的文件;
  2. 外部存储系统一般分为二个部分,一个是用于保存计算任务的数据库,如:mysql、hbase等。另一个是用于保存计算数据的存储系统,如:FastDFS、hdfs等;
  3. DictMaster程序中的计算任务的导入和导出功能的具体实现,需要根据实际运用的具体情况进行自定义开发;
  4. DictWorker程序中的计算功能的具体实现,需要根据实际运用的具体情况进行自定义开发;

关于作者

FastDCS 的作者刘勋,有着 17 年的软件行业和互联网行业开发和架构经验,2008年-2012年曾创办瑞读网,是国内最早期的数字出版云服务提供商,在创业期间开发的 FastDCS 分布式计算系统为瑞读网的客户提供了长期稳定的服务。

FastDCS 分布式计算系统介绍(上)

本文已由作者授权网易云社区发布,未经允许不得转载。