Split过程源码分析

2017-09-14 11:37:55来源:CSDN作者:qq_17864929人点击

分享

split来源:

    1、Memstore flush时直接CompactSplitThread.requestSplit。    2、HBaseAdmin客户端发起的请求,HRegionServer收到后,转CompactSplitThread.requestSplit处理。
    下面介绍HBaseAdmin发起请求的实现细节:    client通过界面发起action请求,调用org.apache.hadoop.hbase.generated.master类中的_jspService方法,
    下面为方法中的代码片段:
1
...
2
    
3
if (action.equals("split")) {
4
    if (key != null && key.length() > 0) {
5
        hbadmin.split(key);
6
    } else {
7
        hbadmin.split(fqtn);
8
    }
9
    out.write(" Split request accepted. ");
10
11
} 
12
...
    可以看到如果发起split请求,会调用HbaseAdmin的split方法,下面进入到org.apache.hadoop.hbase.client.HbaseAdmin的split方法中,此时的第二个参数默认为null。
1
/**
2
   * Split a table or an individual region.
3
   * Asynchronous operation.
4
   */
5
public void split(final byte[] tableNameOrRegionName,
6
          final byte [] splitPoint) throws IOException, InterruptedException {
7
    CatalogTracker ct = getCatalogTracker();
8
    try {
9
        Pair<HRegionInfo, ServerName> regionServerPair = getRegion(tableNameOrRegionName, ct);
10
        if (regionServerPair != null) {
11
            if (regionServerPair.getSecond() == null) {
12
                throw new NoServerForRegionException(Bytes.toStringBinary(tableNameOrRegionName));
13
            } else {
14
                split(regionServerPair.getSecond(), regionServerPair.getFirst(), splitPoint);
15
            }
16
        } 
17
        ...
18
}
    接着进入到split(pair.getSecond(), pair.getFirst(), splitPoint)方法中去,在此方法中获取到相应的regionserver,并在ProtobufUtil内部调用此regionserver的split方法:
1
private void split(final ServerName sn, final HRegionInfo hri,
2
                   byte[] splitPoint) throws IOException {
3
    if (hri.getStartKey() != null && splitPoint != null &&
4
        Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) {
5
        throw new IOException("should not give a splitkey which equals to startkey!");
6
    }
7
    AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
8
    ProtobufUtil.split(admin, hri, splitPoint);
9
}
    下面ProtobufUtil的split方法内:
1
/**
2
   * A helper to split a region using admin protocol.
3
   */
4
public static void split(final AdminService.BlockingInterface admin,
5
                         final HRegionInfo hri, byte[] splitPoint) throws IOException {
6
    SplitRegionRequest request = RequestConverter.buildSplitRegionRequest(hri.getRegionName(), splitPoint);
7
    try {
8
        admin.splitRegion(null, request);
9
    } catch (ServiceException se) {
10
        throw ProtobufUtil.getRemoteException(se);
11
    }
12
}
    再进入BlockingInterface的splitRegion方法内如下1
public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse splitRegion(
2
    com.google.protobuf.RpcController controller,
3
    org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest request)
4
    throws com.google.protobuf.ServiceException;
    可以看到HRegionServer类实现了BlockingInterface接口,并且实现了相应的splitRegion方法。
1
/**
2
 * HRegionServer makes a set of HRegions available to clients. It checks in with
3
 * the HMaster. There are many HRegionServers in a single HBase deployment.
4
 */
5
@InterfaceAudience.Private
6
@SuppressWarnings("deprecation")
7
public class HRegionServer implements ClientProtos.ClientService.BlockingInterface,
8
  AdminProtos.AdminService.BlockingInterface, Runnable, RegionServerServices,
9
  HBaseRPCErrorHandler, LastSequenceId {
10
    
11
  ...
12
      
13
  /**
14
   * Split a region on the region server.
15
   *
16
   * @param controller the RPC controller
17
   * @param request the request
18
   * @throws ServiceException
19
   */
20
  @Override
21
  @QosPriority(priority=HConstants.HIGH_QOS)
22
  public SplitRegionResponse splitRegion(final RpcController controller,
23
      final SplitRegionRequest request) throws ServiceException {
24
    try {
25
      checkOpen();
26
      requestCount.increment();
27
      HRegion region = getRegion(request.getRegion());
28
      region.startRegionOperation(Operation.SPLIT_REGION);
29
      LOG.info("Splitting " + region.getRegionNameAsString());
30
      long startTime = EnvironmentEdgeManager.currentTimeMillis();
31
      HRegion.FlushResult flushResult = region.flushcache();
32
      if (flushResult.isFlushSucceeded()) {
33
        long endTime = EnvironmentEdgeManager.currentTimeMillis();
34
        metricsRegionServer.updateFlushTime(endTime - startTime);
35
      }
36
      byte[] splitPoint = null;
37
      if (request.hasSplitPoint()) {
38
        splitPoint = request.getSplitPoint().toByteArray();
39
      }
40
      region.forceSplit(splitPoint);
41
      //CompactSplitThread发起split请求,region.checkSplit()会获取到split的midkey
42
      compactSplitThread.requestSplit(region, region.checkSplit(), RpcServer.getRequestUser());
43
      ...
44
 
45
  }
46
}
    首先进入到region.checkSplit()内部:
1
/**
2
   * Return the splitpoint. null indicates the region isn't splittable
3
   * If the splitpoint isn't explicitly specified, it will go over the stores
4
   * to find the best splitpoint. Currently the criteria of best splitpoint
5
   * is based on the size of the store.
6
   */
7
public byte[] checkSplit() {
8
    ...
9
    if (!splitPolicy.shouldSplit()) {
10
        return null;
11
    }
12
13
    byte[] ret = splitPolicy.getSplitPoint();
14
    ...
15
    return ret;
16
}
    进入getSplitPoint内,此方法会获取到regionserver中最大的一个store,并拿到该store的:
1
/**
2
   * @return the key at which the region should be split, or null
3
   * if it cannot be split. This will only be called if shouldSplit
4
   * previously returned true.
5
   */
6
protected byte[] getSplitPoint() {
7
    byte[] explicitSplitPoint = this.region.getExplicitSplitPoint();
8
    if (explicitSplitPoint != null) {
9
        return explicitSplitPoint;
10
    }
11
    Map<byte[], Store> stores = region.getStores();
12
13
    byte[] splitPointFromLargestStore = null;
14
    long largestStoreSize = 0;
15
    for (Store s : stores.values()) {
16
        byte[] splitPoint = s.getSplitPoint();
17
        long storeSize = s.getSize();
18
        if (splitPoint != null && largestStoreSize < storeSize) {
19
            splitPointFromLargestStore = splitPoint;
20
            largestStoreSize = storeSize;
21
        }
22
    }
23
24
    return splitPointFromLargestStore;
25
}
    进到HStore的getSplitPoint方法内:
1
@Override
2
public byte[] getSplitPoint() {
3
    this.lock.readLock().lock();
4
    try {
5
        // Should already be enforced by the split policy!
6
        assert !this.getRegionInfo().isMetaRegion();
7
        // Not split-able if we find a reference store file present in the store.
8
        if (hasReferences()) {
9
            return null;
10
        }
11
        return this.storeEngine.getStoreFileManager().getSplitPoint();
12
    } catch(IOException e) {
13
        LOG.warn("Failed getting store size for " + this, e);
14
    } finally {
15
        this.lock.readLock().unlock();
16
    }
17
    return null;
18
}
       接着进入到StoreFile的getFileSplitPoint方法内,此方法中主要是返回最大store的最大一个storefile的中间一个block的第一个key:
1
@Override
2
public final byte[] getSplitPoint() throws IOException {
3
    if (this.storefiles.isEmpty()) {
4
        return null;
5
    }
6
    return StoreUtils.getLargestFile(this.storefiles).getFileSplitPoint(this.kvComparator);
7
}
1
/**
2
   * Gets the approximate mid-point of this file that is optimal for use in splitting it.
3
   * @param comparator Comparator used to compare KVs.
4
   * @return The split point row, or null if splitting is not possible, or reader is null.
5
   */
6
@SuppressWarnings("deprecation")
7
byte[] getFileSplitPoint(KVComparator comparator) throws IOException {
8
    if (this.reader == null) {
9
        LOG.warn("Storefile " + this + " Reader is null; cannot get split point");
10
        return null;
11
    }
12
    // Get first, last, and mid keys.  Midkey is the key that starts block
13
    // in middle of hfile.  Has column and timestamp.  Need to return just
14
    // the row we want to split on as midkey.
15
    byte [] midkey = this.reader.midkey();
16
    if (midkey != null) {
17
        KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
18
        byte [] fk = this.reader.getFirstKey();
19
        KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
20
        byte [] lk = this.reader.getLastKey();
21
        KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
22
        // if the midkey is the same as the first or last keys, we cannot (ever) split this region.
23
        if (comparator.compareRows(mk, firstKey) == 0 || comparator.compareRows(mk, lastKey) == 0) {
24
            if (LOG.isDebugEnabled()) {
25
                LOG.debug("cannot split because midkey is the same as first or last row");
26
            }
27
            return null;
28
        }
29
        return mk.getRow();
30
    }
31
    return null;
32
}
    

    HBase还规定,如果定位到的rowkey是整个文件的首个rowkey或者最后一个rowkey的话,就认为没有切分点。

    什么情况下会出现没有切分点的场景呢?最常见的就是一个文件只有一个block,执行split的时候就会发现无法切分。很多新同学在测试split的时候往往都是新建一张新表,然后往新表中插入几条数据并执行一下flush,再执行split,奇迹般地发现数据表并没有真正执行切分。原因就在这里,这个时候仔细的话你翻看debug日志是可以看到这样的日志滴:


    再回到之前CompactSplitThread的requestSplit方法内:
1
/*
2
   * The User parameter allows the split thread to assume the correct user identity
3
   */
4
public synchronized void requestSplit(final HRegion r, byte[] midKey, User user) {
5
    if (midKey == null) {
6
        LOG.debug("Region " + r.getRegionNameAsString() +
7
                  " not splittable because midkey=null");
8
        if (r.shouldForceSplit()) {
9
            r.clearSplit();
10
        }
11
        return;
12
    }
13
    try {
14
        //线程池中调用SplitRequest类的doSplitting方法
15
        this.splits.execute(new SplitRequest(r, midKey, this.server, user));
16
        if (LOG.isDebugEnabled()) {
17
            LOG.debug("Split requested for " + r + ".  " + this);
18
        }
19
    } catch (RejectedExecutionException ree) {
20
        LOG.info("Could not execute split for " + r, ree);
21
    }
22
}
    下面进入到SplitRequest的soSplitting方法内,此方法中主要有两个阶段prepare方法和execute方法,在这之前会把midkey赋值给SplitTransaction类的splitrow变量,这个变量会在后面创建reference files的时候用到,用于比较store
1
private void doSplitting(User user) {
2
    boolean success = false;
3
    server.getMetrics().incrSplitRequest();
4
    long startTime = EnvironmentEdgeManager.currentTimeMillis();
5
    SplitTransaction st = new SplitTransaction(parent, midKey);
6
    try {
7
      //acquire a shared read lock on the table, so that table schema modifications
8
      //do not happen concurrently
9
      tableLock = server.getTableLockManager().readLock(parent.getTableDesc().getTableName()
10
          , "SPLIT_REGION:" + parent.getRegionNameAsString());
11
      try {
12
        tableLock.acquire();
13
      } catch (IOException ex) {
14
        tableLock = null;
15
        throw ex;
16
      }
17
18
      // If prepare does not return true, for some reason -- logged inside in
19
      // the prepare call -- we are not ready to split just now. Just return.
20
      if (!st.prepare()) return;
21
      try {
22
        st.execute(this.server, this.server, user);
23
        success = true;
24
      } catch (Exception e) {
25
          ...
26
    } catch (IOException ex) {
27
      LOG.error("Split failed " + this, RemoteExceptionHandler.checkIOException(ex));
28
      server.checkFileSystem();
29
    } finally {
30
      if (this.parent.getCoprocessorHost() != null) {
31
        try {
32
          this.parent.getCoprocessorHost().postCompleteSplit();
33
        } catch (IOException io) {
34
          LOG.error("Split failed " + this,
35
              RemoteExceptionHandler.checkIOException(io));
36
        }
37
      }
38
      if (parent.shouldForceSplit()) {
39
        parent.clearSplit();
40
      }
41
      releaseTableLock();
42
      long endTime = EnvironmentEdgeManager.currentTimeMillis();
43
      // Update regionserver metrics with the split transaction total running time
44
      server.getMetrics().updateSplitTime(endTime - startTime);
45
      if (success) {
46
        ...
47
    }
48
}
    关于SplitTransaction的prepare方法主要是用于初始化两个子region:
1
/**
2
   * Does checks on split inputs.
3
   * @return <code>true</code> if the region is splittable else
4
   * <code>false</code> if it is not (e.g. its already closed, etc.).
5
   */
6
public boolean prepare() {
7
    if (!this.parent.isSplittable()) return false;
8
    // Split key can be null if this region is unsplittable; i.e. has refs.
9
    if (this.splitrow == null) return false;
10
    HRegionInfo hri = this.parent.getRegionInfo();
11
    parent.prepareToSplit();
12
    // Check splitrow.
13
    byte [] startKey = hri.getStartKey();
14
    byte [] endKey = hri.getEndKey();
15
    if (Bytes.equals(startKey, splitrow) ||
16
        !this.parent.getRegionInfo().containsRow(splitrow)) {
17
        LOG.info("Split row is not inside region key range or is equal to " +
18
                 "startkey: " + Bytes.toStringBinary(this.splitrow));
19
        return false;
20
    }
21
    long rid = getDaughterRegionIdTimestamp(hri);
22
    this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
23
    this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
24
    this.journal.add(new JournalEntry(JournalEntryType.PREPARED));
25
    return true;
26
}
    重点在于SplitTransaction的execute方法内部,在执行完createDaughters方法,会执行stepsAfterPONR方法,这个方法内部会执行openDaughters方法,在这个方法中会调用openDaughterRegion方法,然后对子region做一些初始化操作
1
/**
2
   * Run the transaction.
3
   * @param server Hosting server instance.  Can be null when testing (won't try
4
   * and update in zk if a null server)
5
   * @param services Used to online/offline regions.
6
   * @throws IOException If thrown, transaction failed.
7
   *          Call {@link #rollback(Server, RegionServerServices)}
8
   * @return Regions created
9
   * @throws IOException
10
   * @see #rollback(Server, RegionServerServices)
11
   */
12
public PairOfSameType<HRegion> execute(final Server server,
13
                                       final RegionServerServices services, User user)
14
    throws IOException {
15
    useZKForAssignment =
16
        server == null ? true : ConfigUtil.useZKForAssignment(server.getConfiguration());
17
    PairOfSameType<HRegion> regions = createDaughters(server, services, user);
18
    ...
19
    return stepsAfterPONR(server, services, regions, user);
20
}
    进入到SplitTransaction的createDaughters方法内,获取parent region的写锁
1
/* package */PairOfSameType<HRegion> createDaughters(final Server server,
2
      final RegionServerServices services, User user) throws IOException {
3
    ...
4
5
    PairOfSameType<HRegion> daughterRegions = stepsBeforePONR(server, services, testing);
6
7
    final List<Mutation> metaEntries = new ArrayList<Mutation>();
8
    boolean ret = false;
9
    
10
    ...
11
    return daughterRegions;
12
}
       进入到SplitTransaction的stepsBeforePONR方法内,在这个方法内会通过SplitTransaction的createNodeSplitting方法标识父region在splitting,在journal中加入开始split的日志,在zookeeper上创建splitting文件夹。并且通过splitStoreFiles方法创建reference files去引用两个子region中的hfile在父region中的位置。
1
public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
2
      final RegionServerServices services, boolean testing) throws IOException {
3
    // Set ephemeral SPLITTING znode up in zk.  Mocked servers sometimes don't
4
    // have zookeeper so don't do zk stuff if server or zookeeper is null
5
    if (server != null && server.getZooKeeper() != null && useZKForAssignment) {
6
        try {
7
            createNodeSplitting(server.getZooKeeper(), parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
8
        } catch (KeeperException e) {
9
            throw new IOException("Failed creating PENDING_SPLIT znode on " +
10
                                  this.parent.getRegionNameAsString(), e);
11
        }
12
    } else if (services != null && !useZKForAssignment) {
13
        if (!services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT, parent.getRegionInfo(), hri_a, hri_b)) {
14
            throw new IOException("Failed to get ok from master to split " + parent.getRegionNameAsString());
15
        }
16
    }
17
    this.journal.add(new JournalEntry(JournalEntryType.SET_SPLITTING_IN_ZK));
18
    if (server != null && server.getZooKeeper() != null && useZKForAssignment) {
19
        // After creating the split node, wait for master to transition it
20
        // from PENDING_SPLIT to SPLITTING so that we can move on. We want master
21
        // knows about it and won't transition any region which is splitting.
22
        znodeVersion = getZKNode(server, services);
23
    }
24
25
    this.parent.getRegionFileSystem().createSplitsDir();
26
    this.journal.add(new JournalEntry(JournalEntryType.CREATE_SPLIT_DIR));
27
28
    Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
29
    Exception exceptionToThrow = null;
30
    try{
31
        // 获取到父region的所有sotre
32
        hstoreFilesToSplit = this.parent.close(false);
33
    } catch (Exception e) {
34
        exceptionToThrow = e;
35
    }
36
    ...
37
    this.journal.add(new JournalEntry(JournalEntryType.OFFLINED_PARENT));
38
39
    // splitStoreFiles creates daughter region dirs under the parent splits dir
40
    // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
41
    // clean this up.
42
    Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
43
44
    // Log to the journal that we are creating region A, the first daughter
45
    // region.  We could fail halfway through.  If we do, we could have left
46
    // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
47
    // add entry to journal BEFORE rather than AFTER the change.
48
    this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_A_CREATION));
49
    assertReferenceFileCount(expectedReferences.getFirst(),
50
                             this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
51
    HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
52
    assertReferenceFileCount(expectedReferences.getFirst(),
53
            new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
54
55
    // Ditto
56
    this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_B_CREATION));
57
    assertReferenceFileCount(expectedReferences.getSecond(), this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
58
    HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
59
    assertReferenceFileCount(expectedReferences.getSecond(),
60
            new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
61
62
    return new PairOfSameType<HRegion>(a, b);
63
}
    进入splitStoreFiles内部,其中会遍历整个hstoreFilesToSplit,这里面是父region的所有store,这个值是从上一层传过来的:
1
/**
2
   * Creates reference files for top and bottom half of the
3
   * @param hstoreFilesToSplit map of store files to create half file references for.
4
   * @return the number of reference files that were created.
5
   * @throws IOException
6
   */
7
private Pair<Integer, Integer> splitStoreFiles(final Map<byte[], 
8
      List<StoreFile>> hstoreFilesToSplit) throws IOException {
9
    if (hstoreFilesToSplit == null) {
10
        // Could be null because close didn't succeed -- for now consider it fatal
11
        throw new IOException("Close returned empty list of StoreFiles");
12
    }
13
    // The following code sets up a thread pool executor with as many slots as
14
    // there's files to split. It then fires up everything, waits for
15
    // completion and finally checks for any exception
16
    int nbFiles = 0;
17
    for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
18
        nbFiles += entry.getValue().size();
19
    }
20
    if (nbFiles == 0) {
21
        // no file needs to be splitted.
22
        return new Pair<Integer, Integer>(0,0);
23
    }
24
    // Default max #threads to use is the smaller of table's configured number of blocking store
25
    // files or the available number of logical cores.
26
    int defMaxThreads = Math.min(parent.conf.getInt(HStore.BLOCKING_STOREFILES_KEY,
27
                HStore.DEFAULT_BLOCKING_STOREFILE_COUNT),
28
                Runtime.getRuntime().availableProcessors());
29
    // Max #threads is the smaller of the number of storefiles or the default max determined above.
30
    int maxThreads = Math.min(parent.conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX, defMaxThreads), nbFiles);
31
    LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent +
32
             " using " + maxThreads + " threads");
33
    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
34
    builder.setNameFormat("StoreFileSplitter-%1$d");
35
    ThreadFactory factory = builder.build();
36
    ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxThreads, factory);
