从Hive源码解读大数据开发为什么可以脱离SQL、Java、Scala

从Hive源码解读大数据开发为什么可以脱离SQL、Java、Scala

前言

【本文适合有一定计算机基础/半年工作经验的读者食用。立个Flg,愿天下不再有肤浅的SQL Boy】

谈到大数据开发,占据绝大多数人口的就是SQL Boy,不接受反驳,毕竟大数据主要就是为机器学习和统计报表服务的,自然从Oracle数据库开发转过来并且还是只会写几句SQL的人不在少数,个别会Python写个spark.sql(“一个sql字符串”)的已经是SQL Boy中的人才。这种只能处理结构化表的最基础的大数据开发人员,就是我们常提到的梗:肤浅的SQL Boy。。。对大数据完全不懂,思想还停留在数据库时代,大数据组件也都是拿来当RDBMS来用。。。这种业务开发人员的技术水平其实不敢恭维。

还有从Java后端开发转过来的,虽然不适应,但还是可以一个Main方法流畅地操作Spark、Flink,手写个JDBC,做点简单的二开,这种就是平台开发人员,技术水平要更高一些。Java写得好,Scala其实上手也快。

但是。。。这并不代表做大数据只能用SQL/Java/Scala。。。这么局限的话,也不比SQL Boy强到哪里去。

笔者最早还搞过嵌入式开发,自然明白C/C#/C++也可以搞大数据。。。

本文将以大数据开发中最常见的数仓组件Hive的drop table为例,抛砖引玉,解读为神马大数据开发可以脱离SQL、Java、Scala。

为神马可以脱离SQL

数据不外乎结构化数据和非结构化数据,SQL只能处理极其有限的结构化表【RDBMS、整齐的csv/tsv等】,绝大多数的半结构化、非结构化数据SQL是无能为力的【log日志文件、音图等】。古代的MapReduce本身就不可以用SQL,Spark和Flink老版本都是基于API的,没有SQL的年代大家也活得好好的。大数据组件对SQL的支持日渐友好都是后来的事情,主要是为了降低门槛,让SQL Boy也可以用上大数据技术。

肤浅的SQL Boy们当然只知道:

drop table db_name.tb_name;

正常情况这个Hive表就会被drop掉,认知也就局限于Hive是个数据库。

但是大数据平台开发知道去翻看Hive的Java API:

https://svn.apache.org/repos/infra/websites/production/hive/content/javadocs/r3.1.3/api/index.html

知道还有这种方式:

package com.zhiyong;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;

/**
 * @program: zhiyong_study
 * @description: 测试MetaStore
 * @author: zhiyong
 * @create: 2023-03-22 22:57
 **/
public class MetaStoreDemo {
    public static void main(String[] args) throws Exception{
        HiveConf hiveConf = new HiveConf();
        HiveMetaStoreClient client = new HiveMetaStoreClient(hiveConf);
        client.dropTable("db_name","tb_name");
    }
}

通过调用API的方式,同样可以drop掉表。显然不一定要用DDL。通过HiveMetaStoreClient的方式,还可以create建表等操作。

懂大数据底层的平台开发当然还有更狠的方式:直接连Hive存元数据的MySQL,对元数据表的数据做精准crud。。。

对结构化表的ETL或者其它的运算处理完全可以用Spark的DataFrame、Flink的DataStream编程,纯API方式实现,SQL能实现的Java和Scala都能实现,至于SQL实现不了的Java和Scala也能实现。。。

笔者实在是想不到除了RDBMS和各类包皮产品【在开源的Apache组件基础上做一些封装】,还有哪些场景是只能用SQL的。。。

至此,可以说明大数据可以脱离SQL。

为神马可以脱离Java

虽然Hive底层是Java写的,但是这并不意味着只能用Java操作Hive。认知这么肤浅的话,也就活该一辈子调参调API了。。。

找到dropTable的实际入口

从Hive3.1.2源码,可以找到dropTable方法:

@Override
  public void dropTable(String dbname, String name, boolean deleteData,
      boolean ignoreUnknownTab) throws MetaException, TException,
      NoSuchObjectException, UnsupportedOperationException {
    dropTable(getDefaultCatalog(conf), dbname, name, deleteData, ignoreUnknownTab, null);
  }

  @Override
  public void dropTable(String dbname, String name, boolean deleteData,
      boolean ignoreUnknownTab, boolean ifPurge) throws TException {
    dropTable(getDefaultCatalog(conf), dbname, name, deleteData, ignoreUnknownTab, ifPurge);
  }

  @Override
  public void dropTable(String dbname, String name) throws TException {
    dropTable(getDefaultCatalog(conf), dbname, name, true, true, null);
  }

  @Override
  public void dropTable(String catName, String dbName, String tableName, boolean deleteData,
                        boolean ignoreUnknownTable, boolean ifPurge) throws TException {
    //build new environmentContext with ifPurge;
    EnvironmentContext envContext = null;
    if(ifPurge){
      Map<String, String> warehouseOptions;
      warehouseOptions = new HashMap<>();
      warehouseOptions.put("ifPurge", "TRUE");
      envContext = new EnvironmentContext(warehouseOptions);
    }
    dropTable(catName, dbName, tableName, deleteData, ignoreUnknownTable, envContext);

  }

