读取任务一中序列文件,统计每个用户每天的访问次数,最终将2021/1和2021/2的数据分别输出在两个文件中。
一、创建项目步骤:
1.创建项目

2.修改pom.xml文件
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>

3.在resources目录下创建日志文件log4j.properties
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=D:\\countLog.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

二、编写程序:
1.自定义键的类型 MemberLogTime 包含两个属性(memberId,memberLogTime) 实现WritableComparable接口


2.编写Mapper模块:(在Mapper中计数器,使用枚举)
package com.maidu.countlog;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**@author:yt
* @since:2024-05-15
* 定义Mapper模块
*/
public class LogCountMapper extends Mapper<Text,Text,MemberLogTime, IntWritable> {
private MemberLogTime mt =new MemberLogTime();
private IntWritable one =new IntWritable(1);
//使用枚举
enum LogCounter{
January,
February
}
@Override
protected void map(Text key, Text value, Mapper<Text, Text, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
String memberId =key.toString();
String logTime =value.toString();
//计数器
if(logTime.contains("2021/1")){
//一月计数器值+1
context.getCounter(LogCounter.January).increment(1);
}else{
context.getCounter(LogCounter.February).increment(1);
}
//将用户ID和访问时间存到MemberLogTime对象中
mt.setMemberId(memberId);
mt.setLogTime(logTime);
context.write(mt,one);
}
}

3.编写Combiner:
package com.maidu.countlog;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author:yt
* @since:2024-05-15
* 合并的类
*/
public class LogCoutCombiner extends Reducer<MemberLogTime, IntWritable,MemberLogTime,IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> values, Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum =0;
for(IntWritable val:values){
sum+=val.get();
}
context.write(key,new IntWritable(sum));
}
}

4.编写Patitioner:
package com.maidu.countlog;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author:yt
* @since:2024-05-15
*/
public class LogCountPartitioner extends Partitioner<MemberLogTime, IntWritable> {
@Override
public int getPartition(MemberLogTime memberLogTime, IntWritable intWritable, int numPartitions) {
String date = memberLogTime.getLogTime();
if( date.contains( "2021/1") ){
return 0%numPartitions;
}else{
return 1%numPartitions;
}
}
}

5.编写Reduce模块:
package com.maidu.countlog;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author:yt
* @since:2024-05-15
*/
public class LogCountReducer extends Reducer<MemberLogTime, IntWritable,MemberLogTime,IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> values, Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
//计数器(动态计数器)
if(key.getLogTime().contains("2021/1")){
context.getCounter("OuputCounter","JanuaryResult").increment(1);
} else{
context.getCounter("OuputCounter","FabruaryResult").increment(1);
}
int sum =0;
for(IntWritable val:values){
sum+=val.get();
}
context.write(key,new IntWritable(sum));
}
}

6.编写Driver模块:
package com.maidu.countlog;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* @author:yt
* @since:2024-05-15
*/
public class LogCount {
public static void main(String[] args) throws Exception {
Configuration conf =new Configuration();
Job job =Job.getInstance(conf,"Log count");
job.setJarByClass(LogCount.class);
job.setMapperClass(LogCountMapper.class);
job.setReducerClass(LogCountReducer.class);
job.setCombinerClass(LogCoutCombiner.class);
job.setPartitionerClass(LogCountPartitioner.class);
//设置reduce任务数2
job.setNumReduceTasks(2);
job.setOutputKeyClass(MemberLogTime.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(SequenceFileAsTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//添加输入路径
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}

7.使用Maven打包为Jar文件,上传到master节点上执行

[yt@master ~]$ hadoop jar countLog-1.0-SNAPSHOT.jar com.maidu.countlog.LogCount /ouput-2301-select/part-m-00000 /logcount/2301/

8.查看运行结果
(1)计数器

(2)结果

(3)查看其中一个文件内容
[yt@master ~]$ hdfs dfs -cat /logcount/2301/part-r-00001


