37
    List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
38
39
    // Split each store file.
40
    for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
41
        for (StoreFile sf: entry.getValue()) {
42
            StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
43
            futures.add(threadPool.submit(sfs));
44
        }
45
    }
46
    // Shutdown the pool
47
    threadPool.shutdown();
48
49
    // Wait for all the tasks to finish
50
    try {
51
        boolean stillRunning = !threadPool.awaitTermination(this.fileSplitTimeout, TimeUnit.MILLISECONDS);
52
        if (stillRunning) {
53
            threadPool.shutdownNow();
54
            // wait for the thread to shutdown completely.
55
            while (!threadPool.isTerminated()) {
56
                Thread.sleep(50);
57
            }
58
            throw new IOException("Took too long to split the" + " files and create the references, aborting split");
59
        }
60
    } catch (InterruptedException e) {
61
        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
62
    }
63
64
    int created_a = 0;
65
    int created_b = 0;
66
    // Look for any exception
67
    for (Future<Pair<Path, Path>> future : futures) {
68
        try {
69
            Pair<Path, Path> p = future.get();
70
            created_a += p.getFirst() != null ? 1 : 0;
71
            created_b += p.getSecond() != null ? 1 : 0;
72
        } catch (InterruptedException e) {
73
            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
74
        } catch (ExecutionException e) {
75
            throw new IOException(e);
76
        }
77
    }