虽然有多个同名方法,但是底层调用的还是同一个方法:

  /**
   * Drop the table and choose whether to: delete the underlying table data;
   * throw if the table doesn't exist; save the data in the trash.
   *
   * @param catName catalog name
   * @param dbname database name
   * @param name table name
   * @param deleteData
   *          delete the underlying data or just delete the table in metadata
   * @param ignoreUnknownTab
   *          don't throw if the requested table doesn't exist
   * @param envContext
   *          for communicating with thrift
   * @throws MetaException
   *           could not drop table properly
   * @throws NoSuchObjectException
   *           the table wasn't found
   * @throws TException
   *           a thrift communication error occurred
   * @throws UnsupportedOperationException
   *           dropping an index table is not allowed
   * @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#drop_table(java.lang.String,
   *      java.lang.String, boolean)
   */
  public void dropTable(String catName, String dbname, String name, boolean deleteData,
      boolean ignoreUnknownTab, EnvironmentContext envContext) throws MetaException, TException,
      NoSuchObjectException, UnsupportedOperationException {
    Table tbl;
    try {
      tbl = getTable(catName, dbname, name);
    } catch (NoSuchObjectException e) {
      if (!ignoreUnknownTab) {
        throw e;
      }
      return;
    }
    HiveMetaHook hook = getHook(tbl);
    if (hook != null) {
      hook.preDropTable(tbl);
    }
    boolean success = false;
    try {
      drop_table_with_environment_context(catName, dbname, name, deleteData, envContext);
      if (hook != null) {
        hook.commitDropTable(tbl, deleteData || (envContext != null && "TRUE".equals(envContext.getProperties().get("ifPurge"))));
      }
      success=true;
    } catch (NoSuchObjectException e) {
      if (!ignoreUnknownTab) {
        throw e;
      }
    } finally {
      if (!success && (hook != null)) {
        hook.rollbackDropTable(tbl);
      }
    }
  }

主要就是获取了表对象,然后做了preDropTable预提交和commitDropTable实际的提交。这种2PC方式表面上还是很严谨。。。

可以发现HiveMetaHook这其实是个接口:

package org.apache.hadoop.hive.metastore;

/**
 * HiveMetaHook defines notification methods which are invoked as part
 * of transactions against the metastore, allowing external catalogs
 * such as HBase to be kept in sync with Hive's metastore.
 *
 *<p>
 *
 * Implementations can use {@link MetaStoreUtils#isExternalTable} to
 * distinguish external tables from managed tables.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface HiveMetaHook {

  public String ALTER_TABLE_OPERATION_TYPE = "alterTableOpType";

  public List<String> allowedAlterTypes = ImmutableList.of("ADDPROPS", "DROPPROPS");

  /**
   * Called before a table definition is removed from the metastore
   * during DROP TABLE.
   *
   * @param table table definition
   */
  public void preDropTable(Table table)
    throws MetaException;

  /**
   * Called after failure removing a table definition from the metastore
   * during DROP TABLE.
   *
   * @param table table definition
   */
  public void rollbackDropTable(Table table)
    throws MetaException;

  /**
   * Called after successfully removing a table definition from the metastore
   * during DROP TABLE.
   *
   * @param table table definition
   *
   * @param deleteData whether to delete data as well; this should typically
   * be ignored in the case of an external table
   */
  public void commitDropTable(Table table, boolean deleteData)
    throws MetaException;
}

继承关系:

在这里插入图片描述

显然不是这个:

package org.apache.hadoop.hive.metastore;

public abstract class DefaultHiveMetaHook implements HiveMetaHook {
  /**
   * Called after successfully INSERT [OVERWRITE] statement is executed.
   * @param table table definition
   * @param overwrite true if it is INSERT OVERWRITE
   *
   * @throws MetaException
   */
  public abstract void commitInsertTable(Table table, boolean overwrite) throws MetaException;

  /**
   * called before commit insert method is called
   * @param table table definition
   * @param overwrite true if it is INSERT OVERWRITE
   *
   * @throws MetaException
   */
  public abstract void preInsertTable(Table table, boolean overwrite) throws MetaException;

