流装饰器参考

cartesianProduct

cartesianProduct 函数将具有多值字段(即数组)的单个元组转换为多个元组,数组中的每个值对应一个元组。也就是说,给定一个包含 fieldA 的 N 个值数组的单个元组,cartesianProduct 函数将输出 N 个元组,每个元组都包含原始元组数组中的一个值。本质上,您可以展平数组以进行进一步处理。

例如,使用 cartesianProduct,您可以将此元组

{
  "fieldA": "foo",
  "fieldB": ["bar","baz","bat"]
}

转换为以下 3 个元组

{
  "fieldA": "foo",
  "fieldB": "bar"
}
{
  "fieldA": "foo",
  "fieldB": "baz"
}
{
  "fieldA": "foo",
  "fieldB": "bat"
}

cartesianProduct 参数

  • incoming stream:(必需)单个传入流。

  • fieldName 或 evaluator:(必需)要展平值的字段名称,或者应该展平其结果的评估器。

  • productSort='fieldName ASC|DESC':(可选)新生成的元组的排序顺序。

cartesianProduct 语法

cartesianProduct(
  <stream>,
  <fieldName | evaluator> [as newFieldName],
  productSort='fieldName ASC|DESC'
)

cartesianProduct 示例

以下示例显示此源元组的不同输出

{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3]
}

单字段,无排序

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB
)

{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": [1,2,3]
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": [1,2,3]
}

单评估器,无排序

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  sequence(3,4,5) as fieldE
)

{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 4
}
{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 9
}
{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 14
}

单字段,按值排序

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB,
  productSort="fieldB desc"
)

{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": [1,2,3]
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": [1,2,3]
}

单评估器,按评估器值排序

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  sequence(3,4,5) as fieldE,
  productSort="newFieldE desc"
)

{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 14
}
{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 9
}
{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3],
  "fieldE": 4
}

重命名单字段,按值排序

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB as newFieldB,
  productSort="fieldB desc"
)

{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3]
  "newFieldB": "valueB2",
}
{
  "fieldA": "valueA",
  "fieldB": ["valueB1","valueB2"],
  "fieldC": [1,2,3]
  "newFieldB": "valueB1",
}

多字段,无排序

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB,
  fieldC
)

{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 3
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 3
}

多字段,按单字段排序

cartesianProduct(
  search(collection1, qt="/export", q="*:*", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB,
  fieldC,
  productSort="fieldC asc"
)

{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 3
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 3
}

多字段,按多字段排序

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  fieldB,
  fieldC,
  productSort="fieldC asc, fieldB desc"
)

{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 1
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 2
}
{
  "fieldA": "valueA",
  "fieldB": "valueB2",
  "fieldC": 3
}
{
  "fieldA": "valueA",
  "fieldB": "valueB1",
  "fieldC": 3
}

字段和评估器,无排序

cartesianProduct(
  search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
  sequence(3,4,5) as fieldE,
  fieldB
)

{
  "fieldA": "valueA",
  "fieldB": valueB1,
  "fieldC": [1,2,3],
  "fieldE": 4
}
{
  "fieldA": "valueA",
  "fieldB": valueB2,
  "fieldC": [1,2,3],
  "fieldE": 4
}
{
  "fieldA": "valueA",
  "fieldB": valueB1,
  "fieldC": [1,2,3],
  "fieldE": 9
}
{
  "fieldA": "valueA",
  "fieldB": valueB2,
  "fieldC": [1,2,3],
  "fieldE": 9
}
{
  "fieldA": "valueA",
  "fieldB": valueB1,
  "fieldC": [1,2,3],
  "fieldE": 14
}
{
  "fieldA": "valueA",
  "fieldB": valueB2,
  "fieldC": [1,2,3],
  "fieldE": 14
}

正如您在上面的示例中看到的那样,cartesianProduct 函数确实支持跨多个字段和/或评估器展平元组。

classify

classify 函数使用逻辑回归文本分类模型对元组进行分类。它专门设计用于与使用 train 函数训练的模型一起工作。 classify 函数使用 model 函数检索存储的模型,然后使用该模型对元组流进行评分。分类器读取的元组必须包含可用于分类的文本字段。classify 函数使用 Lucene 分析器从文本中提取特征,以便可以应用模型。默认情况下,classify 函数使用元组中文本字段的名称查找分析器。如果工作节点上的 Solr 模式不包含此字段,则可以通过指定 analyzerField 参数在另一个字段中查找分析器。

每个分类的元组都会分配两个分数

  • probability_d*:一个介于 0 和 1 之间的浮点数,描述元组属于该类的概率。这在分类用例中很有用。

  • score_d*:文档的分数,该分数没有被压缩到 0 到 1 之间。该分数可以是正数或负数。分数越高,文档越适合该类。这种未压缩的分数在查询重新排序和推荐用例中很有用。当多个高排名文档的 probability_d 分数为 1 时,此分数尤其有用,因为这不会提供文档之间有意义的排名。

classify 参数

  • model expression:(必需)检索存储的逻辑回归模型。

  • field:(必需)要应用分类器的元组中的字段。默认情况下,将使用模式中此字段的分析器提取特征。

  • analyzerField:(可选)指定在模式中查找分析器的其他字段。

