Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.app.function;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;

import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.api.IDatasourceFunction;
import org.apache.asterix.metadata.declared.DataSourceId;
import org.apache.asterix.metadata.declared.FunctionDataSource;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entities.Function;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;

public class FunctionMetadataDatasource extends FunctionDataSource {

private static final DataSourceId FUNCTION_METADATA_DATASOURCE_ID =
createDataSourceId(FunctionMetadataRewriter.FUNCTION_METADATA);

public FunctionMetadataDatasource(INodeDomain domain) throws AlgebricksException {
super(FUNCTION_METADATA_DATASOURCE_ID, FunctionMetadataRewriter.FUNCTION_METADATA, domain);
}

@Override
protected IDatasourceFunction createFunction(MetadataProvider metadataProvider,
AlgebricksAbsolutePartitionConstraint locations) throws AlgebricksException {
// UDFs live in the metadata catalog (only reachable from the CC), so resolve them here and
// ship the pre-built rows to the node. Builtins are read from the static registry on the node.
List<String> udfRecords = collectUdfRecords(metadataProvider);
// The builtin function registry is identical on every node, so run on a single
// location to avoid emitting duplicate rows.
return new FunctionMetadataFunction(
AlgebricksAbsolutePartitionConstraint.randomLocation(locations.getLocations()), udfRecords);
}

private static List<String> collectUdfRecords(MetadataProvider metadataProvider) throws AlgebricksException {
MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
List<String> records = new ArrayList<>();
if (mdTxnCtx == null) {
return records;
}
for (Dataverse dv : MetadataManager.INSTANCE.getDataverses(mdTxnCtx)) {
List<Function> functions = MetadataManager.INSTANCE.getDataverseFunctions(mdTxnCtx, dv.getDatabaseName(),
dv.getDataverseName());
for (Function fn : functions) {
String category = fn.getKind() == null ? "scalar" : fn.getKind().toLowerCase(Locale.ROOT);
records.add(FunctionMetadataFunction.buildRecord(fn.getName(), fn.getArity(), category, false,
FunctionMetadataFunction.KIND_UDF, fn.getDataverseName().toString(), Collections.emptyList()));
}
}
return records;
}

@Override
protected boolean sameFunctionDatasource(FunctionDataSource other) {
return Objects.equals(this.functionId, other.getFunctionId());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.app.function;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.lang.common.util.CommonFunctionMapUtil;
import org.apache.asterix.metadata.declared.AbstractDatasourceFunction;
import org.apache.asterix.om.functions.BuiltinFunctionInfo;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;

public class FunctionMetadataFunction extends AbstractDatasourceFunction {

private static final long serialVersionUID = 2L;

static final String KIND_BUILTIN = "builtin";
static final String KIND_UDF = "udf";

// UDF rows are resolved on the CC (they live in the metadata catalog) and shipped here.
private final List<String> udfRecords;

public FunctionMetadataFunction(AlgebricksAbsolutePartitionConstraint locations, List<String> udfRecords) {
super(locations);
this.udfRecords = udfRecords;
}

@Override
public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
List<String> records = new ArrayList<>();
Map<String, List<String>> aliasIndex = buildAliasIndex();
for (FunctionIdentifier fi : BuiltinFunctions.getBuiltinFunctionIdentifiers()) {
records.add(toBuiltinRecord(fi, aliasIndex));
}
records.addAll(udfRecords);
return new FunctionMetadataReader(records.toArray(new String[0]));
}

private static String toBuiltinRecord(FunctionIdentifier fi, Map<String, List<String>> aliasIndex) {
BuiltinFunctionInfo info = BuiltinFunctions.getBuiltinFunctionInfo(fi);
boolean isPrivate = info != null && info.isPrivate();
List<String> aliases = aliasIndex.getOrDefault(fi.getName(), Collections.emptyList());
return buildRecord(underscore(fi.getName()), fi.getArity(), category(fi), isPrivate, KIND_BUILTIN, null,
aliases);
}

/**
* Builds a single JSON row. Shared by builtin rows (here) and UDF rows (resolved on the CC in
* {@link FunctionMetadataDatasource}) so both sides emit an identical schema.
*/
static String buildRecord(String name, int arity, String category, boolean isPrivate, String kind, String dataverse,
List<String> aliases) {
StringBuilder sb = new StringBuilder();
sb.append("{\"name\":\"").append(escape(name)).append("\",\"arity\":").append(arity).append(",\"category\":\"")
.append(escape(category)).append("\",\"private\":").append(isPrivate).append(",\"kind\":\"")
.append(escape(kind)).append("\",\"dataverse\":");
if (dataverse == null) {
sb.append("null");
} else {
sb.append('"').append(escape(dataverse)).append('"');
}
sb.append(",\"aliases\":[");
for (int i = 0; i < aliases.size(); i++) {
if (i > 0) {
sb.append(',');
}
sb.append('"').append(escape(aliases.get(i))).append('"');
}
sb.append("]}");
return sb.toString();
}

static String category(FunctionIdentifier fi) {
if (BuiltinFunctions.isWindowFunction(fi) || BuiltinFunctions.getWindowFunction(fi) != null) {
// isWindowFunction matches the internal *-impl ids; getWindowFunction matches the
// user-facing names (row_number, rank, ...) that map to those impls.
return "window";
}
if (BuiltinFunctions.isBuiltinAggregateFunction(fi)) {
return "aggregate";
}
if (BuiltinFunctions.isBuiltinScalarAggregateFunction(fi)) {
return "aggregate-scalar";
}
// Datasource (table-valued) functions are registered as BOTH unnest and datasource, so the
// datasource check MUST come before the unnest check; otherwise every datasource function
// (function_metadata, active_requests, jobs, ...) would be mislabeled "unnest".
if (BuiltinFunctions.getDatasourceTransformer(fi) != null) {
return "datasource";
}
if (BuiltinFunctions.isBuiltinUnnestingFunction(fi)) {
return "unnest";
}
return "scalar";
}

/**
* Builds an index from an internal (hyphenated) function name to its callable aliases, derived
* from {@link CommonFunctionMapUtil}. Alias spellings are normalized to the underscore form so
* they line up with the {@code name} column.
*/
static Map<String, List<String>> buildAliasIndex() {
Map<String, List<String>> index = new HashMap<>();
for (Map.Entry<String, String> e : CommonFunctionMapUtil.getFunctionMappings().entrySet()) {
index.computeIfAbsent(e.getValue(), k -> new ArrayList<>()).add(underscore(e.getKey()));
}
for (List<String> aliases : index.values()) {
Collections.sort(aliases);
}
return index;
}

static String underscore(String name) {
return name.replace('-', '_');
}

private static String escape(String s) {
StringBuilder sb = new StringBuilder(s.length());
for (int i = 0; i < s.length(); i++) {
char c = s.charAt(i);
if (c == '"' || c == '\\') {
sb.append('\\').append(c);
} else if (c < 0x20) {
sb.append(String.format("\\u%04x", (int) c));
} else {
sb.append(c);
}
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.app.function;

import java.io.IOException;

import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.input.record.CharArrayRecord;

public class FunctionMetadataReader extends FunctionReader {

private final String[] functions;
private int pos = 0;

public FunctionMetadataReader(String[] functions) {
this.functions = functions;
}

@Override
public boolean hasNext() throws Exception {
return pos < functions.length;
}

@Override
public IRawRecord<char[]> next() throws IOException, InterruptedException {
CharArrayRecord record = new CharArrayRecord();
record.set(functions[pos++]);
record.endRecord();
return record;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.app.function;

import org.apache.asterix.common.functions.FunctionConstants;
import org.apache.asterix.metadata.declared.FunctionDataSource;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;

public class FunctionMetadataRewriter extends FunctionRewriter {

public static final FunctionIdentifier FUNCTION_METADATA = FunctionConstants.newAsterix("function-metadata", 0);
public static final FunctionMetadataRewriter INSTANCE = new FunctionMetadataRewriter(FUNCTION_METADATA);

private FunctionMetadataRewriter(FunctionIdentifier functionId) {
super(functionId);
}

@Override
protected FunctionDataSource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression f)
throws AlgebricksException {
return new FunctionMetadataDatasource(context.getComputationNodeDomain());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.asterix.app.function.DatasetRewriter;
import org.apache.asterix.app.function.DumpIndexRewriter;
import org.apache.asterix.app.function.FeedRewriter;
import org.apache.asterix.app.function.FunctionMetadataRewriter;
import org.apache.asterix.app.function.JobSummariesRewriter;
import org.apache.asterix.app.function.PingRewriter;
import org.apache.asterix.app.function.QueryIndexRewriter;
Expand Down Expand Up @@ -119,6 +120,12 @@ public class MetadataBuiltinFunctions {
BuiltinFunctions.addUnnestFun(CollectionEstimateColumnCountRewriter.ESTIMATED_COLLECTION_COLUMN_COUNT, true);
BuiltinFunctions.addDatasourceFunction(CollectionEstimateColumnCountRewriter.ESTIMATED_COLLECTION_COLUMN_COUNT,
CollectionEstimateColumnCountRewriter.INSTANCE);
// Function metadata function
BuiltinFunctions.addFunction(FunctionMetadataRewriter.FUNCTION_METADATA,
(expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
BuiltinFunctions.addUnnestFun(FunctionMetadataRewriter.FUNCTION_METADATA, true);
BuiltinFunctions.addDatasourceFunction(FunctionMetadataRewriter.FUNCTION_METADATA,
FunctionMetadataRewriter.INSTANCE, BuiltinFunctions.DataSourceFunctionProperty.MIN_MEMORY_BUDGET);
}

private MetadataBuiltinFunctions() {
Expand Down
Loading