  /**
   * called in case pre commit or commit insert fail.
   * @param table table definition
   * @param overwrite true if it is INSERT OVERWRITE
   *
   * @throws MetaException
   */
  public abstract void rollbackInsertTable(Table table, boolean overwrite) throws MetaException;
}

更不可能是这个test的Mock类:

/**
 * Mock class used for unit testing.
 * {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2#testLockingOnInsertIntoNonNativeTables()}
 */
public class StorageHandlerMock extends DefaultStorageHandler {
 
}

所以是AccumuloStorageHandler这个类:

package org.apache.hadoop.hive.accumulo;

/**
 * Create table mapping to Accumulo for Hive. Handle predicate pushdown if necessary.
 */
public class AccumuloStorageHandler extends DefaultStorageHandler implements HiveMetaHook,
    HiveStoragePredicateHandler {
    }

但是:

  @Override
  public void preDropTable(Table table) throws MetaException {
    // do nothing
  }

这个do nothing!!!一言难尽。这种2PC方式表面上确实很严谨。。。

所以dropTable的入口是:

  @Override
  public void commitDropTable(Table table, boolean deleteData) throws MetaException {
    String tblName = getTableName(table);
    if (!isExternalTable(table)) {
      try {
        if (deleteData) {
          TableOperations tblOpts = connectionParams.getConnector().tableOperations();
          if (tblOpts.exists(tblName)) {
            tblOpts.delete(tblName);
          }
        }
      } catch (AccumuloException e) {
        throw new MetaException(StringUtils.stringifyException(e));
      } catch (AccumuloSecurityException e) {
        throw new MetaException(StringUtils.stringifyException(e));
      } catch (TableNotFoundException e) {
        throw new MetaException(StringUtils.stringifyException(e));
      }
    }
  }

按照最简单的内部表、需要删数据来看,实际上调用的是这个delete方法。而TableOperations又是个接口:

package org.apache.accumulo.core.client.admin;

/**
 * Provides a class for administering tables
 *
 */

public interface TableOperations {
  /**
   * Delete a table
   *
   * @param tableName
   *          the name of the table
   * @throws AccumuloException
   *           if a general error occurs
   * @throws AccumuloSecurityException
   *           if the user does not have permission
   * @throws TableNotFoundException
   *           if the table does not exist
   */
  void delete(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
}

继承关系简单:

在这里插入图片描述

当然就是这个实现类:

package org.apache.accumulo.core.client.impl;

public class TableOperationsImpl extends TableOperationsHelper {
  @Override
  public void delete(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
    checkArgument(tableName != null, "tableName is null");

    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)));
    Map<String,String> opts = new HashMap<>();

    try {
      doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_DELETE, args, opts);
    } catch (TableExistsException e) {
      // should not happen
      throw new AssertionError(e);
    }

  }
}

所以实际入口是这里的doTableFateOperation方法。枚举体的FateOperation.TABLE_DELETE=2。

找到doTableFateOperation方法的调用栈

跳转到:

  private void doTableFateOperation(String tableOrNamespaceName, Class<? extends Exception> namespaceNotFoundExceptionClass, FateOperation op,
      List<ByteBuffer> args, Map<String,String> opts) throws AccumuloSecurityException, AccumuloException, TableExistsException, TableNotFoundException {
    try {
      doFateOperation(op, args, opts, tableOrNamespaceName);
    } 
  }

继续跳转:

  String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts, String tableOrNamespaceName) throws AccumuloSecurityException,
      TableExistsException, TableNotFoundException, AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
    return doFateOperation(op, args, opts, tableOrNamespaceName, true);
  }

继续跳转:

  String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts, String tableOrNamespaceName, boolean wait)
      throws AccumuloSecurityException, TableExistsException, TableNotFoundException, AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
    Long opid = null;

    try {
      opid = beginFateOperation();
      executeFateOperation(opid, op, args, opts, !wait);
      if (!wait) {
        opid = null;
        return null;
      }
      String ret = waitForFateOperation(opid);
      return ret;
    } catch (ThriftSecurityException e) {
      switch (e.getCode()) {
        case TABLE_DOESNT_EXIST:
          throw new TableNotFoundException(null, tableOrNamespaceName, "Target table does not exist");
        case NAMESPACE_DOESNT_EXIST:
          throw new NamespaceNotFoundException(null, tableOrNamespaceName, "Target namespace does not exist");
        default:
          String tableInfo = Tables.getPrintableTableInfoFromName(context.getInstance(), tableOrNamespaceName);
          throw new AccumuloSecurityException(e.user, e.code, tableInfo, e);
      }
    } catch (ThriftTableOperationException e) {
      switch (e.getType()) {
        case EXISTS:
          throw new TableExistsException(e);
        case NOTFOUND:
          throw new TableNotFoundException(e);
        case NAMESPACE_EXISTS:
          throw new NamespaceExistsException(e);
        case NAMESPACE_NOTFOUND:
          throw new NamespaceNotFoundException(e);
        case OFFLINE:
          throw new TableOfflineException(context.getInstance(), Tables.getTableId(context.getInstance(), tableOrNamespaceName));
        default:
          throw new AccumuloException(e.description, e);
      }
    } catch (Exception e) {
      throw new AccumuloException(e.getMessage(), e);
    } finally {
      Tables.clearCache(context.getInstance());
      // always finish table op, even when exception
      if (opid != null)
        try {
          finishFateOperation(opid);
        } catch (Exception e) {
          log.warn(e.getMessage(), e);
        }
    }
  }