78
79
    if (LOG.isDebugEnabled()) {
80
        LOG.debug("Split storefiles for region " + this.parent + " Daugther A: " + created_a
81
                  + " storefiles, Daugther B: " + created_b + " storefiles.");
82
    }
83
    return new Pair<Integer, Integer>(created_a, created_b);
84
}
    回到上一层hstoreFilesToSplit的值的来源,进入this.parent.close(false)方法,最终这个doClose方法在HRegion内部,所有的store来源于HRegion内的 stores成员变量,如下所示:
1
  protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(Bytes.BYTES_RAWCOMPARATOR);
1
private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
2
    throws IOException {
3
    if (isClosed()) {
4
        LOG.warn("Region " + this + " already closed");
5
        return null;
6
    }
7
8
    if (coprocessorHost != null) {
9
        status.setStatus("Running coprocessor pre-close hooks");
10
        this.coprocessorHost.preClose(abort);
11
    }
12
13
    status.setStatus("Disabling compacts and flushes for region");
14
    synchronized (writestate) {
15
        // Disable compacting and flushing by background threads for this
16
        // region.
17
        writestate.writesEnabled = false;
18
        LOG.debug("Closing " + this + ": disabling compactions & flushes");
19
        waitForFlushesAndCompactions();
20
    }
21
    // If we were not just flushing, is it worth doing a preflush...one
22
    // that will clear out of the bulk of the memstore before we put up
23
    // the close flag?
24
    if (!abort && worthPreFlushing()) {
25
        status.setStatus("Pre-flushing region before close");
26
        LOG.info("Running close preflush of " + this.getRegionNameAsString());
27
        try {
28
            internalFlushcache(status);
29
        } catch (IOException ioe) {
30
            // Failed to flush the region. Keep going.
31
            status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
32
        }
33
    }
34
35
    // block waiting for the lock for closing
36
    lock.writeLock().lock();
37
    this.closing.set(true);
38
    status.setStatus("Disabling writes for close");
39
    try {
40
        if (this.isClosed()) {
41
            status.abort("Already got closed by another process");
42
            // SplitTransaction handles the null
43
            return null;
44
        }
45
        LOG.debug("Updates disabled for region " + this);
46
        // Don't flush the cache if we are aborting
47
        if (!abort) {
48
            int flushCount = 0;
49
            while (this.getMemstoreSize().get() > 0) {
50
                try {
51
                    if (flushCount++ > 0) {
52
                        int actualFlushes = flushCount - 1;
53
                        if (actualFlushes > 5) {
54
                            // If we tried 5 times and are unable to clear memory, abort
55
                            // so we do not lose data
56
                            throw new DroppedSnapshotException("Failed clearing memory after " +
57
                                  actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
58
                        }
59
                        LOG.info("Running extra flush, " + actualFlushes + " (carrying snapshot?) " + this);
60
                    }
61
                    internalFlushcache(status);
62
                } catch (IOException ioe) {
63
                    status.setStatus("Failed flush " + this + ", putting online again");
64
                    synchronized (writestate) {
65
                        writestate.writesEnabled = true;
66
                    }
67
                    // Have to throw to upper layers.  I can't abort server from here.
68
                    throw ioe;
69
                }
70
            }
71
        }
72
73
        Map<byte[], List<StoreFile>> result =
74
            new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
75
        if (!stores.isEmpty()) {
76
            // initialize the thread pool for closing stores in parallel.
77
            ThreadPoolExecutor storeCloserThreadPool =
78
                getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
79
            CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
80
                new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
81
82
            // close each store in parallel
83
            for (final Store store : stores.values()) {
84
                long flushableSize = store.getFlushableSize();
85
                if (!(abort || flushableSize == 0)) {
86
                    getRegionServerServices().abort("Assertion failed while closing store "
87
                         + getRegionInfo().getRegionNameAsString() + " " + store
88
                         + ". flushableSize expected=0, actual= " + flushableSize
89
                         + ". Current memstoreSize=" + getMemstoreSize() + ". Maybe a coprocessor "
90
                         + "operation failed and left the memstore in a partially updated state.", null);
91
                }
92
                completionService.submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
93
                        @Override
94
                        public Pair<byte[], Collection<StoreFile>> call() throws IOException {
95
                            return new Pair<byte[], Collection<StoreFile>>(
96
                                store.getFamily().getName(), store.close());
97
                        }
98
                    });
99
            }
100
            try {
101
                for (int i = 0; i < stores.size(); i++) {
102
                    Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
103
                    Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
104
                    List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
105
                    if (familyFiles == null) {
106
                        familyFiles = new ArrayList<StoreFile>();
107
                        result.put(storeFiles.getFirst(), familyFiles);
108
                    }
109
                    familyFiles.addAll(storeFiles.getSecond());
110
                }
111
            } catch (InterruptedException e) {
112
                throw (InterruptedIOException)new InterruptedIOException().initCause(e);
113
            } catch (ExecutionException e) {
114
                throw new IOException(e.getCause());
115
            } finally {
116
                storeCloserThreadPool.shutdownNow();
117
            }
118
        }
119
        this.closed.set(true);
120
        if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
121
        if (coprocessorHost != null) {
122
            status.setStatus("Running coprocessor post-close hooks");
123
            this.coprocessorHost.postClose(abort);
124
        }
125
        if ( this.metricsRegion != null) {
126
            this.metricsRegion.close();
127
        }
128
        if ( this.metricsRegionWrapper != null) {
129
            Closeables.closeQuietly(this.metricsRegionWrapper);
130
        }
131
        status.markComplete("Closed");
132
        LOG.info("Closed " + this);
133
        return result;
134
    } finally {
135
        lock.writeLock().unlock();
136
    }