classify 语法

classify(model(modelCollection,
             id="model1",
             cacheMillis=5000),
         search(contentCollection,
             q="id:(a b c)",
             qt="/export",
             fl="text_t, id",
             sort="id asc"),
             field="text_t")

在上面的示例中,classify expression 使用 api 函数检索模型。然后,它对 search 函数返回的元组进行分类。text_t 字段用于文本分类,Solr 模式中 text_t 字段的分析器用于分析文本并提取特征。

commit

commit 函数包装单个流 (A),并在给定集合和批量大小时,当满足批量大小或到达流末尾时,向集合发送提交消息。提交流最常与更新流一起使用,因此提交将考虑来自更新流的可能的摘要元组。进入提交流的所有元组都将从提交流中返回 - 不会删除任何元组,也不会添加任何元组。

commit 参数

  • collection:要向其发送提交消息的集合(必需)

  • batchSize:提交批量大小,当达到批量大小时发送提交消息。如果未提供(或提供的值为 0),则仅在传入流结束时发送提交。

  • waitFlush:直接传递给提交处理程序的值(true/false,默认值:false)

  • waitSearcher:直接传递给提交处理程序的值(true/false,默认值:false)

  • softCommit:直接传递给提交处理程序的值(true/false,默认值:false)

  • StreamExpression for StreamA(必需)

commit 语法

commit(
    destinationCollection,
    batchSize=2,
    update(
        destinationCollection,
        batchSize=5,
        search(collection1, q="*:*", qt="/export", fl="id,a_s,a_i,a_f,s_multi,i_multi", sort="a_f asc, a_i asc")
    )
)

complement

complement 函数包装两个流(A 和 B),并从 A 中发出 B 中不存在的元组。元组按照它们在流 A 中出现的顺序发出。两个流都必须按用于确定相等性的字段(使用 on 参数)进行排序。

complement 参数

  • StreamExpression for StreamA

  • StreamExpression for StreamB

  • on:用于检查 A 和 B 之间元组相等性的字段。可以是 on="fieldName"on="fieldNameInLeft=fieldNameInRight"on="fieldName, otherFieldName=rightOtherFieldName" 格式。

complement 语法

complement(
  search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc"),
  on="a_i"
)

complement(
  search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  on="a_i,a_s"
)

daemon

daemon 函数包装另一个函数,并使用内部线程按时间间隔运行它。daemon 函数可用于提供连续的推送和拉取流。

连续推送流

使用连续推送流时,daemon 函数会包装另一个函数,然后将其发送到 /stream 处理程序以执行。/stream 处理程序识别 daemon 函数并将其保留在内存中,以便它可以按时间间隔运行其内部函数。

为了方便元组的推送,daemon 函数必须包装另一个将元组推送到某处的流装饰器。其中一个示例是 update 函数,它包装一个流并将元组发送到另一个 SolrCloud 集合进行索引。

daemon 语法

daemon(id="uniqueId",
       runInterval="1000",
       terminate="true",
       update(destinationCollection,
              batchSize=100,
              topic(checkpointCollection,
                    topicCollection,
                    q="topic query",
                    fl="id, title, abstract, text",
                    id="topicId",
                    initialCheckpoint=0)
               )
        )

上面的示例代码显示了一个 daemon 函数包装一个 update 函数,该函数包装一个 topic 函数。当此表达式发送到 /stream 处理程序时,/stream 处理程序会看到 daemon 函数并将其保留在内存中,它将在其中按时间间隔运行。在此特定示例中,daemon 函数将每秒运行一次 update 函数。update 函数正在包装一个 topic 函数,该函数将分批流式传输与 topic 函数查询匹配的元组。对主题的每个后续调用将返回主题的下一批元组。update 函数会将所有与主题匹配的元组发送到另一个集合进行索引。terminate 参数告诉守护程序在 topic 函数停止发送元组时终止。

这样做的效果是将与特定查询匹配的文档推送到另一个集合中。可以插入自定义推送函数,将文档从 Solr 中推出到其他系统,例如 Kafka 或电子邮件系统。

推送流还可用于连续后台聚合场景,其中聚合在后台按时间间隔汇总并推送到其他 Solr 集合。另一个用例是连续的后台机器学习模型优化,其中优化的模型被推送到另一个 Solr 集合,可以在其中集成到查询中。

/stream 处理程序支持一小组命令,用于列出和控制守护程序函数

https://127.0.0.1:8983/solr/collection/stream?action=list

此命令将提供特定节点上当前正在运行的守护程序及其当前状态的列表。

https://127.0.0.1:8983/solr/collection/stream?action=stop&id=daemonId

此命令将停止特定的守护程序函数,但将其保留在内存中。

https://127.0.0.1:8983/solr/collection/stream?action=start&id=daemonId

此命令将启动已停止的特定守护程序函数。

https://127.0.0.1:8983/solr/collection/stream?action=kill&id=daemonId

此命令将停止特定的守护程序函数并将其从内存中删除。

连续拉取流

DaemonStream java 类(SolrJ 库的一部分)也可以嵌入到 java 应用程序中以提供连续拉取流。示例代码