在这里可以发现一些奇怪的现象,居然catch了好多Thrift相关的Exception。继续跳转:

  // This method is for retrying in the case of network failures; anything else it passes to the caller to deal with
  private void executeFateOperation(long opid, FateOperation op, List<ByteBuffer> args, Map<String,String> opts, boolean autoCleanUp)
      throws ThriftSecurityException, TException, ThriftTableOperationException {
    while (true) {
      MasterClientService.Iface client = null;
      try {
        client = MasterClient.getConnectionWithRetry(context);
        client.executeFateOperation(Tracer.traceInfo(), context.rpcCreds(), opid, op, args, opts, autoCleanUp);
        break;
      } catch (TTransportException tte) {
        log.debug("Failed to call executeFateOperation(), retrying ... ", tte);
        UtilWaitThread.sleep(100);
      } finally {
        MasterClient.close(client);
      }
    }
  }

这个死循环里获取了Client对象。但是这个Client一看就没那么简单。。。调用的executeFateOperation方法还不能直接Idea点开,需要手动定位。

分析client对象

package org.apache.accumulo.core.client.impl;

import com.google.common.net.HostAndPort;

public class MasterClient {
  private static final Logger log = LoggerFactory.getLogger(MasterClient.class);

  public static MasterClientService.Client getConnectionWithRetry(ClientContext context) {
    while (true) {

      MasterClientService.Client result = getConnection(context);
      if (result != null)
        return result;
      UtilWaitThread.sleep(250);
    }
  }
}

实际上又是这个:

public static class Client extends FateService.Client implements Iface {
}

所以其父类是:

package org.apache.accumulo.core.master.thrift;

@SuppressWarnings({"unchecked", "serial", "rawtypes", "unused"}) public class FateService {
    public interface Iface {

    public void executeFateOperation(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, long opid, FateOperation op, List<ByteBuffer> arguments, Map<String,String> options, boolean autoClean) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException, org.apache.thrift.TException;

  }
    public void executeFateOperation(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, long opid, FateOperation op, List<ByteBuffer> arguments, Map<String,String> options, boolean autoClean) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException, org.apache.thrift.TException
{
  send_executeFateOperation(tinfo, credentials, opid, op, arguments, options, autoClean);
  recv_executeFateOperation();
}
    
    public static class Client extends org.apache.thrift.TServiceClient implements Iface {
    }
}

所以这种client对象才可以执行executeFateOperation方法。

查看executeFateOperation方法

分为2步,字面意思send_executeFateOperation方法发送了啥,recv_executeFateOperation方法又接收了啥。显然发送消息是需要重点关心的:

public void send_executeFateOperation(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, long opid, FateOperation op, List<ByteBuffer> arguments, Map<String,String> options, boolean autoClean) throws org.apache.thrift.TException
{
  executeFateOperation_args args = new executeFateOperation_args();
  args.setTinfo(tinfo);
  args.setCredentials(credentials);
  args.setOpid(opid);
  args.setOp(op);
  args.setArguments(arguments);
  args.setOptions(options);
  args.setAutoClean(autoClean);
  sendBase("executeFateOperation", args);
}

这个发送的方法把入参的表名、操作类型【Drop表】设置为sendBase方法的入参。

package org.apache.thrift;

/**
 * A TServiceClient is used to communicate with a TService implementation
 * across protocols and transports.
 */
public abstract class TServiceClient {

  protected void sendBase(String methodName, TBase<?,?> args) throws TException {
    sendBase(methodName, args, TMessageType.CALL);
  }


  private void sendBase(String methodName, TBase<?,?> args, byte type) throws TException {
    oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_));
    args.write(oprot_);
    oprot_.writeMessageEnd();
    oprot_.getTransport().flush();
  }

}

其中:

package org.apache.thrift.protocol;

