📜  HCatalog-输入输出格式

📅  最后修改于: 2020-11-30 04:27:04             🧑  作者: Mango


HCatInputFormatHCatOutputFormat接口用于从HDFS读取数据,并在处理后使用MapReduce作业将结果数据写入HDFS。让我们详细说明输入和输出格式接口。

HCatInputFormat

HCatInputFormat与MapReduce作业一起使用,以从HCatalog管理的表中读取数据。 HCatInputFormat公开了一个Hadoop 0.20 MapReduce API,用于读取数据,就像已将其发布到表一样。

Sr.No. Method Name & Description
1

public static HCatInputFormat setInput(Job job, String dbName, String tableName)throws IOException

Set inputs to use for the job. It queries the metastore with the given input specification and serializes matching partitions into the job configuration for MapReduce tasks.

2

public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException

Set inputs to use for the job. It queries the metastore with the given input specification and serializes matching partitions into the job configuration for MapReduce tasks.

3

public HCatInputFormat setFilter(String filter)throws IOException

Set a filter on the input table.

4

public HCatInputFormat setProperties(Properties properties) throws IOException

Set properties for the input format.

HCatInputFormat API包括以下方法-

  • setInput
  • setOutputSchema
  • getTableSchema

要使用HCatInputFormat读取数据,请首先使用要读取的表中的必要信息实例化InputJobInfo ,然后使用InputJobInfo调用setInput

您可以使用setOutputSchema方法包括投影模式,以指定输出字段。如果未指定架构,则将返回表中的所有列。您可以使用getTableSchema方法来确定指定输入表的表模式。

HCatOutputFormat

HCatOutputFormat与MapReduce作业一起使用,可将数据写入HCatalog管理的表。 HCatOutputFormat公开了用于将数据写入表的Hadoop 0.20 MapReduce API。当MapReduce作业使用HCatOutputFormat写入输出时,将使用为表配置的默认OutputFormat,并在作业完成后将新分区发布到表中。

Sr.No. Method Name & Description
1

public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException

Set the information about the output to write for the job. It queries the metadata server to find the StorageHandler to use for the table. It throws an error if the partition is already published.

2

public static void setSchema (Configuration conf, HCatSchema schema) throws IOException

Set the schema for the data being written out to the partition. The table schema is used by default for the partition if this is not called.

3

public RecordWriter , HCatRecord > getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedException

Get the record writer for the job. It uses the StorageHandler’s default OutputFormat to get the record writer.

4

public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException

Get the output committer for this output format. It ensures that the output is committed correctly.

HCatOutputFormat API包括以下方法-

  • setOutput
  • setSchema
  • getTableSchema

HCatOutputFormat的第一次调用必须是setOutput ;其他任何调用都将引发异常,指出输出格式未初始化。

通过setSchema方法指定要写出的数据的模式。您必须调用此方法,以提供要写入的数据模式。如果您的数据与表架构具有相同的架构,则可以使用HCatOutputFormat.getTableSchema()获取表架构,然后将其传递给setSchema()

下面的MapReduce程序从一个表中读取数据(假定该表在第二列中有一个整数)(“列1”),并计算找到的每个不同值的实例数。也就是说,它等效于“从col1从$ table group中选择col1,count(*); ”。

例如,如果第二列中的值为{1、1、1、3、3、5},则程序将产生以下值和计数的输出:

1, 3
3, 2
5, 1

现在让我们看一下程序代码-

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;

import org.apache.HCatalog.mapreduce.HCatInputFormat;
import org.apache.HCatalog.mapreduce.HCatOutputFormat;
import org.apache.HCatalog.mapreduce.InputJobInfo;
import org.apache.HCatalog.mapreduce.OutputJobInfo;

public class GroupByAge extends Configured implements Tool {

   public static class Map extends Mapper {
      int age;
        
      @Override
      protected void map(
         WritableComparable key, HCatRecord value,
         org.apache.hadoop.mapreduce.Mapper.Context context
      )throws IOException, InterruptedException {
         age = (Integer) value.get(1);
         context.write(new IntWritable(age), new IntWritable(1));
      }
   }
    
   public static class Reduce extends Reducer {
      @Override
      protected void reduce(
         IntWritable key, java.lang.Iterable values,
         org.apache.hadoop.mapreduce.Reducer.Context context
      )throws IOException ,InterruptedException {
         int sum = 0;
         Iterator iter = values.iterator();
            
         while (iter.hasNext()) {
            sum++;
            iter.next();
         }
            
         HCatRecord record = new DefaultHCatRecord(2);
         record.set(0, key.get());
         record.set(1, sum);
         context.write(null, record);
      }
   }
    
   public int run(String[] args) throws Exception {
      Configuration conf = getConf();
      args = new GenericOptionsParser(conf, args).getRemainingArgs();
        
      String serverUri = args[0];
      String inputTableName = args[1];
      String outputTableName = args[2];
      String dbName = null;
      String principalID = System
        
      .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
      if (principalID != null)
      conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
      Job job = new Job(conf, "GroupByAge");
      HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));

      // initialize HCatOutputFormat
      job.setInputFormatClass(HCatInputFormat.class);
      job.setJarByClass(GroupByAge.class);
      job.setMapperClass(Map.class);
      job.setReducerClass(Reduce.class);
        
      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(IntWritable.class);
      job.setOutputKeyClass(WritableComparable.class);
      job.setOutputValueClass(DefaultHCatRecord.class);
        
      HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
      HCatSchema s = HCatOutputFormat.getTableSchema(job);
      System.err.println("INFO: output schema explicitly set for writing:" + s);
      HCatOutputFormat.setSchema(job, s);
      job.setOutputFormatClass(HCatOutputFormat.class);
      return (job.waitForCompletion(true) ? 0 : 1);
   }
    
   public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new GroupByAge(), args);
      System.exit(exitCode);
   }
}

在编译上述程序之前,您必须下载一些jar并将其添加到该应用程序的类路径中。您需要下载所有的Hive jar和HCatalog jar(HCatalog-core-0.5.0.jar,hive-metastore-0.10.0.jar,libthrift-0.7.0.jar,hive-exec-0.10.0.jar, libfb303-0.7.0.jar,jdo2-api-2.3-ec.jar,slf4j-api-1.6.1.jar)。

使用以下命令将这些jar文件从本地复制到HDFS并将其添加到classpath

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp

export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar

使用以下命令来编译和执行给定程序。

$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive

现在,检查输出目录(hdfs:user / tmp / hive)以获取输出(part_0000,part_0001)。