StreamContext context = new StreamContext()
SolrClientCache cache = new SolrClientCache();
context.setSolrClientCache(cache);

Map topicQueryParams = new HashMap();
topicQueryParams.put("q","hello");  // The query for the topic
topicQueryparams.put("rows", "500"); // How many rows to fetch during each run
topicQueryparams.put("fl", "id", "title"); // The field list to return with the documents

TopicStream topicStream = new TopicStream(zkHost,        // Host address for the ZooKeeper service housing the collections
                                         "checkpoints",  // The collection to store the topic checkpoints
                                         "topicData",    // The collection to query for the topic records
                                         "topicId",      // The id of the topic
                                         -1,             // checkpoint every X tuples, if set -1 it will checkpoint after each run.
                                          topicQueryParams); // The query parameters for the TopicStream

DaemonStream daemonStream = new DaemonStream(topicStream,             // The underlying stream to run.
                                             "daemonId",              // The id of the daemon
                                             1000,                    // The interval at which to run the internal stream
                                             500);                    // The internal queue size for the daemon stream. Tuples will be placed in the queue
                                                                      // as they are read by the internal thread.
                                                                      // Calling read() on the daemon stream reads records from the internal queue.

daemonStream.setStreamContext(context);

daemonStream.open();

//Read until it's time to shutdown the DaemonStream. You can define the shutdown criteria.
while(!shutdown()) {
    Tuple tuple = daemonStream.read() // This will block until tuples become available from the underlying stream (TopicStream)
                                      // The EOF tuple (signaling the end of the stream) will never occur until the DaemonStream has been shutdown.
    //Do something with the tuples
}

// Shutdown the DaemonStream.
daemonStream.shutdown();

//Read the DaemonStream until the EOF Tuple is found.
//This allows the underlying stream to perform an orderly shutdown.

while(true) {
    Tuple tuple = daemonStream.read();
    if(tuple.EOF) {
        break;
    } else {
        //Do something with the tuples.
    }
}
//Finally close the stream
daemonStream.close();

delete

delete 函数包装其他函数,并使用找到的 id_version_ 值将元组作为 按 Id 删除命令发送到 SolrCloud 集合。

这类似于下面描述的 update() 函数。

delete 参数

  • destinationCollection:(必需)将删除元组的集合。

  • batchSize:(可选,默认为 250)删除批量大小。

  • pruneVersionField:(可选,默认为 false)是否从元组中修剪 _version_

  • StreamExpression:(必需)

delete 语法

 delete(collection1,
        batchSize=500,
        search(collection1,
               q=old_data:true,
               qt="/export",
               fl="id",
               sort="a_f asc, a_i asc"))

上面的示例使用针对 collection1search 函数返回的元组,并将找到的每个文档的 id 值转换为针对同一 collection1 的删除请求。

update() 函数不同,delete() 默认为 pruneVersionField=false - 在将元组转换为“按 ID 删除”请求时保留内部流中找到的任何 _version_ 值。这确保了使用此流(默认情况下)不会导致删除在执行 search(…​) *之后*但在 delete(…​) 处理该元组*之前*更新的任何文档(利用 乐观并发 约束)。

希望忽略并发更新并删除所有匹配文档的用户应设置 pruneVersionField=true (或确保内部流元组不包含任何 _version_ 值)。

预计会出现并发更新并希望“跳过”任何失败删除的用户应考虑配置 TolerantUpdateProcessorFactory

eval

eval 函数允许在动态生成然后评估新的流式表达式的情况下使用。eval 函数包装一个流式表达式并从底层流读取单个元组。然后,eval 函数从元组的 expr_s 字段中检索字符串流式表达式。然后,eval 函数编译字符串流式表达式并发出元组。

eval 参数

  • StreamExpression:(必需)提供要评估的流式表达式的流。

eval 语法

eval(expr)

在上面的示例中,eval 表达式从底层表达式读取第一个元组。然后,它编译并执行 expr_s 字段中的字符串流式表达式。

示例

eval(select(
  echo("tuple(answer=42)"),
  echo as expr_s
))

输出

{
  "result-set": {
    "docs": [
      {
        "answer": "42"
      },
      {
        "EOF": true,
        "RESPONSE_TIME": 0
      }
    ]
  }
}

executor

executor 函数包装包含流式表达式的流源,并并行执行表达式。executor 函数在每个元组的 expr_s 字段中查找表达式。executor 函数具有一个内部线程池,该线程池运行在同一工作节点上并行编译和运行表达式的任务。此函数也可以通过将其包装在 parallel 函数中来跨工作节点并行化,从而提供跨集群的表达式并行执行。

executor 函数不会对其运行的表达式的输出进行任何特定操作。因此,执行的表达式必须包含将元组推送到其目的地的逻辑。可以将 update 函数包含在正在执行的表达式中,以将元组发送到 SolrCloud 集合进行存储。

此模型允许异步执行作业,其中输出存储在 SolrCloud 集合中,可以在作业进行时访问该集合。

executor 参数

  • threads: (可选) 执行表达式的执行器线程池中的线程数。

  • StreamExpression: (必需) 包含要执行的流表达式的流源。

执行器语法