/**
 * Message type constants in the Thrift protocol.
 *
 */
public final class TMessageType {
  public static final byte CALL  = 1;
  public static final byte REPLY = 2;
  public static final byte EXCEPTION = 3;
  public static final byte ONEWAY = 4;
}

这个type传入的其实是1。用于构造方法:

package org.apache.thrift.protocol;

/**
 * Helper class that encapsulates struct metadata.
 *
 */
public final class TMessage {

  public TMessage(String n, byte t, int s) {
    name = n;
    type = t;
    seqid = s;
  }

  public final String name;
  public final byte type;
  public final int seqid;


}

另一个泛型TBase:

package org.apache.thrift;

import java.io.Serializable;

import org.apache.thrift.protocol.TProtocol;

/**
 * Generic base interface for generated Thrift objects.
 *
 */
public interface TBase<T extends TBase<?,?>, F extends TFieldIdEnum> extends Comparable<T>,  Serializable {

  /**
   * Reads the TObject from the given input protocol.
   *
   * @param iprot Input protocol
   */
  public void read(TProtocol iprot) throws TException;

  /**
   * Writes the objects out to the protocol
   *
   * @param oprot Output protocol
   */
  public void write(TProtocol oprot) throws TException;
}

按照注释可以知道write方法是把Java的对象输出给协议。

executeFateOperation_args类:

public static class executeFateOperation_args implements org.apache.thrift.TBase<executeFateOperation_args, executeFateOperation_args._Fields>, java.io.Serializable, Cloneable, Comparable<executeFateOperation_args>   {
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
    }   
}

它的write方法:

package org.apache.thrift.scheme;

import org.apache.thrift.TBase;

public interface IScheme<T extends TBase> {

  public void read(org.apache.thrift.protocol.TProtocol iproto, T struct) throws org.apache.thrift.TException;

  public void write(org.apache.thrift.protocol.TProtocol oproto, T struct) throws org.apache.thrift.TException;

}

又是跳转到接口。。。

在这里插入图片描述

可以看到有2大抽象类。

getScheme拿到的:


package org.apache.thrift.protocol;

import java.nio.ByteBuffer;

import org.apache.thrift.TException;
import org.apache.thrift.scheme.IScheme;
import org.apache.thrift.scheme.StandardScheme;
import org.apache.thrift.transport.TTransport;

/**
 * Protocol interface definition.
 *
 */
public abstract class TProtocol {
  public Class<? extends IScheme> getScheme() {
    return StandardScheme.class;
  }
    
  public abstract void writeMessageBegin(TMessage message) throws TException;
}

显然get到的是StandardScheme类。而writeMessageBegin又是这个抽象类的抽象方法。

该抽象类的继承关系:

在这里插入图片描述

至此可以知道原生支持的协议有这些。最常用的当然就是二进制协议:TBinaryProtocol。

查看TBinaryProtocol二进制协议

package org.apache.thrift.protocol;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;

import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;

/**
 * Binary protocol implementation for thrift.
 *
 */
public class TBinaryProtocol extends TProtocol {
  public void writeMessageBegin(TMessage message) throws TException {
    if (strictWrite_) {
      int version = VERSION_1 | message.type;
      writeI32(version);
      writeString(message.name);
      writeI32(message.seqid);
    } else {
      writeString(message.name);
      writeByte(message.type);
      writeI32(message.seqid);
    }
  }
}

可以看出writeMessageBegin方法就是实际的写数据操作,把消息拆分后写出。

public void writeString(String str) throws TException {
  try {
    byte[] dat = str.getBytes("UTF-8");
    writeI32(dat.length);
    trans_.write(dat, 0, dat.length);
  } catch (UnsupportedEncodingException uex) {
    throw new TException("JVM DOES NOT SUPPORT UTF-8");
  }
}

以此为例。会去把数据作为字节数组写出:

package org.apache.thrift.transport;

import java.io.Closeable;

/**
 * Generic class that encapsulates the I/O layer. This is basically a thin
 * wrapper around the combined functionality of Java input/output streams.
 *
 */
public abstract class TTransport implements Closeable {

  /**
   * Reads up to len bytes into buffer buf, starting at offset off.
   *
   * @param buf Array to read into
   * @param off Index to start reading at
   * @param len Maximum number of bytes to read
   * @return The number of bytes actually read
   * @throws TTransportException if there was an error reading data
   */
  public abstract int read(byte[] buf, int off, int len)
    throws TTransportException;

  /**
   * Writes up to len bytes from the buffer.
   *
   * @param buf The output data buffer
   * @param off The offset to start writing from
   * @param len The number of bytes to write
   * @throws TTransportException if there was an error writing data
   */
  public abstract void write(byte[] buf, int off, int len)
    throws TTransportException;
}

