关于图查询中的多线程查询优化记录一、背景二、串行改并行

一、背景

在查询图数据库(nebula)过程中,我们往往需要具有拓扑关系的连续多次查询来作为业务的一次有效查询,比如我需要查询某点的一阶关系,获取到其一阶关系表,判断结果进行业务逻辑处理并对每个结果进行二阶关系甚至三阶的查询,经过几个for循环后也往往需要上百ms返回。需要自行使用client进行开发api查询.

实际实时业务中,尤其是支付业务,对图查询的时延要求比风控对图的要求要高,即一连串的查询(十几条)需要控制在20ms内返回,如果全部按业务逻辑串行执行,即使全内存图,也要上百ms。

因此,为了满足业务方需求,需要优化图查询。将可并行部分进行并行化,来缩短时延。因涉及到多线程,正好学习一下,也为之后工作遇到相关查询并行化问题提供个原型,特此记录产生的问题和解决方式以及相关的思考。

二、串行改并行

首先,需要构思出查询的逻辑图,以我自身的业务为例子:

我需要进行多种edgeType的查询,经过一跳查询后会进行一些逻辑处理判断在进行第2/3跳查询,每次查询会产生一个list,串行执行导致多种查询依次进行,总执行时间为 edgeTpe1 cost time + edgeTpe2 cost time + edgeTpe3 cost time + ···+ edgeTpeN cost time ,在离线批查询中,对时延要求并不高,但是在单条实时查询时,要求在20ms以内返回结果,所以就需要这些查询一起执行才能满足,当然前提是你的 server tps 满足要求。
需要注意:对于一个点出发的查询在一个session中不可并行

image.png

在串行改并行过程中,我们主要考虑两部分:

  1. 对每种edgetype查询并行,将此部分并行,查询时间取决于 max(edgeTpe1 cost time ,edgeTpe2 cost time ,edgeTpe3 cost time , ··· ,edgeTpeN cost time
  2. 对每跳结果list的查询进行并行化,避免foreach查询串行等待,将此部分并行,可将每种查询内部耗时缩短
    • 如果一跳查询有10个结果处理后需要进行2跳查询,那么我们将此部分并行,可以将这10个结果并行查询,但是有时可能只有一两个结果,这时如果依然并行,那么开启线程可能反而耗时更久。

    • 因此需要验证,找到最佳分割点进行list并行化

    ---此处应有官方查询加速代码

三、并行选择:

1. callable和runnable多线程

先说结论,经过对比,接口并发访问使用callable更快些,只是为何会出现不同:

  1. 在结果的聚合时,若使用runnable则需要进行多线程同步,写入一个list,否则可能会丢失数据,因此加锁就变相的耗费了时间,而callable则是等所有结果返回后聚合,不存在同步问题

2.多线程下的list,callable返回时如何保持一致,等待其他线程:

feture.get会阻塞主线程,但是在返回结果仍然发现有部分数据可能会不返回,这时我加了countDownLatch,也可以使用while done来判断每个线程的查询否结束,才进行下一步。

// 进行线程等待
// 1. countDownLatch 保证全部线程执行完毕在进行主线程程序
class MyCallableQueryOrder(pool: NebulaPool, srcId: String, hotKeySrc: Int, hotKeyUid: Int, edgeType: String, countDownLatch: CountDownLatch) extends Callable[ArrayBuffer[String]] {
  @throws[Exception]
  override def call: ArrayBuffer[String] = {
    // 图客户端查询
    val result = orderUid(pool, srcId, hotKeySrc, hotKeyUid, edgeType)
    countDownLatch.countDown()
    result
  }
}
// 或者采取不断等待结果的方式来等待线程,不过没有countDownLatch优雅
var isDone = future.isDone && future1.isDone && future2.isDone && future3.isDone && future4.isDone
while (!isDone) {
     isDone = future.isDone && future1.isDone && future2.isDone && future3.isDone && future4.isDone
}

// 收集多线程执行结果
    ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);
    List<ListenableFuture<String>> taskList = new ArrayList<ListenableFuture<String>>();
    for(int i = 0;i<10;i++){
        Callable call = new MyCallableQueryOrder(pool,hotKeySrc,hotKeyUid,edgeType,countDownLatch);
        ListenableFuture future = service.submit(call);
        taskList.add(future)
    }
    countDownLatch.await()
    // 收集全部线程结果
    ListenableFuture<List<String>> listFuture = Futures.successfulAsList(taskList);