daemon(id="myDaemon",
       terminate="true",
       executor(threads=10,
                topic(checkpointCollection
                      storedExpressions,
                      q="*:*",
                      fl="id, expr_s",
                      initialCheckPoint=0,
                      id="myTopic")))

在上面的示例中,daemon 包装了一个执行器,该执行器又包装了一个 topic,该 topic 返回包含要执行的表达式的元组。当发送到流处理程序时,守护进程将以一定的时间间隔调用执行器,这将导致执行器从主题读取并执行 expr_s 字段中找到的表达式。守护进程将重复调用执行器,直到迭代完与该主题匹配的所有元组,然后它将终止。这是从 topic 队列执行批处理流表达式的方法。

fetch

fetch 函数迭代一个流并获取额外的字段,并将它们添加到元组中。fetch 函数以批处理方式获取,以限制对 Solr 的调用次数。从 fetch 函数流出的元组将包含原始字段和获取的额外字段。fetch 函数支持一对一的获取。多对一的获取(其中流源包含重复的键)也有效,但是此函数当前不支持一对多的获取。

fetch 参数

  • Collection: (必需) 要从中获取字段的集合。

  • StreamExpression: (必需) fetch 函数的流源。

  • fl: (必需) 要获取的字段。

  • on: 用于检查流源和获取的记录之间元组的相等性的字段。格式为 on="元组中的字段名=集合中的字段名"

  • batchSize: (可选) 批处理获取大小。

fetch 语法

fetch(addresses,
      search(people, q="*:*", qt="/export", fl="username, firstName, lastName", sort="username asc"),
      fl="streetAddress, city, state, country, zip",
      on="username=userId")

上面的示例通过将元组中的用户名与地址集合中的 userId 字段匹配来获取用户的地址。

having

having 表达式包装一个流,并将一个布尔运算应用于每个元组。它仅发出布尔运算返回 true 的元组。

having 参数

  • StreamExpression: (必需) having 函数的流源。

  • booleanEvaluator: (必需) 支持以下布尔运算:eq (等于),gt (大于),lt (小于),gteq (大于或等于),lteq (小于或等于),andoreor (异或) 和 not。布尔求值器可以与其他求值器嵌套,以形成复杂的布尔逻辑。

比较求值器将特定字段中的值与一个值进行比较,该值可以是字符串、数字或布尔值。例如:eq(field1, 10),如果 field1 等于 10,则返回 true

having 语法

having(rollup(over=a_s,
              sum(a_i),
              search(collection1,
                     q="*:*",
                     qt="/export",
                     fl="id,a_s,a_i,a_f",
                     sort="a_s asc")),
       and(gt(sum(a_i), 100), lt(sum(a_i), 110)))

在此示例中,having 表达式迭代来自 rollup 表达式的聚合元组,并发出字段 sum(a_i) 大于 100 且小于 110 的所有元组。

leftOuterJoin

leftOuterJoin 函数包装两个流,左流和右流,并从左流发出元组。如果右流中存在一个相等(由 on 定义)的元组,则该元组中的值将包含在发出的元组中。左流元组不需要存在一个相等的右流元组才能被发出。这支持一对一、一对多、多对一和多对多的左外连接场景。元组按它们在左流中出现的顺序发出。两个流都必须按用于确定相等性的字段排序(使用 on 参数)。如果两个元组都包含同名字段,则在发出的元组中使用来自右流的值。

您可以将传入的流包装在 select 函数中,以明确哪些字段值包含在发出的元组中。

leftOuterJoin 参数

  • StreamLeft 的 StreamExpression

  • StreamRight 的 StreamExpression

  • on: 用于检查左流和右流之间元组的相等性的字段。可以是 on="字段名"on="左流中的字段名=右流中的字段名"on="字段名, 其他字段名=右流中其他字段名" 格式。

leftOuterJoin 语法

leftOuterJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
  on="personId"
)

leftOuterJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
  on="personId=ownerId"
)

leftOuterJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  select(
    search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
    ownerId,
    name as petName
  ),
  on="personId=ownerId"
)

hashJoin

hashJoin 函数包装两个流,左流和右流,并且对于左流中在右流中存在的每个元组,将发出一个包含两个元组字段的元组。这支持一对一、一对多、多对一和多对多的内连接场景。元组按它们在左流中出现的顺序发出。流的顺序无关紧要。如果两个元组都包含同名字段,则在发出的元组中使用来自右流的值。

您可以将传入的流包装在 select 函数中,以明确哪些字段值包含在发出的元组中。

当左流和右流的元组不能按相同顺序排列时,可以使用 hashJoin 函数。由于元组是无序的,因此此流函数在打开操作期间读取右流中的所有值,并将所有元组存储在内存中。结果是内存占用量等于右流的大小。

hashJoin 参数

  • StreamLeft 的 StreamExpression

  • hashed=StreamRight 的 StreamExpression

  • on: 用于检查左流和右流之间元组的相等性的字段。可以是 on="字段名"on="左流中的字段名=右流中的字段名"on="字段名, 其他字段名=右流中其他字段名" 格式。

hashJoin 语法

hashJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  hashed=search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
  on="personId"
)