137
}
138
    sotres这个成员变量会在初始化regionStores的时候赋值,此方法在HRegion类中:
1
private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
2
    throws IOException, UnsupportedEncodingException {
3
    // Load in all the HStores.
4
5
    long maxSeqId = -1;
6
    // initialized to -1 so that we pick up MemstoreTS from column families
7
    long maxMemstoreTS = -1;
8
9
    if (!htableDescriptor.getFamilies().isEmpty()) {
10
        // initialize the thread pool for opening stores in parallel.
11
        ThreadPoolExecutor storeOpenerThreadPool =
12
            getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
13
        CompletionService<HStore> completionService =
14
            new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
15
16
        // initialize each store in parallel
17
        for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
18
            status.setStatus("Instantiating store for column family " + family);
19
            completionService.submit(new Callable<HStore>() {
20
                @Override
21
                public HStore call() throws IOException {
22
                    return instantiateHStore(family);
23
                }
24
            });
25
        }
26
        boolean allStoresOpened = false;
27
        try {
28
            for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
29
                Future<HStore> future = completionService.take();
30
                HStore store = future.get();
31
                this.stores.put(store.getColumnFamilyName().getBytes(), store);
32
33
                long storeMaxSequenceId = store.getMaxSequenceId();
34
                maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
35
                                     storeMaxSequenceId);
36
                if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
37
                    maxSeqId = storeMaxSequenceId;
38
                }