复制代码

四、 客户端连接使用

nebula 连接池需要在主线程申请完毕,客户端session不可以多线程共享。因此在多线程query时,需要每个线程获取session,进行执行。执行完毕后session需要进行释放,然后pool中维护session关系进行复用
这里可以看一下session源码,我们每次getsession时,会获得一个session id,这个线程的内所有的查询操作都会通过这个id进行,所有session共享一个pool,execute则是对ngql进行通过session提交执行查询连接,以及释放、重连的逻辑代码

public class Session {
    private final long sessionID;
    private SyncConnection connection;
    private final GenericObjectPool<SyncConnection> pool;
    private final Boolean retryConnect;
    private final Logger log = LoggerFactory.getLogger(this.getClass());

    public Session(SyncConnection connection, long sessionID, GenericObjectPool<SyncConnection> connPool, Boolean retryConnect) {
        this.connection = connection;
        this.sessionID = sessionID;
        this.pool = connPool;
        this.retryConnect = retryConnect;
    }

    public ResultSet execute(String stmt) throws IOErrorException, UnsupportedEncodingException {
        try {
            if (this.connection == null) {
                throw new IOErrorException(2, "Connection is null");
            } else {
                ExecutionResponse resp = this.connection.execute(this.sessionID, stmt);
                return new ResultSet(resp);
            }
        } catch (IOErrorException var4) {
            if (var4.getType() == 2) {
                if (this.pool.getFactory() instanceof ConnObjectPool) {
                    ((ConnObjectPool)this.pool.getFactory()).updateServerStatus();
                }

                if (this.retryConnect) {
                    if (this.retryConnect()) {
                        ExecutionResponse resp = this.connection.execute(this.sessionID, stmt);
                        return new ResultSet(resp);
                    }

                    throw new IOErrorException(1, "All servers are broken.");
                }
            }

            throw var4;
        }
    }

    private boolean retryConnect() {
        try {
            try {
                this.pool.invalidateObject(this.connection);
            } catch (Exception var2) {
                this.log.error("Return object failed");
            }

            SyncConnection newConn = (SyncConnection)this.pool.borrowObject();
            if (newConn == null) {
                this.log.error("Get connection object failed.");
            }

            this.connection = newConn;
            return true;
        } catch (Exception var3) {
            return false;
        }
    }

    public boolean ping() {
        return this.connection == null ? false : this.connection.ping();
    }

    public void release() {
        if (this.connection != null) {
            this.connection.signout(this.sessionID);

            try {
                this.pool.returnObject(this.connection);
            } catch (Exception var2) {
                this.log.warn("Return object to pool failed.");
            }

        }
    }
}
复制代码

这里再看下如何使用官方给出的Session连接:

NebulaPool pool = GraphSingleQueryApi.initGraphClient(hostAndPort, 50);

def initGraphClient(hostAndPort: String, maxConnSize: Int): NebulaPool = {
  val hostAndPorts = getGraphHostPort(hostAndPort)
  val pool = new NebulaPool()
  val nebulaPoolConfig = new NebulaPoolConfig()
  nebulaPoolConfig.setMaxConnSize(maxConnSize)
  pool.init(hostAndPorts, nebulaPoolConfig)
  pool
}

def getGraphHostPort(hostAndPort: String): util.List[HostAddress] = {
  val list: Array[String] = hostAndPort.split(",")
  val result = new util.ArrayList[HostAddress]()
  list.toStream.map(item => {
    val stringList: Array[String] = item.split(":")
    result.add(new HostAddress(stringList(0), stringList(1).toInt))
  }).toList
  result
}
复制代码

使用时从 poolgetsession即可。目前2.5.1版本及以上已支持sessionid在metaed的持久化看了下是6月合的pr,即使服务端重启也会保存状态,重启后仍然可以正常连接使用。算是一个比较重要的服务支持,毕竟业务需要可能会偶尔重启,之前是需要业务应用也重启才能重新连接。