Skip to content

Commit

Permalink
[core] Introduce expire_tags action and procedure with olderThan user…
Browse files Browse the repository at this point in the history
… custom time (#4138)
  • Loading branch information
askwang authored Oct 15, 2024
1 parent 55f2102 commit 6f033f0
Show file tree
Hide file tree
Showing 15 changed files with 832 additions and 6 deletions.
14 changes: 14 additions & 0 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,20 @@ All available procedures are listed below.
CALL sys.delete_tag(`table` => 'default.T', tag => 'my_tag')
</td>
</tr>
<tr>
<td>expire_tags</td>
<td>
CALL [catalog.]sys.expire_tags('identifier', 'older_than')
</td>
<td>
To expire tags by time. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>older_than: tagCreateTime before which tags will be removed.</li>
</td>
<td>
CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 11:00:00')
</td>
</tr>
<tr>
<td>merge_into</td>
<td>
Expand Down
11 changes: 11 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,17 @@ This section introduce all available spark procedures about paimon.
</td>
<td>CALL sys.delete_tag(table => 'default.T', tag => 'my_tag')</td>
</tr>
<tr>
<td>expire_tags</td>
<td>
To expire tags by time. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>older_than: tagCreateTime before which tags will be removed.</li>
</td>
<td>
CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 11:00:00')
</td>
</tr>
<tr>
<td>rollback</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void run() {
tagAutoCreation.run();
}
if (tagTimeExpire != null) {
tagTimeExpire.run();
tagTimeExpire.expire();
}
}

Expand All @@ -65,4 +65,8 @@ public static TagAutoManager create(
public TagAutoCreation getTagAutoCreation() {
return tagAutoCreation;
}

public TagTimeExpire getTagTimeExpire() {
return tagTimeExpire;
}
}
45 changes: 40 additions & 5 deletions paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@

package org.apache.paimon.tag;

import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;