39
                long maxStoreMemstoreTS = store.getMaxMemstoreTS();
40
                if (maxStoreMemstoreTS > maxMemstoreTS) {
41
                    maxMemstoreTS = maxStoreMemstoreTS;
42
                }
43
            }
44
            allStoresOpened = true;
45
        } catch (InterruptedException e) {
46
            throw (InterruptedIOException)new InterruptedIOException().initCause(e);
47
        } catch (ExecutionException e) {
48
            throw new IOException(e.getCause());
49
        } finally {
50
            storeOpenerThreadPool.shutdownNow();
51
            if (!allStoresOpened) {
52
                // something went wrong, close all opened stores
53
                LOG.error("Could not initialize all stores for the region=" + this);
54
                for (Store store : this.stores.values()) {
55
                    try {
56
                        store.close();
57
                    } catch (IOException e) {
58
                        LOG.warn(e.getMessage());
59
                    }
60
                }
61
            }
62
        }
63
    }
64
    mvcc.initialize(maxMemstoreTS + 1);
65
    // Recover any edits if available.
66
    maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
67
    return maxSeqId;
68
}
    上面就是parent.close()的获取stores的流程,下面进入到splitStoreFile内部,查看如何创建reference files的:
1
private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf)
2
    throws IOException {
3
    if (LOG.isDebugEnabled()) {
4
        LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " +
5
                  this.parent);
6
    }