这才是真正的传输对象。其继承关系:

在这里插入图片描述

搞过嵌入式开发的一定很熟悉这个Socket!!!就是IP+port的那个Socket。应用层与TCP/IP传输层间的抽象层。。。

查看TIOStreamTransport传输类

package org.apache.thrift.transport;

/**
 * This is the most commonly used base transport. It takes an InputStream
 * and an OutputStream and uses those to perform all transport operations.
 * This allows for compatibility with all the nice constructs Java already
 * has to provide a variety of types of streams.
 *
 */
public class TIOStreamTransport extends TTransport {

  public int read(byte[] buf, int off, int len) throws TTransportException {
    if (inputStream_ == null) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot read from null inputStream");
    }
    int bytesRead;
    try {
      bytesRead = inputStream_.read(buf, off, len);
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
    if (bytesRead < 0) {
      throw new TTransportException(TTransportException.END_OF_FILE);
    }
    return bytesRead;
  }

  /**
   * Writes to the underlying output stream if not null.
   */
  public void write(byte[] buf, int off, int len) throws TTransportException {
    if (outputStream_ == null) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream");
    }
    try {
      outputStream_.write(buf, off, len);
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
  }

  /**
   * Flushes the underlying output stream if not null.
   */
  public void flush() throws TTransportException {
    if (outputStream_ == null) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot flush null outputStream");
    }
    try {
      outputStream_.flush();
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
  }
}

其子类TSocket重写了IP、Port和init等。

小结Drop表的流程

至此可以得知Java用API操作Hive的原理,大致是这样:

顶层API【dropTable】→表操作实现类【TableOperationsImpl】的删表方法【doTableFateOperation】
→executeFateOperation方法→Client类的实例对象的executeFateOperation方法
→sendBase方法→executeFateOperation_args静态类的实例对象的write方法输出数据给传输协议TProtocol
→传输协议类的write方法具体把数据写出给ThriftServerThriftServer接收到消息后执行对应的操作

最出名的Thrift当然是Hive自己的Hive Server【Standalone】和Hive Server2,还有Spark的Thrift Server,借助它们,可以用JDBC或者Cli的方式去操作Hive。

但是!!!Thrift的初衷就是实现语言无关,毕竟底层只需要能把数据传输到位即可,数据传输并不是Java的特权。

其它语言的Thrift

在这里插入图片描述

service-rpc这个路径下,可以发现有cpp、Java、php、py,rb的包!!!

Hive的官方文档写的很明白:

https://cwiki.apache.org/confluence/display/Hive/HiveClient#HiveClient-ThriftJavaClient

The command line client currently only supports an embedded server. The JDBC and Thrift-Java clients support both embedded and standalone servers. Clients in other languages only support standalone servers.

命令行模式目前只能用于嵌入式服务,JDBC和Thrift-Java的Client可以支持嵌入式和独立部署的服务。别的语言的Client只支持在独立部署的服务使用。

Connection con = DriverManager.getConnection("jdbc:hive://localhost:10000/default", "", "");
Statement stmt = con.createStatement();

这种古代的Hive Server就是嵌入模式。。。

Connection con = DriverManager.getConnection("jdbc:hive2://localhost:10000/default", "", "");

这种Hive Server2就是独立部署模式。

官方还给出了python的案例:

#!/usr/bin/env python
 
import sys
 
from hive import ThriftHive
from hive.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
 
try:
    transport = TSocket.TSocket('localhost', 10000)
    transport = TTransport.TBufferedTransport(transport)
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
 
    client = ThriftHive.Client(protocol)
    transport.open()
 
    client.execute("CREATE TABLE r(a STRING, b INT, c DOUBLE)")
    client.execute("LOAD TABLE LOCAL INPATH '/path' INTO TABLE r")
    client.execute("SELECT * FROM r")
    while (1):
      row = client.fetchOne()
      if (row == None):
        break
      print row
    client.execute("SELECT * FROM r")
    print client.fetchAll()
 
    transport.close()
 
except Thrift.TException, tx:
    print '%s' % (tx.message)

以及PHP的案例:

<?php
// set THRIFT_ROOT to php directory of the hive distribution
$GLOBALS['THRIFT_ROOT'] = '/lib/php/';
// load the required files for connecting to Hive
require_once $GLOBALS['THRIFT_ROOT'] . 'packages/hive_service/ThriftHive.php';
require_once $GLOBALS['THRIFT_ROOT'] . 'transport/TSocket.php';
require_once $GLOBALS['THRIFT_ROOT'] . 'protocol/TBinaryProtocol.php';
// Set up the transport/protocol/client
$transport = new TSocket('localhost', 10000);
$protocol = new TBinaryProtocol($transport);
$client = new ThriftHiveClient($protocol);
$transport->open();
 
