Skip to content

Commit 5633f4e

Browse files
committed
fix(glue): Support create_table for S3 Tables federated databases
GlueCatalog.create_table() fails for S3 Tables because it tries to write Iceberg metadata to a warehouse-derived location before registering the table in Glue. S3 Tables manages storage internally, so the location is not known until the table exists in the service. Detect S3 Tables federated databases (FederatedDatabase.ConnectionType == "aws:s3tables") and use a two-phase create: first create a minimal table entry in Glue to have S3 Tables allocate storage, then write Iceberg metadata to the managed location and update the Glue table with the metadata pointer. On failure, clean up the table entry. This follows the same "pre-create then update" pattern used by the Java GlueCatalog when LakeFormation is enabled (GlueTableOperations.createGlueTempTableIfNecessary).
1 parent 89a129c commit 5633f4e

File tree

1 file changed

+129
-2
lines changed

1 file changed

+129
-2
lines changed

pyiceberg/catalog/glue.py

Lines changed: 129 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
cast,
2525
)
2626

27+
import logging
28+
2729
import boto3
2830
from botocore.config import Config
2931

@@ -120,6 +122,8 @@
120122
ICEBERG_FIELD_OPTIONAL = "iceberg.field.optional"
121123
ICEBERG_FIELD_CURRENT = "iceberg.field.current"
122124

125+
logger = logging.getLogger(__name__)
126+
123127
GLUE_PROFILE_NAME = "glue.profile-name"
124128
GLUE_REGION = "glue.region"
125129
GLUE_ACCESS_KEY_ID = "glue.access-key-id"
@@ -417,6 +421,114 @@ def _get_glue_table(self, database_name: str, table_name: str) -> "TableTypeDef"
417421
except self.glue.exceptions.EntityNotFoundException as e:
418422
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e
419423

424+
def _is_s3tables_database(self, database_name: str) -> bool:
425+
"""Check if a Glue database is federated with S3 Tables.
426+
427+
S3 Tables databases have a FederatedDatabase property with
428+
ConnectionType set to aws:s3tables.
429+
430+
Args:
431+
database_name: The name of the Glue database.
432+
433+
Returns:
434+
True if the database is an S3 Tables federated database.
435+
"""
436+
try:
437+
database_response = self.glue.get_database(Name=database_name)
438+
except self.glue.exceptions.EntityNotFoundException:
439+
return False
440+
database = database_response["Database"]
441+
federated = database.get("FederatedDatabase", {})
442+
return (federated.get("ConnectionType") or "").lower() == "aws:s3tables"
443+
444+
def _create_table_s3tables(
445+
self,
446+
identifier: str | Identifier,
447+
database_name: str,
448+
table_name: str,
449+
schema: Union[Schema, "pa.Schema"],
450+
location: str | None,
451+
partition_spec: PartitionSpec,
452+
sort_order: SortOrder,
453+
properties: Properties,
454+
) -> Table:
455+
"""Create an Iceberg table in an S3 Tables federated database.
456+
457+
S3 Tables manages storage internally, so the table location is not known until the
458+
table is created in the service. This method:
459+
1. Creates a minimal table entry in Glue (format=ICEBERG), which causes S3 Tables
460+
to allocate storage.
461+
2. Retrieves the managed storage location via GetTable.
462+
3. Writes Iceberg metadata to that location.
463+
4. Updates the Glue table entry with the metadata pointer.
464+
465+
On failure, the table created in step 1 is deleted.
466+
"""
467+
if location is not None:
468+
raise ValueError(
469+
f"Cannot specify a location for S3 Tables table {database_name}.{table_name}. "
470+
"S3 Tables manages the storage location automatically."
471+
)
472+
473+
# Create a minimal table in Glue so S3 Tables allocates storage.
474+
self._create_glue_table(
475+
database_name=database_name,
476+
table_name=table_name,
477+
table_input={
478+
"Name": table_name,
479+
"Parameters": {"format": "ICEBERG"},
480+
},
481+
)
482+
483+
try:
484+
# Retrieve the managed storage location.
485+
glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
486+
storage_descriptor = glue_table.get("StorageDescriptor", {})
487+
managed_location = storage_descriptor.get("Location")
488+
if not managed_location:
489+
raise ValueError(
490+
f"S3 Tables did not assign a storage location for {database_name}.{table_name}"
491+
)
492+
493+
# Build the Iceberg metadata targeting the managed location.
494+
staged_table = self._create_staged_table(
495+
identifier=identifier,
496+
schema=schema,
497+
location=managed_location,
498+
partition_spec=partition_spec,
499+
sort_order=sort_order,
500+
properties=properties,
501+
)
502+
503+
# Write metadata and update the Glue table with the metadata pointer.
504+
self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location)
505+
table_input = _construct_table_input(
506+
table_name, staged_table.metadata_location, properties, staged_table.metadata
507+
)
508+
version_id = glue_table.get("VersionId")
509+
if not version_id:
510+
raise CommitFailedException(
511+
f"Cannot commit {database_name}.{table_name} because Glue table version id is missing"
512+
)
513+
self._update_glue_table(
514+
database_name=database_name,
515+
table_name=table_name,
516+
table_input=table_input,
517+
version_id=version_id,
518+
)
519+
except Exception:
520+
# Clean up the table created in step 1.
521+
try:
522+
self.glue.delete_table(DatabaseName=database_name, Name=table_name)
523+
except Exception:
524+
logger.warning(
525+
f"Failed to clean up S3 Tables table {database_name}.{table_name}",
526+
exc_info=logger.isEnabledFor(logging.DEBUG),
527+
)
528+
raise
529+
530+
return self.load_table(identifier=identifier)
531+
420532
def create_table(
421533
self,
422534
identifier: str | Identifier,
@@ -433,6 +545,7 @@ def create_table(
433545
identifier: Table identifier.
434546
schema: Table's schema.
435547
location: Location for the table. Optional Argument.
548+
Must not be set for S3 Tables, which manage their own storage.
436549
partition_spec: PartitionSpec for the table.
437550
sort_order: SortOrder for the table.
438551
properties: Table properties that can be a string based dictionary.
@@ -442,9 +555,24 @@ def create_table(
442555
443556
Raises:
444557
AlreadyExistsError: If a table with the name already exists.
445-
ValueError: If the identifier is invalid, or no path is given to store metadata.
558+
ValueError: If the identifier is invalid, no path is given to store metadata,
559+
or a location is specified for an S3 Tables table.
446560
447561
"""
562+
database_name, table_name = self.identifier_to_database_and_table(identifier)
563+
564+
if self._is_s3tables_database(database_name):
565+
return self._create_table_s3tables(
566+
identifier=identifier,
567+
database_name=database_name,
568+
table_name=table_name,
569+
schema=schema,
570+
location=location,
571+
partition_spec=partition_spec,
572+
sort_order=sort_order,
573+
properties=properties,
574+
)
575+
448576
staged_table = self._create_staged_table(
449577
identifier=identifier,
450578
schema=schema,
@@ -453,7 +581,6 @@ def create_table(
453581
sort_order=sort_order,
454582
properties=properties,
455583
)
456-
database_name, table_name = self.identifier_to_database_and_table(identifier)
457584

458585
self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location)
459586
table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata)

0 commit comments

Comments
 (0)