diff --git a/src/backend/commands/creatinh.c b/src/backend/commands/creatinh.c index 602ceb54328..7aaaa3cdebf 100644 --- a/src/backend/commands/creatinh.c +++ b/src/backend/commands/creatinh.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/Attic/creatinh.c,v 1.74 2001/03/22 06:16:11 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/Attic/creatinh.c,v 1.75 2001/03/30 20:50:36 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -34,11 +34,11 @@ * ---------------- */ -static int checkAttrExists(const char *attributeName, - const char *attributeType, List *schema); static List *MergeAttributes(List *schema, List *supers, bool istemp, List **supOids, List **supconstr); +static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno); static void StoreCatalogInheritance(Oid relationId, List *supers); +static int findAttrByName(const char *attributeName, List *schema); static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass); @@ -63,9 +63,10 @@ DefineRelation(CreateStmt *stmt, char relkind) int i; AttrNumber attnum; - if (strlen(stmt->relname) >= NAMEDATALEN) - elog(ERROR, "the relation name %s is >= %d characters long", - stmt->relname, NAMEDATALEN); + /* + * Truncate relname to appropriate length (probably a waste of time, + * as parser should have done this already). + */ StrNCpy(relname, stmt->relname, NAMEDATALEN); /* @@ -80,7 +81,7 @@ DefineRelation(CreateStmt *stmt, char relkind) elog(ERROR, "DefineRelation: please inherit from a relation or define an attribute"); /* - * create a relation descriptor from the relation schema and create + * Create a relation descriptor from the relation schema and create * the relation. Note that in this stage only inherited (pre-cooked) * defaults and constraints will be included into the new relation. * (BuildDescForRelation takes care of the inherited defaults, but we @@ -234,12 +235,339 @@ TruncateRelation(char *name) heap_truncate(name); } +/*---------- + * MergeAttributes + * Returns new schema given initial schema and superclasses. + * + * Input arguments: + * 'schema' is the column/attribute definition for the table. (It's a list + * of ColumnDef's.) It is destructively changed. + * 'supers' is a list of names (as Value objects) of parent relations. + * 'istemp' is TRUE if we are creating a temp relation. + * + * Output arguments: + * 'supOids' receives an integer list of the OIDs of the parent relations. + * 'supconstr' receives a list of constraints belonging to the parents, + * updated as necessary to be valid for the child. + * + * Return value: + * Completed schema list. + * + * Notes: + * The order in which the attributes are inherited is very important. + * Intuitively, the inherited attributes should come first. If a table + * inherits from multiple parents, the order of those attributes are + * according to the order of the parents specified in CREATE TABLE. + * + * Here's an example: + * + * create table person (name text, age int4, location point); + * create table emp (salary int4, manager text) inherits(person); + * create table student (gpa float8) inherits (person); + * create table stud_emp (percent int4) inherits (emp, student); + * + * The order of the attributes of stud_emp is: + * + * person {1:name, 2:age, 3:location} + * / \ + * {6:gpa} student emp {4:salary, 5:manager} + * \ / + * stud_emp {7:percent} + * + * If the same attribute name appears multiple times, then it appears + * in the result table in the proper location for its first appearance. + *---------- + */ +static List * +MergeAttributes(List *schema, List *supers, bool istemp, + List **supOids, List **supconstr) +{ + List *entry; + List *inhSchema = NIL; + List *parentOids = NIL; + List *constraints = NIL; + int child_attno; + + /* + * Check for duplicate names in the explicit list of attributes. + * + * Although we might consider merging such entries in the same way that + * we handle name conflicts for inherited attributes, it seems to make + * more sense to assume such conflicts are errors. + */ + foreach(entry, schema) + { + ColumnDef *coldef = lfirst(entry); + List *rest; + + foreach(rest, lnext(entry)) + { + ColumnDef *restdef = lfirst(rest); + + if (strcmp(coldef->colname, restdef->colname) == 0) + { + elog(ERROR, "CREATE TABLE: attribute \"%s\" duplicated", + coldef->colname); + } + } + } + /* + * Reject duplicate names in the list of parents, too. + */ + foreach(entry, supers) + { + List *rest; + + foreach(rest, lnext(entry)) + { + if (strcmp(strVal(lfirst(entry)), strVal(lfirst(rest))) == 0) + { + elog(ERROR, "CREATE TABLE: inherited relation \"%s\" duplicated", + strVal(lfirst(entry))); + } + } + } + + /* + * Scan the parents left-to-right, and merge their attributes to form + * a list of inherited attributes (inhSchema). + */ + child_attno = 0; + foreach(entry, supers) + { + char *name = strVal(lfirst(entry)); + Relation relation; + TupleDesc tupleDesc; + TupleConstr *constr; + AttrNumber *newattno; + AttrNumber parent_attno; + + relation = heap_openr(name, AccessShareLock); + + if (relation->rd_rel->relkind != RELKIND_RELATION) + elog(ERROR, "CREATE TABLE: inherited relation \"%s\" is not a table", name); + /* Permanent rels cannot inherit from temporary ones */ + if (!istemp && is_temp_rel_name(name)) + elog(ERROR, "CREATE TABLE: cannot inherit from temp relation \"%s\"", name); + + /* + * We should have an UNDER permission flag for this, but for now, + * demand that creator of a child table own the parent. + */ + if (!pg_ownercheck(GetUserId(), name, RELNAME)) + elog(ERROR, "you do not own table \"%s\"", name); + + parentOids = lappendi(parentOids, relation->rd_id); + setRelhassubclassInRelation(relation->rd_id, true); + + tupleDesc = RelationGetDescr(relation); + constr = tupleDesc->constr; + + /* + * newattno[] will contain the child-table attribute numbers for + * the attributes of this parent table. (They are not the same + * for parents after the first one.) + */ + newattno = (AttrNumber *) palloc(tupleDesc->natts*sizeof(AttrNumber)); + + for (parent_attno = 1; parent_attno <= tupleDesc->natts; + parent_attno++) + { + Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1]; + char *attributeName; + char *attributeType; + HeapTuple tuple; + int exist_attno; + ColumnDef *def; + TypeName *typename; + + /* + * Get name and type name of attribute + */ + attributeName = NameStr(attribute->attname); + tuple = SearchSysCache(TYPEOID, + ObjectIdGetDatum(attribute->atttypid), + 0, 0, 0); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "CREATE TABLE: cache lookup failed for type %u", + attribute->atttypid); + attributeType = pstrdup(NameStr(((Form_pg_type) GETSTRUCT(tuple))->typname)); + ReleaseSysCache(tuple); + + /* + * Does it conflict with some previously inherited column? + */ + exist_attno = findAttrByName(attributeName, inhSchema); + if (exist_attno > 0) + { + /* + * Yes, try to merge the two column definitions. + * They must have the same type and typmod. + */ + elog(NOTICE, "CREATE TABLE: merging multiple inherited definitions of attribute \"%s\"", + attributeName); + def = (ColumnDef *) nth(exist_attno - 1, inhSchema); + if (strcmp(def->typename->name, attributeType) != 0 || + def->typename->typmod != attribute->atttypmod) + elog(ERROR, "CREATE TABLE: inherited attribute \"%s\" type conflict (%s and %s)", + attributeName, def->typename->name, attributeType); + /* Merge of NOT NULL constraints = OR 'em together */ + def->is_not_null |= attribute->attnotnull; + /* Default and other constraints are handled below */ + newattno[parent_attno - 1] = exist_attno; + } + else + { + /* + * No, create a new inherited column + */ + def = makeNode(ColumnDef); + def->colname = pstrdup(attributeName); + typename = makeNode(TypeName); + typename->name = attributeType; + typename->typmod = attribute->atttypmod; + def->typename = typename; + def->is_not_null = attribute->attnotnull; + def->is_sequence = false; + def->raw_default = NULL; + def->cooked_default = NULL; + def->constraints = NIL; + inhSchema = lappend(inhSchema, def); + newattno[parent_attno - 1] = ++child_attno; + } + /* + * Copy default if any, overriding any default from earlier parent + */ + if (attribute->atthasdef) + { + AttrDefault *attrdef; + int i; + + def->raw_default = NULL; + def->cooked_default = NULL; + + Assert(constr != NULL); + attrdef = constr->defval; + for (i = 0; i < constr->num_defval; i++) + { + if (attrdef[i].adnum == parent_attno) + { + /* + * if default expr could contain any vars, we'd + * need to fix 'em, but it can't ... + */ + def->cooked_default = pstrdup(attrdef[i].adbin); + break; + } + } + Assert(def->cooked_default != NULL); + } + } + /* + * Now copy the constraints of this parent, adjusting attnos using + * the completed newattno[] map + */ + if (constr && constr->num_check > 0) + { + ConstrCheck *check = constr->check; + int i; + + for (i = 0; i < constr->num_check; i++) + { + Constraint *cdef = makeNode(Constraint); + Node *expr; + + cdef->contype = CONSTR_CHECK; + if (check[i].ccname[0] == '$') + cdef->name = NULL; + else + cdef->name = pstrdup(check[i].ccname); + cdef->raw_expr = NULL; + /* adjust varattnos of ccbin here */ + expr = stringToNode(check[i].ccbin); + change_varattnos_of_a_node(expr, newattno); + cdef->cooked_expr = nodeToString(expr); + constraints = lappend(constraints, cdef); + } + } + + pfree(newattno); + + /* + * Close the parent rel, but keep our AccessShareLock on it until + * xact commit. That will prevent someone else from deleting or + * ALTERing the parent before the child is committed. + */ + heap_close(relation, NoLock); + } + + /* + * If we had no inherited attributes, the result schema is just the + * explicitly declared columns. Otherwise, we need to merge the + * declared columns into the inherited schema list. + */ + if (inhSchema != NIL) + { + foreach(entry, schema) + { + ColumnDef *newdef = lfirst(entry); + char *attributeName = newdef->colname; + char *attributeType = newdef->typename->name; + int exist_attno; + + /* + * Does it conflict with some previously inherited column? + */ + exist_attno = findAttrByName(attributeName, inhSchema); + if (exist_attno > 0) + { + ColumnDef *def; + + /* + * Yes, try to merge the two column definitions. + * They must have the same type and typmod. + */ + elog(NOTICE, "CREATE TABLE: merging attribute \"%s\" with inherited definition", + attributeName); + def = (ColumnDef *) nth(exist_attno - 1, inhSchema); + if (strcmp(def->typename->name, attributeType) != 0 || + def->typename->typmod != newdef->typename->typmod) + elog(ERROR, "CREATE TABLE: attribute \"%s\" type conflict (%s and %s)", + attributeName, def->typename->name, attributeType); + /* Merge of NOT NULL constraints = OR 'em together */ + def->is_not_null |= newdef->is_not_null; + /* If new def has a default, override previous default */ + if (newdef->raw_default != NULL) + { + def->raw_default = newdef->raw_default; + def->cooked_default = newdef->cooked_default; + } + } + else + { + /* + * No, attach new column to result schema + */ + inhSchema = lappend(inhSchema, newdef); + } + } + + schema = inhSchema; + } + + *supOids = parentOids; + *supconstr = constraints; + return schema; +} + /* * complementary static functions for MergeAttributes(). - * Varattnos of pg_relcheck.rcbin should be rewritten when - * subclasses inherit the constraints from the super class. - * Note that these functions rewrite varattnos while walking - * through a node tree. + * + * Varattnos of pg_relcheck.rcbin must be rewritten when subclasses inherit + * constraints from parent classes, since the inherited attributes could + * be given different column numbers in multiple-inheritance cases. + * + * Note that the passed node tree is modified in place! */ static bool change_varattnos_walker(Node *node, const AttrNumber *newattno) @@ -250,7 +578,8 @@ change_varattnos_walker(Node *node, const AttrNumber *newattno) { Var *var = (Var *) node; - if (var->varlevelsup == 0 && var->varno == 1) + if (var->varlevelsup == 0 && var->varno == 1 && + var->varattno > 0) { /* @@ -273,273 +602,6 @@ change_varattnos_of_a_node(Node *node, const AttrNumber *newattno) return change_varattnos_walker(node, newattno); } -/* - * MergeAttributes - * Returns new schema given initial schema and supers. - * - * Input arguments: - * - * 'schema' is the column/attribute definition for the table. (It's a list - * of ColumnDef's.) It is destructively changed. - * 'supers' is a list of names (as Value objects) of parent relations. - * 'istemp' is TRUE if we are creating a temp relation. - * - * Output arguments: - * 'supOids' receives an integer list of the OIDs of the parent relations. - * 'supconstr' receives a list of constraints belonging to the parents. - * - * Notes: - * The order in which the attributes are inherited is very important. - * Intuitively, the inherited attributes should come first. If a table - * inherits from multiple parents, the order of those attributes are - * according to the order of the parents specified in CREATE TABLE. - * - * Here's an example: - * - * create table person (name text, age int4, location point); - * create table emp (salary int4, manager text) inherits(person); - * create table student (gpa float8) inherits (person); - * create table stud_emp (percent int4) inherits (emp, student); - * - * the order of the attributes of stud_emp is as follow: - * - * - * person {1:name, 2:age, 3:location} - * / \ - * {6:gpa} student emp {4:salary, 5:manager} - * \ / - * stud_emp {7:percent} - */ -static List * -MergeAttributes(List *schema, List *supers, bool istemp, - List **supOids, List **supconstr) -{ - List *entry; - List *inhSchema = NIL; - List *parentOids = NIL; - List *constraints = NIL; - int attnums; - - /* - * Validates that there are no duplications. Validity checking of - * types occurs later. - */ - foreach(entry, schema) - { - ColumnDef *coldef = lfirst(entry); - List *rest; - - foreach(rest, lnext(entry)) - { - - /* - * check for duplicated names within the new relation - */ - ColumnDef *restdef = lfirst(rest); - - if (strcmp(coldef->colname, restdef->colname) == 0) - { - elog(ERROR, "CREATE TABLE: attribute \"%s\" duplicated", - coldef->colname); - } - } - } - foreach(entry, supers) - { - List *rest; - - foreach(rest, lnext(entry)) - { - if (strcmp(strVal(lfirst(entry)), strVal(lfirst(rest))) == 0) - { - elog(ERROR, "CREATE TABLE: inherited relation \"%s\" duplicated", - strVal(lfirst(entry))); - } - } - } - - /* - * merge the inherited attributes into the schema - */ - attnums = 0; - foreach(entry, supers) - { - char *name = strVal(lfirst(entry)); - Relation relation; - List *partialResult = NIL; - AttrNumber attrno; - TupleDesc tupleDesc; - TupleConstr *constr; - AttrNumber *newattno, - *partialAttidx; - Node *expr; - int i, - attidx, - attno_exist; - - relation = heap_openr(name, AccessShareLock); - - if (relation->rd_rel->relkind != RELKIND_RELATION) - elog(ERROR, "CREATE TABLE: inherited relation \"%s\" is not a table", name); - /* Permanent rels cannot inherit from temporary ones */ - if (!istemp && is_temp_rel_name(name)) - elog(ERROR, "CREATE TABLE: cannot inherit from temp relation \"%s\"", name); - - /* - * We should have an UNDER permission flag for this, but for now, - * demand that creator of a child table own the parent. - */ - if (!pg_ownercheck(GetUserId(), name, RELNAME)) - elog(ERROR, "you do not own table \"%s\"", name); - - parentOids = lappendi(parentOids, relation->rd_id); - setRelhassubclassInRelation(relation->rd_id, true); - tupleDesc = RelationGetDescr(relation); - /* allocate a new attribute number table and initialize */ - newattno = (AttrNumber *) palloc(tupleDesc->natts * sizeof(AttrNumber)); - for (i = 0; i < tupleDesc->natts; i++) - newattno[i] = 0; - - /* - * searching and storing order are different. another table is - * needed. - */ - partialAttidx = (AttrNumber *) palloc(tupleDesc->natts * sizeof(AttrNumber)); - for (i = 0; i < tupleDesc->natts; i++) - partialAttidx[i] = 0; - constr = tupleDesc->constr; - - attidx = 0; - for (attrno = relation->rd_rel->relnatts - 1; attrno >= 0; attrno--) - { - Form_pg_attribute attribute = tupleDesc->attrs[attrno]; - char *attributeName; - char *attributeType; - HeapTuple tuple; - ColumnDef *def; - TypeName *typename; - - /* - * form name, type and constraints - */ - attributeName = NameStr(attribute->attname); - tuple = SearchSysCache(TYPEOID, - ObjectIdGetDatum(attribute->atttypid), - 0, 0, 0); - if (!HeapTupleIsValid(tuple)) - elog(ERROR, "CREATE TABLE: cache lookup failed for type %u", - attribute->atttypid); - attributeType = pstrdup(NameStr(((Form_pg_type) GETSTRUCT(tuple))->typname)); - ReleaseSysCache(tuple); - - /* - * check validity - * - */ - if (checkAttrExists(attributeName, attributeType, schema) != 0) - elog(ERROR, "CREATE TABLE: attribute \"%s\" already exists in inherited schema", - attributeName); - - if (0 < (attno_exist = checkAttrExists(attributeName, attributeType, inhSchema))) - { - - /* - * this entry already exists - */ - newattno[attribute->attnum - 1] = attno_exist; - continue; - } - attidx++; - partialAttidx[attribute->attnum - 1] = attidx; - - /* - * add an entry to the schema - */ - def = makeNode(ColumnDef); - typename = makeNode(TypeName); - def->colname = pstrdup(attributeName); - typename->name = pstrdup(attributeType); - typename->typmod = attribute->atttypmod; - def->typename = typename; - def->is_not_null = attribute->attnotnull; - def->raw_default = NULL; - def->cooked_default = NULL; - if (attribute->atthasdef) - { - AttrDefault *attrdef; - int i; - - Assert(constr != NULL); - - attrdef = constr->defval; - for (i = 0; i < constr->num_defval; i++) - { - if (attrdef[i].adnum == attrno + 1) - { - def->cooked_default = pstrdup(attrdef[i].adbin); - break; - } - } - Assert(def->cooked_default != NULL); - } - partialResult = lcons(def, partialResult); - } - for (i = 0; i < tupleDesc->natts; i++) - { - if (partialAttidx[i] > 0) - newattno[i] = attnums + attidx + 1 - partialAttidx[i]; - } - attnums += attidx; - pfree(partialAttidx); - - if (constr && constr->num_check > 0) - { - ConstrCheck *check = constr->check; - int i; - - for (i = 0; i < constr->num_check; i++) - { - Constraint *cdef = makeNode(Constraint); - - cdef->contype = CONSTR_CHECK; - if (check[i].ccname[0] == '$') - cdef->name = NULL; - else - cdef->name = pstrdup(check[i].ccname); - cdef->raw_expr = NULL; - /* adjust varattnos of ccbin here */ - expr = stringToNode(check[i].ccbin); - change_varattnos_of_a_node(expr, newattno); - cdef->cooked_expr = nodeToString(expr); - constraints = lappend(constraints, cdef); - } - } - pfree(newattno); - - /* - * Close the parent rel, but keep our AccessShareLock on it until - * xact commit. That will prevent someone else from deleting or - * ALTERing the parent before the child is committed. - */ - heap_close(relation, NoLock); - - /* - * wants the inherited schema to appear in the order they are - * specified in CREATE TABLE - */ - inhSchema = nconc(inhSchema, partialResult); - } - - /* - * put the inherited schema before our the schema for this table - */ - schema = nconc(inhSchema, schema); - - *supOids = parentOids; - *supconstr = constraints; - return schema; -} - /* * StoreCatalogInheritance * Updates the system catalogs with proper inheritance information. @@ -716,15 +778,14 @@ again: heap_close(relation, RowExclusiveLock); } - - /* - * returns the index (starting with 1) if attribute already exists in schema, + * Look for an existing schema entry with the given name. + * + * Returns the index (starting with 1) if attribute already exists in schema, * 0 if it doesn't. */ static int -checkAttrExists(const char *attributeName, const char *attributeType, - List *schema) +findAttrByName(const char *attributeName, List *schema) { List *s; int i = 0; @@ -735,21 +796,14 @@ checkAttrExists(const char *attributeName, const char *attributeType, ++i; if (strcmp(attributeName, def->colname) == 0) - { - - /* - * attribute exists. Make sure the types are the same. - */ - if (strcmp(attributeType, def->typename->name) != 0) - elog(ERROR, "CREATE TABLE: attribute \"%s\" type conflict (%s and %s)", - attributeName, attributeType, def->typename->name); return i; - } } return 0; } - +/* + * Update a relation's pg_class.relhassubclass entry to the given value + */ static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass) { diff --git a/src/test/regress/expected/create_table.out b/src/test/regress/expected/create_table.out index 6a3fc96ff13..a0ddd5ef609 100644 --- a/src/test/regress/expected/create_table.out +++ b/src/test/regress/expected/create_table.out @@ -81,6 +81,9 @@ CREATE TABLE student ( CREATE TABLE stud_emp ( percent int4 ) INHERITS (emp, student); +NOTICE: CREATE TABLE: merging multiple inherited definitions of attribute "name" +NOTICE: CREATE TABLE: merging multiple inherited definitions of attribute "age" +NOTICE: CREATE TABLE: merging multiple inherited definitions of attribute "location" CREATE TABLE city ( name name, location box, @@ -132,6 +135,8 @@ CREATE TABLE c_star ( CREATE TABLE d_star ( d float8 ) INHERITS (b_star, c_star); +NOTICE: CREATE TABLE: merging multiple inherited definitions of attribute "class" +NOTICE: CREATE TABLE: merging multiple inherited definitions of attribute "a" CREATE TABLE e_star ( e int2 ) INHERITS (c_star); diff --git a/src/test/regress/expected/inherit.out b/src/test/regress/expected/inherit.out index 3bc4199e118..b56798124f3 100644 --- a/src/test/regress/expected/inherit.out +++ b/src/test/regress/expected/inherit.out @@ -5,6 +5,8 @@ CREATE TABLE a (aa TEXT); CREATE TABLE b (bb TEXT) INHERITS (a); CREATE TABLE c (cc TEXT) INHERITS (a); CREATE TABLE d (dd TEXT) INHERITS (b,c,a); +NOTICE: CREATE TABLE: merging multiple inherited definitions of attribute "aa" +NOTICE: CREATE TABLE: merging multiple inherited definitions of attribute "aa" INSERT INTO a(aa) VALUES('aaa'); INSERT INTO a(aa) VALUES('aaaa'); INSERT INTO a(aa) VALUES('aaaaa');