hashJoin(
  search(people, q="*:*", fl="personId,name", sort="personId asc"),
  hashed=search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
  on="personId=ownerId"
)

hashJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  hashed=select(
    search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
    ownerId,
    name as petName
  ),
  on="personId=ownerId"
)

innerJoin

包装两个流,左流和右流。对于左流中在右流中存在的每个元组,将发出一个包含两个元组字段的元组。这支持一对一、一对多、多对一和多对多的内连接场景。元组按它们在左流中出现的顺序发出。两个流都必须按用于确定相等性的字段排序('on' 参数)。如果两个元组都包含同名字段,则在发出的元组中使用来自右流的值。您可以使用 select(…​) 表达式包装传入的流,以明确哪些字段值包含在发出的元组中。

innerJoin 参数

  • StreamLeft 的 StreamExpression

  • StreamRight 的 StreamExpression

  • on: 用于检查左流和右流之间元组的相等性的字段。可以是 on="字段名"on="左流中的字段名=右流中的字段名"on="字段名, 其他字段名=右流中其他字段名" 格式。

innerJoin 语法

innerJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
  on="personId"
)

innerJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
  on="personId=ownerId"
)

innerJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  select(
    search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
    ownerId,
    name as petName
  ),
  on="personId=ownerId"
)

intersect

intersect 函数包装两个流 A 和 B,并发出 确实 存在于 B 中的 A 中的元组。元组按它们在流 A 中出现的顺序发出。两个流都必须按用于确定相等性的字段排序(on 参数)。仅发出来自 A 的元组。

intersect 参数

  • StreamExpression for StreamA

  • StreamExpression for StreamB

  • on:用于检查 A 和 B 之间元组相等性的字段。可以是 on="fieldName"on="fieldNameInLeft=fieldNameInRight"on="fieldName, otherFieldName=rightOtherFieldName" 格式。

intersect 语法

intersect(
  search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc"),
  on="a_i"
)

intersect(
  search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
  on="a_i,a_s"
)

list

list 函数包装 N 个流表达式,并按顺序打开和迭代每个流。这具有连接多个流表达式结果的效果。

list 参数

  • StreamExpressions …​:N 个流表达式

list 语法

list(tuple(a="hello world"), tuple(a="HELLO WORLD"))

list(search(collection1, q="*:*", fl="id, prod_ss", sort="id asc"),
     search(collection2, q="*:*", fl="id, prod_ss", sort="id asc"))

list(tuple(a=search(collection1, q="*:*", fl="id, prod_ss", sort="id asc")),
     tuple(a=search(collection2, q="*:*", fl="id, prod_ss", sort="id asc")))

merge

merge 函数合并两个或多个流表达式,并保持底层流的顺序。由于保持了顺序,因此底层流的排序必须与提供给 merge 函数的 on 参数对齐。

merge 参数

  • StreamExpression A

  • StreamExpression B

  • 可选 StreamExpression C,D,…​.Z

  • on: 用于执行合并的排序条件。形式为 字段名 顺序,其中顺序为 ascdesc。可以以 字段A 顺序, 字段B 顺序 的形式提供多个字段。

merge 语法

# Merging two stream expressions together
merge(
      search(collection1,
             q="id:(0 3 4)",
             qt="/export",
             fl="id,a_s,a_i,a_f",
             sort="a_f asc"),
      search(collection1,
             q="id:(1)",
             qt="/export",
             fl="id,a_s,a_i,a_f",
             sort="a_f asc"),
      on="a_f asc")
# Merging four stream expressions together. Notice that while the sorts of each stream are not identical they are
# comparable. That is to say the first N fields in each stream's sort matches the N fields in the merge's on clause.
merge(
      search(collection1,
             q="id:(0 3 4)",
             qt="/export",
             fl="id,fieldA,fieldB,fieldC",
             sort="fieldA asc, fieldB desc"),
      search(collection1,
             q="id:(1)",
             qt="/export",
             fl="id,fieldA",
             sort="fieldA asc"),
      search(collection2,
             q="id:(10 11 13)",
             qt="/export",
             fl="id,fieldA,fieldC",
             sort="fieldA asc"),
      search(collection3,
             q="id:(987)",
             qt="/export",
             fl="id,fieldA,fieldC",
             sort="fieldA asc"),
      on="fieldA asc")

null

当执行并行关系代数(连接、交集、汇总等)时,null 表达式是一个有用的实用程序函数,用于了解瓶颈。null 函数读取底层流中的所有元组,并返回一个包含计数和处理时间的单个元组。由于 null 流本身添加的开销最小,因此它可以用于隔离 Solr 的 /export 处理程序的性能。如果 /export 处理程序的性能不是瓶颈,则瓶颈很可能发生在运行流装饰器的工作节点中。

null 表达式可以由并行函数包装并发送到工作节点。在这种情况下,每个工作节点将返回一个包含工作节点上处理的元组计数和该工作节点计时信息的元组。这提供了有价值的信息,例如:

  1. 随着添加更多的工作节点,/export 处理程序的性能是否提高。

  2. 元组是否均匀地分布在各个工作节点上,或者哈希分区是否将更多文档发送到单个工作节点。

  3. 所有工作节点是否以相同的速度处理数据,或者其中一个工作节点是否是瓶颈的来源。

