欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

对 MapReduce 中调用 reduce 方法时键值变化的分析和源代码分析。

最编程 2024-03-15 17:18:52
...
public class ReduceContextImpl { private RawKeyValueIterator input;//这个迭代器里面存储的key-value对元素。 private KEYIN key; // current key private VALUEIN value; // current value private boolean firstValue = false; // first value in key private boolean nextKeyIsSame = false; // more w/ this key private boolean hasMore; // more in file private ValueIterable iterable = new ValueIterable();//访问自己的内部类 public ReduceContextImpl() throws InterruptedException, IOException{ hasMore = input.next();//对象创建的时候,就先判断reduce接收的key-value迭代器是否有元素,并获取下一个元素 } /** 创建完成就调用该方法 ,开始处理下一个唯一的key*/ public boolean nextKey() throws IOException,InterruptedException { while (hasMore && nextKeyIsSame) { //判断迭代器是否还有下一个元素已经下一个元素是否和上一个已经遍历出来的key-value元素的key是不是一样 nextKeyValue(); } if (hasMore) { if (inputKeyCounter != null) { inputKeyCounter.increment(1); } return nextKeyValue(); } else { return false; } } /** * Advance to the next key/value pair. */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!hasMore) { key = null; value = null; return false; } firstValue = !nextKeyIsSame; //获取迭代器下一个元素的key DataInputBuffer nextKey = input.getKey(); //设置当前key的坐标 currentRawKey.set(nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition()); buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength()); //反序列化得到当前key对象 key = keyDeserializer.deserialize(key); //获取迭代器下一个元素的value DataInputBuffer nextVal = input.getValue(); buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength() - nextVal.getPosition()); //反序列化value value = valueDeserializer.deserialize(value); currentKeyLength = nextKey.getLength() - nextKey.getPosition(); currentValueLength = nextVal.getLength() - nextVal.getPosition(); if (isMarked) { //存储下一个key和value backupStore.write(nextKey, nextVal); } //迭代器向下迭代一次 hasMore = input.next(); //如果还有元素,则进行比较,判断key是否相同 if (hasMore) { nextKey = input.getKey(); //这个地方也是比较关键的: nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, currentRawKey.getLength(), nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition() ) == 0; } else { nextKeyIsSame = false; } inputValueCounter.increment(1); return true; } //一个迭代器模式的内部类 protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> { private boolean inReset = false; private boolean clearMarkFlag = false; @Override//它并不仅仅是判断迭代器是否还有下一个元素,而且还要判断下一个元素和上一个元素是不是相同的key public boolean hasNext() { if (inReset && backupStore.hasNext()) { return true; } return firstValue || nextKeyIsSame; } @Override //这个地方要注意了,其实在获取下一个元素的时候主要调用的是nextKeyValue(); public VALUEIN next() { if (inReset) { if (backupStore.hasNext()) { backupStore.next(); DataInputBuffer next = backupStore.nextValue(); buffer.reset(next.getData(), next.getPosition(), next.getLength() - next.getPosition()); value = valueDeserializer.deserialize(value); return value; } else { inReset = false; backupStore.exitResetMode(); if (clearMarkFlag) { clearMarkFlag = false; isMarked = false; } } } // if this is the first record, we don't need to advance if (firstValue) { firstValue = false; return value; } // otherwise, go to the next key/value pair nextKeyValue();//该方法就是获取下一个key,value对,key值的变化也就在这里表现出来了。 return value; } } //内部类,实现迭代器,具备迭代器功能 protected class ValueIterable implements Iterable<VALUEIN> { private ValueIterator iterator = new ValueIterator(); @Override public Iterator<VALUEIN> iterator() { return iterator; } } public Iterable<VALUEIN> getValues() throws IOException, InterruptedException { return iterable; } }

推荐阅读