这是在工作中用到脚本,按设计思路是任何数据源都可以导入!
写的不太好,大佬莫介意!

drop table if exists `jdbc`;
CREATE TABLE if not exists `jdbc` (
  `jdbc_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `jdbc_url` text COMMENT 'jdbc连接',
  `user_name` text COMMENT '用户名',
  `pass_word` text COMMENT '密码',
  `state` int(11) DEFAULT '0' COMMENT '状态',
  `desc_name` text COMMENT '注释',
  `jdbc_type` varchar(20) DEFAULT NULL COMMENT 'jdbc类型',
  PRIMARY KEY (`jdbc_id`)
) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8;

drop table if exists `tablename_pk`;
CREATE TABLE if not exists  `tablename_pk` (
  `table_name` varchar(50) NOT NULL COMMENT '表名',
  `kudu_pk_name` varchar(500) DEFAULT NULL COMMENT 'kudu主键名',
  `mysql_pk_name` varchar(500) DEFAULT NULL COMMENT 'mysql主键名',
  `mysql_key_name` varchar(500) DEFAULT NULL COMMENT 'mysql key名',
  `mysql_index_name` varchar(500) DEFAULT NULL COMMENT 'mysql 索引名',
  PRIMARY KEY (`table_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

drop table if exists `oralce_special_field`;
CREATE TABLE if not exists `oralce_special_field` (
  `table_name` varchar(100) NOT NULL COMMENT '表名',
  `special_field` varchar(100) NOT NULL COMMENT '特殊字段',
  `field_type` varchar(20) DEFAULT NULL COMMENT '字段类型',
  PRIMARY KEY (`table_name`,`special_field`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

drop table if exists `import_type`;
CREATE TABLE if not exists `import_type` (
  `import_type_id` int(11) NOT NULL DEFAULT '0' COMMENT '主键',
  `tmp_table_name` text COMMENT '临时库名',
  `desc_name` text COMMENT '注释',
  PRIMARY KEY (`import_type_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

drop table if exists `import_str_sql`;
CREATE TABLE if not exists `import_str_sql` (
  `str_sql_id` int(11) NOT NULL COMMENT '主键',
  `str_sql` text COMMENT 'sql语句',
  `desc_name` text COMMENT '注释',
  PRIMARY KEY (`str_sql_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

drop table if exists `import_config_incr_prod`;
CREATE TABLE if not exists `import_config_incr_prod` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `import_type_id` int(11) NOT NULL DEFAULT '0' COMMENT '临时库id',
  `data_source_jdbc_id` int(11) NOT NULL COMMENT '数据源jdbc id',
  `data_source_name` text NOT NULL COMMENT '数据源表名',
  `source_str_sql_id` int(11) DEFAULT NULL COMMENT '提取数据sql',
  `special_field` int(11) DEFAULT '0' COMMENT '是否包含特殊字段',
  `data_sink_jdbc_id` int(11) NOT NULL COMMENT '结果表jdbc id',
  `data_sink_name` text NOT NULL COMMENT '结果表名',
  `sink_str_sql_id` int(11) DEFAULT NULL COMMENT '结果表sql',
  `field_type` varchar(20) NOT NULL DEFAULT 'kudu_field_type1' COMMENT '字段类型',
  `shell_name` varchar(20) DEFAULT 'oracle_to_kudu' COMMENT '脚本名',
  `desc_name` text COMMENT '注释',
  `state` int(11) NOT NULL DEFAULT '0' COMMENT '状态',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=267 DEFAULT CHARSET=utf8;

drop table if exists `field_type`;
CREATE TABLE if not exists `field_type` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `table_name` varchar(100) NOT NULL COMMENT '表名',
  `field_name` varchar(500) NOT NULL COMMENT '字段名',
  `mysql_field_type` varchar(50) DEFAULT NULL COMMENT 'mysql字段类型',
  `kudu_field_type1` varchar(50) DEFAULT NULL COMMENT 'kudu字段类型1(跟源表保持一致)',
  `kudu_field_type2` varchar(50) DEFAULT NULL COMMENT 'kudu字段类型2',
  `kudu_state` int(11) DEFAULT NULL COMMENT 'kudu表中状态',
  `state` int(11) DEFAULT NULL COMMENT '状态',
  PRIMARY KEY (`table_name`,`field_name`),
  KEY `id` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10314 DEFAULT CHARSET=utf8;

增量到处到大数据平台脚本 oracle_to_kudu_incr:

#!/bin/bash
#set -x
#执行多个sql文件
#export PGSQL_HOME=/usr/local/pgsql
#export MYSQL_HOME=/usr/local/mysql
export ORACLE_SID=orcl
export ORACLE_OWNER=oracle
export ORACLE_HOME=/home/batsom/instantclient_12_1/
export ORACLE_HOME_LISTNER=$ORACLE_HOME
PATH=$PATH:$HOME/bin:$ORACLE_HOME
PATH=$PATH:$HOME/bin:$MYSQL_HOME/bin
#PATH=$PATH:$HOME/bin:$PGSQL_HOME/bin
export PATH
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${ORACLE_HOME}
export NLS_LANG=AMERICAN_AMERICA.AL32UTF8
export HADOOP_USER_NAME=hdfs
export LANG=zh_CN.gbk

database_type="$1"
sd_date="$2"

############################启用脚本个数#######################
number=""
shell_name="oracle_to_kudu_incr"
sql_config="_data_import"
confile_file_dir="/home/batsom/shell"
shell_dir="/home/batsom/shell"
log_dir="${shell_dir}/log"
mkdir -p ${log_dir}
fun_lib_dir="/home/batsom/shell/inc"
out_file_dir="/home/batsom/out_file"
mkdir -p ${out_file_dir}
in_file_dir="/home/batsom/in_file"
mkdir -p ${in_file_dir}
exec_log="${log_dir}/${shell_name}${database_type}_exec.log"
err_log="${log_dir}/${shell_name}${database_type}_err.log"
############################自定义参数区#######################
#orgid="1"
#orgname="删除表"
###########################读取mysql配置#######################
mysql_config="${confile_file_dir}/readfile/mysql_config${sql_config}.txt"
mysql_host=`cat ${mysql_config}|awk -F '#' '{print $1}'`
mysql_port=`cat ${mysql_config}|awk -F '#' '{print $2}'`
mysql_dbname=`cat ${mysql_config}|awk -F '#' '{print $3}'`
mysql_user=`cat ${mysql_config}|awk -F '#' '{print $4}'`
mysql_passwd=`cat ${mysql_config}|awk -F '#' '{print $5}'`    
###########################读取oracle配置######################
#oracle_config="${confile_file_dir}/readfile/oracle_config${sql_config}.txt"
#oracle_host=`cat ${oracle_config}|awk -F '#' '{print $1}'`
#oracle_port=`cat ${oracle_config}|awk -F '#' '{print $2}'`
#oracle_orcl=`cat ${oracle_config}|awk -F '#' '{print $3}'`
#oracle_user=`cat ${oracle_config}|awk -F '#' '{print $4}'`
#oracle_passwd=`cat ${oracle_config}|awk -F '#' '{print $5}'`
########################读取PostgreSQL配置#####################
#免密登录(将密码写在客户端服务器的用户家目录下,创建一个.pgpass文件,并将权限设置为0600,就可以实现了)
#文件的格式如下:
#hostname:port:database:username:password
###############################################################
#psql_config="${confile_file_dir}/readfile/psql_config${sql_config}.txt"
#psql_host=`cat ${mysql_config}|awk -F '#' '{print $1}'`
#psql_port=`cat ${mysql_config}|awk -F '#' '{print $2}'`
#psql_user=`cat ${mysql_config}|awk -F '#' '{print $3}'`
###############################################################

if test ! -f ${fun_lib_dir}/error_resolve.sh -o ! -f ${fun_lib_dir}/function.sh ;then
  exit
fi

source ${fun_lib_dir}/error_resolve.sh
source ${fun_lib_dir}/function.sh

exec_num=`ps aux|grep "${confile_file_dir}/${shell_name}.sh"|grep "${database_type}"|grep -v "grep"|wc -l`
if test ${exec_num} -gt 4 ;then
   exit
fi

start_time_s=`date +%s`

str_sql="SELECT CONCAT_WS('#',ic.id,it.tmp_table_name,ic.data_source_name,data_source_jdbc.jdbc_url,data_source_jdbc.user_name,data_source_jdbc.pass_word,iss1.str_sql,ic.data_sink_name,data_sink_jdbc.jdbc_url,data_sink_jdbc.user_name,data_sink_jdbc.pass_word,ic.field_type,ic.special_field,iss2.str_sql) AS return_txt FROM import_config_incr_${database_type} ic LEFT JOIN import_type it ON ic.import_type_id=it.import_type_id LEFT JOIN jdbc data_source_jdbc ON ic.data_source_jdbc_id=data_source_jdbc.jdbc_id LEFT JOIN import_str_sql iss1 ON ic.source_str_sql_id=iss1.str_sql_id LEFT JOIN jdbc data_sink_jdbc ON ic.data_sink_jdbc_id=data_sink_jdbc.jdbc_id LEFT JOIN import_str_sql iss2 ON ic.sink_str_sql_id=iss2.str_sql_id WHERE ic.state=1 AND ic.shell_name='oracle_to_kudu_incr' order by  ic.id " 
return_txt=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
      if test -z "${return_txt}" ;then
          exit
      fi

while read line
do
   #主键
   id=`echo "${line}"|awk -F '#' '{print $1}'`
   #hive临时库
   hive_database=`echo "${line}"|awk -F '#' '{print $2}'`
   #数据源表名
   data_source_name=`echo "${line}"|awk -F '#' '{print $3}'| tr a-z A-Z`
   #数据源jdbc
   data_source_jdbc=`echo "${line}"|awk -F '#' '{print $4}'`
   #数据源用户名
   source_user_name=`echo "${line}"|awk -F '#' '{print $5}'`
   #数据源密码
   source_pass_word=`echo "${line}"|awk -F '#' '{print $6}'`
   #数据源sql
   source_str_sql=`echo "${line}"|awk -F '#' '{print $7}'`
   #结果源表名
   data_sink_name=`echo "${line}"|awk -F '#' '{print $8}'`
   #结果源jdbc
   data_sink_jdbc=`echo "${line}"|awk -F '#' '{print $9}'`
   #结果源用户名
   sink_user_name=`echo "${line}"|awk -F '#' '{print $10}'`
   #结果源密码
   sink_pass_word=`echo "${line}"|awk -F '#' '{print $11}'`
   #字段类型
   field_type_name=`echo "${line}"|awk -F '#' '{print $12}'`
   #是否包含特殊字段
   special_field=`echo "${line}"|awk -F '#' '{print $13}'`
   #结果源sql
   sink_str_sql=`echo "${line}"|awk -F '#' '{print $14}'`

   sink_database=`echo "${data_sink_name}"|awk -F '.' '{print $1}'`
   sink_tablename=`echo "${data_sink_name}"|awk -F '.' '{print $2}'`

           hadoop fs -rmr hdfs://nameservice1/user/hdfs/${data_source_name}
 
           str_sql="SELECT count(*) as txt FROM field_type WHERE UPPER(table_name)=UPPER('${data_source_name}') AND state=1 ORDER BY id"
           echo "${str_sql}"
           return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`

             if test "${return_txt_tmp}" = "0" ;then
                   echo "insert table field"
                   /usr/bin/sqoop import --hive-import --connect  jdbc:oracle:thin:@${data_source_jdbc} --username ${source_user_name} --password ${source_pass_word}  --table ${data_source_name} --hive-database ${hive_database} --hive-table ${sink_tablename} --hive-drop-import-delims --null-string '\\N' --null-non-string '\\N' --hive-drop-import-delims ${map_column_hive}  -m 1 --where "1=0"
                   mv -f ${data_source_name}.java ${log_dir}/

                  str_sql="invalidate metadata ${hive_database}.${sink_tablename}"
                  echo "${str_sql}"
                  /usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}"

                  str_sql="DESCRIBE ${hive_database}.${sink_tablename}"
                  /usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}" -B --output_delimiter=","|awk -F ',' -v table_name="${data_source_name}" '{print table_name","$1","$2",string,2"}' > ${out_file_dir}/${sink_tablename}.csv

                  return_txt=`mysqlload "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${out_file_dir}/${sink_tablename}.csv" "data_import.field_type" "," "(table_name,field_name,kudu_field_type1,kudu_field_type2,state)"`
                  mv -f ${out_file_dir}/${sink_tablename}.csv /tmp

                  str_sql="update data_import.import_config_${database_type} set state=2 where id=${id}"
                  echo "${str_sql}"
                  return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
                  continue
             fi
 
           str_sql="drop table if exists ${hive_database}.${sink_tablename}_${database_type}"
           /usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}"
          
           i=0
           if test "${special_field}" = "1" ;then
               str_sql="SELECT GROUP_CONCAT(UPPER(special_field),'=',field_type) txt FROM data_import.oralce_special_field WHERE UPPER(table_name)=UPPER('${data_source_name}') GROUP BY table_name"
               return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
               map_column_hive="--map-column-hive ${return_txt_tmp}"
           else
               map_column_hive=""
           fi

          
           str_sql="drop table if exists ${hive_database}.${sink_tablename}_${database_type}"
           /usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}"
          
           i=0
           if test "${special_field}" = "1" ;then
               str_sql="SELECT GROUP_CONCAT(UPPER(special_field),'=',field_type) txt FROM oralce_special_field WHERE UPPER(table_name)=UPPER('${data_source_name}') GROUP BY table_name"
               return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
               map_column_hive="--map-column-hive ${return_txt_tmp}"
           else
               map_column_hive=""
           fi

           str_sql="SELECT kudu_pk_name FROM tablename_pk WHERE UPPER(table_name)=UPPER('${data_source_name}')"
           return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
           table_pk_tmp="${return_txt_tmp}"

           table_pk_notin=""
           create_kudu_field1=""
           create_hive_field1=""
           create_hive_field_and_type=""
           array_field=(${table_pk_tmp//,/ })
           table_pk_one="${array_field[0]}"
           for LINE in "${array_field[@]}"
           do
               if test ${i} -eq 0 ;then
                  #create_hive_field=`echo "${create_hive_field}"|sed "s# ${LINE} ##g"`
                  table_pk_notin="${table_pk_notin}\"${LINE}\""
                  create_kudu_field1="${create_kudu_field1}cast(\`${LINE}\` as string) as \`${LINE}\`"
                  create_hive_field1="${create_hive_field1}\`${LINE}\` string"
                  i=`expr ${i} + 1`
               else
                  table_pk_notin="${table_pk_notin},\"${LINE}\""
                  create_kudu_field1="${create_kudu_field1},cast(\`${LINE}\` as string) as \`${LINE}\`"
                  create_hive_field1="${create_hive_field1},\`${LINE}\` string"
               fi
           done
           table_pk=`echo "${table_pk_notin}"|sed 's#\"#\`#g'`
           echo "${table_pk}"
           
           i=0
           create_kudu_field2=""
           import_feild_name=""
           str_sql="SELECT GROUP_CONCAT(upper(field_name) ORDER BY id) as txt  FROM field_type WHERE UPPER(table_name)=UPPER('${data_source_name}') AND state=1"
           echo "${str_sql}"
           return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
           import_feild_name="${return_txt_tmp}"

           str_sql="SELECT CONCAT_WS('#',field_name,kudu_field_type1,${field_type_name}) as txt  FROM field_type WHERE UPPER(table_name)=UPPER('${data_source_name}') AND field_name not in (${table_pk_notin}) AND state=1 ORDER BY id"
           echo "${str_sql}"
           return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
 
           while read line_tmp
           do
             #echo "${line_tmp}"
             table_field_name=`echo "${line_tmp}"|awk -F '#' '{print $1}'| tr a-z A-Z`
             table_field_type1=`echo "${line_tmp}"|awk -F '#' '{print $2}'|sed "s/\r//"`
             table_field_type=`echo "${line_tmp}"|awk -F '#' '{print $3}'|sed "s/\r//"`
              #kudu_state=`echo "${line_tmp}"|awk -F '#' '{print $4}'`

             if test ${i} -eq 0 ;then
                create_kudu_field2="${create_kudu_field2}\`${table_field_name}\` "
                create_hive_field_and_type="${create_hive_field_and_type}\`${table_field_name}\` ${table_field_type} "
                i=`expr ${i} + 1`
             else
                create_kudu_field2="${create_kudu_field2},\`${table_field_name}\` "
                create_hive_field_and_type="${create_hive_field_and_type},\`${table_field_name}\` ${table_field_type} "
             fi              

           done <<< "${return_txt_tmp}"

           upsert_kudu_field="${table_pk_tmp},${create_kudu_field2}"
           create_kudu_field="${create_kudu_field1},${create_kudu_field2}"

           str_sql="create table if not exists ${hive_database}.${sink_tablename}_${database_type}(${create_hive_field1},${create_hive_field_and_type}) stored as parquet"
           echo "${str_sql}"
           /usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}"

           echo "`date +%Y-%m-%d\ %H:%M`---${data_sink_name}---" >> ${out_file_dir}/${shell_name}_${database_type}.log
           str_sql="select count(*) as text from ${data_source_name}"
           oracle_count=`oracletxt "${data_source_jdbc}" "${source_user_name}" "${source_pass_word}" "${str_sql}"`

           echo "oracle : ${oracle_count}" >> ${out_file_dir}/${shell_name}_${database_type}.log


            str_sql=`echo "${source_str_sql}"|sed "s#\[replace_data_sink_table_name\]#${data_source_name}#g"|sed "s#\[replace_data_source_table_name\]#${hive_database}.${sink_tablename}_${database_type}#g"|sed "s#\[replace_table_field\]#${import_feild_name}#g"|sed "s#\[table_pk\]#${table_pk}#g"|sed "s#\[table_field_name\]#${table_pk_one}#g"|sed "s#\[sd_date\]#${sd_date}#g"`
            
           echo "${str_sql}"  >> ${exec_log}
   
/usr/bin/sqoop import --connect jdbc:oracle:thin:@${data_source_jdbc} --username ${source_user_name} --password ${source_pass_word} --hcatalog-database ${hive_database} --hcatalog-table ${sink_tablename}_${database_type}  --null-string '\\N' --null-non-string '\\N' -m 1  --query "${str_sql}"
           error=`echo $?`
           
 echo "/usr/bin/sqoop import --connect jdbc:oracle:thin:@${data_source_jdbc} --username ${source_user_name} --password ${source_pass_word} --hcatalog-database ${hive_database} --hcatalog-table ${sink_tablename}_${database_type}  --null-string '\\N' --null-non-string '\\N' -m 1  --query \"${str_sql}\""


           if test ${error} -ne 0 ;then
              continue
           fi

           mv -f QueryResult.java ${log_dir}/
          
           str_sql="invalidate metadata ${hive_database}.${sink_tablename}_${database_type};refresh ${hive_database}.${sink_tablename}_${database_type};"
           /usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}"

           data_sink_name_tmp="${sink_database}.`echo "${sink_tablename}"| tr a-z A-Z`"
           str_sql=`echo "${sink_str_sql}"|sed "s#\[replace_data_sink_table_name\]#${data_sink_name_tmp}#g"|sed "s#\[replace_data_source_table_name\]#${hive_database}.${sink_tablename}_${database_type}#g"|sed "s#\[database_type\]#_${database_type}#g"|sed "s#\[replace_table_field1\]#${upsert_kudu_field}#g"|sed "s#\[replace_table_field2\]#${create_kudu_field}#g"|sed "s#\[replace_table_field\]#${create_kudu_field}#g"|sed "s#\[replace_table_field\]#${import_feild_name}#g"|sed "s#\[table_pk\]#${table_pk}#g"|sed "s#\[table_field_name\]#${table_pk_one}#g"|sed "s#\[sd_date\]#${sd_date}#g"|sed "s#\[replace_sink_tablename\]#${data_source_name}#"`
           echo "${str_sql}"
           /usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}"

           str_sql="invalidate metadata ${data_sink_name};refresh ${data_sink_name};compute stats ${data_sink_name};"
           /usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}"

           str_sql="select count(*) from ${data_sink_name}"
           impala_count=`/usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}" -B`
           echo "impala :    ${impala_count}" >> ${out_file_dir}/${shell_name}_${database_type}.log

           str_sql="update import_config_incr_${database_type} set state=0 where id=${id}"
           return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`

           
done <<< "${return_txt}"



end_time_s=`date +%s`
spent_s=`expr ${end_time_s} - ${start_time_s} `
echo "${shell_name}.sh `date +%Y-%m-%d\ %H:%M` 处理完毕,耗时:${spent_s}" >> ${exec_log}


#set +x

增量同步到mysql hive_to_mysql_incr_v4.sh:

#!/bin/bash
#set -x
#执行多个sql文件
#export PGSQL_HOME=/usr/local/pgsql
export MYSQL_HOME=/usr/local/mysql
#export ORACLE_SID=orcl
#export ORACLE_OWNER=oracle
#export ORACLE_HOME=/u01/app/oracle/product/11.2.0/dbhome_1
#export ORACLE_BASE=/u01/app/oracle
#export ORACLE_HOME_LISTNER=$ORACLE_HOME
#PATH=$PATH:$HOME/bin:$ORACLE_HOME/bin
PATH=$PATH:$HOME/bin:$MYSQL_HOME/bin
#PATH=$PATH:$HOME/bin:$PGSQL_HOME/bin
export PATH
#export LD_LIBRARY_PATH="${ORACLE_HOME}/lib"
#export NLS_LANG=AMERICAN_AMERICA.AL32UTF8
export HADOOP_USER_NAME=hdfs
#export LANG=zh_CN.gbk

data_source_name="$1"
database_type="$2"

if test "${database_type}" = "dwd_data_prod" ;then
   mysql_database="uzt"
else
   mysql_database="${database_type}"
fi

mysql_type=`echo "${database_type}"|sed "s#dwd_data##g"`


############################启用脚本个数#######################
number=""
shell_name="hive_to_mysql_incr_v4"
sql_config="_data_import"
confile_file_dir="/home/batsom/shell"
shell_dir="/home/batsom/shell"
log_dir="${shell_dir}/log"
mkdir -p ${log_dir}
fun_lib_dir="/home/batsom/shell/inc"
out_file_dir="/home/batsom/out_file"
mkdir -p ${out_file_dir}
in_file_dir="/home/batsom/in_file"
mkdir -p ${in_file_dir}
exec_log="${log_dir}/${shell_name}${mysql_type}_exec.log"
err_log="${log_dir}/${shell_name}${mysql_type}_err.log"
############################自定义参数区#######################
#orgid="1"
#orgname="删除表"
###########################读取mysql配置#######################
mysql_config="${confile_file_dir}/readfile/mysql_config${sql_config}.txt"
mysql_host=`cat ${mysql_config}|awk -F '#' '{print $1}'`
mysql_port=`cat ${mysql_config}|awk -F '#' '{print $2}'`
mysql_dbname=`cat ${mysql_config}|awk -F '#' '{print $3}'`
mysql_user=`cat ${mysql_config}|awk -F '#' '{print $4}'`
mysql_passwd=`cat ${mysql_config}|awk -F '#' '{print $5}'`    
###########################读取oracle配置######################
#oracle_config="${confile_file_dir}/readfile/oracle_config${sql_config}.txt"
#oracle_host=`cat ${oracle_config}|awk -F '#' '{print $1}'`
#oracle_port=`cat ${oracle_config}|awk -F '#' '{print $2}'`
#oracle_orcl=`cat ${oracle_config}|awk -F '#' '{print $3}'`
#oracle_user=`cat ${oracle_config}|awk -F '#' '{print $4}'`
#oracle_passwd=`cat ${oracle_config}|awk -F '#' '{print $5}'`
########################读取PostgreSQL配置#####################
#免密登录(将密码写在客户端服务器的用户家目录下,创建一个.pgpass文件,并将权限设置为0600,就可以实现了)
#文件的格式如下:
#hostname:port:database:username:password
###############################################################
#psql_config="${confile_file_dir}/readfile/psql_config${sql_config}.txt"
#psql_host=`cat ${mysql_config}|awk -F '#' '{print $1}'`
#psql_port=`cat ${mysql_config}|awk -F '#' '{print $2}'`
#psql_user=`cat ${mysql_config}|awk -F '#' '{print $3}'`
###############################################################

if test ! -f ${fun_lib_dir}/error_resolve.sh -o ! -f ${fun_lib_dir}/function.sh ;then
  exit
fi

source ${fun_lib_dir}/error_resolve.sh
source ${fun_lib_dir}/function.sh

exec_num=`ps aux|grep "${confile_file_dir}/${shell_name}.sh"|grep "${mysql_type}"|grep -v "grep"|wc -l`
#if test ${exec_num} -gt 4 ;then
#   exit
#fi

start_time_s=`date +%s`

str_sql="SELECT CONCAT_WS('#',ic.id,it.tmp_table_name,ic.data_source_name,data_source_jdbc.jdbc_url,data_source_jdbc.user_name,data_source_jdbc.pass_word,iss1.str_sql,ic.data_sink_name,data_sink_jdbc.jdbc_url,data_sink_jdbc.user_name,data_sink_jdbc.pass_word,ic.field_type,ic.special_field,iss2.str_sql) AS return_txt FROM import_config_incr${mysql_type} ic LEFT JOIN import_type it ON ic.import_type_id=it.import_type_id LEFT JOIN jdbc data_source_jdbc ON ic.data_source_jdbc_id=data_source_jdbc.jdbc_id LEFT JOIN import_str_sql iss1 ON ic.source_str_sql_id=iss1.str_sql_id LEFT JOIN jdbc data_sink_jdbc ON ic.data_sink_jdbc_id=data_sink_jdbc.jdbc_id LEFT JOIN import_str_sql iss2 ON ic.sink_str_sql_id=iss2.str_sql_id WHERE ic.state=1 AND ic.shell_name='hive_to_mysql_incr' and ic.data_source_name='${data_source_name}' " 
return_txt=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`

      if test -z "${return_txt}" ;then
          exit
      fi

while read line
do
   #主键
   id=`echo "${line}"|awk -F '#' '{print $1}'`
   #hive临时库
   hive_database=`echo "${line}"|awk -F '#' '{print $2}'`
   #数据源表名
   data_source_name=`echo "${line}"|awk -F '#' '{print $3}'| tr a-z A-Z`
   data_source_name_tmp=`echo "${line}"|awk -F '#' '{print $3}'|awk -F '.' '{print $2}'`
   #数据源jdbc
   data_source_jdbc=`echo "${line}"|awk -F '#' '{print $4}'`
   #数据源用户名
   source_user_name=`echo "${line}"|awk -F '#' '{print $5}'`
   #数据源密码
   source_pass_word=`echo "${line}"|awk -F '#' '{print $6}'`
   #数据源sql
   source_str_sql=`echo "${line}"|awk -F '#' '{print $7}'`
   #结果源表名
   data_sink_name=`echo "${line}"|awk -F '#' '{print $8}'`
   data_sink_name_tmp=`echo "${line}"|awk -F '#' '{print $8}'|awk -F '.' '{print $2}'`
   #结果源jdbc
   data_sink_jdbc=`echo "${line}"|awk -F '#' '{print $9}'`
   #结果源用户名
   sink_user_name=`echo "${line}"|awk -F '#' '{print $10}'`
   #结果源密码
   sink_pass_word=`echo "${line}"|awk -F '#' '{print $11}'`
   #字段类型
   field_type_name=`echo "${line}"|awk -F '#' '{print $12}'`
   #是否包含特殊字段
   special_field=`echo "${line}"|awk -F '#' '{print $13}'`
   #结果源sql
   sink_str_sql=`echo "${line}"|awk -F '#' '{print $14}'`


   source_database=`echo "${data_source_name}"|awk -F '.' '{print $1}'`
   source_tablename=`echo "${data_source_name}"|awk -F '.' '{print $2}'`
   sink_database=`echo "${data_sink_name}"|awk -F '.' '{print $1}'`
   sink_tablename=`echo "${data_sink_name}"|awk -F '.' '{print $2}'`
   data_sink_jdbc_ip=`echo "${data_sink_jdbc}" |awk -F '/' '{print $1}'`
   data_sink_jdbc="${data_sink_jdbc_ip}/${mysql_database}"
   
           i=0
           j=0
           create_mysql_field_and_type=""
           create_mysql_field=""
           mysql_index=""
   
           str_sql="SELECT CONCAT_WS('#',field_name,mysql_field_type) as txt  FROM field_type WHERE UPPER(table_name)=UPPER('${sink_tablename}') AND state=1 ORDER BY id"
           echo "${str_sql}"
           return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`

           while read line_tmp
           do
             #echo "${line_tmp}"
             table_field_name=`echo "${line_tmp}"|awk -F '#' '{print $1}'`
             table_field_type=`echo "${line_tmp}"|awk -F '#' '{print $2}'|sed "s/\r//"`

             if test "${table_field_name}" = "" -o "${table_field_type}" = "" ;then
                   echo "insert table field"
                   export_file_dir="${out_file_dir}/${shell_name}_${data_source_name}.csv"

                   str_sql="SELECT CONCAT_WS(',',table_name,COLUMN_NAME,COLUMN_TYPE,'string','2') txt FROM INFORMATION_SCHEMA.COLUMNS t WHERE upper(TABLE_NAME)=upper('${sink_tablename}') AND TABLE_SCHEMA='dwd_data' ORDER BY ORDINAL_POSITION"
                   return_txt=`mysqlout "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}" "${export_file_dir}.tmp"`
                   
                   mv ${export_file_dir}.tmp ${export_file_dir}
                   
                   return_txt=`mysqlload "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${export_file_dir}" "data_import.field_type" "," "(table_name,field_name,mysql_field_type,kudu_field_type2,state)"`
                   mv -f ${export_file_dir} /tmp
                   j=`expr ${j} + 1`
             fi

             if test ${i} -eq 0 ;then
                create_mysql_field_and_type="${create_mysql_field_and_type}${table_field_name} ${table_field_type} "
                create_mysql_field="${create_mysql_field}${table_field_name} "
                i=`expr ${i} + 1`
             else
                create_mysql_field_and_type="${create_mysql_field_and_type},${table_field_name} ${table_field_type} "
                create_mysql_field="${create_mysql_field},${table_field_name} "
             fi
             


            done <<< "${return_txt_tmp}"
            
           #echo "------------------${create_mysql_field_and_type}--------------"
           #echo "-----------------------${create_mysql_field}------------------"
           echo "--`date +%Y-%m-%d\ %H:%M`-${data_source_name}---" >> ${out_file_dir}/${shell_name}${mysql_type}.log

           if test ${j} != "0" ;then
             continue
           fi
           
           str_sql=`echo "${source_str_sql}"|sed "s#\[replace_data_sink_table_name\]#${hive_database}.${source_tablename}${mysql_type}#g"|sed "s#\[replace_data_source_table_name\]#${database_type}.${source_tablename}#g"|sed "s#\[replace_table_field\]#${create_mysql_field}#g"`
           echo "${str_sql}"
           /usr/bin/impala-shell -i "${data_source_jdbc}" -d "${source_database}" -q "${str_sql}"

           str_sql="invalidate metadata ${hive_database}.${source_tablename}${mysql_type};refresh ${hive_database}.${source_tablename}${mysql_type};"
           echo "${str_sql}"
           /usr/bin/impala-shell -i "${data_source_jdbc}" -d "${source_database}" -q "${str_sql}"

           str_sql="select count(*) from ${database_type}.${source_tablename}"
           return_txt_tmp=`/usr/bin/impala-shell -i "${data_source_jdbc}" -d "${source_database}" -q "${str_sql}" -B`
           echo "impala : ${return_txt_tmp}" >> ${out_file_dir}/${shell_name}${mysql_type}.log
           
           str_sql="SELECT CONCAT_WS('#',mysql_pk_name,IFNULL(mysql_key_name,''),IFNULL(mysql_index_name,'')) txt FROM tablename_pk WHERE UPPER(table_name)=UPPER('${sink_tablename}')"
           return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
           mysql_pk_name=`echo "${return_txt_tmp}"|awk -F '#' '{print $1}'`
           mysql_key_name=`echo "${return_txt_tmp}"|awk -F '#' '{print $2}'`
           mysql_index_name=`echo "${return_txt_tmp}"|awk -F '#' '{print $3}'`

           if test "${mysql_pk_name}" ;then
              mysql_pkey=",PRIMARY KEY (${mysql_pk_name})"
           else
              continue
           fi

           if test "${mysql_key_name}" ;then
              mysql_key=",KEY ${sink_tablename} (${mysql_key_name})"
           else
              mysql_key=""
           fi

    
           if test "${mysql_index_name}" ;then
              i=0
              array_field=(${mysql_index_name//,/ })
              for LINE in "${array_field[@]}"
              do   
               #echo "------------------"
                 echo "${LINE}"
                 if test ${i} -eq 0 ;then
                   mysql_index=",${mysql_index} index ${sink_tablename}_${LINE}(${LINE})"
                   i=`expr ${i} + 1`
                 else
                   mysql_index="${mysql_index},index ${sink_tablename}_${LINE}(${LINE})"
                 fi
 
              done

           else
              mysql_index=""
           fi

           mysql_load_dbname=${data_sink_jdbc##*/}
           jdbc_url_tmp=${data_sink_jdbc%/*}
           mysql_host_port_tmp=${jdbc_url_tmp##*/}
           mysql_load_host=`echo "${mysql_host_port_tmp}"|awk -F ':' '{print $1}'`
           mysql_load_port=`echo "${mysql_host_port_tmp}"|awk -F ':' '{print $2}'`
           mysql_load_user=${sink_user_name}
           mysql_load_passwd=${sink_pass_word}

           #str_sql="drop table if exists ${data_sink_name};create table ${data_sink_name}(${create_mysql_field_and_type} ${mysql_pkey} ${mysql_key} ${mysql_index}) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4 ;"
           str_sql=`echo "${sink_str_sql}"|sed "s#\[data_sink_name\]#${data_sink_name}#g"|sed "s#\[create_mysql_field_and_type\]#${create_mysql_field_and_type}#g"|sed "s#\[mysql_pkey\]#${mysql_pkey}#g"|sed "s#\[mysql_key\]#${mysql_key}#g"|sed "s#\[mysql_index\]#${mysql_index}#g"`
           
           echo "${str_sql}"
           return_txt_tmp=`mysqltxt "${mysql_load_host}" "${mysql_load_port}" "${mysql_load_user}" "${mysql_load_passwd}" "${mysql_load_dbname}" "${str_sql}"`


           /usr/bin/sqoop export -Dmapreduce.map.memory.mb=2049 --connect jdbc:mysql://${data_sink_jdbc}?useUnicode=true --username ${sink_user_name} --password ${sink_pass_word} --table ${sink_tablename} --update-key ${mysql_pk_name} --update-mode allowinsert --hcatalog-database ${hive_database} --hcatalog-table ${source_tablename}${mysql_type}
           mv -f ${sink_tablename}.java ${log_dir}/
           echo "/usr/bin/sqoop export -Dmapreduce.map.memory.mb=2049 --connect jdbc:mysql://${data_sink_jdbc}?useUnicode=true --username ${sink_user_name} --password ${sink_pass_word} --table ${sink_tablename} --update-key ${mysql_pk_name} --update-mode allowinsert --hcatalog-database ${hive_database} --hcatalog-table ${source_tablename}${mysql_type}"
           
           str_sql="select count(*) from ${data_sink_name}"
           return_txt_tmp=`mysqltxt "${mysql_load_host}" "${mysql_load_port}" "${mysql_load_user}" "${mysql_load_passwd}" "${mysql_load_dbname}" "${str_sql}"`
           echo "mysql  : ${return_txt_tmp}" >> ${out_file_dir}/${shell_name}${mysql_type}.log
 
           str_sql="update import_config_incr${mysql_type} set state=0 where id=${id}"
           #return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`

done <<< "${return_txt}"

end_time_s=`date +%s`
spent_s=`expr ${end_time_s} - ${start_time_s} `
echo "${shell_name}.sh `date +%Y-%m-%d\ %H:%M` 处理完毕,耗时:${spent_s}" >> ${exec_log}

#set +x