流装饰器参考
cartesianProduct
cartesianProduct
函数将具有多值字段(即数组)的单个元组转换为多个元组,数组中的每个值对应一个元组。也就是说,给定一个包含 fieldA 的 N 个值数组的单个元组,cartesianProduct
函数将输出 N 个元组,每个元组都包含原始元组数组中的一个值。本质上,您可以展平数组以进行进一步处理。
例如,使用 cartesianProduct
,您可以将此元组
{
"fieldA": "foo",
"fieldB": ["bar","baz","bat"]
}
转换为以下 3 个元组
{
"fieldA": "foo",
"fieldB": "bar"
}
{
"fieldA": "foo",
"fieldB": "baz"
}
{
"fieldA": "foo",
"fieldB": "bat"
}
cartesianProduct 参数
-
incoming stream
:(必需)单个传入流。 -
fieldName 或 evaluator
:(必需)要展平值的字段名称,或者应该展平其结果的评估器。 -
productSort='fieldName ASC|DESC'
:(可选)新生成的元组的排序顺序。
cartesianProduct 语法
cartesianProduct(
<stream>,
<fieldName | evaluator> [as newFieldName],
productSort='fieldName ASC|DESC'
)
cartesianProduct 示例
以下示例显示此源元组的不同输出
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3]
}
单字段,无排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB
)
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": [1,2,3]
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": [1,2,3]
}
单评估器,无排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
sequence(3,4,5) as fieldE
)
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 4
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 9
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 14
}
单字段,按值排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB,
productSort="fieldB desc"
)
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": [1,2,3]
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": [1,2,3]
}
单评估器,按评估器值排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
sequence(3,4,5) as fieldE,
productSort="newFieldE desc"
)
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 14
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 9
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3],
"fieldE": 4
}
重命名单字段,按值排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB as newFieldB,
productSort="fieldB desc"
)
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3]
"newFieldB": "valueB2",
}
{
"fieldA": "valueA",
"fieldB": ["valueB1","valueB2"],
"fieldC": [1,2,3]
"newFieldB": "valueB1",
}
多字段,无排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB,
fieldC
)
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 3
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 3
}
多字段,按单字段排序
cartesianProduct(
search(collection1, qt="/export", q="*:*", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB,
fieldC,
productSort="fieldC asc"
)
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 3
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 3
}
多字段,按多字段排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
fieldB,
fieldC,
productSort="fieldC asc, fieldB desc"
)
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 1
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 2
}
{
"fieldA": "valueA",
"fieldB": "valueB2",
"fieldC": 3
}
{
"fieldA": "valueA",
"fieldB": "valueB1",
"fieldC": 3
}
字段和评估器,无排序
cartesianProduct(
search(collection1, q="*:*", qt="/export", fl="fieldA, fieldB, fieldC", sort="fieldA asc"),
sequence(3,4,5) as fieldE,
fieldB
)
{
"fieldA": "valueA",
"fieldB": valueB1,
"fieldC": [1,2,3],
"fieldE": 4
}
{
"fieldA": "valueA",
"fieldB": valueB2,
"fieldC": [1,2,3],
"fieldE": 4
}
{
"fieldA": "valueA",
"fieldB": valueB1,
"fieldC": [1,2,3],
"fieldE": 9
}
{
"fieldA": "valueA",
"fieldB": valueB2,
"fieldC": [1,2,3],
"fieldE": 9
}
{
"fieldA": "valueA",
"fieldB": valueB1,
"fieldC": [1,2,3],
"fieldE": 14
}
{
"fieldA": "valueA",
"fieldB": valueB2,
"fieldC": [1,2,3],
"fieldE": 14
}
正如您在上面的示例中看到的那样,cartesianProduct
函数确实支持跨多个字段和/或评估器展平元组。
classify
classify
函数使用逻辑回归文本分类模型对元组进行分类。它专门设计用于与使用 train
函数训练的模型一起工作。 classify
函数使用 model
函数检索存储的模型,然后使用该模型对元组流进行评分。分类器读取的元组必须包含可用于分类的文本字段。classify
函数使用 Lucene 分析器从文本中提取特征,以便可以应用模型。默认情况下,classify
函数使用元组中文本字段的名称查找分析器。如果工作节点上的 Solr 模式不包含此字段,则可以通过指定 analyzerField
参数在另一个字段中查找分析器。
每个分类的元组都会分配两个分数
-
probability_d*:一个介于 0 和 1 之间的浮点数,描述元组属于该类的概率。这在分类用例中很有用。
-
score_d*:文档的分数,该分数没有被压缩到 0 到 1 之间。该分数可以是正数或负数。分数越高,文档越适合该类。这种未压缩的分数在查询重新排序和推荐用例中很有用。当多个高排名文档的 probability_d 分数为 1 时,此分数尤其有用,因为这不会提供文档之间有意义的排名。
commit
commit
函数包装单个流 (A),并在给定集合和批量大小时,当满足批量大小或到达流末尾时,向集合发送提交消息。提交流最常与更新流一起使用,因此提交将考虑来自更新流的可能的摘要元组。进入提交流的所有元组都将从提交流中返回 - 不会删除任何元组,也不会添加任何元组。
complement
complement
函数包装两个流(A 和 B),并从 A 中发出 B 中不存在的元组。元组按照它们在流 A 中出现的顺序发出。两个流都必须按用于确定相等性的字段(使用 on
参数)进行排序。
complement 参数
-
StreamExpression for StreamA
-
StreamExpression for StreamB
-
on
:用于检查 A 和 B 之间元组相等性的字段。可以是on="fieldName"
、on="fieldNameInLeft=fieldNameInRight"
或on="fieldName, otherFieldName=rightOtherFieldName"
格式。
complement 语法
complement(
search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc"),
on="a_i"
)
complement(
search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
on="a_i,a_s"
)
daemon
daemon
函数包装另一个函数,并使用内部线程按时间间隔运行它。daemon
函数可用于提供连续的推送和拉取流。
连续推送流
使用连续推送流时,daemon
函数会包装另一个函数,然后将其发送到 /stream
处理程序以执行。/stream
处理程序识别 daemon
函数并将其保留在内存中,以便它可以按时间间隔运行其内部函数。
为了方便元组的推送,daemon
函数必须包装另一个将元组推送到某处的流装饰器。其中一个示例是 update
函数,它包装一个流并将元组发送到另一个 SolrCloud 集合进行索引。
daemon 语法
daemon(id="uniqueId",
runInterval="1000",
terminate="true",
update(destinationCollection,
batchSize=100,
topic(checkpointCollection,
topicCollection,
q="topic query",
fl="id, title, abstract, text",
id="topicId",
initialCheckpoint=0)
)
)
上面的示例代码显示了一个 daemon
函数包装一个 update
函数,该函数包装一个 topic
函数。当此表达式发送到 /stream
处理程序时,/stream
处理程序会看到 daemon
函数并将其保留在内存中,它将在其中按时间间隔运行。在此特定示例中,daemon
函数将每秒运行一次 update
函数。update
函数正在包装一个 topic
函数,该函数将分批流式传输与 topic
函数查询匹配的元组。对主题的每个后续调用将返回主题的下一批元组。update
函数会将所有与主题匹配的元组发送到另一个集合进行索引。terminate
参数告诉守护程序在 topic
函数停止发送元组时终止。
这样做的效果是将与特定查询匹配的文档推送到另一个集合中。可以插入自定义推送函数,将文档从 Solr 中推出到其他系统,例如 Kafka 或电子邮件系统。
推送流还可用于连续后台聚合场景,其中聚合在后台按时间间隔汇总并推送到其他 Solr 集合。另一个用例是连续的后台机器学习模型优化,其中优化的模型被推送到另一个 Solr 集合,可以在其中集成到查询中。
/stream
处理程序支持一小组命令,用于列出和控制守护程序函数
https://127.0.0.1:8983/solr/collection/stream?action=list
此命令将提供特定节点上当前正在运行的守护程序及其当前状态的列表。
https://127.0.0.1:8983/solr/collection/stream?action=stop&id=daemonId
此命令将停止特定的守护程序函数,但将其保留在内存中。
https://127.0.0.1:8983/solr/collection/stream?action=start&id=daemonId
此命令将启动已停止的特定守护程序函数。
https://127.0.0.1:8983/solr/collection/stream?action=kill&id=daemonId
此命令将停止特定的守护程序函数并将其从内存中删除。
连续拉取流
DaemonStream java 类(SolrJ 库的一部分)也可以嵌入到 java 应用程序中以提供连续拉取流。示例代码
StreamContext context = new StreamContext()
SolrClientCache cache = new SolrClientCache();
context.setSolrClientCache(cache);
Map topicQueryParams = new HashMap();
topicQueryParams.put("q","hello"); // The query for the topic
topicQueryparams.put("rows", "500"); // How many rows to fetch during each run
topicQueryparams.put("fl", "id", "title"); // The field list to return with the documents
TopicStream topicStream = new TopicStream(zkHost, // Host address for the ZooKeeper service housing the collections
"checkpoints", // The collection to store the topic checkpoints
"topicData", // The collection to query for the topic records
"topicId", // The id of the topic
-1, // checkpoint every X tuples, if set -1 it will checkpoint after each run.
topicQueryParams); // The query parameters for the TopicStream
DaemonStream daemonStream = new DaemonStream(topicStream, // The underlying stream to run.
"daemonId", // The id of the daemon
1000, // The interval at which to run the internal stream
500); // The internal queue size for the daemon stream. Tuples will be placed in the queue
// as they are read by the internal thread.
// Calling read() on the daemon stream reads records from the internal queue.
daemonStream.setStreamContext(context);
daemonStream.open();
//Read until it's time to shutdown the DaemonStream. You can define the shutdown criteria.
while(!shutdown()) {
Tuple tuple = daemonStream.read() // This will block until tuples become available from the underlying stream (TopicStream)
// The EOF tuple (signaling the end of the stream) will never occur until the DaemonStream has been shutdown.
//Do something with the tuples
}
// Shutdown the DaemonStream.
daemonStream.shutdown();
//Read the DaemonStream until the EOF Tuple is found.
//This allows the underlying stream to perform an orderly shutdown.
while(true) {
Tuple tuple = daemonStream.read();
if(tuple.EOF) {
break;
} else {
//Do something with the tuples.
}
}
//Finally close the stream
daemonStream.close();
delete
delete
函数包装其他函数,并使用找到的 id
和 _version_
值将元组作为 按 Id 删除命令发送到 SolrCloud 集合。
这类似于下面描述的 update()
函数。
delete 参数
-
destinationCollection
:(必需)将删除元组的集合。 -
batchSize
:(可选,默认为250
)删除批量大小。 -
pruneVersionField
:(可选,默认为false
)是否从元组中修剪_version_
值 -
StreamExpression
:(必需)
delete 语法
delete(collection1,
batchSize=500,
search(collection1,
q=old_data:true,
qt="/export",
fl="id",
sort="a_f asc, a_i asc"))
上面的示例使用针对 collection1
的 search
函数返回的元组,并将找到的每个文档的 id
值转换为针对同一 collection1
的删除请求。
与 希望忽略并发更新并删除所有匹配文档的用户应设置 预计会出现并发更新并希望“跳过”任何失败删除的用户应考虑配置 |
eval
eval
函数允许在动态生成然后评估新的流式表达式的情况下使用。eval
函数包装一个流式表达式并从底层流读取单个元组。然后,eval
函数从元组的 expr_s
字段中检索字符串流式表达式。然后,eval
函数编译字符串流式表达式并发出元组。
executor
executor
函数包装包含流式表达式的流源,并并行执行表达式。executor
函数在每个元组的 expr_s
字段中查找表达式。executor
函数具有一个内部线程池,该线程池运行在同一工作节点上并行编译和运行表达式的任务。此函数也可以通过将其包装在 parallel
函数中来跨工作节点并行化,从而提供跨集群的表达式并行执行。
executor
函数不会对其运行的表达式的输出进行任何特定操作。因此,执行的表达式必须包含将元组推送到其目的地的逻辑。可以将 update 函数包含在正在执行的表达式中,以将元组发送到 SolrCloud 集合进行存储。
此模型允许异步执行作业,其中输出存储在 SolrCloud 集合中,可以在作业进行时访问该集合。
fetch
fetch
函数迭代一个流并获取额外的字段,并将它们添加到元组中。fetch
函数以批处理方式获取,以限制对 Solr 的调用次数。从 fetch
函数流出的元组将包含原始字段和获取的额外字段。fetch
函数支持一对一的获取。多对一的获取(其中流源包含重复的键)也有效,但是此函数当前不支持一对多的获取。
having
having
表达式包装一个流,并将一个布尔运算应用于每个元组。它仅发出布尔运算返回 true 的元组。
leftOuterJoin
leftOuterJoin
函数包装两个流,左流和右流,并从左流发出元组。如果右流中存在一个相等(由 on
定义)的元组,则该元组中的值将包含在发出的元组中。左流元组不需要存在一个相等的右流元组才能被发出。这支持一对一、一对多、多对一和多对多的左外连接场景。元组按它们在左流中出现的顺序发出。两个流都必须按用于确定相等性的字段排序(使用 on
参数)。如果两个元组都包含同名字段,则在发出的元组中使用来自右流的值。
您可以将传入的流包装在 select
函数中,以明确哪些字段值包含在发出的元组中。
leftOuterJoin 参数
-
StreamLeft 的 StreamExpression
-
StreamRight 的 StreamExpression
-
on
: 用于检查左流和右流之间元组的相等性的字段。可以是on="字段名"
,on="左流中的字段名=右流中的字段名"
或on="字段名, 其他字段名=右流中其他字段名"
格式。
leftOuterJoin 语法
leftOuterJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId"
)
leftOuterJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId"
)
leftOuterJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
select(
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId,
name as petName
),
on="personId=ownerId"
)
hashJoin
hashJoin
函数包装两个流,左流和右流,并且对于左流中在右流中存在的每个元组,将发出一个包含两个元组字段的元组。这支持一对一、一对多、多对一和多对多的内连接场景。元组按它们在左流中出现的顺序发出。流的顺序无关紧要。如果两个元组都包含同名字段,则在发出的元组中使用来自右流的值。
您可以将传入的流包装在 select
函数中,以明确哪些字段值包含在发出的元组中。
当左流和右流的元组不能按相同顺序排列时,可以使用 hashJoin 函数。由于元组是无序的,因此此流函数在打开操作期间读取右流中的所有值,并将所有元组存储在内存中。结果是内存占用量等于右流的大小。
hashJoin 参数
-
StreamLeft 的 StreamExpression
-
hashed=StreamRight 的 StreamExpression
-
on
: 用于检查左流和右流之间元组的相等性的字段。可以是on="字段名"
,on="左流中的字段名=右流中的字段名"
或on="字段名, 其他字段名=右流中其他字段名"
格式。
hashJoin 语法
hashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId"
)
hashJoin(
search(people, q="*:*", fl="personId,name", sort="personId asc"),
hashed=search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId"
)
hashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=select(
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId,
name as petName
),
on="personId=ownerId"
)
innerJoin
包装两个流,左流和右流。对于左流中在右流中存在的每个元组,将发出一个包含两个元组字段的元组。这支持一对一、一对多、多对一和多对多的内连接场景。元组按它们在左流中出现的顺序发出。两个流都必须按用于确定相等性的字段排序('on' 参数)。如果两个元组都包含同名字段,则在发出的元组中使用来自右流的值。您可以使用 select(…)
表达式包装传入的流,以明确哪些字段值包含在发出的元组中。
innerJoin 参数
-
StreamLeft 的 StreamExpression
-
StreamRight 的 StreamExpression
-
on
: 用于检查左流和右流之间元组的相等性的字段。可以是on="字段名"
,on="左流中的字段名=右流中的字段名"
或on="字段名, 其他字段名=右流中其他字段名"
格式。
innerJoin 语法
innerJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId"
)
innerJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId"
)
innerJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
select(
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId,
name as petName
),
on="personId=ownerId"
)
intersect
intersect
函数包装两个流 A 和 B,并发出 确实 存在于 B 中的 A 中的元组。元组按它们在流 A 中出现的顺序发出。两个流都必须按用于确定相等性的字段排序(on
参数)。仅发出来自 A 的元组。
intersect 参数
-
StreamExpression for StreamA
-
StreamExpression for StreamB
-
on
:用于检查 A 和 B 之间元组相等性的字段。可以是on="fieldName"
、on="fieldNameInLeft=fieldNameInRight"
或on="fieldName, otherFieldName=rightOtherFieldName"
格式。
intersect 语法
intersect(
search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc"),
on="a_i"
)
intersect(
search(collection1, q="a_s:(setA || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
search(collection1, q="a_s:(setB || setAB)", qt="/export", fl="id,a_s,a_i", sort="a_i asc, a_s asc"),
on="a_i,a_s"
)
list
list
函数包装 N 个流表达式,并按顺序打开和迭代每个流。这具有连接多个流表达式结果的效果。
list 语法
list(tuple(a="hello world"), tuple(a="HELLO WORLD"))
list(search(collection1, q="*:*", fl="id, prod_ss", sort="id asc"),
search(collection2, q="*:*", fl="id, prod_ss", sort="id asc"))
list(tuple(a=search(collection1, q="*:*", fl="id, prod_ss", sort="id asc")),
tuple(a=search(collection2, q="*:*", fl="id, prod_ss", sort="id asc")))
merge
merge
函数合并两个或多个流表达式,并保持底层流的顺序。由于保持了顺序,因此底层流的排序必须与提供给 merge 函数的 on 参数对齐。
merge 参数
-
StreamExpression A
-
StreamExpression B
-
可选 StreamExpression C,D,….Z
-
on
: 用于执行合并的排序条件。形式为字段名 顺序
,其中顺序为asc
或desc
。可以以字段A 顺序, 字段B 顺序
的形式提供多个字段。
merge 语法
# Merging two stream expressions together
merge(
search(collection1,
q="id:(0 3 4)",
qt="/export",
fl="id,a_s,a_i,a_f",
sort="a_f asc"),
search(collection1,
q="id:(1)",
qt="/export",
fl="id,a_s,a_i,a_f",
sort="a_f asc"),
on="a_f asc")
# Merging four stream expressions together. Notice that while the sorts of each stream are not identical they are
# comparable. That is to say the first N fields in each stream's sort matches the N fields in the merge's on clause.
merge(
search(collection1,
q="id:(0 3 4)",
qt="/export",
fl="id,fieldA,fieldB,fieldC",
sort="fieldA asc, fieldB desc"),
search(collection1,
q="id:(1)",
qt="/export",
fl="id,fieldA",
sort="fieldA asc"),
search(collection2,
q="id:(10 11 13)",
qt="/export",
fl="id,fieldA,fieldC",
sort="fieldA asc"),
search(collection3,
q="id:(987)",
qt="/export",
fl="id,fieldA,fieldC",
sort="fieldA asc"),
on="fieldA asc")
null
当执行并行关系代数(连接、交集、汇总等)时,null 表达式是一个有用的实用程序函数,用于了解瓶颈。null 函数读取底层流中的所有元组,并返回一个包含计数和处理时间的单个元组。由于 null 流本身添加的开销最小,因此它可以用于隔离 Solr 的 /export
处理程序的性能。如果 /export 处理程序的性能不是瓶颈,则瓶颈很可能发生在运行流装饰器的工作节点中。
null 表达式可以由并行函数包装并发送到工作节点。在这种情况下,每个工作节点将返回一个包含工作节点上处理的元组计数和该工作节点计时信息的元组。这提供了有价值的信息,例如:
-
随着添加更多的工作节点,/export 处理程序的性能是否提高。
-
元组是否均匀地分布在各个工作节点上,或者哈希分区是否将更多文档发送到单个工作节点。
-
所有工作节点是否以相同的速度处理数据,或者其中一个工作节点是否是瓶颈的来源。
outerHashJoin
outerHashJoin
函数包装两个流,左流和右流,并从左流发出元组。如果右流中存在一个相等(由 on
参数定义)的元组,则该元组中的值将包含在发出的元组中。左流元组不需要存在一个相等的右流元组才能被发出。这支持一对一、一对多、多对一和多对多的左外连接场景。元组按它们在左流中出现的顺序发出。流的顺序无关紧要。如果两个元组都包含同名字段,则在发出的元组中使用来自右流的值。
您可以将传入的流包装在 select
函数中,以明确哪些字段值包含在发出的元组中。
当左侧和右侧的元组不能按相同顺序排列时,可以使用 outerHashJoin 流。由于元组是无序的,此流的工作方式是在打开操作期间读取右侧流中的所有值,并将所有元组存储在内存中。这样做的结果是内存占用量等于右侧流的大小。
outerHashJoin 参数
-
StreamLeft 的 StreamExpression
-
hashed=StreamRight 的 StreamExpression
-
on
: 用于检查左流和右流之间元组的相等性的字段。可以是on="字段名"
,on="左流中的字段名=右流中的字段名"
或on="字段名, 其他字段名=右流中其他字段名"
格式。
outerHashJoin 语法
outerHashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=search(pets, q="type:cat", qt="/export", fl="personId,petName", sort="personId asc"),
on="personId"
)
outerHashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=search(pets, q="type:cat", qt="/export", fl="ownerId,petName", sort="ownerId asc"),
on="personId=ownerId"
)
outerHashJoin(
search(people, q="*:*", qt="/export", fl="personId,name", sort="personId asc"),
hashed=select(
search(pets, q="type:cat", qt="/export", fl="ownerId,name", sort="ownerId asc"),
ownerId,
name as petName
),
on="personId=ownerId"
)
parallel
parallel
函数封装一个流式表达式,并将其发送到 N 个工作节点进行并行处理。
parallel
函数要求为底层搜索提供 partitionKeys
参数。partitionKeys
参数会将搜索结果(元组)在工作节点之间进行分区。具有相同 partitionKeys
值的元组将被洗牌到同一个工作节点。
parallel
函数会维护工作节点返回的元组的排序顺序,因此排序条件必须包含工作节点返回的元组的排序顺序。
例如,如果您按年、月和日进行排序,只要有足够多的不同年份来将元组分散到工作节点上,您就可以只按年份进行分区。
Solr 允许按 4 个以上的字段进行排序,但出于速度考虑,您不能指定超过 4 个 partitionKeys
。此外,当一两个键足以分散元组时,指定多个 partitionKeys
是过分的。
当底层搜索流将从集合中发出大量元组时,会设计并行流。如果搜索流仅使用 parallel
发出集合的一小部分数据,则可能会更慢。
工作节点集合
工作节点可以来自与数据相同的集合,也可以是完全不同的集合,甚至是仅为 |
parallel 参数
-
collection
:要将 StreamExpression 发送到的工作节点集合的名称。 -
StreamExpression
:要发送到工作节点集合的表达式。 -
workers
:要将表达式发送到的工作节点集合中的工作节点数量。 -
zkHost
:(可选)工作节点集合所在的 ZooKeeper 连接字符串。仅当使用与您连接的 Solr 实例相同的 ZkHost 时(chroot
可以不同),才会包含 Zookeeper 凭据和 ACL。 -
sort
:用于对工作节点返回的元组进行排序的排序条件。
parallel 语法
parallel(workerCollection,
rollup(search(collection1, q="*:*", fl="id,year_i,month_i,day_i", qt="/export", sort="year_i desc,month_i desc,day_i asc", partitionKeys="year_i"),
over="year_i", count(*)),
workers="20",
zkHost="localhost:9983",
sort="year_i desc")
上面的表达式显示了一个封装了 rollup
函数的 parallel
函数。这将导致 rollup
函数在 20 个工作节点上并行运行。
预热
对于具有相同数量的工作节点和
|
plist
plist
函数封装 N 个 Stream Expressions,并并行打开流,然后依次迭代每个流。list
和 plist
的区别在于,流是并行打开的。由于许多流(如 facet
、stats
和 significantTerms
)在打开时会将繁重的操作推送到 Solr,因此 plist 函数可以通过并行执行这些操作来显着提高性能。
plist 语法
plist(tuple(a="hello world"), tuple(a="HELLO WORLD"))
plist(search(collection1, q="*:*", fl="id, prod_ss", sort="id asc"),
search(collection2, q="*:*", fl="id, prod_ss", sort="id asc"))
plist(tuple(a=search(collection1, q="*:*", fl="id, prod_ss", sort="id asc")),
tuple(a=search(collection2, q="*:*", fl="id, prod_ss", sort="id asc")))
priority
priority
函数是 executor 函数的简单优先级调度器。executor
函数没有直接的任务优先级概念;相反,它只是按照从底层流中读取的顺序执行任务。priority
函数提供了在较早提交的较低优先级任务之前调度较高优先级任务的能力。
priority
函数封装了两个 topic
函数,这两个函数都发出包含要执行的流式表达式的元组。第一个主题被认为是更高优先级的任务队列。
每次调用 priority
函数时,它都会检查更高优先级的任务队列,查看是否有任何任务要执行。如果更高优先级的队列中有任务在等待,则 priority 函数将发出更高优先级的任务。如果没有高优先级任务要运行,则会发出较低优先级队列的任务。
每次调用 priority
函数时,它只会从其中一个队列中发出一批任务。这确保在更高优先级的队列没有要运行的任务之前,不会执行任何较低优先级的任务。
priority 语法
daemon(id="myDaemon",
executor(threads=10,
priority(topic(checkpointCollection, storedExpressions, q="priority:high", fl="id, expr_s", initialCheckPoint=0,id="highPriorityTasks"),
topic(checkpointCollection, storedExpressions, q="priority:low", fl="id, expr_s", initialCheckPoint=0,id="lowPriorityTasks"))))
在上面的示例中,daemon
函数迭代地调用执行器。每次调用时,executor
函数都会执行 priority
函数发出的任务。priority
函数封装了两个主题。第一个主题是更高优先级的任务队列,第二个主题是较低优先级的主题。
reduce
reduce
函数封装了一个内部流,并按公共字段对元组进行分组。
每个元组组都作为一个单独的块被可插拔的 reduce 操作处理。Solr 提供的组操作实现了分布式分组功能。组操作还可以作为构建自定义 reduce 操作时可以参考的示例 reduce 操作。
reduce 函数依赖于底层流的排序顺序。因此,底层流的排序顺序必须与按字段分组对齐。 |
rollup
rollup
函数封装另一个流函数,并在桶字段上汇总聚合。rollup 函数依赖于底层流的排序顺序,以一次对一个分组汇总聚合。因此,底层流的排序顺序必须与 rollup 函数的 over
参数中的字段匹配。
rollup 函数还需要处理整个结果集才能执行其聚合。当底层流是 search
函数时,可以使用 /export
处理程序向 rollup 函数提供完整的排序结果集。这种排序方法允许 rollup 函数对非常高基数的字段执行聚合。这种方法的缺点是,必须对元组进行排序,并通过网络流式传输到工作节点进行聚合。为了更快地聚合低到中等基数的字段,可以使用 facet
函数。
rollup 参数
-
StreamExpression
(必需) -
over
:(必需)按其分组的字段列表。 -
metrics
:(必需)要计算的指标列表。当前支持的指标是sum(col)
、avg(col)
、min(col)
、max(col)
、count(*)
。
rollup 语法
rollup(
search(collection1, q="*:*", qt="/export", fl="a_s,a_i,a_f", qt="/export", sort="a_s asc"),
over="a_s",
sum(a_i),
sum(a_f),
min(a_i),
min(a_f),
max(a_i),
max(a_f),
avg(a_i),
avg(a_f),
count(*)
)
上面的示例显示了封装搜索函数的 rollup 函数。请注意,搜索函数正在使用 /export
处理程序向 rollup 流提供整个结果集。另请注意,搜索函数的 sort
参数与 rollup 的 over
参数匹配。这允许 rollup 函数一次对一个组汇总 a_s
字段。
scoreNodes
请参阅图遍历中的部分。
select
select
函数封装一个流式表达式,并输出包含来自传入元组的字段子集或修改集的元组。输出元组中包含的字段列表可以包含别名以有效地重命名字段。select
流支持操作和评估器。可以提供一个操作和评估器列表,以对任何字段执行操作,例如 replace
、add
、if
等。
select 参数
-
StreamExpression
-
fieldName
:要包含在输出元组中的字段的名称(可以包含多个),例如outputTuple[fieldName] = inputTuple[fieldName]
。fieldName
可以是通配符模式,例如a_*
选择所有以a_
开头的字段。 -
fieldName as aliasFieldName
:要包含在输出元组中的别名字段名称(可以包含多个),例如outputTuple[aliasFieldName] = incomingTuple[fieldName]
-
replace(fieldName, value, withValue=replacementValue)
:如果incomingTuple[fieldName] == value
,则outgoingTuple[fieldName]
将设置为replacementValue
。value
可以是字符串 "null",以将 null 值替换为其他值。 -
replace(fieldName, value, withField=otherFieldName)
:如果incomingTuple[fieldName] == value
,则outgoingTuple[fieldName]
将设置为incomingTuple[otherFieldName]
的值。value
可以是字符串 "null",以将 null 值替换为其他值。
select 语法
// output tuples with fields teamName, wins, losses, and winPercentages where a null value for wins or losses is translated to the value of 0
select(
search(collection1, fl="id,teamName_s,wins,losses", q="*:*", qt="/export", sort="id asc"),
teamName_s as teamName,
wins,
losses,
replace(wins,null,withValue=0),
replace(losses,null,withValue=0),
if(eq(0,wins), 0, div(add(wins,losses), wins)) as winPercentage
)
sort
sort
函数封装一个流式表达式,并对元组重新排序。sort 函数以新的排序顺序发出所有传入的元组。sort 函数从传入的流中读取所有元组,使用具有 O(nlog(n))
性能特征的算法重新排序它们,其中 n 是传入流中元组的总数,然后以新的排序顺序输出元组。由于所有元组都读入内存,因此此函数的内存消耗会随着传入流中元组数量的增加而线性增长。
sort 语法
下面的表达式查找狗主人,并按主人和宠物名称对结果进行排序。请注意,它使用高效的 innerJoin,首先按人/主人 ID 排序,然后按主人和宠物名称重新排序最终输出。
sort(
innerJoin(
search(people, q="*:*", qt="/export", fl="id,name", sort="id asc"),
search(pets, q="type:dog", qt="/export", fl="owner,petName", sort="owner asc"),
on="id=owner"
),
by="name asc, petName asc"
)
唯一
unique
函数包装一个流式表达式,并根据 over
参数发出唯一的元组流。 unique 函数依赖于底层流的排序顺序。 over
参数必须与底层流的排序顺序匹配。
unique 函数实现了一个非同位的唯一算法。这意味着具有相同唯一 over
字段的记录不需要位于同一个分片上。在并行执行时,partitionKeys
参数必须与唯一 over
字段相同,以便具有相同键的记录将被洗牌到同一个工作进程。
更新
update
函数包装另一个函数,并将元组发送到 SolrCloud 集合以作为文档进行索引。
update 参数
-
destinationCollection
: (必需) 将要索引元组的集合。 -
batchSize
: (可选,默认为250
) 索引批次大小。 -
pruneVersionField
: (可选,默认为true
) 是否从元组中删除_version_
值 -
StreamExpression
:(必需)
update 语法
update(destinationCollection,
batchSize=500,
search(collection1,
q=*:*,
qt="/export",
fl="id,a_s,a_i,a_f,s_multi,i_multi",
sort="a_f asc, a_i asc"))
上面的示例将 search
函数返回的元组发送到 destinationCollection
进行索引。
如此示例中所示包装 search(…)
是此装饰器的常见用法:从集合中读取文档作为元组,以某种方式处理或修改它们,然后将它们添加回新集合。因此,pruneVersionField=true
是默认行为 — 在将元组转换为 Solr 文档时,剥离内部流中找到的任何 _version_
值,以防止 乐观并发约束导致任何意外错误。