// run queries, metadata calls etc
$client->execute('SELECT * from src');
var_dump($client->fetchAll());
$transport->close();

Ruby好歹也给了个参考: https://github.com/forward3d/rbhive

至于Java、C++就不给Client的案例了。。。也是很容易理解。。。毕竟Java有JDBC和高层API,一般不会有人去用底层API了。

如果是做平台开发或者组件开发这种真正用得上底层API的情况,地方支援中央发型的老Java程序猿,查API填参数让程序跑起来,这点工程能力还是有的。

至于C++程序猿强悍的造轮子功力,没准像临摹Kafka的Red Panda那样,哪天也照猫画虎折腾出个C++版的Hive。。。

既然可以通过Thrift实现语言无关,那么调用组件就不必局限于Java或者Scala。而造轮子从来也不是Java和Scala的专利。

这就是为神马大数据开发可以脱离Java和Scala。

尾言

大数据并不是趋向SQL化,只是为了扩大受众群体,让广大技术水平不高的业务开发人员也能吃上大数据技术的红利。且SQL在处理结构化表的特定场景下开发效率更高。
但是。。。哪怕是这种极度细分的场景,SQL还是有很多缺陷,虽然API的方式也没有好到哪里去。

造轮子和组件调用,就更是语言无关的事情了。。。编程语言往往只是个表达思想的载体,技术栈足够全面才有做选择的权力。

转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/129742904
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/12168.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

开发者笑疯了! LLaMa惊天泄露引爆ChatGPT平替狂潮,开源LLM领域变天

来源: 新智源 微信号&#xff1a;AI-era Meta的LLaMA模型开源&#xff0c;让文本大模型迎来了Stable Diffustion时刻。谁都没想 谁能想到&#xff0c;一次意外的LLaMA泄漏&#xff0c;竟点燃了开源LLM领域最大的创新火花。 一系列表现出色的ChatGPT开源替代品——「羊驼家族」…

全网最详细,Jmeter性能测试-性能基础详解,接口关联与编写Java脚本(三)

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 接口关联 接口关联…

水塘抽样解决随机选择问题

1.简介 水塘抽样是一系列的随机算法&#xff0c;其目的在于从包含n个项目的集合S中选取k个样本&#xff0c;其中n为一很大或未知的数量&#xff0c;尤其适用于不能把所有n个项目都存放到内存的情况。最常见例子为Jeffrey Vitter在其论文中所提及的算法R。 2.算法步骤&#xff1…

机器学习算法系列(三)

机器学习算法之–对数几率回归&#xff08;逻辑斯蒂回归&#xff09;算法 上个算法&#xff08;算法系列二&#xff09;介绍了如何使用线性模型进行回归学习&#xff0c;但若要做的是分类任务&#xff0c;则需要找一个单调可微函数将分类任务的真实标记y与线性回归模型的预测值…

一次etcd变更引发的惨案

问题描述 在做etcd的数据变更时候&#xff0c;etcd在组成集群的时候出现leader不断切换问题&#xff0c;导致集群不稳定&#xff0c;都面将不健康的etcd节点踢出&#xff0c;只剩etcd单节点&#xff0c;后面将踢出的etcd节点重新加入现有etcd&#xff0c;导致etcd集群奔溃&…

【故障诊断】基于 KPCA 进行降维、故障检测和故障诊断研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

快速搭建第一个SpringCloud程序

目录 1、Spring Boot项目脚手架快速搭建 1.1 生成工程基本配置 1.2 生成工程。 1.3 导入开发工具&#xff08;此处为Idea&#xff09; 1.4 运行代码 1.5 验证是否能访问 2、Spring Cloud环境搭建 2.1 版本匹配问题 2.2 Spring Cloud环境测试 3、引入Eureka Server 3…

运行时内存数据区之虚拟机栈——局部变量表

这篇内容十分重要,文字也很多,仔细阅读后,你必定有所收获! 基本内容 与程序计数器一样&#xff0c;Java虚拟机栈&#xff08;Java Virtual Machine Stack&#xff09;也是线程私有的&#xff0c;它的生命周期与线程相同。虚拟机栈描述的是Java方法执行的线程内存模型&#xf…

【从零开始学Skynet】基础篇(六):MySql数据库安装操作

游戏服务端的另一项重要功能是保存玩家数据&#xff0c;Skynet提供了操作MySQL数据库、MongoDB数据库的模块。1、数据库安装 首先安装Mysql服务器&#xff0c;打开终端输入如下指令&#xff1a; sudo apt-get install mysql-server 按下回车&#xff0c;输入密码后开始安装&a…