7
    HRegionFileSystem fs = this.parent.getRegionFileSystem();
8
    String familyName = Bytes.toString(family);
9
    Path path_a = fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,
10
                          this.parent.getSplitPolicy());
11
    Path path_b = fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
12
                          this.parent.getSplitPolicy());
13
    if (LOG.isDebugEnabled()) {
14
        LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " +
15
                  this.parent);
16
    }
17
    return new Pair<Path,Path>(path_a, path_b);
18
}
    在下面的方法中,如果top为true,则splitKey会与storefile中的firstKey做比较,如果大于firstKey则会为第一个子region添加reference file,如果top为false,则splitKey会与storefile中的lastKey做比较,如果小于lastKey则会为第二个子region添加reference file,大部分hifle会分别在a,b中生成一个reference file。
1
/**
2
   * Write out a split reference. Package local so it doesnt leak out of
3
   * regionserver.
4
   * @param hri {@link HRegionInfo} of the destination
5
   * @param familyName Column Family Name
6
   * @param f File to split.
7
   * @param splitRow Split Row
8
   * @param top True if we are referring to the top half of the hfile.
9
   * @param splitPolicy
10
   * @return Path to created reference.
11
   * @throws IOException
12
   */
13
Path splitStoreFile(final HRegionInfo hri, final String familyName, final StoreFile f,
14
                    final byte[] splitRow, final boolean top, RegionSplitPolicy splitPolicy) throws IOException {
15
16
    if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {
17
        // Check whether the split row lies in the range of the store file
18
        // If it is outside the range, return directly.
19
        try {
20
            if (top) {
21
                //check if larger than last key.
22
                KeyValue splitKey = KeyValue.createFirstOnRow(splitRow);
23
                byte[] lastKey = f.createReader().getLastKey();
24
                // If lastKey is null means storefile is empty.
25
                if (lastKey == null) return null;
26
                if (f.getReader().getComparator().compareFlatKey(splitKey.getBuffer(),
27
                       splitKey.getKeyOffset(), splitKey.getKeyLength(), lastKey, 0, lastKey.length) > 0) {
28
                    return null;
29
                }
30
            } else {
31
                //check if smaller than first key
32
                KeyValue splitKey = KeyValue.createLastOnRow(splitRow);
33
                byte[] firstKey = f.createReader().getFirstKey();
34
                // If firstKey is null means storefile is empty.
35
                if (firstKey == null) return null;
36
                if (f.getReader().getComparator().compareFlatKey(splitKey.getBuffer(),
37
                        splitKey.getKeyOffset(), splitKey.getKeyLength(), firstKey, 0, firstKey.length) < 0) {
38
                    return null;
39
                }
40
            }
41
        } finally {
42
            f.closeReader(f.getCacheConf() != null ? f.getCacheConf().shouldEvictOnClose() : true);
43
        }
44
    }
45
46
    Path splitDir = new Path(getSplitsDir(hri), familyName);
47
    // A reference to the bottom half of the hsf store file.
48
    Reference r =
49
        top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);
50
    // Add the referred-to regions name as a dot separated suffix.
51
    // See REF_NAME_REGEX regex above.  The referred-to regions name is
52
    // up in the path of the passed in <code>f</code> -- parentdir is family,
53
    // then the directory above is the region name.
54
    String parentRegionName = regionInfo.getEncodedName();
55
    // Write reference with same file id only with the other region name as
56
    // suffix and into the new region location (under same family).
57
    Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
58
    return r.write(fs, p);
59
}
    下面再进入createDaughterRegionFromSplits方法内部,会将reference files移到相应的子region目录下去,然后创建子region实例并返回:
1
/**
2
   * Create a daughter region from given a temp directory with the region data.
3
   * @param hri Spec. for daughter region to open.
4
   * @throws IOException
5
   */
6
HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
7
    // Move the files from the temporary .splits to the final /table/region directory
8
    fs.commitDaughterRegion(hri);
9
10
    // Create the daughter HRegion instance
11
    HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
12
                                   this.getBaseConf(), hri, this.getTableDesc(), rsServices);
13
    r.readRequestsCount.set(this.getReadRequestsCount() / 2);
14
    r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
15
    return r;
16
}
    以上便是split一个region的主要过程,整个流程图如下:

最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台