null 参数

  • StreamExpression: (必需) null 函数读取的表达式。

null 语法

 parallel(workerCollection,
          null(search(collection1, q="*:*", fl="id,a_s,a_i,a_f", sort="a_s desc", qt="/export", partitionKeys="a_s")),
          workers="20",
          zkHost="localhost:9983",
          sort="a_s desc")

上面的表达式显示了一个包装 null 函数的并行函数。这将导致 null 函数在 20 个工作节点上并行运行。每个工作节点将返回一个包含处理的元组数和迭代元组所用时间的单个元组。

outerHashJoin

outerHashJoin 函数包装两个流,左流和右流,并从左流发出元组。如果右流中存在一个相等(由 on 参数定义)的元组,则该元组中的值将包含在发出的元组中。左流元组不需要存在一个相等的右流元组才能被发出。这支持一对一、一对多、多对一和多对多的左外连接场景。元组按它们在左流中出现的顺序发出。流的顺序无关紧要。如果两个元组都包含同名字段,则在发出的元组中使用来自右流的值。

您可以将传入的流包装在 select 函数中,以明确哪些字段值包含在发出的元组中。

当左侧和右侧的元组不能按相同顺序排列时,可以使用 outerHashJoin 流。由于元组是无序的,此流的工作方式是在打开操作期间读取右侧流中的所有值,并将所有元组存储在内存中。这样做的结果是内存占用量等于右侧流的大小。

outerHashJoin 参数

  • StreamLeft 的 StreamExpression

  • hashed=StreamRight 的 StreamExpression

  • on: 用于检查左流和右流之间元组的相等性的字段。可以是 on="字段名"on="左流中的字段名=右流中的字段名"on="字段名, 其他字段名=右流中其他字段名" 格式。

outerHashJoin 语法

outerHashJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  hashed=search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
  on="personId"
)

outerHashJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  hashed=search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
  on="personId=ownerId"
)

outerHashJoin(
  search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
  hashed=select(
    search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
    ownerId,
    name as petName
  ),
  on="personId=ownerId"
)

parallel

parallel 函数封装一个流式表达式,并将其发送到 N 个工作节点进行并行处理。

parallel 函数要求为底层搜索提供 partitionKeys 参数。partitionKeys 参数会将搜索结果(元组)在工作节点之间进行分区。具有相同 partitionKeys 值的元组将被洗牌到同一个工作节点。

parallel 函数会维护工作节点返回的元组的排序顺序,因此排序条件必须包含工作节点返回的元组的排序顺序。

例如,如果您按年、月和日进行排序,只要有足够多的不同年份来将元组分散到工作节点上,您就可以只按年份进行分区。

Solr 允许按 4 个以上的字段进行排序,但出于速度考虑,您不能指定超过 4 个 partitionKeys。此外,当一两个键足以分散元组时,指定多个 partitionKeys 是过分的。

当底层搜索流将从集合中发出大量元组时,会设计并行流。如果搜索流仅使用 parallel 发出集合的一小部分数据,则可能会更慢。

工作节点集合

工作节点可以来自与数据相同的集合,也可以是完全不同的集合,甚至是仅为 parallel 流式表达式存在的集合。工作节点集合可以是任何配置了 /stream 处理程序的 SolrCloud 集合。与普通的 SolrCloud 集合不同,工作节点集合不必保存任何数据。工作节点集合可以是仅用于执行流式表达式的空集合。

parallel 参数

  • collection:要将 StreamExpression 发送到的工作节点集合的名称。

  • StreamExpression:要发送到工作节点集合的表达式。

  • workers:要将表达式发送到的工作节点集合中的工作节点数量。

  • zkHost:(可选)工作节点集合所在的 ZooKeeper 连接字符串。仅当使用与您连接的 Solr 实例相同的 ZkHost 时(chroot 可以不同),才会包含 Zookeeper 凭据和 ACL。

  • sort:用于对工作节点返回的元组进行排序的排序条件。

parallel 语法

 parallel(workerCollection,
          rollup(search(collection1, q="*:*", fl="id,year_i,month_i,day_i", qt="/export", sort="year_i desc,month_i desc,day_i asc", partitionKeys="year_i"),
                 over="year_i", count(*)),
          workers="20",
          zkHost="localhost:9983",
          sort="year_i desc")

上面的表达式显示了一个封装了 rollup 函数的 parallel 函数。这将导致 rollup 函数在 20 个工作节点上并行运行。

预热

parallel 函数使用哈希查询解析器在工作节点之间拆分数据。它对所有文档执行,结果位集缓存在 filterCache 中。

对于具有相同数量的工作节点和 partitionKeysparallel 流,第一个查询将比后续查询慢。为了避免为第一个慢查询付出代价,可以使用每个新搜索器的预热查询。以下是 2 个工作节点和 "year_i" 作为 partionKeyssolrconfig.xml 代码片段。

<listener event="newSearcher" class="solr.QuerySenderListener">
<arr name="queries">
    <lst><str name="q">:</str><str name="fq">{!hash workers=2 worker=0}</str><str name="partitionKeys">year_i</str></lst>
    <lst><str name="q">:</str><str name="fq">{!hash workers=2 worker=1}</str><str name="partitionKeys">year_i</str></lst>