项目1实现login登录功能方案设计第三版

需求优化点:MySQL表常用功能模块实现方案index页面home页面需求 实现一个登录功能 实现的功能 注册(邮箱注册)登录(邮箱密码)重置密码查看操作记录(登录, 注册, 重置密码, 登出. 都算操作)登出在第2版的基础上进行优化:\ 优化点: VerificationCode(验证码储存库): 增加时间字段…

青藤首提“业安融合”理念,正式发布先进云安全方案CNAPP

4月18日&#xff0c;以“云时代&#xff0c;安全变了”为主题的2023年云安全高峰论坛在北京举行。会上&#xff0c;青藤首次提出“业安融合”理念&#xff0c;正式发布先进云安全方案CNAPP。 中国全面进入云和数字化时代 当前&#xff0c;全球已进入数字经济时代&#xff0c;…

前端自动化测试之葵花宝典

首先聊一下概念&#xff0c;Web 前端自动化测试是一种通过编写代码来自动化执行 Web 应用程序的测试任务的方法&#xff0c;它通常使用 JavaScript 和测试框架 (如 Selenium、Appium 等) 来实现。 Web 前端自动化测试的优点是可以提高测试效率、减少测试时间和测试成本&#x…

工业机器人远程监控解决方案

一、项目背景 随着我国科技不断进步发展和产业升级的不断进行&#xff0c;现阶段机器人应用在生产制造行业以及运输行业已经变得越来越广泛。工业机器人机构复杂、维护成本高&#xff0c;机器人应用的这一行业现状&#xff0c;对工业机器人生产企业的产品高品质服务能力提出了…

Mac远程控制工具有哪些

适用于Mac的远程控制工具有很多&#xff0c;这里我们给大家列举五个常用软件。 1、Apple Remote Desktop 苹果自带远程桌面正如其名称所承诺的那样。作为 Apple 出品的应用程序&#xff0c;您可以想象它的配置和上手是多么容易。从 App Store 下载 Apple Remote Desktop 后&a…

数据结构初阶(算法的复杂度 + 包装类 + 泛型)

文章目录一、算法复杂度1. 算法效率2. 时间复杂度&#xff08;1&#xff09; O的渐进表示法3. 空间复杂度二、包装2.1 为什么会出现包装2.2 分类2.3 装箱和拆箱&#xff08;1&#xff09;装箱/装包&#xff08;2&#xff09;拆箱/拆箱三、泛型3.1 泛型的基本概念3.2 泛型的使用…

【Elastic (ELK) Stack 实战教程】10、ELK 架构升级-引入消息队列 Redis、Kafka

目录 一、ELK 架构面临的问题 1.1 耦合度过高 1.2 性能瓶颈 二、ELK 对接 Redis 实践 2.1 配置 Redis 2.1.1 安装 Redis 2.1.2 配置 Redis 2.1.3 启动 Redis 2.2 配置 Filebeat 2.3 配置 Logstash 2.4 数据消费 2.5 配置 kibana 三、消息队列基本概述 3.1 什么是…

Spring Cloud Gateway: 网关

文章目录 网关Hello world路由: Route谓词: Predicate过滤器: FilterGateway实现限流: RequestRateLimiter过滤器使用Gateway实现服务降级 自定义全局过滤器GateWay中执行流程 网关 API网关就是实现了前端项目和服务端项目之间的统一入口 Nginx实现的是用户和前端项目之间调用…

Spring AOP

目录 AOP 为什么使用AOP Spring AOP AOP的组成 实现Spring AOP AOP表达式 Spring AOP的实现原理 在介绍Spring AOP之前需要先介绍AOP AOP AOP(面向切面编程)就像我们之前学习的OOP(面向对象编程)它是一种思想,它是对某一类事情的集中处理,比如用户登录的校验,在没学AOP…

BUUCTF-rip

https://www.cnblogs.com/refrain-again/p/15001283.html 看了这个文章 我起码能理解我们栈溢出的目的 在做题之前 我们需要先理解 栈的存储方法 从上往下看 就能理解入栈 说回这道题目 为什么这道题目是栈溢出 1.查看基本信息 checksec file 是kali下的elf文件 相当于w…

场景搭建、素材库、在线标绘等,四维轻云地理空间数据云管理平台新增了这些功能

四维轻云是一款地理空间数据云管理平台&#xff0c;具有地理空间数据在线管理、展示及分享等功能。在四维轻云平台中&#xff0c;用户可以不受时间地点的限制&#xff0c;随时随地管理、查看及分享各类地理空间数据。 为了更好地满足用户需求和进行地理空间数据在线管理&#…