Impala实践之三:详解invalidate metadata
前言
这次主要是想通过源码找到invalidate metadata执行的原理,由于不太懂java和c++的互调的细节,目前只能查找到如下阶段,后面会专门看一下java的jni以及thrift的原理。
分析
目前主要定位到三个类,com.cloudera.impala.service.JniCatalog ,com.cloudera.impala.service.CatalogOpExecutor和com.cloudera.impala.catalog.CatalogServiceCatalog。
大致的过程就是在JniCatalog中调用了CatalogOpExecutor的execResetMetadata()方法,然后再判断参数,如果没有表参数的时候会执行CatalogServiceCatalog的reset方法,就完成了一次invalidate metadata操作。
com.cloudera.impala.service.JniCatalog
public class JniCatalog { private final CatalogOpExecutor catalogOpExecutor_; public byte[] resetMetadata(byte[] thriftResetMetadataReq) throws ImpalaException, TException { TResetMetadataRequest req = new TResetMetadataRequest(); JniUtil.deserializeThrift(protocolFactory_, req, thriftResetMetadataReq); TSerializer serializer = new TSerializer(protocolFactory_); return serializer.serialize(catalogOpExecutor_.execResetMetadata(req)); }}com.cloudera.impala.service.CatalogOpExecutor
这个方法的最后一部分有一句注释”Invalidate the entire catalog if no table name is provided.”,可以看出,当使用invalidate metadata没有带参数的时候,会调用CatalogServiceCatalo的reset方法。
public class CatalogOpExecutor { //这里新建了CatalogServiceCatalog对象 private final CatalogServiceCatalog catalog_;/** * Executes a TResetMetadataRequest and returns the result as a * TResetMetadataResponse. Based on the request parameters, this operation * may do one of three things: * 1) invalidate the entire catalog, forcing the metadata for all catalog * objects to be reloaded. * 2) invalidate a specific table, forcing the metadata to be reloaded * on the next access. * 3) perform a synchronous incremental refresh of a specific table. * * For details on the specific commands see comments on their respective * methods in CatalogServiceCatalog.java. */ public TResetMetadataResponse execResetMetadata(TResetMetadataRequest req) throws CatalogException { TResetMetadataResponse resp = new TResetMetadataResponse(); resp.setResult(new TCatalogUpdateResult()); resp.getResult().setCatalog_service_id(JniCatalog.getServiceId()); if (req.isSetTable_name()) { // Tracks any CatalogObjects updated/added/removed as a result of // the invalidate metadata or refresh call. For refresh() it is only expected // that a table be modified, but for invalidateTable() the table's parent database // may have also been added if it did not previously exist in the catalog. Pair<Db, Table> modifiedObjects = new Pair<Db, Table>(null, null); boolean wasRemoved = false; if (req.isIs_refresh()) { TableName tblName = TableName.fromThrift(req.getTable_name()); Table tbl = getExistingTable(tblName.getDb(), tblName.getTbl()); if (tbl == null) { modifiedObjects.second = null; } else { modifiedObjects.second = catalog_.reloadTable(tbl); } } else { wasRemoved = catalog_.invalidateTable(req.getTable_name(), modifiedObjects); } if (modifiedObjects.first == null) { TCatalogObject thriftTable = TableToTCatalogObject(modifiedObjects.second); if (modifiedObjects.second != null) { // Return the TCatalogObject in the result to indicate this request can be // processed as a direct DDL operation. if (wasRemoved) { resp.getResult().setRemoved_catalog_object_DEPRECATED(thriftTable); } else { resp.getResult().setUpdated_catalog_object_DEPRECATED(thriftTable); } } else { // Table does not exist in the meta store and Impala catalog, throw error. throw new TableNotFoundException("Table not found: " + req.getTable_name().getDb_name() + "." + req.getTable_name().getTable_name()); } resp.getResult().setVersion(thriftTable.getCatalog_version()); } else { // If there were two catalog objects modified it indicates there was an // "invalidateTable()" call that added a new table AND database to the catalog. Preconditions.checkState(!req.isIs_refresh()); Preconditions.checkNotNull(modifiedObjects.first); Preconditions.checkNotNull(modifiedObjects.second); // The database should always have a lower catalog version than the table because // it needs to be created before the table can be added. Preconditions.checkState(modifiedObjects.first.getCatalogVersion() < modifiedObjects.second.getCatalogVersion()); // Since multiple catalog objects were modified, don't treat this as a direct DDL // operation. Just set the overall catalog version and the impalad will wait for // a statestore heartbeat that contains the update. resp.getResult().setVersion(modifiedObjects.second.getCatalogVersion()); } } else { // Invalidate the entire catalog if no table name is provided. Preconditions.checkArgument(!req.isIs_refresh()); //reset元数据信息 catalog_.reset(); resp.result.setVersion(catalog_.getCatalogVersion()); } resp.getResult().setStatus( new TStatus(TErrorCode.OK, new ArrayList<String>())); return resp; }}com.cloudera.impala.catalog.CatalogServiceCatalog
根据猜测,reset方法中dbCache_应该是impalad本地的缓存,可以看到,程序先new了一下新的newDbCache,ConcurrentHashMap<String, Db> newDbCache = new ConcurrentHashMap<String, Db>();。然后在for循环中获取所有的元数据信息并put到newDbCache中,newDbCache.put(db.getName().toLowerCase(), db);。最后会把newDbCache的内容set到dbCache_中。
更新元数据的方式也是通过HiveClient来获取所有的信息。
比较好奇的地方是,在这里居然没有发现和分区相关的地方,所有的partition信息是通过Catalog服务自动同步的?
public class CatalogServiceCatalog extends Catalog {/** * Resets this catalog instance by clearing all cached table and database metadata. */ public void reset() throws CatalogException { // First update the policy metadata. if (sentryProxy_ != null) { // Sentry Service is enabled. try { // Update the authorization policy, waiting for the result to complete. sentryProxy_.refresh(); } catch (Exception e) { throw new CatalogException("Error updating authorization policy: ", e); } } catalogLock_.writeLock().lock(); try { nextTableId_.set(0); // Not all Java UDFs are persisted to the metastore. The ones which aren't // should be restored once the catalog has been invalidated. Map<String, Db> oldDbCache = dbCache_.get(); // Build a new DB cache, populate it, and replace the existing cache in one // step. ConcurrentHashMap<String, Db> newDbCache = new ConcurrentHashMap<String, Db>(); List<TTableName> tblsToBackgroundLoad = Lists.newArrayList(); MetaStoreClient msClient = metaStoreClientPool_.getClient(); try { for (String dbName: msClient.getHiveClient().getAllDatabases()) { List<org.apache.hadoop.hive.metastore.api.Function> javaFns = Lists.newArrayList(); for (String javaFn: msClient.getHiveClient().getFunctions(dbName, "*")) { javaFns.add(msClient.getHiveClient().getFunction(dbName, javaFn)); } org.apache.hadoop.hive.metastore.api.Database msDb = msClient.getHiveClient().getDatabase(dbName); Db db = new Db(dbName, this, msDb); // Restore UDFs that aren't persisted. Db oldDb = oldDbCache.get(db.getName().toLowerCase()); if (oldDb != null) { for (Function fn: oldDb.getTransientFunctions()) { db.addFunction(fn); fn.setCatalogVersion(incrementAndGetCatalogVersion()); } } loadFunctionsFromDbParams(db, msDb); loadJavaFunctions(db, javaFns); db.setCatalogVersion(incrementAndGetCatalogVersion()); newDbCache.put(db.getName().toLowerCase(), db); for (String tableName: msClient.getHiveClient().getAllTables(dbName)) { Table incompleteTbl = IncompleteTable.createUninitializedTable( getNextTableId(), db, tableName); incompleteTbl.setCatalogVersion(incrementAndGetCatalogVersion()); db.addTable(incompleteTbl); if (loadInBackground_) { tblsToBackgroundLoad.add( new TTableName(dbName.toLowerCase(), tableName.toLowerCase())); } } } } finally { msClient.release(); } dbCache_.set(newDbCache); // Submit tables for background loading. for (TTableName tblName: tblsToBackgroundLoad) { tableLoadingMgr_.backgroundLoad(tblName); } } catch (Exception e) { LOG.error(e); throw new CatalogException("Error initializing Catalog. Catalog may be empty.", e); } finally { catalogLock_.writeLock().unlock(); }