</arr>
</listener>

plist

plist 函数封装 N 个 Stream Expressions,并并行打开流,然后依次迭代每个流。listplist 的区别在于,流是并行打开的。由于许多流(如 facetstatssignificantTerms)在打开时会将繁重的操作推送到 Solr,因此 plist 函数可以通过并行执行这些操作来显着提高性能。

plist 参数

  • StreamExpressions …​:N 个流表达式

plist 语法

plist(tuple(a="hello world"), tuple(a="HELLO WORLD"))

plist(search(collection1, q="*:*", fl="id, prod_ss", sort="id asc"),
      search(collection2, q="*:*", fl="id, prod_ss", sort="id asc"))

plist(tuple(a=search(collection1, q="*:*", fl="id, prod_ss", sort="id asc")),
      tuple(a=search(collection2, q="*:*", fl="id, prod_ss", sort="id asc")))

priority

priority 函数是 executor 函数的简单优先级调度器。executor 函数没有直接的任务优先级概念;相反,它只是按照从底层流中读取的顺序执行任务。priority 函数提供了在较早提交的较低优先级任务之前调度较高优先级任务的能力。

priority 函数封装了两个 topic 函数,这两个函数都发出包含要执行的流式表达式的元组。第一个主题被认为是更高优先级的任务队列。

每次调用 priority 函数时,它都会检查更高优先级的任务队列,查看是否有任何任务要执行。如果更高优先级的队列中有任务在等待,则 priority 函数将发出更高优先级的任务。如果没有高优先级任务要运行,则会发出较低优先级队列的任务。

每次调用 priority 函数时,它只会从其中一个队列中发出一批任务。这确保在更高优先级的队列没有要运行的任务之前,不会执行任何较低优先级的任务。

priority 参数

  • topic expression:(必需)高优先级任务队列

  • topic expression:(必需)较低优先级任务队列

priority 语法

daemon(id="myDaemon",
       executor(threads=10,
                priority(topic(checkpointCollection, storedExpressions, q="priority:high", fl="id, expr_s", initialCheckPoint=0,id="highPriorityTasks"),
                         topic(checkpointCollection, storedExpressions, q="priority:low", fl="id, expr_s", initialCheckPoint=0,id="lowPriorityTasks"))))

在上面的示例中,daemon 函数迭代地调用执行器。每次调用时,executor 函数都会执行 priority 函数发出的任务。priority 函数封装了两个主题。第一个主题是更高优先级的任务队列,第二个主题是较低优先级的主题。

reduce

reduce 函数封装了一个内部流,并按公共字段对元组进行分组。

每个元组组都作为一个单独的块被可插拔的 reduce 操作处理。Solr 提供的组操作实现了分布式分组功能。组操作还可以作为构建自定义 reduce 操作时可以参考的示例 reduce 操作。

reduce 函数依赖于底层流的排序顺序。因此,底层流的排序顺序必须与按字段分组对齐。

reduce 参数

  • StreamExpression:(必需)

  • by:(必需)按其分组的字段的逗号分隔列表。

  • Reduce Operation:(必需)

reduce 语法

reduce(search(collection1, q="*:*", qt="/export", fl="id,a_s,a_i,a_f", sort="a_s asc, a_f asc"),
       by="a_s",
       group(sort="a_f desc", n="4")
)

rollup

rollup 函数封装另一个流函数,并在桶字段上汇总聚合。rollup 函数依赖于底层流的排序顺序,以一次对一个分组汇总聚合。因此,底层流的排序顺序必须与 rollup 函数的 over 参数中的字段匹配。

rollup 函数还需要处理整个结果集才能执行其聚合。当底层流是 search 函数时,可以使用 /export 处理程序向 rollup 函数提供完整的排序结果集。这种排序方法允许 rollup 函数对非常高基数的字段执行聚合。这种方法的缺点是,必须对元组进行排序,并通过网络流式传输到工作节点进行聚合。为了更快地聚合低到中等基数的字段,可以使用 facet 函数。

rollup 参数

  • StreamExpression (必需)

  • over:(必需)按其分组的字段列表。

  • metrics:(必需)要计算的指标列表。当前支持的指标是 sum(col)avg(col)min(col)max(col)count(*)

rollup 语法

rollup(
   search(collection1, q="*:*", qt="/export", fl="a_s,a_i,a_f", qt="/export", sort="a_s asc"),
   over="a_s",
   sum(a_i),
   sum(a_f),
   min(a_i),
   min(a_f),
   max(a_i),
   max(a_f),
   avg(a_i),
   avg(a_f),
   count(*)
)

上面的示例显示了封装搜索函数的 rollup 函数。请注意,搜索函数正在使用 /export 处理程序向 rollup 流提供整个结果集。另请注意,搜索函数的 sort 参数与 rollup 的 over 参数匹配。这允许 rollup 函数一次对一个组汇总 a_s 字段。

scoreNodes

请参阅图遍历中的部分。

select