/** A manager to expire tags by time. */
Expand All @@ -41,6 +45,8 @@ public class TagTimeExpire {
private final TagDeletion tagDeletion;
private final List<TagCallback> callbacks;

private LocalDateTime olderThanTime;

private TagTimeExpire(
SnapshotManager snapshotManager,
TagManager tagManager,
Expand All @@ -52,24 +58,53 @@ private TagTimeExpire(
this.callbacks = callbacks;
}

public void run() {
public List<String> expire() {
List<Pair<Tag, String>> tags = tagManager.tagObjects();
List<String> expired = new ArrayList<>();
for (Pair<Tag, String> pair : tags) {
Tag tag = pair.getLeft();
String tagName = pair.getRight();
LocalDateTime createTime = tag.getTagCreateTime();
Duration timeRetained = tag.getTagTimeRetained();
if (createTime == null || timeRetained == null) {
continue;
if (olderThanTime != null) {
FileStatus tagFileStatus;
try {
tagFileStatus =
snapshotManager.fileIO().getFileStatus(tagManager.tagPath(tagName));
} catch (IOException e) {
LOG.warn(
"Tag path {} not exist, skip expire it.",
tagManager.tagPath(tagName));
continue;
}
createTime = DateTimeUtils.toLocalDateTime(tagFileStatus.getModificationTime());
} else {
continue;
}
}
if (LocalDateTime.now().isAfter(createTime.plus(timeRetained))) {
boolean isReachTimeRetained =
timeRetained != null
&& LocalDateTime.now().isAfter(createTime.plus(timeRetained));
boolean isOlderThan = olderThanTime != null && olderThanTime.isAfter(createTime);
if (isReachTimeRetained || isOlderThan) {
LOG.info(
"Delete tag {}, because its existence time has reached its timeRetained of {}.",
"Delete tag {}, because its existence time has reached its timeRetained of {} or"
+ " its createTime {} is olderThan olderThanTime {}.",
tagName,
timeRetained);
timeRetained,
createTime,
olderThanTime);
tagManager.deleteTag(tagName, tagDeletion, snapshotManager, callbacks);
expired.add(tagName);
}
}
return expired;
}

public TagTimeExpire withOlderThanTime(LocalDateTime olderThanTime) {
this.olderThanTime = olderThanTime;
return this;
}

public static TagTimeExpire create(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.DateTimeUtils;

import org.apache.flink.table.procedure.ProcedureContext;

import java.time.LocalDateTime;
import java.util.List;
import java.util.TimeZone;

/** A procedure to expire tags by time. */
public class ExpireTagsProcedure extends ProcedureBase {

private static final String IDENTIFIER = "expire_tags";

public String[] call(ProcedureContext procedureContext, String tableId)
throws Catalog.TableNotExistException {
return call(procedureContext, tableId, null);
}

public String[] call(ProcedureContext procedureContext, String tableId, String olderThanStr)
throws Catalog.TableNotExistException {
FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
TagTimeExpire tagTimeExpire =
fileStoreTable.store().newTagCreationManager().getTagTimeExpire();
if (olderThanStr != null) {
LocalDateTime olderThanTime =
DateTimeUtils.parseTimestampData(olderThanStr, 3, TimeZone.getDefault())
.toLocalDateTime();
tagTimeExpire.withOlderThanTime(olderThanTime);
}
List<String> expired = tagTimeExpire.expire();
return expired.isEmpty()
? new String[] {"No expired tags."}
: expired.toArray(new String[0]);
}

@Override
public String identifier() {
return IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -489,4 +489,31 @@ public void testRewriteFileIndex() {
assertThatCode(() -> sql("CALL sys.rewrite_file_index('default.T', 'pt = 0')"))
.doesNotThrowAnyException();
}

@Test
public void testExpireTags() throws Exception {
sql(
"CREATE TABLE T ("
+ " k STRING,"
+ " dt STRING,"
+ " PRIMARY KEY (k, dt) NOT ENFORCED"
+ ") PARTITIONED BY (dt) WITH ("
+ " 'bucket' = '1'"
+ ")");
FileStoreTable table = paimonTable("T");
for (int i = 1; i <= 3; i++) {
sql("INSERT INTO T VALUES ('" + i + "', '" + i + "')");
}
assertThat(table.snapshotManager().snapshotCount()).isEqualTo(3L);

sql("CALL sys.create_tag('default.T', 'tag-1', 1)");
sql("CALL sys.create_tag('default.T', 'tag-2', 2, '1d')");
sql("CALL sys.create_tag('default.T', 'tag-3', 3, '1s')");

assertThat(sql("select count(*) from `T$tags`")).containsExactly(Row.of(3L));

Thread.sleep(1000);
assertThat(sql("CALL sys.expire_tags('default.T')"))
.containsExactlyInAnyOrder(Row.of("tag-3"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.flink.procedure.ExpireTagsProcedure;

import org.apache.flink.table.procedure.DefaultProcedureContext;

import java.util.Map;

/** Expire tags action for Flink. */
public class ExpireTagsAction extends ActionBase {

private final String table;
private final String olderThan;

public ExpireTagsAction(
String warehouse, String table, String olderThan, Map<String, String> catalogConfig) {
super(warehouse, catalogConfig);
this.table = table;
this.olderThan = olderThan;
}

@Override
public void run() throws Exception {
ExpireTagsProcedure expireTagsProcedure = new ExpireTagsProcedure();
expireTagsProcedure.withCatalog(catalog);
expireTagsProcedure.call(new DefaultProcedureContext(env), table, olderThan);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import java.util.Map;
import java.util.Optional;

/** Factory to create {@link ExpireTagsAction}. */
public class ExpireTagsActionFactory implements ActionFactory {

private static final String IDENTIFIER = "expire_tags";

private static final String OLDER_THAN = "older_than";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
String warehouse = params.get(WAREHOUSE);
String table = params.get(TABLE);
String olderThan = params.get(OLDER_THAN);
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);

ExpireTagsAction expireTagsAction =
new ExpireTagsAction(warehouse, table, olderThan, catalogConfig);
return Optional.of(expireTagsAction);
}

@Override
public void printHelp() {
System.out.println("Action \"expire_tags\" expire tags by time.");
System.out.println();

System.out.println("Syntax:");
System.out.println(
" expire_tags --warehouse <warehouse_path> "
+ "--table <database.table_name> "
+ "[--older_than <older_than>]");
}
}
Loading

0 comments on commit 6f033f0

Please sign in to comment.