select 函数封装一个流式表达式,并输出包含来自传入元组的字段子集或修改集的元组。输出元组中包含的字段列表可以包含别名以有效地重命名字段。select 流支持操作和评估器。可以提供一个操作和评估器列表,以对任何字段执行操作,例如 replaceaddif 等。

select 参数

  • StreamExpression

  • fieldName:要包含在输出元组中的字段的名称(可以包含多个),例如 outputTuple[fieldName] = inputTuple[fieldName]fieldName 可以是通配符模式,例如 a_* 选择所有以 a_ 开头的字段。

  • fieldName as aliasFieldName:要包含在输出元组中的别名字段名称(可以包含多个),例如 outputTuple[aliasFieldName] = incomingTuple[fieldName]

  • replace(fieldName, value, withValue=replacementValue):如果 incomingTuple[fieldName] == value,则 outgoingTuple[fieldName] 将设置为 replacementValuevalue 可以是字符串 "null",以将 null 值替换为其他值。

  • replace(fieldName, value, withField=otherFieldName):如果 incomingTuple[fieldName] == value,则 outgoingTuple[fieldName] 将设置为 incomingTuple[otherFieldName] 的值。value 可以是字符串 "null",以将 null 值替换为其他值。

select 语法

// output tuples with fields teamName, wins, losses, and winPercentages where a null value for wins or losses is translated to the value of 0
select(
  search(collection1, fl="id,teamName_s,wins,losses", q="*:*", qt="/export", sort="id asc"),
  teamName_s as teamName,
  wins,
  losses,
  replace(wins,null,withValue=0),
  replace(losses,null,withValue=0),
  if(eq(0,wins), 0, div(add(wins,losses), wins)) as winPercentage
)

sort

sort 函数封装一个流式表达式,并对元组重新排序。sort 函数以新的排序顺序发出所有传入的元组。sort 函数从传入的流中读取所有元组,使用具有 O(nlog(n)) 性能特征的算法重新排序它们,其中 n 是传入流中元组的总数,然后以新的排序顺序输出元组。由于所有元组都读入内存,因此此函数的内存消耗会随着传入流中元组数量的增加而线性增长。

sort 参数

  • StreamExpression

  • by:用于重新排序元组的排序条件

sort 语法

下面的表达式查找狗主人,并按主人和宠物名称对结果进行排序。请注意,它使用高效的 innerJoin,首先按人/主人 ID 排序,然后按主人和宠物名称重新排序最终输出。

sort(
  innerJoin(
    search(people, q="*:*", qt="/export", fl="id,name", sort="id asc"),
    search(pets, q="type:dog", qt="/export", fl="owner,petName", sort="owner asc"),
    on="id=owner"
  ),
  by="name asc, petName asc"
)

顶部

top 函数包装一个流式表达式并重新排序元组。 top 函数仅按新的排序顺序发出前 N 个元组。 top 函数会重新排序底层流,因此排序标准不必与底层流匹配。

top 参数

  • n: 要返回的顶部元组的数量。

  • StreamExpression

  • sort: 用于选择前 N 个元组的排序标准。

top 语法

下面的表达式查找底层搜索的前 3 个结果。请注意,它反转了排序顺序。 top 函数会重新排序底层流的结果。

top(n=3,
     search(collection1,
            q="*:*",
            qt="/export",
            fl="id,a_s,a_i,a_f",
            sort="a_f desc, a_i desc"),
      sort="a_f asc, a_i asc")

唯一

unique 函数包装一个流式表达式,并根据 over 参数发出唯一的元组流。 unique 函数依赖于底层流的排序顺序。 over 参数必须与底层流的排序顺序匹配。

unique 函数实现了一个非同位的唯一算法。这意味着具有相同唯一 over 字段的记录不需要位于同一个分片上。在并行执行时,partitionKeys 参数必须与唯一 over 字段相同,以便具有相同键的记录将被洗牌到同一个工作进程。

unique 参数

  • StreamExpression

  • over: 唯一的标准。

unique 语法

unique(
  search(collection1,
         q="*:*",
         qt="/export",
         fl="id,a_s,a_i,a_f",
         sort="a_f asc, a_i asc"),
  over="a_f")

更新

update 函数包装另一个函数,并将元组发送到 SolrCloud 集合以作为文档进行索引。

update 参数

  • destinationCollection: (必需) 将要索引元组的集合。

  • batchSize: (可选,默认为 250) 索引批次大小。

  • pruneVersionField: (可选,默认为 true) 是否从元组中删除 _version_

  • StreamExpression:(必需)

update 语法

 update(destinationCollection,
        batchSize=500,
        search(collection1,
               q=*:*,
               qt="/export",
               fl="id,a_s,a_i,a_f,s_multi,i_multi",
               sort="a_f asc, a_i asc"))

上面的示例将 search 函数返回的元组发送到 destinationCollection 进行索引。

如此示例中所示包装 search(…​) 是此装饰器的常见用法:从集合中读取文档作为元组,以某种方式处理或修改它们,然后将它们添加回新集合。因此,pruneVersionField=true 是默认行为 — 在将元组转换为 Solr 文档时,剥离内部流中找到的任何 _version_ 值,以防止 乐观并发约束导致任何意外错误。